| title | Kafka Destination | |||||||
|---|---|---|---|---|---|---|---|---|
| disable_toc | false | |||||||
| products |
|
{{< product-availability >}}
Use Observability Pipelines' Kafka destination to send logs to Kafka topics.
Common scenarios when you might use this destination:
- To route logs to the following destinations:
- Clickhouse: An open-source column-oriented database management system used for analyzing large volumes of logs.
- Snowflake: A data warehouse used for storage and query.
- Snowflake's API integration utilizes Kafka as a method to ingest logs into their platform.
- Databricks: A data lakehouse for analytics and storage.
- Azure Event Hub: An ingest and processing service in the Microsoft and Azure ecosystem.
- To route data to Kafka and use the Kafka Connect ecosystem.
- To process and normalize your data with Observability Pipelines before routing to Apache Spark with Kafka to analyze data and run machine learning workloads.
Set up the Kafka destination and its environment variables when you set up a pipeline. The information below is configured in the pipelines UI.
- Enter the identifier for your Kafka bootstrap servers. If you leave it blank, the default is used.
- Enter the name of the topic you want to send logs to.
- In the Encoding dropdown menu, select either
JSONorRaw messageas the output format.
{{< img src="observability_pipelines/destinations/kafka_settings.png" alt="The Kafka destination with sample values" style="width:30%;" >}}
{{% observability_pipelines/tls_settings %}}
- Toggle the switch to enable SASL Authentication.
- Enter the identifiers for your Kafka SASL username and password. If you leave them blank, the defaults are used.
- Select the mechanism (PLAIN, SCHRAM-SHA-256, or SCHRAM-SHA-512) in the dropdown menu.
- Toggle switch to Enable Compression.
- In the Compression Algorithm dropdown menu, select a compression algorithm (gzip, zstd, lz4, or snappy).
- (Optional) Select a Compression Level in the dropdown menu. If the level is not specified, the algorithm's default level is used.
{{% observability_pipelines/destination_buffer %}}
Click Advanced if you want to set any of the following fields:
- Message Key Field: Specify which log field contains the message key for partitioning, grouping, and ordering.
- Headers Key: Specify which log field contains your Kafka headers. If left blank, no headers are written.
- Message Timeout (ms): Local message timeout, in milliseconds. Default is
300,000 ms. - Socket Timeout (ms): Default timeout, in milliseconds, for network requests. Default is
60,000 ms. - Rate Limit Events: The maximum number of requests the Kafka client can send within the rate limit time window. Default is no rate limit.
- Rate Limit Time Window (secs): The time window used for the rate limit option.
- This setting has no effect if the rate limit for events is not set.
- Default is
1 secondif Rate Limit Events is set, but Rate Limit Time Window is not set.
- To add additional librdkafka options, click Add Option and select an option in the dropdown menu.
- Enter a value for that option.
- Check your values against the librdkafka documentation to make sure they have the correct type and are within the set range.
- Click Add Option to add another librdkafka option.
{{% observability_pipelines/set_secrets_intro %}}
{{< tabs >}} {{% tab "Secrets Management" %}}
- Kafka bootstrap servers identifier:
- References the bootstrap server that the client uses to connect to the Kafka cluster and discover all the other hosts in the cluster.
- In your secrets manager, the host and port must be entered in the format of
host:port, such as10.14.22.123:9092. If there is more than one server, use commas to separate them. - The default identifier is
DESTINATION_KAFKA_BOOTSTRAP_SERVERS.
- Kafka TLS passphrase identifier (when TLS is enabled):
- The default identifier is
DESTINATION_KAFKA_KEY_PASS.
- The default identifier is
- SASL authentication (when enabled):
- Kafka SASL username identifier:
- The default identifier is
DESTINATION_KAFKA_SASL_USERNAME.
- The default identifier is
- Kafka SASL password identifier:
- The default identifier is
DESTINATION_KAFKA_SASL_PASSWORD.
- The default identifier is
- Kafka SASL username identifier:
{{% /tab %}}
{{% tab "Environment Variables" %}}
{{< img src="observability_pipelines/destinations/kafka_env_var.png" alt="The install page showing the Kafka environment variable field" style="width:70%;" >}}
{{% observability_pipelines/configure_existing_pipelines/destination_env_vars/kafka %}}
{{% /tab %}} {{< /tabs >}}
These are the available librdkafka options:
- client.id
- queue.buffering.max_messages
- transactional.id
- enable.idempotence
- acks
See the librdkafka documentation for more information and to ensure your values have the correct type and are within range.
See the Observability Pipelines Metrics for a full list of available health metrics.
{{% observability_pipelines/metrics/component %}}
{{% observability_pipelines/metrics/buffer/destinations %}}
{{% observability_pipelines/metrics/buffer/deprecated_destination_metrics %}}
A batch of events is flushed when one of these parameters is met. See event batching for more information.
| Maximum Events | Maximum Size (MB) | Timeout (seconds) |
|---|---|---|
| 10,000 | 1 | 1 |