From d0234fec937cd4928f875678137c722ba76984b6 Mon Sep 17 00:00:00 2001 From: mats Date: Mon, 6 Apr 2026 20:43:12 +0900 Subject: [PATCH 1/7] plugins: add ZeroBus output plugin Introduce the ZeroBus output plugin for Fluent Bit, enabling integration with Databricks ZeroBus. This commit includes the necessary CMake configuration, plugin registration, and implementation files. The plugin is disabled by default and requires specific library paths to be set for proper functionality. Signed-off-by: Kazuki Matsuda Signed-off-by: mats --- cmake/plugins_options.cmake | 1 + plugins/CMakeLists.txt | 1 + plugins/out_zerobus/CMakeLists.txt | 23 ++ plugins/out_zerobus/zerobus.c | 573 +++++++++++++++++++++++++++++ plugins/out_zerobus/zerobus.h | 132 +++++++ 5 files changed, 730 insertions(+) create mode 100644 plugins/out_zerobus/CMakeLists.txt create mode 100644 plugins/out_zerobus/zerobus.c create mode 100644 plugins/out_zerobus/zerobus.h diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 7fbd0a96bd5..eed9550a20b 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -155,3 +155,4 @@ DEFINE_OPTION(FLB_OUT_TCP "Enable TCP output plugin" DEFINE_OPTION(FLB_OUT_UDP "Enable UDP output plugin" ON) DEFINE_OPTION(FLB_OUT_VIVO_EXPORTER "Enable Vivo exporter output plugin" ON) DEFINE_OPTION(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" ON) +DEFINE_OPTION(FLB_OUT_ZEROBUS "Enable Databricks ZeroBus output plugin" OFF) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 36fcc94ad12..bd9fe9beaff 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -413,6 +413,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write") REGISTER_OUT_PLUGIN("out_s3") REGISTER_OUT_PLUGIN("out_vivo_exporter") REGISTER_OUT_PLUGIN("out_chronicle") +REGISTER_OUT_PLUGIN("out_zerobus") if(FLB_ZIG) REGISTER_OUT_PLUGIN("out_zig_demo" "zig") diff --git a/plugins/out_zerobus/CMakeLists.txt b/plugins/out_zerobus/CMakeLists.txt new file mode 100644 index 00000000000..e485fc4d972 --- /dev/null +++ b/plugins/out_zerobus/CMakeLists.txt @@ -0,0 +1,23 @@ +set(src + zerobus.c) + +FLB_PLUGIN(out_zerobus "${src}" "") + +if(NOT ZEROBUS_LIB_DIR) + message(FATAL_ERROR + "ZEROBUS_LIB_DIR must be set to the directory containing libzerobus_ffi.a " + "(e.g. -DZEROBUS_LIB_DIR=/path/to/zerobus-sdk/lib/darwin_arm64)") +endif() + +target_link_libraries(flb-plugin-out_zerobus ${ZEROBUS_LIB_DIR}/libzerobus_ffi.a) + +# Platform-specific linker flags required by the Rust FFI static library +if(APPLE) + target_link_libraries(flb-plugin-out_zerobus + "-framework CoreFoundation" + "-framework Security" + -liconv) +elseif(UNIX) + target_link_libraries(flb-plugin-out_zerobus + -ldl -lpthread -lm -lresolv -lgcc_s) +endif() diff --git a/plugins/out_zerobus/zerobus.c b/plugins/out_zerobus/zerobus.c new file mode 100644 index 00000000000..32d7b546ca9 --- /dev/null +++ b/plugins/out_zerobus/zerobus.c @@ -0,0 +1,573 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "zerobus.h" + +static flb_sds_t ensure_url_scheme(const char *url) +{ + size_t url_len; + flb_sds_t out; + + if (strncmp(url, "https://", 8) == 0 || + strncmp(url, "http://", 7) == 0) { + return flb_sds_create(url); + } + + url_len = strlen(url); + out = flb_sds_create_size(url_len + 9); + if (!out) { + return NULL; + } + out = flb_sds_cat(out, "https://", 8); + if (!out) { + return NULL; + } + out = flb_sds_cat(out, url, url_len); + return out; +} + +static int format_timestamp_rfc3339nano(struct flb_time *tm, + char *buf, size_t size) +{ + struct tm gmt; + time_t sec = (time_t) tm->tm.tv_sec; + + if (!gmtime_r(&sec, &gmt)) { + return -1; + } + return snprintf(buf, size, + "%04d-%02d-%02dT%02d:%02d:%02d.%09luZ", + gmt.tm_year + 1900, gmt.tm_mon + 1, gmt.tm_mday, + gmt.tm_hour, gmt.tm_min, gmt.tm_sec, + (unsigned long) tm->tm.tv_nsec); +} + +static int key_in_log_keys(const char *key, int key_len, + struct mk_list *log_keys) +{ + struct mk_list *head; + struct flb_slist_entry *entry; + + mk_list_foreach(head, log_keys) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + if ((int) flb_sds_len(entry->str) == key_len && + memcmp(entry->str, key, key_len) == 0) { + return 1; + } + } + return 0; +} + +static inline int str_key_equals(const msgpack_object *k, + const char *name, int name_len) +{ + return k->type == MSGPACK_OBJECT_STR && + (int) k->via.str.size == name_len && + memcmp(k->via.str.ptr, name, name_len) == 0; +} + +/* Log a CResult error and free the message. Returns FLB_RETRY or FLB_ERROR. */ +static int log_cresult_error(struct flb_output_instance *ins, + CResult *r, const char *context) +{ + int ret = r->is_retryable ? FLB_RETRY : FLB_ERROR; + + flb_plg_error(ins, "%s: %s", + context, + r->error_message ? r->error_message : "unknown"); + if (r->error_message) { + zerobus_free_error_message(r->error_message); + r->error_message = NULL; + } + return ret; +} + +/* + * Convert a log event body to a JSON string for ZeroBus ingestion. + * + * Semantics match the Go plugin's recordToJSON: + * 1. Capture raw record as JSON before filtering (raw_log_key) + * 2. Apply log_keys filter (if configured) + * 3. Inject raw_log_key, time_key, _tag -- without overwriting + * + * Uses flb_mp_map_header for single-pass packing (no pre-counting). + * The caller-owned msgpack_sbuffer is reused across records. + */ +static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, + msgpack_object *body, + struct flb_time *tm, + const char *tag, int tag_len, + msgpack_sbuffer *sbuf, + int escape_unicode) +{ + int i; + int has_time_key = 0; + int has_tag_key = 0; + int has_raw_key = 0; + int time_key_len; + int raw_key_len; + int include; + char *raw_json = NULL; + char time_buf[64]; + int time_len = 0; + msgpack_packer pk; + struct flb_mp_map_header mh; + flb_sds_t json; + + if (body->type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + msgpack_object_map *map = &body->via.map; + time_key_len = (ctx->time_key) ? (int) flb_sds_len(ctx->time_key) : 0; + raw_key_len = (ctx->raw_log_key) ? (int) flb_sds_len(ctx->raw_log_key) : 0; + + /* + * Capture the full record as JSON before filtering. + * The Go plugin calls json.Marshal(m) before applying LogKeys. + */ + if (raw_key_len > 0) { + raw_json = flb_msgpack_to_json_str(0, body, escape_unicode); + if (!raw_json) { + return NULL; + } + } + + msgpack_sbuffer_clear(sbuf); + msgpack_packer_init(&pk, sbuf, msgpack_sbuffer_write); + flb_mp_map_header_init(&mh, &pk); + + /* Single pass: pack included body keys, track collision flags */ + for (i = 0; i < (int) map->size; i++) { + msgpack_object *k = &map->ptr[i].key; + + if (ctx->log_keys) { + if (k->type != MSGPACK_OBJECT_STR) { + continue; + } + include = key_in_log_keys(k->via.str.ptr, + (int) k->via.str.size, + ctx->log_keys); + if (!include) { + continue; + } + } + + flb_mp_map_header_append(&mh); + msgpack_pack_object(&pk, map->ptr[i].key); + msgpack_pack_object(&pk, map->ptr[i].val); + + if (k->type == MSGPACK_OBJECT_STR) { + if (time_key_len > 0 && + str_key_equals(k, ctx->time_key, time_key_len)) { + has_time_key = 1; + } + if (ctx->add_tag && str_key_equals(k, "_tag", 4)) { + has_tag_key = 1; + } + if (raw_key_len > 0 && + str_key_equals(k, ctx->raw_log_key, raw_key_len)) { + has_raw_key = 1; + } + } + } + + if (raw_key_len > 0 && raw_json && !has_raw_key) { + size_t rj_len = strlen(raw_json); + + flb_mp_map_header_append(&mh); + msgpack_pack_str(&pk, raw_key_len); + msgpack_pack_str_body(&pk, ctx->raw_log_key, raw_key_len); + msgpack_pack_str(&pk, rj_len); + msgpack_pack_str_body(&pk, raw_json, rj_len); + } + + if (time_key_len > 0 && !has_time_key) { + time_len = format_timestamp_rfc3339nano(tm, time_buf, sizeof(time_buf)); + if (time_len > 0) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(&pk, time_key_len); + msgpack_pack_str_body(&pk, ctx->time_key, time_key_len); + msgpack_pack_str(&pk, time_len); + msgpack_pack_str_body(&pk, time_buf, time_len); + } + } + + if (ctx->add_tag && tag_len > 0 && !has_tag_key) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(&pk, 4); + msgpack_pack_str_body(&pk, "_tag", 4); + msgpack_pack_str(&pk, tag_len); + msgpack_pack_str_body(&pk, tag, tag_len); + } + + flb_mp_map_header_end(&mh); + + json = flb_msgpack_raw_to_json_sds(sbuf->data, sbuf->size, escape_unicode); + + if (raw_json) { + flb_free(raw_json); + } + + return json; +} + +static int cb_zerobus_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + const char *tmp; + struct flb_out_zerobus *ctx; + CResult result; + CStreamConfigurationOptions opts; + + (void) config; + (void) data; + + ctx = flb_calloc(1, sizeof(struct flb_out_zerobus)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* + * URL fields need https:// prepending when no scheme is present + * (the ZeroBus SDK requires a scheme), so they are read manually. + */ + tmp = flb_output_get_property("zerobus_endpoint", ins); + if (!tmp || strlen(tmp) == 0) { + flb_plg_error(ins, "'zerobus_endpoint' is required"); + goto init_error; + } + ctx->zerobus_endpoint = ensure_url_scheme(tmp); + if (!ctx->zerobus_endpoint) { + goto init_error; + } + + tmp = flb_output_get_property("workspace_url", ins); + if (!tmp || strlen(tmp) == 0) { + flb_plg_error(ins, "'workspace_url' is required"); + goto init_error; + } + ctx->workspace_url = ensure_url_scheme(tmp); + if (!ctx->workspace_url) { + goto init_error; + } + + if (!ctx->table_name || flb_sds_len(ctx->table_name) == 0) { + flb_plg_error(ins, "'table_name' is required"); + goto init_error; + } + if (!ctx->client_id || flb_sds_len(ctx->client_id) == 0) { + flb_plg_error(ins, "'client_id' is required"); + goto init_error; + } + if (!ctx->client_secret || flb_sds_len(ctx->client_secret) == 0) { + flb_plg_error(ins, "'client_secret' is required"); + goto init_error; + } + + memset(&result, 0, sizeof(result)); + ctx->sdk = zerobus_sdk_new(ctx->zerobus_endpoint, + ctx->workspace_url, + &result); + if (!ctx->sdk || !result.success) { + log_cresult_error(ins, &result, "failed to create ZeroBus SDK"); + goto init_error; + } + + if (strncmp(ctx->zerobus_endpoint, "http://", 7) == 0) { + zerobus_sdk_set_use_tls(ctx->sdk, false); + } + + opts = zerobus_get_default_config(); + opts.record_type = ZEROBUS_RECORD_TYPE_JSON; + + memset(&result, 0, sizeof(result)); + ctx->stream = zerobus_sdk_create_stream(ctx->sdk, + ctx->table_name, + NULL, 0, + ctx->client_id, + ctx->client_secret, + &opts, + &result); + if (!ctx->stream || !result.success) { + log_cresult_error(ins, &result, "failed to create ZeroBus stream"); + zerobus_sdk_free(ctx->sdk); + ctx->sdk = NULL; + goto init_error; + } + + flb_plg_info(ins, "connected to %s, table: %s", + ctx->zerobus_endpoint, ctx->table_name); + + flb_output_set_context(ins, ctx); + return 0; + +init_error: + if (ctx->zerobus_endpoint) { + flb_sds_destroy(ctx->zerobus_endpoint); + } + if (ctx->workspace_url) { + flb_sds_destroy(ctx->workspace_url); + } + flb_free(ctx); + return -1; +} + +static void cb_zerobus_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + size_t capacity; + size_t num_records = 0; + int convert_errors = 0; + struct flb_out_zerobus *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + flb_sds_t *json_records = NULL; + flb_sds_t json; + msgpack_sbuffer sbuf; + CResult result; + int64_t offset; + size_t i; + int tag_len; + + (void) i_ins; + + tag_len = event_chunk->tag ? (int) flb_sds_len(event_chunk->tag) : 0; + + ret = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "log event decoder initialization error: %d", ret); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + capacity = event_chunk->total_events > 0 ? event_chunk->total_events : 64; + json_records = flb_malloc(sizeof(flb_sds_t) * capacity); + if (!json_records) { + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Reuse a single sbuffer across all record conversions */ + msgpack_sbuffer_init(&sbuf); + + while (flb_log_event_decoder_next(&log_decoder, + &log_event) == FLB_EVENT_DECODER_SUCCESS) { + json = record_to_json(ctx, + log_event.body, + &log_event.timestamp, + event_chunk->tag, tag_len, + &sbuf, + config->json_escape_unicode); + if (!json) { + convert_errors++; + flb_plg_warn(ctx->ins, "failed to convert record to JSON"); + continue; + } + + if (num_records == capacity) { + size_t new_cap = capacity * 2; + flb_sds_t *tmp = flb_realloc(json_records, + sizeof(flb_sds_t) * new_cap); + if (!tmp) { + flb_sds_destroy(json); + break; + } + json_records = tmp; + capacity = new_cap; + } + + json_records[num_records] = json; + num_records++; + } + + msgpack_sbuffer_destroy(&sbuf); + flb_log_event_decoder_destroy(&log_decoder); + + if (num_records == 0) { + flb_free(json_records); + if (convert_errors > 0) { + flb_plg_error(ctx->ins, + "all %d records failed conversion", convert_errors); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + FLB_OUTPUT_RETURN(FLB_OK); + } + + if (convert_errors > 0) { + flb_plg_warn(ctx->ins, + "skipped %d records due to conversion errors", + convert_errors); + } + + /* flb_sds_t is char*, so the cast to const char** is safe */ + memset(&result, 0, sizeof(result)); + offset = zerobus_stream_ingest_json_records(ctx->stream, + (const char **) json_records, + num_records, + &result); + if (!result.success) { + ret = log_cresult_error(ctx->ins, &result, "ingestion error"); + goto flush_cleanup; + } + + memset(&result, 0, sizeof(result)); + zerobus_stream_wait_for_offset(ctx->stream, offset, &result); + if (!result.success) { + ret = log_cresult_error(ctx->ins, &result, "wait_for_offset error"); + goto flush_cleanup; + } + + ret = FLB_OK; + +flush_cleanup: + for (i = 0; i < num_records; i++) { + flb_sds_destroy(json_records[i]); + } + flb_free(json_records); + + FLB_OUTPUT_RETURN(ret); +} + +static int cb_zerobus_exit(void *data, struct flb_config *config) +{ + struct flb_out_zerobus *ctx = data; + CResult result; + + (void) config; + + if (!ctx) { + return 0; + } + + if (ctx->stream) { + memset(&result, 0, sizeof(result)); + zerobus_stream_close(ctx->stream, &result); + if (!result.success && result.error_message) { + flb_plg_error(ctx->ins, "stream close error: %s", + result.error_message); + zerobus_free_error_message(result.error_message); + } + zerobus_stream_free(ctx->stream); + } + + if (ctx->sdk) { + zerobus_sdk_free(ctx->sdk); + } + + if (ctx->zerobus_endpoint) { + flb_sds_destroy(ctx->zerobus_endpoint); + } + if (ctx->workspace_url) { + flb_sds_destroy(ctx->workspace_url); + } + + flb_free(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "zerobus_endpoint", NULL, + 0, FLB_FALSE, 0, + "ZeroBus gRPC endpoint URL" + }, + { + FLB_CONFIG_MAP_STR, "workspace_url", NULL, + 0, FLB_FALSE, 0, + "Databricks workspace URL" + }, + { + FLB_CONFIG_MAP_STR, "table_name", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, table_name), + "Fully qualified table name (catalog.schema.table)" + }, + { + FLB_CONFIG_MAP_STR, "client_id", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, client_id), + "OAuth2 client ID for authentication" + }, + { + FLB_CONFIG_MAP_STR, "client_secret", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, client_secret), + "OAuth2 client secret for authentication" + }, + { + FLB_CONFIG_MAP_BOOL, "add_tag", "true", + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, add_tag), + "Add Fluent Bit tag as _tag field in each record" + }, + { + FLB_CONFIG_MAP_STR, "time_key", "_time", + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, time_key), + "Key name for the injected timestamp (RFC 3339 with nanoseconds)" + }, + { + FLB_CONFIG_MAP_CLIST, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, log_keys), + "Comma-separated list of record keys to include (all if unset)" + }, + { + FLB_CONFIG_MAP_STR, "raw_log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_zerobus, raw_log_key), + "If set, store the full original record as a JSON string under this key" + }, + {0} +}; + +struct flb_output_plugin out_zerobus_plugin = { + .name = "zerobus", + .description = "Send logs to Databricks ZeroBus", + .cb_init = cb_zerobus_init, + .cb_flush = cb_zerobus_flush, + .cb_exit = cb_zerobus_exit, + .config_map = config_map, + .event_type = FLB_OUTPUT_LOGS, + .flags = 0, + .workers = 1, +}; diff --git a/plugins/out_zerobus/zerobus.h b/plugins/out_zerobus/zerobus.h new file mode 100644 index 00000000000..09b0d333db0 --- /dev/null +++ b/plugins/out_zerobus/zerobus.h @@ -0,0 +1,132 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ZEROBUS_H +#define FLB_OUT_ZEROBUS_H + +#include +#include +#include + +#include +#include + +/* + * ZeroBus FFI declarations + * + * These types and functions are provided by the prebuilt Rust FFI static + * library (libzerobus_ffi.a). The declarations below are extracted from + * the Go SDK CGO preamble at: + * github.com/databricks/zerobus-sdk/go@v1.0.0/ffi.go + */ + +/* Opaque SDK / stream handles */ +typedef struct CZerobusSdk CZerobusSdk; +typedef struct CZerobusStream CZerobusStream; + +/* Result returned by every fallible FFI call */ +typedef struct CResult { + bool success; + char *error_message; + bool is_retryable; +} CResult; + +/* Stream configuration passed to create_stream */ +typedef struct CStreamConfigurationOptions { + uintptr_t max_inflight_requests; + bool recovery; + uint64_t recovery_timeout_ms; + uint64_t recovery_backoff_ms; + uint32_t recovery_retries; + uint64_t server_lack_of_ack_timeout_ms; + uint64_t flush_timeout_ms; + int32_t record_type; + uint64_t stream_paused_max_wait_time_ms; + bool has_stream_paused_max_wait_time_ms; + uint64_t callback_max_wait_time_ms; + bool has_callback_max_wait_time_ms; +} CStreamConfigurationOptions; + +/* Record type enum values */ +#define ZEROBUS_RECORD_TYPE_JSON 2 + +/* --- SDK lifecycle --- */ +extern CZerobusSdk *zerobus_sdk_new(const char *endpoint, + const char *unity_catalog_url, + CResult *result); +extern void zerobus_sdk_free(CZerobusSdk *sdk); +extern void zerobus_sdk_set_use_tls(CZerobusSdk *sdk, bool use_tls); + +/* --- Stream lifecycle --- */ +extern CZerobusStream *zerobus_sdk_create_stream( + CZerobusSdk *sdk, + const char *table_name, + const uint8_t *descriptor_proto_bytes, + uintptr_t descriptor_proto_len, + const char *client_id, + const char *client_secret, + const CStreamConfigurationOptions *options, + CResult *result); + +extern bool zerobus_stream_close(CZerobusStream *stream, CResult *result); +extern void zerobus_stream_free(CZerobusStream *stream); + +/* --- Ingestion --- */ +extern int64_t zerobus_stream_ingest_json_records( + CZerobusStream *stream, + const char **json_records, + uintptr_t num_records, + CResult *result); + +extern bool zerobus_stream_wait_for_offset(CZerobusStream *stream, + int64_t offset, + CResult *result); + +/* --- Utilities --- */ +extern void zerobus_free_error_message(char *error_message); +extern CStreamConfigurationOptions zerobus_get_default_config(void); + +/* ------------------------------------------------------------------ */ + +/* Plugin context */ +struct flb_out_zerobus { + /* ZeroBus handles */ + CZerobusSdk *sdk; + CZerobusStream *stream; + + /* Required config -- URL fields are built manually (need scheme fix) */ + flb_sds_t zerobus_endpoint; + flb_sds_t workspace_url; + + /* Required config -- auto-populated by config_map */ + flb_sds_t table_name; + flb_sds_t client_id; + flb_sds_t client_secret; + + /* Optional config -- auto-populated by config_map */ + int add_tag; /* FLB_TRUE / FLB_FALSE */ + flb_sds_t time_key; /* default "_time" */ + struct mk_list *log_keys; /* CLIST, NULL when unset */ + flb_sds_t raw_log_key; /* NULL when unset */ + + /* Fluent Bit instance reference (used for logging macros) */ + struct flb_output_instance *ins; +}; + +#endif /* FLB_OUT_ZEROBUS_H */ From 296caf8f6666a44da31ccaff85c29d82061735f8 Mon Sep 17 00:00:00 2001 From: mats Date: Mon, 6 Apr 2026 20:57:27 +0900 Subject: [PATCH 2/7] refactor: update ZeroBus plugin to require URL scheme for endpoint Changed the configuration property from 'zerobus_endpoint' to 'endpoint' to enforce the inclusion of a URL scheme (http:// or https://). Updated related error messages and context handling to reflect this change. The workspace_url will now automatically prepend https:// if no scheme is provided. Signed-off-by: Kazuki Matsuda Signed-off-by: mats --- plugins/out_zerobus/zerobus.c | 36 ++++++++++++++++++++--------------- plugins/out_zerobus/zerobus.h | 6 +++--- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/plugins/out_zerobus/zerobus.c b/plugins/out_zerobus/zerobus.c index 32d7b546ca9..9497e100925 100644 --- a/plugins/out_zerobus/zerobus.c +++ b/plugins/out_zerobus/zerobus.c @@ -268,16 +268,22 @@ static int cb_zerobus_init(struct flb_output_instance *ins, } /* - * URL fields need https:// prepending when no scheme is present - * (the ZeroBus SDK requires a scheme), so they are read manually. + * endpoint must include a URL scheme (https:// or http://). + * workspace_url gets https:// prepended when no scheme is present. */ - tmp = flb_output_get_property("zerobus_endpoint", ins); + tmp = flb_output_get_property("endpoint", ins); if (!tmp || strlen(tmp) == 0) { - flb_plg_error(ins, "'zerobus_endpoint' is required"); + flb_plg_error(ins, "'endpoint' is required"); goto init_error; } - ctx->zerobus_endpoint = ensure_url_scheme(tmp); - if (!ctx->zerobus_endpoint) { + if (strncmp(tmp, "https://", 8) != 0 && strncmp(tmp, "http://", 7) != 0) { + flb_plg_error(ins, + "'endpoint' must include a URL scheme " + "(e.g. https://...), got: %s", tmp); + goto init_error; + } + ctx->endpoint = flb_sds_create(tmp); + if (!ctx->endpoint) { goto init_error; } @@ -305,7 +311,7 @@ static int cb_zerobus_init(struct flb_output_instance *ins, } memset(&result, 0, sizeof(result)); - ctx->sdk = zerobus_sdk_new(ctx->zerobus_endpoint, + ctx->sdk = zerobus_sdk_new(ctx->endpoint, ctx->workspace_url, &result); if (!ctx->sdk || !result.success) { @@ -313,7 +319,7 @@ static int cb_zerobus_init(struct flb_output_instance *ins, goto init_error; } - if (strncmp(ctx->zerobus_endpoint, "http://", 7) == 0) { + if (strncmp(ctx->endpoint, "http://", 7) == 0) { zerobus_sdk_set_use_tls(ctx->sdk, false); } @@ -336,14 +342,14 @@ static int cb_zerobus_init(struct flb_output_instance *ins, } flb_plg_info(ins, "connected to %s, table: %s", - ctx->zerobus_endpoint, ctx->table_name); + ctx->endpoint, ctx->table_name); flb_output_set_context(ins, ctx); return 0; init_error: - if (ctx->zerobus_endpoint) { - flb_sds_destroy(ctx->zerobus_endpoint); + if (ctx->endpoint) { + flb_sds_destroy(ctx->endpoint); } if (ctx->workspace_url) { flb_sds_destroy(ctx->workspace_url); @@ -500,8 +506,8 @@ static int cb_zerobus_exit(void *data, struct flb_config *config) zerobus_sdk_free(ctx->sdk); } - if (ctx->zerobus_endpoint) { - flb_sds_destroy(ctx->zerobus_endpoint); + if (ctx->endpoint) { + flb_sds_destroy(ctx->endpoint); } if (ctx->workspace_url) { flb_sds_destroy(ctx->workspace_url); @@ -513,9 +519,9 @@ static int cb_zerobus_exit(void *data, struct flb_config *config) static struct flb_config_map config_map[] = { { - FLB_CONFIG_MAP_STR, "zerobus_endpoint", NULL, + FLB_CONFIG_MAP_STR, "endpoint", NULL, 0, FLB_FALSE, 0, - "ZeroBus gRPC endpoint URL" + "ZeroBus gRPC endpoint URL (must include https:// scheme)" }, { FLB_CONFIG_MAP_STR, "workspace_url", NULL, diff --git a/plugins/out_zerobus/zerobus.h b/plugins/out_zerobus/zerobus.h index 09b0d333db0..0f8484994e8 100644 --- a/plugins/out_zerobus/zerobus.h +++ b/plugins/out_zerobus/zerobus.h @@ -110,9 +110,9 @@ struct flb_out_zerobus { CZerobusSdk *sdk; CZerobusStream *stream; - /* Required config -- URL fields are built manually (need scheme fix) */ - flb_sds_t zerobus_endpoint; - flb_sds_t workspace_url; + /* Required config -- URL fields are read manually */ + flb_sds_t endpoint; /* must include https:// scheme */ + flb_sds_t workspace_url; /* https:// auto-prepended if missing */ /* Required config -- auto-populated by config_map */ flb_sds_t table_name; From 919de560e031e3610bf9c20530377156a17ef42b Mon Sep 17 00:00:00 2001 From: mats Date: Tue, 7 Apr 2026 07:41:54 +0900 Subject: [PATCH 3/7] fix: update ZeroBus plugin for platform compatibility and enhance code documentation Added a fatal error message for Windows builds to indicate that the ZeroBus plugin is not supported. Enhanced code documentation by adding comments to clarify the purpose of several functions, including URL scheme validation, timestamp formatting, and plugin initialization and cleanup processes. Signed-off-by: Kazuki Matsuda Signed-off-by: mats --- plugins/out_zerobus/CMakeLists.txt | 6 +++++- plugins/out_zerobus/zerobus.c | 17 ++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/plugins/out_zerobus/CMakeLists.txt b/plugins/out_zerobus/CMakeLists.txt index e485fc4d972..31b71a98f3e 100644 --- a/plugins/out_zerobus/CMakeLists.txt +++ b/plugins/out_zerobus/CMakeLists.txt @@ -12,7 +12,11 @@ endif() target_link_libraries(flb-plugin-out_zerobus ${ZEROBUS_LIB_DIR}/libzerobus_ffi.a) # Platform-specific linker flags required by the Rust FFI static library -if(APPLE) +if(WIN32) + message(FATAL_ERROR + "out_zerobus does not support Windows builds. " + "Disable it with -DFLB_OUT_ZEROBUS=OFF") +elseif(APPLE) target_link_libraries(flb-plugin-out_zerobus "-framework CoreFoundation" "-framework Security" diff --git a/plugins/out_zerobus/zerobus.c b/plugins/out_zerobus/zerobus.c index 9497e100925..9037c1b9e27 100644 --- a/plugins/out_zerobus/zerobus.c +++ b/plugins/out_zerobus/zerobus.c @@ -32,6 +32,7 @@ #include "zerobus.h" +/* Ensure the URL has an http:// or https:// scheme prefix. Returns a new sds string. */ static flb_sds_t ensure_url_scheme(const char *url) { size_t url_len; @@ -55,6 +56,7 @@ static flb_sds_t ensure_url_scheme(const char *url) return out; } +/* Format a flb_time as an RFC 3339 timestamp with nanosecond precision into buf. */ static int format_timestamp_rfc3339nano(struct flb_time *tm, char *buf, size_t size) { @@ -71,6 +73,7 @@ static int format_timestamp_rfc3339nano(struct flb_time *tm, (unsigned long) tm->tm.tv_nsec); } +/* Return 1 if the given key matches any entry in the log_keys list, 0 otherwise. */ static int key_in_log_keys(const char *key, int key_len, struct mk_list *log_keys) { @@ -87,6 +90,7 @@ static int key_in_log_keys(const char *key, int key_len, return 0; } +/* Return true if msgpack object k is a string equal to name. */ static inline int str_key_equals(const msgpack_object *k, const char *name, int name_len) { @@ -241,6 +245,7 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, return json; } +/* Initialize the ZeroBus output plugin: validate config, create SDK and stream. */ static int cb_zerobus_init(struct flb_output_instance *ins, struct flb_config *config, void *data) @@ -358,6 +363,7 @@ static int cb_zerobus_init(struct flb_output_instance *ins, return -1; } +/* Flush callback: convert log events to JSON and ingest them via ZeroBus. */ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -421,8 +427,16 @@ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk, flb_sds_t *tmp = flb_realloc(json_records, sizeof(flb_sds_t) * new_cap); if (!tmp) { + flb_plg_error(ctx->ins, + "realloc failed, retrying entire batch"); flb_sds_destroy(json); - break; + for (i = 0; i < num_records; i++) { + flb_sds_destroy(json_records[i]); + } + flb_free(json_records); + msgpack_sbuffer_destroy(&sbuf); + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_RETRY); } json_records = tmp; capacity = new_cap; @@ -480,6 +494,7 @@ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } +/* Cleanup callback: close the stream, free SDK resources and plugin context. */ static int cb_zerobus_exit(void *data, struct flb_config *config) { struct flb_out_zerobus *ctx = data; From 6dbccf7213cf45a81c60acec193fd3e56aec79dd Mon Sep 17 00:00:00 2001 From: mats Date: Tue, 7 Apr 2026 10:41:02 +0900 Subject: [PATCH 4/7] refactor: simplify URL scheme handling in ZeroBus plugin Updated the ZeroBus plugin to automatically prepend https:// to both the endpoint and workspace_url if no scheme is provided. Revised related comments and documentation to reflect this change, enhancing clarity for users. Signed-off-by: mats --- plugins/out_zerobus/zerobus.c | 14 ++++---------- plugins/out_zerobus/zerobus.h | 2 +- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/plugins/out_zerobus/zerobus.c b/plugins/out_zerobus/zerobus.c index 9037c1b9e27..9abdbf5fc14 100644 --- a/plugins/out_zerobus/zerobus.c +++ b/plugins/out_zerobus/zerobus.c @@ -273,21 +273,15 @@ static int cb_zerobus_init(struct flb_output_instance *ins, } /* - * endpoint must include a URL scheme (https:// or http://). - * workspace_url gets https:// prepended when no scheme is present. + * Both endpoint and workspace_url get https:// prepended when no + * scheme is present. */ tmp = flb_output_get_property("endpoint", ins); if (!tmp || strlen(tmp) == 0) { flb_plg_error(ins, "'endpoint' is required"); goto init_error; } - if (strncmp(tmp, "https://", 8) != 0 && strncmp(tmp, "http://", 7) != 0) { - flb_plg_error(ins, - "'endpoint' must include a URL scheme " - "(e.g. https://...), got: %s", tmp); - goto init_error; - } - ctx->endpoint = flb_sds_create(tmp); + ctx->endpoint = ensure_url_scheme(tmp); if (!ctx->endpoint) { goto init_error; } @@ -536,7 +530,7 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "endpoint", NULL, 0, FLB_FALSE, 0, - "ZeroBus gRPC endpoint URL (must include https:// scheme)" + "ZeroBus gRPC endpoint URL (https:// prepended if no scheme)" }, { FLB_CONFIG_MAP_STR, "workspace_url", NULL, diff --git a/plugins/out_zerobus/zerobus.h b/plugins/out_zerobus/zerobus.h index 0f8484994e8..bc67b81fdcb 100644 --- a/plugins/out_zerobus/zerobus.h +++ b/plugins/out_zerobus/zerobus.h @@ -111,7 +111,7 @@ struct flb_out_zerobus { CZerobusStream *stream; /* Required config -- URL fields are read manually */ - flb_sds_t endpoint; /* must include https:// scheme */ + flb_sds_t endpoint; /* https:// auto-prepended if missing */ flb_sds_t workspace_url; /* https:// auto-prepended if missing */ /* Required config -- auto-populated by config_map */ From 24f796ab40a65baf9ba11c97f8d89f3c5e490596 Mon Sep 17 00:00:00 2001 From: mats Date: Tue, 7 Apr 2026 12:11:34 +0900 Subject: [PATCH 5/7] feat: add ZeroBus output plugin support and enable by default Introduced the ZeroBus output plugin for Fluent Bit, allowing integration with Databricks ZeroBus. The plugin is now enabled by default in the configuration. Added CMake support to include the necessary ZeroBus FFI library setup, and enhanced documentation for clarity on usage and platform compatibility. Signed-off-by: mats --- CMakeLists.txt | 6 ++ cmake/plugins_options.cmake | 2 +- cmake/zerobus-ffi.cmake | 95 +++++++++++++++++++++++++++++ plugins/out_zerobus/CMakeLists.txt | 6 +- plugins/out_zerobus/zerobus.c | 97 ++++++++++++++++++++---------- 5 files changed, 172 insertions(+), 34 deletions(-) create mode 100644 cmake/zerobus-ffi.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b61fec3693..f5d9b178a2e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1369,6 +1369,12 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND)) FLB_OPTION(FLB_OUT_PGSQL OFF) endif() +# ZeroBus FFI +# =========== +if(FLB_OUT_ZEROBUS) + include(cmake/zerobus-ffi.cmake) +endif() + # Arrow GLib # ========== find_package(PkgConfig) diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index eed9550a20b..7c15849f22b 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -155,4 +155,4 @@ DEFINE_OPTION(FLB_OUT_TCP "Enable TCP output plugin" DEFINE_OPTION(FLB_OUT_UDP "Enable UDP output plugin" ON) DEFINE_OPTION(FLB_OUT_VIVO_EXPORTER "Enable Vivo exporter output plugin" ON) DEFINE_OPTION(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" ON) -DEFINE_OPTION(FLB_OUT_ZEROBUS "Enable Databricks ZeroBus output plugin" OFF) +DEFINE_OPTION(FLB_OUT_ZEROBUS "Enable Databricks ZeroBus output plugin" ON) diff --git a/cmake/zerobus-ffi.cmake b/cmake/zerobus-ffi.cmake new file mode 100644 index 00000000000..ca461467e02 --- /dev/null +++ b/cmake/zerobus-ffi.cmake @@ -0,0 +1,95 @@ +# Set up the ZeroBus FFI prebuilt static library. +# +# If ZEROBUS_LIB_DIR is already set by the user, that path is used as-is. +# Otherwise the official release tarball is downloaded and the correct +# platform subdirectory is selected automatically. +# +# On unsupported platforms or when the download fails, the plugin is +# disabled automatically (FLB_OUT_ZEROBUS is set to OFF). +# +# After this module runs, ZEROBUS_LIB_DIR points to the directory +# containing libzerobus_ffi.a for the current platform. + +if(ZEROBUS_LIB_DIR) + return() +endif() + +set(_ZEROBUS_URL + "https://github.com/databricks/zerobus-sdk/releases/download/ffi-v1.0.0/zerobus-ffi-1.0.0.tar.gz") +set(_ZEROBUS_SHA256 + "c38609f5bddc160b43b35f9047919b35f66375308be69a0d0d6cd20bc01cee5a") + +# Determine the platform subdirectory inside the tarball +if(CMAKE_SYSTEM_NAME STREQUAL "Linux") + if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|ARM64") + set(_ZEROBUS_PLATFORM "linux-aarch64") + elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64|AMD64") + set(_ZEROBUS_PLATFORM "linux-x86-64") + else() + message(STATUS + "ZeroBus FFI: unsupported Linux architecture '${CMAKE_SYSTEM_PROCESSOR}', " + "disabling out_zerobus. " + "To build manually, set -DZEROBUS_LIB_DIR=/path/to/lib.") + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + return() + endif() +else() + message(STATUS + "ZeroBus FFI: no prebuilt library available for ${CMAKE_SYSTEM_NAME}, " + "disabling out_zerobus. " + "To build manually, set -DZEROBUS_LIB_DIR=/path/to/lib.") + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + return() +endif() + +# Download the tarball if not already cached +set(_ZEROBUS_TARBALL "${CMAKE_BINARY_DIR}/zerobus-ffi-1.0.0.tar.gz") +if(NOT EXISTS "${_ZEROBUS_TARBALL}") + message(STATUS "ZeroBus FFI: downloading ${_ZEROBUS_URL}") + file(DOWNLOAD + "${_ZEROBUS_URL}" + "${_ZEROBUS_TARBALL}" + EXPECTED_HASH "SHA256=${_ZEROBUS_SHA256}" + SHOW_PROGRESS + STATUS _DOWNLOAD_STATUS + ) + list(GET _DOWNLOAD_STATUS 0 _DOWNLOAD_ERROR) + if(_DOWNLOAD_ERROR) + message(STATUS + "ZeroBus FFI: download failed (${_DOWNLOAD_STATUS}), " + "disabling out_zerobus. " + "To build manually, set -DZEROBUS_LIB_DIR=/path/to/lib.") + file(REMOVE "${_ZEROBUS_TARBALL}") + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + return() + endif() +endif() + +# Extract the tarball +set(_ZEROBUS_EXTRACT_DIR "${CMAKE_BINARY_DIR}") +if(NOT EXISTS "${_ZEROBUS_EXTRACT_DIR}/native") + execute_process( + COMMAND ${CMAKE_COMMAND} -E tar xzf "${_ZEROBUS_TARBALL}" + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}" + RESULT_VARIABLE _EXTRACT_RESULT + ) + if(_EXTRACT_RESULT) + message(STATUS + "ZeroBus FFI: extraction failed, disabling out_zerobus.") + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + return() + endif() +endif() + +set(ZEROBUS_LIB_DIR "${_ZEROBUS_EXTRACT_DIR}/native/${_ZEROBUS_PLATFORM}" + CACHE PATH "Path to ZeroBus FFI library directory" FORCE) + +if(NOT EXISTS "${ZEROBUS_LIB_DIR}/libzerobus_ffi.a") + message(STATUS + "ZeroBus FFI: libzerobus_ffi.a not found at ${ZEROBUS_LIB_DIR}, " + "disabling out_zerobus.") + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + return() +endif() + +message(STATUS "ZeroBus FFI library: ${ZEROBUS_LIB_DIR}/libzerobus_ffi.a") diff --git a/plugins/out_zerobus/CMakeLists.txt b/plugins/out_zerobus/CMakeLists.txt index 31b71a98f3e..4dbbd4cf26c 100644 --- a/plugins/out_zerobus/CMakeLists.txt +++ b/plugins/out_zerobus/CMakeLists.txt @@ -3,10 +3,12 @@ set(src FLB_PLUGIN(out_zerobus "${src}" "") +# ZEROBUS_LIB_DIR is set automatically by cmake/zerobus-ffi.cmake or +# can be overridden by the user via -DZEROBUS_LIB_DIR=/path/to/lib. if(NOT ZEROBUS_LIB_DIR) message(FATAL_ERROR - "ZEROBUS_LIB_DIR must be set to the directory containing libzerobus_ffi.a " - "(e.g. -DZEROBUS_LIB_DIR=/path/to/zerobus-sdk/lib/darwin_arm64)") + "ZEROBUS_LIB_DIR is not set. This should not happen when " + "FLB_OUT_ZEROBUS is ON — check that cmake/zerobus-ffi.cmake is included.") endif() target_link_libraries(flb-plugin-out_zerobus ${ZEROBUS_LIB_DIR}/libzerobus_ffi.a) diff --git a/plugins/out_zerobus/zerobus.c b/plugins/out_zerobus/zerobus.c index 9abdbf5fc14..3ea5ba2de5d 100644 --- a/plugins/out_zerobus/zerobus.c +++ b/plugins/out_zerobus/zerobus.c @@ -32,11 +32,16 @@ #include "zerobus.h" -/* Ensure the URL has an http:// or https:// scheme prefix. Returns a new sds string. */ +/* + * Prepend "https://" to url if no scheme is present. + * Returns a newly allocated sds string; caller must flb_sds_destroy it. + * Returns NULL on allocation failure. + */ static flb_sds_t ensure_url_scheme(const char *url) { size_t url_len; flb_sds_t out; + flb_sds_t tmp; if (strncmp(url, "https://", 8) == 0 || strncmp(url, "http://", 7) == 0) { @@ -48,15 +53,25 @@ static flb_sds_t ensure_url_scheme(const char *url) if (!out) { return NULL; } - out = flb_sds_cat(out, "https://", 8); - if (!out) { + tmp = flb_sds_cat(out, "https://", 8); + if (!tmp) { + flb_sds_destroy(out); + return NULL; + } + out = tmp; + tmp = flb_sds_cat(out, url, url_len); + if (!tmp) { + flb_sds_destroy(out); return NULL; } - out = flb_sds_cat(out, url, url_len); - return out; + return tmp; } -/* Format a flb_time as an RFC 3339 timestamp with nanosecond precision into buf. */ +/* + * Format tm as an RFC 3339 timestamp with nanosecond precision into buf. + * Returns the number of characters written (excluding the null terminator), + * or -1 if gmtime_r fails. + */ static int format_timestamp_rfc3339nano(struct flb_time *tm, char *buf, size_t size) { @@ -73,7 +88,10 @@ static int format_timestamp_rfc3339nano(struct flb_time *tm, (unsigned long) tm->tm.tv_nsec); } -/* Return 1 if the given key matches any entry in the log_keys list, 0 otherwise. */ +/* + * Return 1 if key (with length key_len) matches any entry in log_keys, + * 0 otherwise. + */ static int key_in_log_keys(const char *key, int key_len, struct mk_list *log_keys) { @@ -90,7 +108,10 @@ static int key_in_log_keys(const char *key, int key_len, return 0; } -/* Return true if msgpack object k is a string equal to name. */ +/* + * Return 1 if msgpack object k is a string of length name_len equal to name, + * 0 otherwise. + */ static inline int str_key_equals(const msgpack_object *k, const char *name, int name_len) { @@ -99,7 +120,10 @@ static inline int str_key_equals(const msgpack_object *k, memcmp(k->via.str.ptr, name, name_len) == 0; } -/* Log a CResult error and free the message. Returns FLB_RETRY or FLB_ERROR. */ +/* + * Log an error from a failed CResult and free its error_message. + * Returns FLB_RETRY if is_retryable, FLB_ERROR otherwise. + */ static int log_cresult_error(struct flb_output_instance *ins, CResult *r, const char *context) { @@ -118,10 +142,9 @@ static int log_cresult_error(struct flb_output_instance *ins, /* * Convert a log event body to a JSON string for ZeroBus ingestion. * - * Semantics match the Go plugin's recordToJSON: - * 1. Capture raw record as JSON before filtering (raw_log_key) - * 2. Apply log_keys filter (if configured) - * 3. Inject raw_log_key, time_key, _tag -- without overwriting + * Matches the Go plugin's recordToJSON: applies log_keys filter, then + * injects raw_log_key (full pre-filter record), time_key, and _tag + * without overwriting existing keys. * * Uses flb_mp_map_header for single-pass packing (no pre-counting). * The caller-owned msgpack_sbuffer is reused across records. @@ -142,7 +165,6 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, int include; char *raw_json = NULL; char time_buf[64]; - int time_len = 0; msgpack_packer pk; struct flb_mp_map_header mh; flb_sds_t json; @@ -155,17 +177,6 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, time_key_len = (ctx->time_key) ? (int) flb_sds_len(ctx->time_key) : 0; raw_key_len = (ctx->raw_log_key) ? (int) flb_sds_len(ctx->raw_log_key) : 0; - /* - * Capture the full record as JSON before filtering. - * The Go plugin calls json.Marshal(m) before applying LogKeys. - */ - if (raw_key_len > 0) { - raw_json = flb_msgpack_to_json_str(0, body, escape_unicode); - if (!raw_json) { - return NULL; - } - } - msgpack_sbuffer_clear(sbuf); msgpack_packer_init(&pk, sbuf, msgpack_sbuffer_write); flb_mp_map_header_init(&mh, &pk); @@ -205,8 +216,21 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, } } - if (raw_key_len > 0 && raw_json && !has_raw_key) { - size_t rj_len = strlen(raw_json); + if (raw_key_len > 0 && !has_raw_key) { + size_t rj_len; + + /* + * Serialize the original (pre-filter) body only when the key is + * absent. Deferring to here avoids a full serialize+discard on + * every record that already carries the field. + * body is unchanged by the loop above, so the result is identical + * to capturing it before filtering (matching Go's json.Marshal(m)). + */ + raw_json = flb_msgpack_to_json_str(0, body, escape_unicode); + if (!raw_json) { + return NULL; + } + rj_len = strlen(raw_json); flb_mp_map_header_append(&mh); msgpack_pack_str(&pk, raw_key_len); @@ -216,7 +240,7 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, } if (time_key_len > 0 && !has_time_key) { - time_len = format_timestamp_rfc3339nano(tm, time_buf, sizeof(time_buf)); + int time_len = format_timestamp_rfc3339nano(tm, time_buf, sizeof(time_buf)); if (time_len > 0) { flb_mp_map_header_append(&mh); msgpack_pack_str(&pk, time_key_len); @@ -245,7 +269,10 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx, return json; } -/* Initialize the ZeroBus output plugin: validate config, create SDK and stream. */ +/* + * Plugin init callback: validate required config, then create the ZeroBus + * SDK handle and stream. Returns 0 on success, -1 on failure. + */ static int cb_zerobus_init(struct flb_output_instance *ins, struct flb_config *config, void *data) @@ -357,7 +384,12 @@ static int cb_zerobus_init(struct flb_output_instance *ins, return -1; } -/* Flush callback: convert log events to JSON and ingest them via ZeroBus. */ +/* + * Plugin flush callback: decode incoming log events, convert each to JSON, + * and ingest the batch via ZeroBus. Waits for server-side acknowledgment + * before returning. Returns FLB_OK, FLB_RETRY, or FLB_ERROR via + * FLB_OUTPUT_RETURN. + */ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -488,7 +520,10 @@ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } -/* Cleanup callback: close the stream, free SDK resources and plugin context. */ +/* + * Plugin exit callback: close the ZeroBus stream, free the SDK handle, + * and release the plugin context. Returns 0. + */ static int cb_zerobus_exit(void *data, struct flb_config *config) { struct flb_out_zerobus *ctx = data; From 5843a3dff399eec1aea378ab98ab18e2b3cbc3dd Mon Sep 17 00:00:00 2001 From: mats Date: Tue, 7 Apr 2026 12:37:15 +0900 Subject: [PATCH 6/7] refactor: update ZeroBus FFI CMake configuration for improved library handling Enhanced the CMake configuration for the ZeroBus FFI by introducing a new variable, ZEROBUS_LIB_FILE, to specify the full path to the static library. Updated related logic to ensure proper handling of library paths across different platforms, including improved error messaging for unsupported architectures. This change aims to streamline the build process and enhance clarity in library management. Signed-off-by: mats --- cmake/zerobus-ffi.cmake | 30 +++++++++++++++++++++++++----- plugins/out_zerobus/CMakeLists.txt | 13 ++++++------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/cmake/zerobus-ffi.cmake b/cmake/zerobus-ffi.cmake index ca461467e02..9b841fc2e1c 100644 --- a/cmake/zerobus-ffi.cmake +++ b/cmake/zerobus-ffi.cmake @@ -7,10 +7,14 @@ # On unsupported platforms or when the download fails, the plugin is # disabled automatically (FLB_OUT_ZEROBUS is set to OFF). # -# After this module runs, ZEROBUS_LIB_DIR points to the directory -# containing libzerobus_ffi.a for the current platform. +# After this module runs: +# ZEROBUS_LIB_DIR — directory containing the static library +# ZEROBUS_LIB_FILE — full path to the static library if(ZEROBUS_LIB_DIR) + set(ZEROBUS_LIB_FILE + "${ZEROBUS_LIB_DIR}/${CMAKE_STATIC_LIBRARY_PREFIX}zerobus_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}" + CACHE FILEPATH "Full path to ZeroBus FFI static library" FORCE) return() endif() @@ -33,6 +37,17 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux") FLB_OPTION(FLB_OUT_ZEROBUS OFF) return() endif() +elseif(CMAKE_SYSTEM_NAME STREQUAL "Windows") + if(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64|AMD64") + set(_ZEROBUS_PLATFORM "windows-x86-64") + else() + message(STATUS + "ZeroBus FFI: unsupported Windows architecture '${CMAKE_SYSTEM_PROCESSOR}', " + "disabling out_zerobus. " + "To build manually, set -DZEROBUS_LIB_DIR=/path/to/lib.") + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + return() + endif() else() message(STATUS "ZeroBus FFI: no prebuilt library available for ${CMAKE_SYSTEM_NAME}, " @@ -42,6 +57,9 @@ else() return() endif() +set(_ZEROBUS_LIB_FILENAME + "${CMAKE_STATIC_LIBRARY_PREFIX}zerobus_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}") + # Download the tarball if not already cached set(_ZEROBUS_TARBALL "${CMAKE_BINARY_DIR}/zerobus-ffi-1.0.0.tar.gz") if(NOT EXISTS "${_ZEROBUS_TARBALL}") @@ -83,13 +101,15 @@ endif() set(ZEROBUS_LIB_DIR "${_ZEROBUS_EXTRACT_DIR}/native/${_ZEROBUS_PLATFORM}" CACHE PATH "Path to ZeroBus FFI library directory" FORCE) +set(ZEROBUS_LIB_FILE "${ZEROBUS_LIB_DIR}/${_ZEROBUS_LIB_FILENAME}" + CACHE FILEPATH "Full path to ZeroBus FFI static library" FORCE) -if(NOT EXISTS "${ZEROBUS_LIB_DIR}/libzerobus_ffi.a") +if(NOT EXISTS "${ZEROBUS_LIB_FILE}") message(STATUS - "ZeroBus FFI: libzerobus_ffi.a not found at ${ZEROBUS_LIB_DIR}, " + "ZeroBus FFI: ${_ZEROBUS_LIB_FILENAME} not found at ${ZEROBUS_LIB_DIR}, " "disabling out_zerobus.") FLB_OPTION(FLB_OUT_ZEROBUS OFF) return() endif() -message(STATUS "ZeroBus FFI library: ${ZEROBUS_LIB_DIR}/libzerobus_ffi.a") +message(STATUS "ZeroBus FFI library: ${ZEROBUS_LIB_FILE}") diff --git a/plugins/out_zerobus/CMakeLists.txt b/plugins/out_zerobus/CMakeLists.txt index 4dbbd4cf26c..c10a95a9fd0 100644 --- a/plugins/out_zerobus/CMakeLists.txt +++ b/plugins/out_zerobus/CMakeLists.txt @@ -3,21 +3,20 @@ set(src FLB_PLUGIN(out_zerobus "${src}" "") -# ZEROBUS_LIB_DIR is set automatically by cmake/zerobus-ffi.cmake or +# ZEROBUS_LIB_FILE is set automatically by cmake/zerobus-ffi.cmake or # can be overridden by the user via -DZEROBUS_LIB_DIR=/path/to/lib. -if(NOT ZEROBUS_LIB_DIR) +if(NOT ZEROBUS_LIB_FILE) message(FATAL_ERROR - "ZEROBUS_LIB_DIR is not set. This should not happen when " + "ZEROBUS_LIB_FILE is not set. This should not happen when " "FLB_OUT_ZEROBUS is ON — check that cmake/zerobus-ffi.cmake is included.") endif() -target_link_libraries(flb-plugin-out_zerobus ${ZEROBUS_LIB_DIR}/libzerobus_ffi.a) +target_link_libraries(flb-plugin-out_zerobus "${ZEROBUS_LIB_FILE}") # Platform-specific linker flags required by the Rust FFI static library if(WIN32) - message(FATAL_ERROR - "out_zerobus does not support Windows builds. " - "Disable it with -DFLB_OUT_ZEROBUS=OFF") + target_link_libraries(flb-plugin-out_zerobus + ws2_32 ntdll userenv advapi32 bcrypt) elseif(APPLE) target_link_libraries(flb-plugin-out_zerobus "-framework CoreFoundation" From 4729836e2c8bd20aa88116e6db0f8182eeb49bba Mon Sep 17 00:00:00 2001 From: mats Date: Tue, 7 Apr 2026 12:56:31 +0900 Subject: [PATCH 7/7] fix: enhance ZeroBus FFI CMake configuration for library validation Added a check to verify the existence of the ZeroBus FFI library file, providing a status message if the library is not found. Updated platform detection logic to include case variations for ARM architecture. Improved extraction condition to ensure the correct library file is checked during the build process. Signed-off-by: mats --- cmake/zerobus-ffi.cmake | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cmake/zerobus-ffi.cmake b/cmake/zerobus-ffi.cmake index 9b841fc2e1c..442e184a0a0 100644 --- a/cmake/zerobus-ffi.cmake +++ b/cmake/zerobus-ffi.cmake @@ -11,10 +11,20 @@ # ZEROBUS_LIB_DIR — directory containing the static library # ZEROBUS_LIB_FILE — full path to the static library +set(_ZEROBUS_LIB_FILENAME + "${CMAKE_STATIC_LIBRARY_PREFIX}zerobus_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}") + if(ZEROBUS_LIB_DIR) set(ZEROBUS_LIB_FILE - "${ZEROBUS_LIB_DIR}/${CMAKE_STATIC_LIBRARY_PREFIX}zerobus_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}" + "${ZEROBUS_LIB_DIR}/${_ZEROBUS_LIB_FILENAME}" CACHE FILEPATH "Full path to ZeroBus FFI static library" FORCE) + if(NOT EXISTS "${ZEROBUS_LIB_FILE}") + message(STATUS + "ZeroBus FFI: library not found at ${ZEROBUS_LIB_FILE}, " + "disabling out_zerobus.") + unset(ZEROBUS_LIB_FILE CACHE) + FLB_OPTION(FLB_OUT_ZEROBUS OFF) + endif() return() endif() @@ -25,9 +35,9 @@ set(_ZEROBUS_SHA256 # Determine the platform subdirectory inside the tarball if(CMAKE_SYSTEM_NAME STREQUAL "Linux") - if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|ARM64") + if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64|arm64|ARM64|AARCH64)$") set(_ZEROBUS_PLATFORM "linux-aarch64") - elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64|AMD64") + elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "^(x86_64|AMD64)$") set(_ZEROBUS_PLATFORM "linux-x86-64") else() message(STATUS @@ -38,7 +48,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux") return() endif() elseif(CMAKE_SYSTEM_NAME STREQUAL "Windows") - if(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64|AMD64") + if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(x86_64|AMD64)$") set(_ZEROBUS_PLATFORM "windows-x86-64") else() message(STATUS @@ -57,9 +67,6 @@ else() return() endif() -set(_ZEROBUS_LIB_FILENAME - "${CMAKE_STATIC_LIBRARY_PREFIX}zerobus_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}") - # Download the tarball if not already cached set(_ZEROBUS_TARBALL "${CMAKE_BINARY_DIR}/zerobus-ffi-1.0.0.tar.gz") if(NOT EXISTS "${_ZEROBUS_TARBALL}") @@ -85,7 +92,7 @@ endif() # Extract the tarball set(_ZEROBUS_EXTRACT_DIR "${CMAKE_BINARY_DIR}") -if(NOT EXISTS "${_ZEROBUS_EXTRACT_DIR}/native") +if(NOT EXISTS "${_ZEROBUS_EXTRACT_DIR}/native/${_ZEROBUS_PLATFORM}/${_ZEROBUS_LIB_FILENAME}") execute_process( COMMAND ${CMAKE_COMMAND} -E tar xzf "${_ZEROBUS_TARBALL}" WORKING_DIRECTORY "${CMAKE_BINARY_DIR}"