Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ group :development, :test do
gem "dotenv"
gem "extlz4"
gem "gssapi", ">= 1.2.0"
gem "prometheus-client"
gem "pry"
gem "rake", "~> 10.0"
gem "rspec"
Expand Down
1 change: 1 addition & 0 deletions lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
topic: batch.topic,
partition: batch.partition,
last_offset: batch.last_offset,
last_create_time: batch.messages.last && batch.messages.last.create_time,
offset_lag: batch.offset_lag,
highwater_mark_offset: batch.highwater_mark_offset,
message_count: batch.messages.count,
Expand Down
316 changes: 316 additions & 0 deletions lib/kafka/prometheus.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
# frozen_string_literal: true

#
# Subscriber to ruby_kafka to report metrics to prometheus
#
# Usage:
# require "kafka/prometheus"
#
# Once the file has been required, no further configuration is needed, all operational
# metrics are automatically emitted (Unless PROMETHEUS_NO_AUTO_START is set).
#
# By Peter Mustel, T2 Data AB
#
begin
require 'prometheus/client'
rescue LoadError
warn 'In order to report Kafka client metrics to Prometheus you need to install the `prometheus-client` gem.'
raise
end

require 'active_support/subscriber'

module Kafka
module Prometheus
SIZE_BUCKETS = [1, 10, 100, 1000, 10_000, 100_000, 1_000_000].freeze
LATENCY_BUCKETS = [0.0001, 0.001, 0.01, 0.1, 1.0, 10, 100, 1000].freeze
DELAY_BUCKETS = [1, 3, 10, 30, 100, 300, 1000, 3000, 10_000, 30_000].freeze

class << self
attr_accessor :registry

def start(registry = ::Prometheus::Client.registry)
@registry = registry
ConnectionSubscriber.attach_to 'connection.kafka'
ConsumerSubscriber.attach_to 'consumer.kafka'
ProducerSubscriber.attach_to 'producer.kafka'
AsyncProducerSubscriber.attach_to 'async_producer.kafka'
FetcherSubscriber.attach_to 'fetcher.kafka'
end
end

class ConnectionSubscriber < ActiveSupport::Subscriber
def initialize
super
@api_calls = Prometheus.registry.counter(:api_calls, docstring: 'Total calls', labels: [:client, :api, :broker])
@api_latency = Prometheus.registry.histogram(:api_latency, docstring: 'Latency', buckets: LATENCY_BUCKETS, labels: [:client, :api, :broker])
@api_request_size = Prometheus.registry.histogram(:api_request_size, docstring: 'Request size', buckets: SIZE_BUCKETS, labels: [:client, :api, :broker])
@api_response_size = Prometheus.registry.histogram(:api_response_size, docstring: 'Response size', buckets: SIZE_BUCKETS, labels: [:client, :api, :broker])
@api_errors = Prometheus.registry.counter(:api_errors, docstring: 'Errors', labels: [:client, :api, :broker])
end

def request(event)
key = {
client: event.payload.fetch(:client_id),
api: event.payload.fetch(:api, 'unknown'),
broker: event.payload.fetch(:broker_host)
}
request_size = event.payload.fetch(:request_size, 0)
response_size = event.payload.fetch(:response_size, 0)

@api_calls.increment(labels: key)
@api_latency.observe(event.duration, labels: key)
@api_request_size.observe(request_size, labels: key)
@api_response_size.observe(response_size, labels: key)
@api_errors.increment(labels: key) if event.payload.key?(:exception)
end
end

class ConsumerSubscriber < ActiveSupport::Subscriber
def initialize
super
@process_messages = Prometheus.registry.counter(:consumer_process_messages, docstring: 'Total messages', labels: [:client, :group_id, :topic, :partition])
@process_message_errors = Prometheus.registry.counter(:consumer_process_message_errors, docstring: 'Total errors', labels: [:client, :group_id, :topic, :partition])
@process_message_latency =
Prometheus.registry.histogram(:consumer_process_message_latency, docstring: 'Latency', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition])
@offset_lag = Prometheus.registry.gauge(:consumer_offset_lag, docstring: 'Offset lag', labels: [:client, :group_id, :topic, :partition])
@time_lag = Prometheus.registry.gauge(:consumer_time_lag, docstring: 'Time lag of message', labels: [:client, :group_id, :topic, :partition])
@process_batch_errors = Prometheus.registry.counter(:consumer_process_batch_errors, docstring: 'Total errors in batch', labels: [:client, :group_id, :topic, :partition])
@process_batch_latency =
Comment thread
dhruvparanjape marked this conversation as resolved.
Comment thread
dhruvparanjape marked this conversation as resolved.
Prometheus.registry.histogram(:consumer_process_batch_latency, docstring: 'Latency in batch', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition])
@batch_size = Prometheus.registry.histogram(:consumer_batch_size, docstring: 'Size of batch', buckets: SIZE_BUCKETS, labels: [:client, :group_id, :topic, :partition])
@join_group = Prometheus.registry.histogram(:consumer_join_group, docstring: 'Time to join group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
@join_group_errors = Prometheus.registry.counter(:consumer_join_group_errors, docstring: 'Total error in joining group', labels: [:client, :group_id])
@sync_group = Prometheus.registry.histogram(:consumer_sync_group, docstring: 'Time to sync group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
@sync_group_errors = Prometheus.registry.counter(:consumer_sync_group_errors, docstring: 'Total error in syncing group', labels: [:client, :group_id])
@leave_group = Prometheus.registry.histogram(:consumer_leave_group, docstring: 'Time to leave group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
@leave_group_errors = Prometheus.registry.counter(:consumer_leave_group_errors, docstring: 'Total error in leaving group', labels: [:client, :group_id])
@pause_duration = Prometheus.registry.gauge(:consumer_pause_duration, docstring: 'Pause duration', labels: [:client, :group_id, :topic, :partition])
end

def process_message(event)
key = {
client: event.payload.fetch(:client_id),
group_id: event.payload.fetch(:group_id),
topic: event.payload.fetch(:topic),
partition: event.payload.fetch(:partition)
}

offset_lag = event.payload.fetch(:offset_lag)
create_time = event.payload.fetch(:create_time)

time_lag = create_time && ((Time.now - create_time) * 1000).to_i
Comment thread
dhruvparanjape marked this conversation as resolved.

if event.payload.key?(:exception)
@process_message_errors.increment(labels: key)
else
@process_message_latency.observe(event.duration, labels: key)
@process_messages.increment(labels: key)
end

@offset_lag.set(offset_lag, labels: key)

# Not all messages have timestamps.
return unless time_lag

@time_lag.set(time_lag, labels: key)
end

def process_batch(event)
key = {
client: event.payload.fetch(:client_id),
group_id: event.payload.fetch(:group_id),
topic: event.payload.fetch(:topic),
partition: event.payload.fetch(:partition)
}
message_count = event.payload.fetch(:message_count)

if event.payload.key?(:exception)
@process_batch_errors.increment(labels: key)
else
@process_batch_latency.observe(event.duration, labels: key)
@process_messages.increment(by: message_count, labels: key)
end
end

def fetch_batch(event)
key = {
client: event.payload.fetch(:client_id),
group_id: event.payload.fetch(:group_id),
topic: event.payload.fetch(:topic),
partition: event.payload.fetch(:partition)
}
offset_lag = event.payload.fetch(:offset_lag)
batch_size = event.payload.fetch(:message_count)

@batch_size.observe(batch_size, labels: key)
@offset_lag.set(offset_lag, labels: key)
end

def join_group(event)
key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
@join_group.observe(event.duration, labels: key)

@join_group_errors.increment(labels: key) if event.payload.key?(:exception)
end

def sync_group(event)
key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
@sync_group.observe(event.duration, labels: key)

@sync_group_errors.increment(labels: key) if event.payload.key?(:exception)
end

def leave_group(event)
key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
@leave_group.observe(event.duration, labels: key)

@leave_group_errors.increment(labels: key) if event.payload.key?(:exception)
end

def pause_status(event)
key = {
client: event.payload.fetch(:client_id),
group_id: event.payload.fetch(:group_id),
topic: event.payload.fetch(:topic),
partition: event.payload.fetch(:partition)
}

duration = event.payload.fetch(:duration)
@pause_duration.set(duration, labels: key)
end
end

class ProducerSubscriber < ActiveSupport::Subscriber
def initialize
super
@produce_messages = Prometheus.registry.counter(:producer_produced_messages, docstring: 'Produced messages total', labels: [:client, :topic])
@produce_message_size =
Prometheus.registry.histogram(:producer_message_size, docstring: 'Message size', buckets: SIZE_BUCKETS, labels: [:client, :topic])
@buffer_size = Prometheus.registry.histogram(:producer_buffer_size, docstring: 'Buffer size', buckets: SIZE_BUCKETS, labels: [:client])
Comment thread
dhruvparanjape marked this conversation as resolved.
Comment thread
dhruvparanjape marked this conversation as resolved.
@buffer_fill_ratio = Prometheus.registry.histogram(:producer_buffer_fill_ratio, docstring: 'Buffer fill ratio', labels: [:client])
@buffer_fill_percentage = Prometheus.registry.histogram(:producer_buffer_fill_percentage, docstring: 'Buffer fill percentage', labels: [:client])
Comment thread
dhruvparanjape marked this conversation as resolved.
Comment thread
dhruvparanjape marked this conversation as resolved.
Comment thread
dhruvparanjape marked this conversation as resolved.
@produce_errors = Prometheus.registry.counter(:producer_produce_errors, docstring: 'Produce errors', labels: [:client, :topic])
@deliver_errors = Prometheus.registry.counter(:producer_deliver_errors, docstring: 'Deliver error', labels: [:client])
Comment thread
dhruvparanjape marked this conversation as resolved.
@deliver_latency =
Prometheus.registry.histogram(:producer_deliver_latency, docstring: 'Delivery latency', buckets: LATENCY_BUCKETS, labels: [:client])
@deliver_messages = Prometheus.registry.counter(:producer_deliver_messages, docstring: 'Total count of delivered messages', labels: [:client])
@deliver_attempts = Prometheus.registry.histogram(:producer_deliver_attempts, docstring: 'Delivery attempts', labels: [:client])
@ack_messages = Prometheus.registry.counter(:producer_ack_messages, docstring: 'Ack', labels: [:client, :topic])
@ack_delay = Prometheus.registry.histogram(:producer_ack_delay, docstring: 'Ack delay', buckets: LATENCY_BUCKETS, labels: [:client, :topic])
@ack_errors = Prometheus.registry.counter(:producer_ack_errors, docstring: 'Ack errors', labels: [:client, :topic])
end

def produce_message(event)
client = event.payload.fetch(:client_id)
key = { client: client, topic: event.payload.fetch(:topic) }

message_size = event.payload.fetch(:message_size)
buffer_size = event.payload.fetch(:buffer_size)
max_buffer_size = event.payload.fetch(:max_buffer_size)
buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
buffer_fill_percentage = buffer_fill_ratio * 100.0

# This gets us the write rate.
@produce_messages.increment(labels: key)
@produce_message_size.observe(message_size, labels: key)

# This gets us the avg/max buffer size per producer.
@buffer_size.observe(buffer_size, labels: { client: client })

# This gets us the avg/max buffer fill ratio per producer.
@buffer_fill_ratio.observe(buffer_fill_ratio, labels: { client: client })
@buffer_fill_percentage.observe(buffer_fill_percentage, labels: { client: client })
Comment thread
dhruvparanjape marked this conversation as resolved.
end

def buffer_overflow(event)
key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }
@produce_errors.increment(labels: key)
end

def deliver_messages(event)
key = { client: event.payload.fetch(:client_id) }
message_count = event.payload.fetch(:delivered_message_count)
attempts = event.payload.fetch(:attempts)

@deliver_errors.increment(labels: key) if event.payload.key?(:exception)
@deliver_latency.observe(event.duration, labels: key)

# Messages delivered to Kafka:
@deliver_messages.increment(by: message_count, labels: key)

# Number of attempts to deliver messages:
@deliver_attempts.observe(attempts, labels: key)
end

def ack_message(event)
key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

# Number of messages ACK'd for the topic.
@ack_messages.increment(labels: key)

# Histogram of delay between a message being produced and it being ACK'd.
@ack_delay.observe(event.payload.fetch(:delay), labels: key)
end

def topic_error(event)
key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

@ack_errors.increment(labels: key)
end
end

class AsyncProducerSubscriber < ActiveSupport::Subscriber
def initialize
super
@queue_size = Prometheus.registry.histogram(:async_producer_queue_size, docstring: 'Queue size', buckets: SIZE_BUCKETS, labels: [:client, :topic])
@queue_fill_ratio = Prometheus.registry.histogram(:async_producer_queue_fill_ratio, docstring: 'Queue fill ratio', labels: [:client, :topic])
@produce_errors = Prometheus.registry.counter(:async_producer_produce_errors, docstring: 'Producer errors', labels: [:client, :topic])
@dropped_messages = Prometheus.registry.counter(:async_producer_dropped_messages, docstring: 'Dropped messages', labels: [:client])
end

def enqueue_message(event)
key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

queue_size = event.payload.fetch(:queue_size)
max_queue_size = event.payload.fetch(:max_queue_size)
queue_fill_ratio = queue_size.to_f / max_queue_size.to_f

# This gets us the avg/max queue size per producer.
@queue_size.observe(queue_size, labels: key)

# This gets us the avg/max queue fill ratio per producer.
@queue_fill_ratio.observe(queue_fill_ratio, labels: key)
end

def buffer_overflow(event)
key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }
@produce_errors.increment(labels: key)
end

def drop_messages(event)
key = { client: event.payload.fetch(:client_id) }
message_count = event.payload.fetch(:message_count)
@dropped_messages.increment(by: message_count, labels: key)
end
end

class FetcherSubscriber < ActiveSupport::Subscriber
def initialize
super
@queue_size = Prometheus.registry.gauge(:fetcher_queue_size, docstring: 'Queue size', labels: [:client, :group_id])
end

def loop(event)
queue_size = event.payload.fetch(:queue_size)
client = event.payload.fetch(:client_id)
group_id = event.payload.fetch(:group_id)

@queue_size.set(queue_size, labels: { client: client, group_id: group_id })
end
end
end
end

# To enable testability, it is possible to skip the start until test time
Kafka::Prometheus.start unless defined?(PROMETHEUS_NO_AUTO_START)
2 changes: 1 addition & 1 deletion lib/kafka/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Kafka
VERSION = "0.6.17"
VERSION = "0.6.18"
end
Loading