diff --git a/Gemfile b/Gemfile
index 5eee0307..db939f38 100644
--- a/Gemfile
+++ b/Gemfile
@@ -2,5 +2,6 @@ source 'https://rubygems.org'
gemspec
+gem "pry"
gem "hiredis-client"
# gem 'bunny', '=0.7.10', :path => "#{ENV['HOME']}/src/bunny"
diff --git a/lib/beetle/base.rb b/lib/beetle/base.rb
index ea5f49db..5d3cf774 100644
--- a/lib/beetle/base.rb
+++ b/lib/beetle/base.rb
@@ -18,6 +18,10 @@ def initialize(client, options = {}) #:nodoc:
private
+ def single_broker_mode?
+ @client.single_broker_mode?
+ end
+
def error(text)
logger.error text
raise Error.new(text)
diff --git a/lib/beetle/client.rb b/lib/beetle/client.rb
index 2c62b88a..85f61635 100644
--- a/lib/beetle/client.rb
+++ b/lib/beetle/client.rb
@@ -68,6 +68,10 @@ def initialize(config = Beetle.config)
)
end
+ def single_broker_mode?
+ servers.size == 1
+ end
+
# register an exchange with the given _name_ and a set of _options_:
# [:type]
# the type option will be overwritten and always be :topic, beetle does not allow fanout exchanges
diff --git a/lib/beetle/message.rb b/lib/beetle/message.rb
index aa047166..d1da3518 100644
--- a/lib/beetle/message.rb
+++ b/lib/beetle/message.rb
@@ -62,6 +62,14 @@ class Message
# value returned by handler execution
attr_reader :handler_result
+ def self.create(queue, header, body, opts = {})
+ new(queue, header, body, opts)
+ end
+
+ def self.single_broker(queue, header, body, opts = {})
+ SingleBrokerMessage.new(queue, header, body, opts)
+ end
+
def initialize(queue, header, body, opts = {})
@queue = queue
@header = header
@@ -101,6 +109,7 @@ def decode #:nodoc:
def self.publishing_options(opts = {}) #:nodoc:
flags = 0
flags |= FLAG_REDUNDANT if opts[:redundant]
+
expires_at = now + (opts[:ttl] || DEFAULT_TTL).to_i
opts = opts.slice(*PUBLISHING_KEYS)
opts[:message_id] = generate_uuid.to_s
diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb
index c96829e0..6746173e 100644
--- a/lib/beetle/publisher.rb
+++ b/lib/beetle/publisher.rb
@@ -43,6 +43,7 @@ def publish(message_name, data, opts={}) #:nodoc:
opts = @client.messages[message_name].merge(opts.symbolize_keys)
exchange_name = opts.delete(:exchange)
opts.delete(:queue)
+
recycle_dead_servers unless @dead_servers.empty?
throttle!
if opts[:redundant]
diff --git a/lib/beetle/single_broker_message.rb b/lib/beetle/single_broker_message.rb
new file mode 100644
index 00000000..8a476e5c
--- /dev/null
+++ b/lib/beetle/single_broker_message.rb
@@ -0,0 +1,101 @@
+module Beetle
+ class SingleBrokerMessage < Message
+ def initialize(*args, **kwargs)
+ super(*args, **kwargs)
+ @store = nil
+ end
+
+ def ack!
+ logger.debug "Beetle: ack! for message #{msg_id}"
+ header.ack
+ end
+
+ def set_delay!
+ log_not_supported("delay between retries")
+ end
+
+ def delayed?
+ log_not_supported("delay between retries")
+ false
+ end
+
+ def redundant?
+ false
+ end
+
+ def increment_execution_attempts!; end
+
+ def attempts_limit_reached?(_attempts = nil)
+ # TODO: implement
+ false
+ end
+
+ def exceptions_limit_reached?
+ # TODO: implement
+ false
+ end
+
+ private
+
+ def log_not_supported(what)
+ logger.warn "Beetle: Feature not supported in single broker mode => #{what}"
+ end
+
+ def run_handler!(handler)
+ case result = run_handler(handler)
+ when RC::OK
+ ack!
+ result
+ else
+ handler_failed!(result)
+ end
+ end
+
+ def handler_failed!(result)
+ if attempts_limit_reached?
+ ack!
+ logger.debug "Beetle: reached the handler execution attempts limit: #{attempts_limit} on #{msg_id}"
+ RC::AttemptsLimitReached
+ elsif exceptions_limit_reached?
+ ack!
+ logger.debug "Beetle: reached the handler exceptions limit: #{exceptions_limit} on #{msg_id}"
+ RC::ExceptionsLimitReached
+ elsif !exception_accepted?
+ ack!
+ logger.debug "Beetle: `#{@exception.class.name}` not accepted: `retry_on`=[#{retry_on.join(',')}] on #{msg_id}"
+ RC::ExceptionNotAccepted
+ else
+ result
+ end
+ end
+
+ # Open questions:
+ # - do we need to support timeouts that span executions?
+ def process_internal(handler)
+ if @exception
+ ack!
+ RC::DecodingError
+ elsif @pre_exception
+ ack!
+ RC::PreprocessingError
+ elsif expired?
+ logger.warn "Beetle: ignored expired message (#{msg_id})!"
+ ack!
+ RC::Ancient
+ elsif simple?
+ ack!
+ run_handler(handler) == RC::HandlerCrash ? RC::AttemptsLimitReached : RC::OK
+ elsif attempts_limit_reached?
+ ack!
+ logger.warn "Beetle: reached the handler execution attempts limit: #{attempts_limit} on #{msg_id}"
+ RC::AttemptsLimitReached
+ elsif exceptions_limit_reached?
+ ack!
+ logger.warn "Beetle: reached the handler exceptions limit: #{exceptions_limit} on #{msg_id}"
+ RC::ExceptionsLimitReached
+ else
+ run_handler!(handler)
+ end
+ end
+ end
+end
diff --git a/lib/beetle/subscriber.rb b/lib/beetle/subscriber.rb
index 46b8f26a..aaad7f19 100644
--- a/lib/beetle/subscriber.rb
+++ b/lib/beetle/subscriber.rb
@@ -177,7 +177,11 @@ def create_subscription_callback(queue_name, amqp_queue_name, handler, opts)
end
begin
message_options = opts.merge(:server => server, :store => @client.deduplication_store)
- m = Message.new(amqp_queue_name, header, data, message_options)
+ m = if single_broker_mode?
+ SingleBrokerMessage.create(amqp_queue_name, header, data, message_options)
+ else
+ Message.create(amqp_queue_name, header, data, message_options)
+ end
processor = Handler.create(handler, opts)
result = m.process(processor)
if result.reject?
diff --git a/test/beetle/bunny_behavior_test.rb b/test/beetle/bunny_behavior_test.rb
index 6149e977..d3ac7e7c 100644
--- a/test/beetle/bunny_behavior_test.rb
+++ b/test/beetle/bunny_behavior_test.rb
@@ -26,8 +26,6 @@ class BunnyBehaviorTest < Minitest::Test
assert_equal({"bar" => "baz"}, headers["table"])
end
-
-
test "publishing redundantly does not leave the garbage in dedup store" do
Beetle.config.servers = "localhost:5672,localhost:5673"
client = Beetle::Client.new
@@ -38,7 +36,7 @@ class BunnyBehaviorTest < Minitest::Test
# empty the dedup store
client.deduplication_store.flushdb
- handler = TestHandler.new(stop_listening_after_n_post_processes = 2, client = client)
+ handler = TestHandler.new(2, client = client)
client.register_handler(:test_garbage, handler)
published = client.publish(:test_garbage, 'bam', :redundant =>true)
listen(client)
@@ -49,9 +47,9 @@ class BunnyBehaviorTest < Minitest::Test
message = messages_processed.first
assert_equal 2, published
assert_equal "bam", message.data
- Beetle::DeduplicationStore::KEY_SUFFIXES.each{|suffix|
- assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
- }
+ Beetle::DeduplicationStore::KEY_SUFFIXES.each do |suffix|
+ assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
+ end
end
test "process redundant message once" do
@@ -64,7 +62,7 @@ class BunnyBehaviorTest < Minitest::Test
# empty the dedup store
client.deduplication_store.flushdb
- handler = TestHandler.new(stop_listening_after_n_post_processes = 2, client = client)
+ handler = TestHandler.new(2, client = client)
client.register_handler(:test_processing, handler)
published = client.publish(:test_processing, 'bam', :redundant =>true)
listen(client)
@@ -98,16 +96,20 @@ class BunnyBehaviorTest < Minitest::Test
assert_equal 1, published
assert_equal "bam", message.data
- Beetle::DeduplicationStore::KEY_SUFFIXES.map{|suffix|
- assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
- }
+ Beetle::DeduplicationStore::KEY_SUFFIXES.map do |suffix|
+ assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
+ end
end
+ # FIXME: that's not really testing that publisher confirms work
test "publishing with confirms works as expected" do
- Beetle.config.servers = "localhost:5672"
- client = Beetle::Client.new
+ config = Beetle::Configuration.new
+ config.servers = "localhost:5672"
+ config.publisher_confirms = true
+ client = Beetle::Client.new(config)
+
client.register_queue(:test_publisher_confirms)
- client.register_message(:test_publisher_confirms, :publisher_confirms => true)
+ client.register_message(:test_publisher_confirms)
# purge the test queue
client.purge(:test_publisher_confirms)
@@ -119,26 +121,21 @@ class BunnyBehaviorTest < Minitest::Test
client.stop_publishing
assert_equal 1, published
- assert_equal "bam", message.data
-
+ assert_equal "bam", message.data
end
-
- def listen(client , timeout = 1)
- Timeout.timeout(timeout) do
- client.listen
+ def listen(client, timeout = 1)
+ Timeout.timeout(timeout) do
+ client.listen
end
- rescue Timeout::Error
- puts "Client listen timed out after #{timeout} seconds"
- nil
+ rescue Timeout::Error
+ puts "Client listen timed out after #{timeout} seconds"
+ nil
end
-
class TestHandler < Beetle::Handler
- attr_reader :messages_processed
- attr_reader :pre_process_invocations
- attr_reader :post_process_invocations
+ attr_reader :messages_processed, :pre_process_invocations, :post_process_invocations
def initialize(stop_listening_after_n_post_processes, client)
super()
@@ -149,22 +146,20 @@ def initialize(stop_listening_after_n_post_processes, client)
@messages_processed = []
end
- def pre_process(message)
+ def pre_process(_message)
@pre_process_invocations += 1
end
-
+
def process
@messages_processed << message
end
def post_process
@post_process_invocations += 1
- if @post_process_invocations >= @stop_listening_after_n_post_processes
- @client.stop_listening
- end
+ return unless @post_process_invocations >= @stop_listening_after_n_post_processes
+ @client.stop_listening
end
end
-
end
diff --git a/test/beetle/client_test.rb b/test/beetle/client_test.rb
index 3f61aa0f..f6458fc6 100644
--- a/test/beetle/client_test.rb
+++ b/test/beetle/client_test.rb
@@ -477,4 +477,22 @@ def setup
end
end
+ class SingleBrokerModeTest < Minitest::Test
+ test "when only one server is configured we set single broker mode" do
+ config = Configuration.new
+ config.servers = "localhost:5672"
+ client = Client.new(config)
+
+ assert client.single_broker_mode?
+ end
+
+ test "when more than one server is configured we set single broker mode to false" do
+ config = Configuration.new
+ config.servers = "localhost:5672,localhost:5673"
+ client = Client.new(config)
+
+ refute client.single_broker_mode?
+ end
+ end
+
end
diff --git a/test/beetle/message_test.rb b/test/beetle/message_test.rb
index e0aab1bd..0d44b3f7 100644
--- a/test/beetle/message_test.rb
+++ b/test/beetle/message_test.rb
@@ -219,7 +219,6 @@ def setup
end
class AckingTest < Minitest::Test
-
def setup
@store = DeduplicationStore.new
@store.flushdb
@@ -237,6 +236,17 @@ def setup
assert_equal :no, processed
end
+ test "[SingleBroker] an expired message should be acked without calling the handler" do
+ header = header_with_params(:ttl => -1)
+ header.expects(:ack)
+ message = SingleBrokerMessage.new("somequeue", header, 'foo', :store => @store)
+ assert message.expired?
+
+ processed = :no
+ message.process(Handler.create(lambda {|*args| processed = true}))
+ assert_equal :no, processed
+ end
+
test "a delayed message should not be acked and the handler should not be called" do
header = header_with_params()
header.expects(:ack).never
@@ -250,6 +260,10 @@ def setup
assert_equal :no, processed
end
+ test "[SingleBroker] a delayed message should not be acked and the handler should not be called" do
+ assert true, "Not supported in single broker mode"
+ end
+
test "acking a non redundant message should remove the ack_count key" do
header = header_with_params({})
header.expects(:ack)
@@ -260,6 +274,10 @@ def setup
assert !@store.exists(message.msg_id, :ack_count)
end
+ test "[SingleBroker] acking a non redundant message should remove the ack_count key" do
+ assert true, "Not supported in single broker mode. No redundant messages."
+ end
+
test "a redundant message should be acked after calling the handler" do
header = header_with_params({:redundant => true})
message = Message.new("somequeue", header, 'foo', :store => @store)
@@ -269,6 +287,10 @@ def setup
message.process(@null_handler)
end
+ test "[SingleBroker] a redundant message should be acked after calling the handler" do
+ assert true, "Not supported in single broker mode. No redundant messages."
+ end
+
test "acking a redundant message should increment the ack_count key" do
header = header_with_params({:redundant => true})
header.expects(:ack)
@@ -277,7 +299,12 @@ def setup
assert_nil @store.get(message.msg_id, :ack_count)
message.process(@null_handler)
assert message.redundant?
- assert_equal "1", @store.get(message.msg_id, :ack_count)
+ assert_equal "1", @store.get(message.msg_id, :ack_count).to_s
+ end
+
+
+ test "[SingleBroker] acking a redundant message should increment the ack_count key" do
+ assert true, "Not supported in single broker mode. No redundant messages."
end
test "acking a redundant message twice should remove the ack_count key" do
@@ -291,6 +318,10 @@ def setup
assert !@store.exists(message.msg_id, :ack_count)
end
+
+ test "[SingleBroker] acking a redundant message twice should remove the ack_count key" do
+ assert true, "Not supported in single broker mode. No redundant messages."
+ end
end
class FreshMessageTest < Minitest::Test
@@ -312,6 +343,20 @@ def setup
assert_equal RC::OK, message.process(handler)
end
+ test "[SingleBroker] processing a fresh message sucessfully should first run the handler and then ack it" do
+ header = header_with_params({})
+ message = SingleBrokerMessage.new("somequeue", header, 'foo', :attempts => 2, :store => @store)
+ assert !message.attempts_limit_reached?
+
+ handler = mock("handler")
+ s = sequence("s")
+ handler.expects(:pre_process).with(message).in_sequence(s)
+ handler.expects(:call).in_sequence(s)
+ header.expects(:ack).in_sequence(s)
+ assert_equal RC::OK, message.process(handler)
+ end
+
+
test "after processing a redundant fresh message successfully the ack count should be 1 and the status should be completed" do
header = header_with_params({:redundant => true})
message = Message.new("somequeue", header, 'foo', :timeout => 10.seconds, :store => @store)
@@ -324,11 +369,17 @@ def setup
message.expects(:completed!).in_sequence(s)
header.expects(:ack).in_sequence(s)
assert_equal RC::OK, message.__send__(:process_internal, proc)
- assert_equal "1", @store.get(message.msg_id, :ack_count)
+ assert_equal "1", @store.get(message.msg_id, :ack_count).to_s
end
+
+ test "[SingleBroker] after processing a redundant fresh message successfully the ack count should be 1 and the status should be completed" do
+
+ assert true, "Not supported in single broker mode. No redundant messages."
+ end
end
+
class SimpleMessageTest < Minitest::Test
def setup
@config = Configuration.new
@@ -349,6 +400,20 @@ def setup
assert_equal RC::OK, message.process(handler)
end
+ test "[SingleBroker] when processing a simple message, ack should follow calling the handler" do
+ header = header_with_params({})
+ message = SingleBrokerMessage.new("somequeue", header, 'foo', :attempts => 1, :store => nil)
+
+ handler = mock("handler")
+ s = sequence("s")
+ handler.expects(:pre_process).with(message).in_sequence(s)
+ header.expects(:ack).in_sequence(s)
+ handler.expects(:call).in_sequence(s)
+ assert_equal RC::OK, message.process(handler)
+ end
+
+
+
test "when processing a simple message, RC::AttemptsLimitReached should be returned if the handler crashes" do
header = header_with_params({})
message = Message.new("somequeue", header, 'foo', :attempts => 1, :store => @store)
@@ -364,6 +429,22 @@ def setup
assert_equal RC::AttemptsLimitReached, message.process(handler)
end
+ test "[SingleBroker] when processing a simple message, RC::AttemptsLimitReached should be returned if the handler crashes" do
+ header = header_with_params({})
+ message = SingleBrokerMessage.new("somequeue", header, 'foo', :attempts => 1, :store => nil)
+
+ handler = mock("handler")
+ s = sequence("s")
+ handler.expects(:pre_process).with(message).in_sequence(s)
+ header.expects(:ack).in_sequence(s)
+ e = Exception.new("ohoh")
+ handler.expects(:call).in_sequence(s).raises(e)
+ handler.expects(:process_exception).with(e).in_sequence(s)
+ handler.expects(:process_failure).with(RC::AttemptsLimitReached).in_sequence(s)
+ assert_equal RC::AttemptsLimitReached, message.process(handler)
+ end
+
+
test "when processing a simple message, the handler should be executed only once if status keys are used" do
@config.redis_status_key_expiry_interval = 1.minute
header = header_with_params({})
@@ -405,9 +486,9 @@ def setup
header.expects(:ack).never
assert_equal RC::HandlerCrash, message.__send__(:process_internal, proc)
assert !message.completed?
- assert_equal "1", @store.get(message.msg_id, :exceptions)
- assert_equal "0", @store.get(message.msg_id, :timeout)
- assert_equal "52", @store.get(message.msg_id, :delay)
+ assert_equal "1", @store.get(message.msg_id, :exceptions).to_s
+ assert_equal "0", @store.get(message.msg_id, :timeout).to_s
+ assert_equal "52", @store.get(message.msg_id, :delay).to_s
end
test "a message should delete the mutex before resetting the timer if attempts and exception limits haven't been reached" do
@@ -744,7 +825,6 @@ def setup
result = message.process(handler)
assert_equal RC::ExceptionsLimitReached, result
end
-
end
class MySQLFailoverTest < Minitest::Test
diff --git a/test/beetle/single_broker_mode_behavior_test.rb b/test/beetle/single_broker_mode_behavior_test.rb
new file mode 100644
index 00000000..2f00ef44
--- /dev/null
+++ b/test/beetle/single_broker_mode_behavior_test.rb
@@ -0,0 +1,81 @@
+require 'timeout'
+require File.expand_path(File.dirname(__FILE__) + '/../test_helper')
+
+class SingleBrokerModeBehaviorTest < Minitest::Test
+ attr_reader :client
+
+ def setup
+ Beetle.config.servers = "localhost:5672"
+ @client = Beetle::Client.new
+ client.register_queue(:test_single_broker)
+ client.register_message(:test_single_broker)
+ client.purge(:test_single_broker)
+ end
+
+ test "handles retries correctly" do
+ assert true
+ end
+
+ test "handles attempts correctly" do
+ assert true
+ end
+
+ test "handles timeouts correctly" do
+ message = nil
+
+ client.register_handler(:test_single_broker, timeout: 1) do |msg|
+ sleep 1.5
+ message = msg
+ client.stop_listening
+ end
+ published = client.publish(:test_single_broker, 'single-broker')
+
+ listen(client, 2)
+ client.stop_publishing
+
+ assert_equal 1, published
+ refute message
+ end
+
+ test "ignores expired messages" do
+ message = nil
+
+ client.register_handler(:test_single_broker) do |msg|
+ message = msg
+ client.stop_listening
+ end
+ published = client.publish(:test_single_broker, 'single-broker', ttl: -3600) # expired 1 hour ago
+
+ listen(client)
+ client.stop_publishing
+
+ assert_equal 1, published
+ refute message
+ end
+
+ test "processes fresh message" do
+ message = nil
+
+ client.register_handler(:test_single_broker) do |msg|
+ message = msg
+ client.stop_listening
+ end
+ published = client.publish(:test_single_broker, 'single-broker', ttl: 30)
+
+ listen(client)
+ client.stop_publishing
+
+ assert_equal 1, published
+ assert_equal "single-broker", message.data
+ end
+
+ def listen(client, timeout = 1)
+ Timeout.timeout(timeout) do
+ client.listen
+ end
+ rescue Timeout::Error
+ puts "Client listen timed out after #{timeout} seconds"
+ nil
+ end
+
+end