diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 45fe4230f7e..05e2fc1c8e2 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -23,6 +23,8 @@ #include #include +struct flb_input_instance; + /* Lib engine status */ #define FLB_LIB_ERROR -1 #define FLB_LIB_NONE 0 @@ -70,6 +72,16 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void *, size_t, void *), void *out_callback_data, void *test_ctx); +FLB_EXPORT int flb_output_set_test_with_ctx_callback( + flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_callback) (void *, int, int, + void *, size_t, void *), + void *out_callback_data, + void *test_ctx, + void *(*test_ctx_callback) ( + struct flb_config *, + struct flb_input_instance *, + void *, void *)); FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)); FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 90827b584db..0e853fa2844 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -154,9 +154,24 @@ struct flb_test_out_formatter { */ void *rt_data; - /* optional context for flush callback */ + /* optional context for "flush context callback" */ void *flush_ctx; + /* + * Callback + * ========= + * Optional "flush context callback": it references the function that extracts + * optional flush context for "formatter callback". + */ + void *(*flush_ctx_callback) (/* Fluent Bit context */ + struct flb_config *, + /* plugin that ingested the records */ + struct flb_input_instance *, + /* plugin instance context */ + void *plugin_context, + /* context for "flush context callback" */ + void *flush_ctx); + /* * Callback * ========= diff --git a/include/fluent-bit/flb_sds.h b/include/fluent-bit/flb_sds.h index b430829be3a..5f10de9cd67 100644 --- a/include/fluent-bit/flb_sds.h +++ b/include/fluent-bit/flb_sds.h @@ -35,6 +35,12 @@ typedef char *flb_sds_t; +struct flb_sds_view { + const char *buf; + size_t len; +}; +typedef struct flb_sds_view flb_sds_view_t; + #pragma pack(push, 1) struct flb_sds { uint64_t len; /* used */ @@ -95,8 +101,33 @@ static inline int flb_sds_casecmp(flb_sds_t s, const char *str, int len) return strncasecmp(s, str, len); } +static inline flb_sds_view_t flb_sds_view_create(const char *str, size_t len) +{ + flb_sds_view_t view; + + view.buf = str; + view.len = len; + + return view; +} + +static inline flb_sds_view_t flb_sds_view_create_from_sds(flb_sds_t s) +{ + return flb_sds_view_create(s, flb_sds_len(s)); +} + +static inline int flb_sds_view_is_empty(flb_sds_view_t view) +{ + if (view.len == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + flb_sds_t flb_sds_create(const char *str); flb_sds_t flb_sds_create_len(const char *str, int len); +flb_sds_t flb_sds_create_from_view(flb_sds_view_t view); flb_sds_t flb_sds_create_size(size_t size); int flb_sds_trim(flb_sds_t s); flb_sds_t flb_sds_cat(flb_sds_t s, const char *str, int len); diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 459da80cbe5..d2d3e71c195 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -44,11 +44,219 @@ struct flb_output_plugin out_es_plugin; static int es_pack_array_content(msgpack_packer *tmp_pck, msgpack_object array, - struct flb_elasticsearch *ctx); + int replace_dots); + +static flb_sds_view_t es_get_property_view(const char *property, + struct flb_upstream_node *node, + struct flb_elasticsearch *ctx) +{ + /* + * Lifetime strategy: + * + * This helper always returns a borrowed view over memory owned by either: + * + * - the upstream node property table, or + * - the output instance property table. + * + * The returned view must be consumed within the current flush/config call + * path and must never be stored across calls or freed by the caller. + * + * Precedence strategy: + * - node property (if present) + * - output instance property + * - empty view + */ + const char *value; + + if (node != NULL) { + value = flb_upstream_node_get_property(property, node); + if (value != NULL) { + return flb_sds_view_create(value, strlen(value)); + } + } + + value = flb_output_get_property(property, ctx->ins); + if (value != NULL) { + return flb_sds_view_create(value, strlen(value)); + } + + return flb_sds_view_create("", 0); +} + +static int es_get_property_bool(const char *property, + struct flb_upstream_node *node, + struct flb_es_node_ctx *node_ctx, + int base_val) +{ + /* + * Lifetime/ownership strategy: + * + * This helper returns a copied scalar value (int), so there is no borrowed + * lifetime to manage at the call site. + * + * Resolution strategy: + * - explicit value cached in node_ctx (for fast/validated booleans), + * - raw node property value parsed as bool, + * - plugin/base value fallback. + */ + const char *value; + int ret; + + if (node_ctx != NULL) { + if (strcmp(property, "logstash_format") == 0 && + node_ctx->has_logstash_format == FLB_TRUE) { + return node_ctx->logstash_format; + } + if (strcmp(property, "suppress_type_name") == 0 && + node_ctx->has_suppress_type_name == FLB_TRUE) { + return node_ctx->suppress_type_name; + } + if (strcmp(property, "replace_dots") == 0 && + node_ctx->has_replace_dots == FLB_TRUE) { + return node_ctx->replace_dots; + } + if (strcmp(property, "current_time_index") == 0 && + node_ctx->has_current_time_index == FLB_TRUE) { + return node_ctx->current_time_index; + } + if (strcmp(property, "generate_id") == 0 && + node_ctx->has_generate_id == FLB_TRUE) { + return node_ctx->generate_id; + } +#ifdef FLB_HAVE_AWS + if (strcmp(property, "aws_auth") == 0 && + node_ctx->has_aws_auth_override == FLB_TRUE) { + return node_ctx->has_aws_auth; + } +#endif + } + + if (node != NULL) { + value = flb_upstream_node_get_property(property, node); + if (value != NULL) { + ret = flb_utils_bool(value); + if (ret != -1) { + return ret; + } + } + } + + return base_val; +} + +static size_t es_get_property_size(const char *property, + struct flb_upstream_node *node, + size_t base_val) +{ + const char *value; + int64_t ret; + + if (node != NULL) { + value = flb_upstream_node_get_property(property, node); + if (value != NULL) { + ret = flb_utils_size_to_bytes(value); + if (ret >= 0) { + return (size_t) ret; + } + if (ret == -1) { + return 0; + } + } + } + + return base_val; +} + +static int es_get_property_compress(struct flb_upstream_node *node, + int base_val) +{ + const char *value; + + if (node != NULL) { + value = flb_upstream_node_get_property("compress", node); + if (value != NULL) { + if (strcasecmp(value, "gzip") == 0) { + return FLB_TRUE; + } + return FLB_FALSE; + } + } + + return base_val; +} + +static const char *es_get_action_from_write_operation(flb_sds_view_t write_operation, + const char *base_action) +{ + if (flb_sds_view_is_empty(write_operation)) { + return base_action; + } + + if (flb_sds_casecmp(write_operation.buf, FLB_ES_WRITE_OP_INDEX, + write_operation.len) == 0) { + return FLB_ES_WRITE_OP_INDEX; + } + + if (flb_sds_casecmp(write_operation.buf, FLB_ES_WRITE_OP_CREATE, + write_operation.len) == 0) { + return FLB_ES_WRITE_OP_CREATE; + } + + if (flb_sds_casecmp(write_operation.buf, FLB_ES_WRITE_OP_UPDATE, + write_operation.len) == 0 || + flb_sds_casecmp(write_operation.buf, FLB_ES_WRITE_OP_UPSERT, + write_operation.len) == 0) { + return FLB_ES_WRITE_OP_UPDATE; + } + + return base_action; +} + +static flb_sds_t es_compose_bulk_uri(struct flb_elasticsearch *ctx, + struct flb_upstream_node *node) +{ + flb_sds_view_t path; + flb_sds_view_t pipeline; + flb_sds_t uri; + + path = es_get_property_view("path", node, ctx); + pipeline = es_get_property_view("pipeline", node, ctx); + + if (flb_sds_view_is_empty(path)) { + path = flb_sds_view_create("", 0); + } + + if (!flb_sds_view_is_empty(pipeline)) { + uri = flb_sds_create_size(path.len + pipeline.len + 19); + if (uri == NULL) { + flb_errno(); + return NULL; + } + + uri = flb_sds_printf(&uri, "%.*s/_bulk?pipeline=%.*s", + (int) path.len, path.buf, + (int) pipeline.len, pipeline.buf); + } + else { + uri = flb_sds_create_size(path.len + 8); + if (uri == NULL) { + flb_errno(); + return NULL; + } + + uri = flb_sds_printf(&uri, "%.*s/_bulk", + (int) path.len, path.buf); + } + + return uri; +} #ifdef FLB_HAVE_AWS static flb_sds_t add_aws_auth(struct flb_http_client *c, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch *ctx, + struct flb_aws_provider *provider, + const char *region, + const char *service_name) { flb_sds_t signature = NULL; int ret; @@ -66,9 +274,9 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c, flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21); signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL), - ctx->aws_region, ctx->aws_service_name, + region, service_name, S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers, - ctx->aws_provider); + provider); if (!signature) { flb_plg_error(ctx->ins, "could not sign request with sigv4"); return NULL; @@ -79,7 +287,7 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c, static int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + int replace_dots) { int i; char *ptr_key = NULL; @@ -128,7 +336,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -153,7 +361,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, replace_dots); } /* * The value can be any data type, if it's an array we need to @@ -161,7 +369,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ else if (v->type == MSGPACK_OBJECT_ARRAY) { msgpack_pack_array(tmp_pck, v->via.array.size); - es_pack_array_content(tmp_pck, *v, ctx); + es_pack_array_content(tmp_pck, *v, replace_dots); } else { msgpack_pack_object(tmp_pck, *v); @@ -176,7 +384,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ static int es_pack_array_content(msgpack_packer *tmp_pck, msgpack_object array, - struct flb_elasticsearch *ctx) + int replace_dots) { int i; msgpack_object *e; @@ -186,12 +394,12 @@ static int es_pack_array_content(msgpack_packer *tmp_pck, if (e->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, e->via.map.size); - es_pack_map_content(tmp_pck, *e, ctx); + es_pack_map_content(tmp_pck, *e, replace_dots); } else if (e->type == MSGPACK_OBJECT_ARRAY) { msgpack_pack_array(tmp_pck, e->via.array.size); - es_pack_array_content(tmp_pck, *e, ctx); + es_pack_array_content(tmp_pck, *e, replace_dots); } else { @@ -207,19 +415,21 @@ static int es_pack_array_content(msgpack_packer *tmp_pck, * If it failed, return NULL. */ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx, + struct flb_record_accessor *ra_id_key, + flb_sds_view_t id_key_name, msgpack_object *map) { struct flb_ra_value *rval = NULL; flb_sds_t tmp_str; - rval = flb_ra_get_value_object(ctx->ra_id_key, *map); + rval = flb_ra_get_value_object(ra_id_key, *map); if (rval == NULL) { - flb_plg_warn(ctx->ins, "the value of %s is missing", - ctx->id_key); + flb_plg_warn(ctx->ins, "the value of %.*s is missing", + (int) id_key_name.len, id_key_name.buf); return NULL; } else if(rval->o.type != MSGPACK_OBJECT_STR) { - flb_plg_warn(ctx->ins, "the value of %s is not string", - ctx->id_key); + flb_plg_warn(ctx->ins, "the value of %.*s is not string", + (int) id_key_name.len, id_key_name.buf); flb_ra_key_value_destroy(rval); return NULL; } @@ -235,10 +445,11 @@ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx, return tmp_str; } -static int compose_index_header(struct flb_elasticsearch *ctx, - int es_index_custom_len, +static int compose_index_header(int es_index_custom_len, char *logstash_index, size_t logstash_index_size, - char *separator_str, + flb_sds_view_t logstash_prefix, + flb_sds_view_t separator, + flb_sds_view_t dateformat, struct tm *tm) { int ret; @@ -250,20 +461,20 @@ static int compose_index_header(struct flb_elasticsearch *ctx, if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { - p = logstash_index + flb_sds_len(ctx->logstash_prefix); + p = logstash_index + logstash_prefix.len; } len = p - logstash_index; - ret = snprintf(p, logstash_index_size - len, "%s", - separator_str); + ret = snprintf(p, logstash_index_size - len, "%.*s", + (int) separator.len, separator.buf); if (ret > logstash_index_size - len) { /* exceed limit */ return -1; } - p += strlen(separator_str); - len += strlen(separator_str); + p += separator.len; + len += separator.len; s = strftime(p, logstash_index_size - len, - ctx->logstash_dateformat, tm); + dateformat.buf, tm); if (s==0) { /* exceed limit */ return -1; @@ -317,9 +528,29 @@ static int elasticsearch_format(struct flb_config *config, msgpack_packer tmp_pck; uint16_t hash[8]; int es_index_custom_len; + int logstash_format; + int suppress_type_name; + int replace_dots; + int current_time_index; + int generate_id; + int write_op_update; + int write_op_upsert; struct flb_elasticsearch *ctx = plugin_context; + struct flb_upstream_node *node = flush_ctx; + struct flb_es_node_ctx *node_ctx; + struct flb_record_accessor *ra_prefix_key; + struct flb_record_accessor *ra_id_key; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + flb_sds_view_t index; + flb_sds_view_t id_key_name; + flb_sds_view_t write_operation; + flb_sds_view_t logstash_prefix; + flb_sds_view_t logstash_prefix_separator; + flb_sds_view_t logstash_dateformat; + flb_sds_view_t type_name; + const char *es_action; + flb_sds_t v; j_index = flb_sds_create_size(ES_BULK_HEADER); if (j_index == NULL) { @@ -345,12 +576,95 @@ static int elasticsearch_format(struct flb_config *config, return -1; } + node_ctx = NULL; + if (node != NULL) { + node_ctx = flb_upstream_node_get_data(node); + } + + logstash_format = es_get_property_bool("logstash_format", node, node_ctx, + ctx->logstash_format); + suppress_type_name = es_get_property_bool("suppress_type_name", node, node_ctx, + ctx->suppress_type_name); + replace_dots = es_get_property_bool("replace_dots", node, node_ctx, + ctx->replace_dots); + current_time_index = es_get_property_bool("current_time_index", node, node_ctx, + ctx->current_time_index); + generate_id = es_get_property_bool("generate_id", node, node_ctx, ctx->generate_id); + + write_operation = es_get_property_view("write_operation", node, ctx); + es_action = es_get_action_from_write_operation(write_operation, ctx->es_action); + + write_op_update = FLB_FALSE; + write_op_upsert = FLB_FALSE; + if (!flb_sds_view_is_empty(write_operation)) { + if (flb_sds_casecmp(write_operation.buf, FLB_ES_WRITE_OP_UPDATE, + write_operation.len) == 0) { + write_op_update = FLB_TRUE; + } + else if (flb_sds_casecmp(write_operation.buf, FLB_ES_WRITE_OP_UPSERT, + write_operation.len) == 0) { + write_op_upsert = FLB_TRUE; + } + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) { + write_op_update = FLB_TRUE; + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + write_op_upsert = FLB_TRUE; + } + + ra_prefix_key = ctx->ra_prefix_key; + if (node_ctx != NULL && node_ctx->ra_prefix_key != NULL) { + ra_prefix_key = node_ctx->ra_prefix_key; + } + + ra_id_key = ctx->ra_id_key; + if (node_ctx != NULL && node_ctx->ra_id_key != NULL) { + ra_id_key = node_ctx->ra_id_key; + } + + id_key_name = es_get_property_view("id_key", node, ctx); + if (flb_sds_view_is_empty(id_key_name) && ctx->id_key != NULL) { + id_key_name = flb_sds_view_create(ctx->id_key, flb_sds_len(ctx->id_key)); + } + + logstash_prefix = es_get_property_view("logstash_prefix", node, ctx); + if (flb_sds_view_is_empty(logstash_prefix) && ctx->logstash_prefix != NULL) { + logstash_prefix = flb_sds_view_create(ctx->logstash_prefix, + flb_sds_len(ctx->logstash_prefix)); + } + + logstash_prefix_separator = es_get_property_view("logstash_prefix_separator", + node, ctx); + if (flb_sds_view_is_empty(logstash_prefix_separator) && + ctx->logstash_prefix_separator != NULL) { + logstash_prefix_separator = flb_sds_view_create(ctx->logstash_prefix_separator, + flb_sds_len(ctx->logstash_prefix_separator)); + } + + logstash_dateformat = es_get_property_view("logstash_dateformat", node, ctx); + if (flb_sds_view_is_empty(logstash_dateformat) && + ctx->logstash_dateformat != NULL) { + logstash_dateformat = flb_sds_view_create(ctx->logstash_dateformat, + flb_sds_len(ctx->logstash_dateformat)); + } + /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index)); + if (logstash_format == FLB_TRUE) { + strncpy(logstash_index, logstash_prefix.buf, sizeof(logstash_index)); logstash_index[sizeof(logstash_index) - 1] = '\0'; } + index = es_get_property_view("index", node, ctx); + if (flb_sds_view_is_empty(index) && ctx->index != NULL) { + index = flb_sds_view_create(ctx->index, strlen(ctx->index)); + } + + type_name = es_get_property_view("type", node, ctx); + if (flb_sds_view_is_empty(type_name) && ctx->type != NULL) { + type_name = flb_sds_view_create(ctx->type, strlen(ctx->type)); + } + /* * If logstash format and id generation are disabled, pre-generate * the index line for all records. @@ -358,25 +672,25 @@ static int elasticsearch_format(struct flb_config *config, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (logstash_format == FLB_FALSE && generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + index.buf, &tm); es_index = index_formatted; - if (ctx->suppress_type_name) { + if (suppress_type_name == FLB_TRUE) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_WITHOUT_TYPE, - ctx->es_action, + es_action, es_index); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT, - ctx->es_action, - es_index, ctx->type); + es_action, + es_index, type_name.buf); } } @@ -386,7 +700,7 @@ static int elasticsearch_format(struct flb_config *config, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -396,7 +710,7 @@ static int elasticsearch_format(struct flb_config *config, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (current_time_index == FLB_FALSE) { flb_time_copy(&tms, &log_event.timestamp); } @@ -404,10 +718,9 @@ static int elasticsearch_format(struct flb_config *config, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key) { - flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, - (char *) tag, tag_len, - map, NULL); + if (ra_prefix_key != NULL) { + v = flb_ra_translate(ra_prefix_key, (char *) tag, tag_len, + map, NULL); if (v) { len = flb_sds_len(v); if (len > 128) { @@ -454,40 +767,44 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { - ret = compose_index_header(ctx, es_index_custom_len, + es_index = (char *) index.buf; + if (logstash_format == FLB_TRUE) { + ret = compose_index_header(es_index_custom_len, &logstash_index[0], sizeof(logstash_index), - ctx->logstash_prefix_separator, &tm); + logstash_prefix, + logstash_prefix_separator, + logstash_dateformat, &tm); if (ret < 0) { /* retry with default separator */ - compose_index_header(ctx, es_index_custom_len, + compose_index_header(es_index_custom_len, &logstash_index[0], sizeof(logstash_index), - "-", &tm); + logstash_prefix, + flb_sds_view_create("-", 1), + logstash_dateformat, &tm); } es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { - if (ctx->suppress_type_name) { + if (generate_id == FLB_FALSE) { + if (suppress_type_name == FLB_TRUE) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_WITHOUT_TYPE, - ctx->es_action, + es_action, es_index); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT, - ctx->es_action, - es_index, ctx->type); + es_action, + es_index, type_name.buf); } } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + index.buf, &tm); es_index = index_formatted; } @@ -506,7 +823,7 @@ static int elasticsearch_format(struct flb_config *config, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, replace_dots); if (ret == -1) { flb_log_event_decoder_destroy(&log_decoder); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -515,43 +832,43 @@ static int elasticsearch_format(struct flb_config *config, return -1; } - if (ctx->generate_id == FLB_TRUE) { + if (generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]); - if (ctx->suppress_type_name) { + if (suppress_type_name == FLB_TRUE) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - ctx->es_action, + es_action, es_index, es_uuid); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID, - ctx->es_action, - es_index, ctx->type, es_uuid); + es_action, + es_index, type_name.buf, es_uuid); } } - if (ctx->ra_id_key) { - id_key_str = es_get_id_value(ctx ,&map); + if (ra_id_key != NULL) { + id_key_str = es_get_id_value(ctx, ra_id_key, id_key_name, &map); if (id_key_str) { - if (ctx->suppress_type_name) { + if (suppress_type_name == FLB_TRUE) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - ctx->es_action, + es_action, es_index, id_key_str); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID, - ctx->es_action, - es_index, ctx->type, id_key_str); + es_action, + es_index, type_name.buf, id_key_str); } flb_sds_destroy(id_key_str); id_key_str = NULL; @@ -570,13 +887,13 @@ static int elasticsearch_format(struct flb_config *config, } out_buf_len = flb_sds_len(out_buf); - if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) { + if (write_op_update == FLB_TRUE) { tmp_buf = out_buf; out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2); out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf); flb_sds_destroy(tmp_buf); } - else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + else if (write_op_upsert == FLB_TRUE) { tmp_buf = out_buf; out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2); out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf); @@ -822,20 +1139,53 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, size_t b_sent; struct flb_elasticsearch *ctx = out_context; struct flb_connection *u_conn; - struct flb_http_client *c; + struct flb_http_client *c = NULL; + struct flb_upstream *upstream; + struct flb_upstream_node *node = NULL; + struct flb_es_node_ctx *node_ctx; flb_sds_t signature = NULL; + flb_sds_t uri = NULL; int compressed = FLB_FALSE; + int compress_gzip; + size_t buffer_size; flb_sds_t header_line = NULL; + flb_sds_t tmp_sds = NULL; + flb_sds_view_t http_user; + flb_sds_view_t http_passwd; + flb_sds_view_t http_api_key; +#ifdef FLB_HAVE_AWS + struct flb_aws_provider *aws_provider; + const char *aws_region; + const char *aws_service_name; + int has_aws_auth; +#endif + + node_ctx = NULL; + if (ctx->ha_mode == FLB_TRUE) { + node = flb_upstream_ha_node_get(ctx->ha); + if (node == NULL) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + upstream = node->u; + node_ctx = flb_upstream_node_get_data(node); + } + else { + upstream = ctx->u; + } + + compress_gzip = es_get_property_compress(node, ctx->compress_gzip); + buffer_size = es_get_property_size("buffer_size", node, ctx->buffer_size); /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); + u_conn = flb_upstream_conn_get(upstream); if (!u_conn) { FLB_OUTPUT_RETURN(FLB_RETRY); } /* Convert format */ ret = elasticsearch_format(config, ins, - ctx, NULL, + ctx, node, event_chunk->type, event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data, event_chunk->size, @@ -849,7 +1199,7 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, pack_size = out_size; /* Should we compress the payload ? */ - if (ctx->compress_gzip == FLB_TRUE) { + if (compress_gzip == FLB_TRUE) { ret = flb_gzip_compress((void *) pack, pack_size, &out_buf, &out_size); if (ret == -1) { @@ -872,10 +1222,15 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, } /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + uri = es_compose_bulk_uri(ctx, node); + if (uri == NULL) { + goto retry; + } + + c = flb_http_client(u_conn, FLB_HTTP_POST, uri, pack, pack_size, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, buffer_size); #ifndef FLB_HAVE_AWS flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); @@ -883,20 +1238,29 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); - } - else if (ctx->cloud_user && ctx->cloud_passwd) { - flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd); + http_user = es_get_property_view("http_user", node, ctx); + http_passwd = es_get_property_view("http_passwd", node, ctx); + http_api_key = es_get_property_view("http_api_key", node, ctx); + + if (http_user.buf != NULL && http_passwd.buf != NULL) { + flb_http_basic_auth(c, (char *) http_user.buf, (char *) http_passwd.buf); } - else if (ctx->http_api_key) { + else if (!flb_sds_view_is_empty(http_api_key)) { /* 7 for ApiKey + space */ - header_line = flb_sds_create_size(strlen(ctx->http_api_key)+7); + header_line = flb_sds_create_size(http_api_key.len + 7); if (header_line == NULL) { flb_plg_error(ctx->ins, "failed to format API key auth header"); goto retry; } - header_line = flb_sds_printf(&header_line, "ApiKey %s", ctx->http_api_key); + tmp_sds = flb_sds_printf(&header_line, "ApiKey %.*s", + (int) http_api_key.len, + http_api_key.buf); + if (tmp_sds == NULL) { + flb_plg_error(ctx->ins, "failed to format API key auth header"); + flb_sds_destroy(header_line); + goto retry; + } + header_line = tmp_sds; if (flb_http_add_header(c, FLB_HTTP_HEADER_AUTH, strlen(FLB_HTTP_HEADER_AUTH), @@ -908,10 +1272,27 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, flb_sds_destroy(header_line); } + else if (ctx->cloud_user && ctx->cloud_passwd) { + flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd); + } #ifdef FLB_HAVE_AWS - if (ctx->has_aws_auth == FLB_TRUE) { - signature = add_aws_auth(c, ctx); + has_aws_auth = es_get_property_bool("aws_auth", node, node_ctx, ctx->has_aws_auth); + aws_provider = ctx->aws_provider; + aws_region = ctx->aws_region; + aws_service_name = ctx->aws_service_name; + + if (node_ctx != NULL && node_ctx->has_aws_auth == FLB_TRUE && + node_ctx->aws_provider != NULL) { + aws_provider = node_ctx->aws_provider; + aws_region = node_ctx->aws_region; + aws_service_name = node_ctx->aws_service_name; + has_aws_auth = FLB_TRUE; + } + + if (has_aws_auth == FLB_TRUE) { + signature = add_aws_auth(c, ctx, aws_provider, aws_region, + aws_service_name); if (!signature) { goto retry; } @@ -931,20 +1312,20 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, uri); if (c->resp.status != 200 && c->resp.status != 201) { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", - c->resp.status, ctx->uri, c->resp.payload); + c->resp.status, uri, c->resp.payload); } else { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", - c->resp.status, ctx->uri); + c->resp.status, uri); } goto retry; } @@ -992,23 +1373,31 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, } /* Cleanup */ - flb_http_client_destroy(c); + if (c != NULL) { + flb_http_client_destroy(c); + } flb_free(pack); flb_upstream_conn_release(u_conn); if (signature) { flb_sds_destroy(signature); } + flb_sds_destroy(uri); FLB_OUTPUT_RETURN(FLB_OK); /* Issue a retry */ retry: - flb_http_client_destroy(c); + if (c != NULL) { + flb_http_client_destroy(c); + } flb_free(pack); - + if (signature != NULL) { + flb_sds_destroy(signature); + } if (out_buf != pack) { flb_free(out_buf); } + flb_sds_destroy(uri); flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } @@ -1146,6 +1535,11 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Elastic cloud authentication credentials" }, + { + FLB_CONFIG_MAP_STR, "upstream", NULL, + 0, FLB_FALSE, 0, + "Path to an upstream configuration file to define multiple backend nodes" + }, /* AWS Authentication */ #ifdef FLB_HAVE_AWS diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 60575379d48..119e8850dbc 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -20,6 +20,8 @@ #ifndef FLB_OUT_ES_H #define FLB_OUT_ES_H +#include + #define FLB_ES_DEFAULT_HOST "127.0.0.1" #define FLB_ES_DEFAULT_PORT 92000 #define FLB_ES_DEFAULT_INDEX "fluent-bit" @@ -45,6 +47,31 @@ #define FLB_ES_STATUS_DUPLICATES (1 << 6) #define FLB_ES_STATUS_ERROR (1 << 7) +struct flb_es_node_ctx { + struct flb_record_accessor *ra_id_key; + struct flb_record_accessor *ra_prefix_key; + int has_logstash_format; + int logstash_format; + int has_suppress_type_name; + int suppress_type_name; + int has_replace_dots; + int replace_dots; + int has_current_time_index; + int current_time_index; + int has_generate_id; + int generate_id; +#ifdef FLB_HAVE_AWS + struct flb_aws_provider *aws_provider; + struct flb_aws_provider *base_aws_provider; + struct flb_tls *aws_tls; + struct flb_tls *aws_sts_tls; + char *aws_region; + char *aws_service_name; + int has_aws_auth_override; + int has_aws_auth; +#endif +}; + struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ char *index; @@ -142,6 +169,8 @@ struct flb_elasticsearch { /* Upstream connection to the backend server */ struct flb_upstream *u; + int ha_mode; + struct flb_upstream_ha *ha; /* Plugin output instance reference */ struct flb_output_instance *ins; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index fe771b80ba5..07d2ee01efc 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -25,10 +25,346 @@ #include #include #include +#include #include "es.h" #include "es_conf.h" +static void flb_es_node_ctx_destroy(struct flb_es_node_ctx *node_ctx) +{ + if (node_ctx == NULL) { + return; + } + + if (node_ctx->ra_id_key != NULL) { + flb_ra_destroy(node_ctx->ra_id_key); + } + + if (node_ctx->ra_prefix_key != NULL) { + flb_ra_destroy(node_ctx->ra_prefix_key); + } + +#ifdef FLB_HAVE_AWS + if (node_ctx->base_aws_provider != NULL) { + flb_aws_provider_destroy(node_ctx->base_aws_provider); + } + + if (node_ctx->aws_provider != NULL) { + flb_aws_provider_destroy(node_ctx->aws_provider); + } + + if (node_ctx->aws_tls != NULL) { + flb_tls_destroy(node_ctx->aws_tls); + } + + if (node_ctx->aws_sts_tls != NULL) { + flb_tls_destroy(node_ctx->aws_sts_tls); + } +#endif + + flb_free(node_ctx); +} + +static int flb_es_node_ctx_set_bool_property(struct flb_elasticsearch *ctx, + struct flb_upstream_node *node, + const char *property, + int *is_set, + int *value) +{ + const char *tmp; + int ret; + + *is_set = FLB_FALSE; + + tmp = flb_upstream_node_get_property(property, node); + if (tmp == NULL) { + return 0; + } + + ret = flb_utils_bool(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, + "invalid value for boolean property '%s=%s' in node '%s'", + property, tmp, node->name); + return -1; + } + + *is_set = FLB_TRUE; + *value = ret; + + return 0; +} + +#ifdef FLB_HAVE_AWS +static int flb_es_node_ctx_aws_init(struct flb_es_node_ctx *node_ctx, + struct flb_upstream_node *node, + struct flb_elasticsearch *ctx, + struct flb_output_instance *ins, + struct flb_config *config) +{ + const char *tmp; + const char *node_region; + const char *node_sts_endpoint; + const char *node_role_arn; + const char *node_external_id; + const char *node_profile; + const char *node_service_name; + char *aws_session_name; + struct flb_aws_provider *provider; + + if (node_ctx->has_aws_auth_override == FLB_FALSE) { + node_ctx->has_aws_auth = ctx->has_aws_auth; + } + + if (node_ctx->has_aws_auth != FLB_TRUE) { + return 0; + } + + node_region = flb_upstream_node_get_property("aws_region", node); + if (node_region == NULL) { + node_region = ctx->aws_region; + } + if (node_region == NULL) { + flb_plg_error(ctx->ins, + "aws_auth enabled but aws_region not set for node '%s'", + node->name); + return -1; + } + node_ctx->aws_region = (char *) node_region; + + node_service_name = flb_upstream_node_get_property("aws_service_name", node); + if (node_service_name == NULL) { + node_service_name = ctx->aws_service_name; + } + node_ctx->aws_service_name = (char *) node_service_name; + + node_sts_endpoint = flb_upstream_node_get_property("aws_sts_endpoint", node); + if (node_sts_endpoint == NULL) { + node_sts_endpoint = ctx->aws_sts_endpoint; + } + + node_profile = flb_upstream_node_get_property("aws_profile", node); + if (node_profile == NULL) { + node_profile = ctx->aws_profile; + } + + node_role_arn = flb_upstream_node_get_property("aws_role_arn", node); + node_external_id = flb_upstream_node_get_property("aws_external_id", node); + + node_ctx->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (node_ctx->aws_tls == NULL) { + flb_errno(); + return -1; + } + + provider = flb_standard_chain_provider_create(config, + node_ctx->aws_tls, + node_ctx->aws_region, + (char *) node_sts_endpoint, + NULL, + flb_aws_client_generator(), + (char *) node_profile); + if (provider == NULL) { + flb_plg_error(ctx->ins, + "failed to create AWS Credential Provider for node '%s'", + node->name); + return -1; + } + node_ctx->aws_provider = provider; + + if (node_role_arn != NULL) { + node_ctx->base_aws_provider = node_ctx->aws_provider; + + aws_session_name = flb_sts_session_name(); + if (aws_session_name == NULL) { + flb_plg_error(ctx->ins, + "failed to create aws session name for node '%s'", + node->name); + return -1; + } + + node_ctx->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (node_ctx->aws_sts_tls == NULL) { + flb_free(aws_session_name); + flb_errno(); + return -1; + } + + provider = flb_sts_provider_create(config, + node_ctx->aws_sts_tls, + node_ctx->base_aws_provider, + (char *) node_external_id, + (char *) node_role_arn, + aws_session_name, + node_ctx->aws_region, + (char *) node_sts_endpoint, + NULL, + flb_aws_client_generator()); + flb_free(aws_session_name); + if (provider == NULL) { + flb_plg_error(ctx->ins, + "failed to create AWS STS Credential Provider for node '%s'", + node->name); + return -1; + } + + node_ctx->aws_provider = provider; + } + + node_ctx->aws_provider->provider_vtable->sync(node_ctx->aws_provider); + node_ctx->aws_provider->provider_vtable->init(node_ctx->aws_provider); + node_ctx->aws_provider->provider_vtable->async(node_ctx->aws_provider); + node_ctx->aws_provider->provider_vtable->upstream_set(node_ctx->aws_provider, + ctx->ins); + + return 0; +} +#endif + +static int flb_es_node_ctx_create(struct flb_elasticsearch *ctx, + struct flb_output_instance *ins, + struct flb_config *config) +{ + int len; + const char *tmp; + int ret; + char *buf; + struct mk_list *head; + struct flb_upstream_node *node; + struct flb_es_node_ctx *node_ctx; + + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + + node_ctx = flb_calloc(1, sizeof(struct flb_es_node_ctx)); + if (node_ctx == NULL) { + flb_errno(); + return -1; + } + + ret = flb_es_node_ctx_set_bool_property(ctx, node, "logstash_format", + &node_ctx->has_logstash_format, + &node_ctx->logstash_format); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + + ret = flb_es_node_ctx_set_bool_property(ctx, node, "replace_dots", + &node_ctx->has_replace_dots, + &node_ctx->replace_dots); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + + ret = flb_es_node_ctx_set_bool_property(ctx, node, "suppress_type_name", + &node_ctx->has_suppress_type_name, + &node_ctx->suppress_type_name); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + + ret = flb_es_node_ctx_set_bool_property(ctx, node, "current_time_index", + &node_ctx->has_current_time_index, + &node_ctx->current_time_index); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + + ret = flb_es_node_ctx_set_bool_property(ctx, node, "generate_id", + &node_ctx->has_generate_id, + &node_ctx->generate_id); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + +#ifdef FLB_HAVE_AWS + ret = flb_es_node_ctx_set_bool_property(ctx, node, "aws_auth", + &node_ctx->has_aws_auth_override, + &node_ctx->has_aws_auth); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } +#endif + + tmp = flb_upstream_node_get_property("id_key", node); + if (tmp != NULL) { + node_ctx->ra_id_key = flb_ra_create((char *) tmp, FLB_FALSE); + if (node_ctx->ra_id_key == NULL) { + flb_plg_error(ctx->ins, + "could not create node id_key record accessor for '%s'", + node->name); + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + } + + tmp = flb_upstream_node_get_property("logstash_prefix_key", node); + if (tmp != NULL) { + if (tmp[0] != '$') { + len = strlen(tmp); + buf = flb_malloc(len + 2); + if (buf == NULL) { + flb_errno(); + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + + buf[0] = '$'; + memcpy(buf + 1, tmp, len); + buf[len + 1] = '\0'; + + node_ctx->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); + flb_free(buf); + } + else { + node_ctx->ra_prefix_key = flb_ra_create((char *) tmp, FLB_TRUE); + } + + if (node_ctx->ra_prefix_key == NULL) { + flb_plg_error(ctx->ins, + "invalid node logstash_prefix_key pattern '%s'", + tmp); + flb_es_node_ctx_destroy(node_ctx); + return -1; + } + } + +#ifdef FLB_HAVE_AWS + ret = flb_es_node_ctx_aws_init(node_ctx, node, ctx, ins, config); + if (ret != 0) { + flb_es_node_ctx_destroy(node_ctx); + return -1; + } +#endif + + flb_upstream_node_set_data(node_ctx, node); + } + + return 0; +} + /* * extract_cloud_host extracts the public hostname * of a deployment from a Cloud ID string. @@ -245,21 +581,48 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, } } - /* Prepare an upstream handler */ - upstream = flb_upstream_create(config, - ins->host.name, - ins->host.port, - io_flags, - ins->tls); - if (!upstream) { - flb_plg_error(ctx->ins, "cannot create Upstream context"); - flb_es_conf_destroy(ctx); - return NULL; + tmp = flb_output_get_property("upstream", ins); + if (tmp != NULL) { + ctx->ha = flb_upstream_ha_from_file(tmp, config); + if (ctx->ha == NULL) { + flb_plg_error(ctx->ins, "cannot load Upstream file"); + flb_es_conf_destroy(ctx); + return NULL; + } + + if (mk_list_is_empty(&ctx->ha->nodes) == 0) { + flb_plg_error(ctx->ins, "upstream file does not define any nodes"); + flb_es_conf_destroy(ctx); + return NULL; + } + + ctx->ha_mode = FLB_TRUE; + + flb_output_upstream_ha_set(ctx->ha, ins); + + ret = flb_es_node_ctx_create(ctx, ins, config); + if (ret != 0) { + flb_es_conf_destroy(ctx); + return NULL; + } } - ctx->u = upstream; + else { + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, + ins->host.name, + ins->host.port, + io_flags, + ins->tls); + if (!upstream) { + flb_plg_error(ctx->ins, "cannot create Upstream context"); + flb_es_conf_destroy(ctx); + return NULL; + } + ctx->u = upstream; - /* Set instance flags into upstream */ - flb_output_upstream_set(ctx->u, ins); + /* Set instance flags into upstream */ + flb_output_upstream_set(ctx->u, ins); + } /* Set manual Index and Type */ if (f_index) { @@ -487,11 +850,26 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, int flb_es_conf_destroy(struct flb_elasticsearch *ctx) { + struct mk_list *head; + struct flb_upstream_node *node; + struct flb_es_node_ctx *node_ctx; + if (!ctx) { return 0; } - if (ctx->u) { + if (ctx->ha_mode == FLB_TRUE && ctx->ha != NULL) { + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + node_ctx = flb_upstream_node_get_data(node); + if (node_ctx != NULL) { + flb_es_node_ctx_destroy(node_ctx); + flb_upstream_node_set_data(NULL, node); + } + } + flb_upstream_ha_destroy(ctx->ha); + } + else if (ctx->u) { flb_upstream_destroy(ctx->u); } if (ctx->ra_id_key) { diff --git a/src/flb_engine_dispatch.c b/src/flb_engine_dispatch.c index fb4b4dbbcca..701ada3edb7 100644 --- a/src/flb_engine_dispatch.c +++ b/src/flb_engine_dispatch.c @@ -101,18 +101,24 @@ int flb_engine_dispatch_retry(struct flb_task_retry *retry, static void test_run_formatter(struct flb_config *config, struct flb_input_instance *i_ins, struct flb_output_instance *o_ins, - struct flb_task *task, - void *flush_ctx) + struct flb_task *task) { int ret; void *out_buf = NULL; size_t out_size = 0; struct flb_test_out_formatter *otf; struct flb_event_chunk *evc; + void *flush_ctx; otf = &o_ins->test_formatter; evc = task->event_chunk; + flush_ctx = otf->flush_ctx; + if (otf->flush_ctx_callback) { + flush_ctx = otf->flush_ctx_callback(config, i_ins, o_ins->context, + flush_ctx); + } + /* Invoke the output plugin formatter test callback */ ret = otf->callback(config, i_ins, @@ -176,9 +182,7 @@ static int tasks_start(struct flb_input_instance *in, out->test_formatter.callback != NULL) { /* Run the formatter test */ - test_run_formatter(config, in, out, - task, - out->test_formatter.flush_ctx); + test_run_formatter(config, in, out, task); /* Remove the route */ mk_list_del(&route->_head); diff --git a/src/flb_lib.c b/src/flb_lib.c index 7257480b063..1801343d382 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -590,6 +590,21 @@ int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void (*out_callback) (void *, int, int, void *, size_t, void *), void *out_callback_data, void *test_ctx) +{ + return flb_output_set_test_with_ctx_callback(ctx, ffd, test_name, + out_callback, + out_callback_data, + test_ctx, NULL); +} + +int flb_output_set_test_with_ctx_callback(flb_ctx_t *ctx, int ffd, + char *test_name, + void (*out_callback) (void *, int, int, void *, size_t, void *), + void *out_callback_data, + void *test_ctx, + void *(*test_ctx_callback) (struct flb_config *, + struct flb_input_instance *, + void *, void *)) { struct flb_output_instance *o_ins; @@ -611,6 +626,7 @@ int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, o_ins->test_formatter.rt_out_callback = out_callback; o_ins->test_formatter.rt_data = out_callback_data; o_ins->test_formatter.flush_ctx = test_ctx; + o_ins->test_formatter.flush_ctx_callback = test_ctx_callback; } else { return -1; diff --git a/src/flb_sds.c b/src/flb_sds.c index f534f287522..d4beb02c2e3 100644 --- a/src/flb_sds.c +++ b/src/flb_sds.c @@ -89,6 +89,11 @@ flb_sds_t flb_sds_create(const char *str) return flb_sds_create_len(str, len); } +flb_sds_t flb_sds_create_from_view(flb_sds_view_t view) +{ + return flb_sds_create_len(view.buf, view.len); +} + flb_sds_t flb_sds_create_size(size_t size) { return sds_alloc(size); diff --git a/tests/internal/sds.c b/tests/internal/sds.c index d9d5db1027f..c2538d838ca 100644 --- a/tests/internal/sds.c +++ b/tests/internal/sds.c @@ -86,10 +86,35 @@ static void test_sds_cat_utf8() flb_sds_destroy(s); } +static void test_sds_view() +{ + flb_sds_t owned; + flb_sds_t copied; + flb_sds_view_t borrowed_view; + flb_sds_view_t owned_view; + char raw_buffer[] = "borrowed-value"; + + borrowed_view = flb_sds_view_create(raw_buffer, strlen(raw_buffer)); + TEST_CHECK(flb_sds_view_is_empty(borrowed_view) == FLB_FALSE); + copied = flb_sds_create_from_view(borrowed_view); + TEST_CHECK(copied != NULL); + TEST_CHECK(flb_sds_len(copied) == strlen(raw_buffer)); + TEST_CHECK(strncmp(copied, raw_buffer, borrowed_view.len) == 0); + flb_sds_destroy(copied); + + owned = flb_sds_create("owned-value"); + TEST_CHECK(owned != NULL); + owned_view = flb_sds_view_create_from_sds(owned); + TEST_CHECK(owned_view.buf == owned); + TEST_CHECK(owned_view.len == flb_sds_len(owned)); + flb_sds_destroy(owned); +} + TEST_LIST = { { "sds_usage" , test_sds_usage}, { "sds_printf", test_sds_printf}, { "sds_cat_utf8", test_sds_cat_utf8}, + { "sds_view", test_sds_view}, { "test_sds_printf_7143_off_by_1", test_sds_printf_7143_off_by_1}, { 0 } }; diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index bd34dc27412..64e54855fbf 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -2,9 +2,14 @@ #include #include "flb_tests_runtime.h" +#include "../include/flb_tests_tmpdir.h" +#include +#include +#include /* Test data */ #include "data/es/json_es.h" /* JSON_ES */ +#include "../../plugins/out_es/es.h" static void cb_check_http_api_key(void *ctx, int ffd, @@ -100,6 +105,29 @@ static void cb_check_index_type(void *ctx, int ffd, flb_free(res_data); } +static void cb_check_node_index_type(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *index_match; + char *type_match; + char *out_js = res_data; + char *index_line = "\"_index\":\"another_index_test\""; + char *type_line = "\"_type\":\"another_type_test\""; + + index_match = strstr(out_js, index_line); + if (!TEST_CHECK(index_match != NULL)) { + TEST_MSG("Got: %s", out_js); + } + + type_match = strstr(out_js, type_line); + if (!TEST_CHECK(type_match != NULL)) { + TEST_MSG("Got: %s", out_js); + } + + flb_free(res_data); +} + static void cb_check_logstash_format(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) @@ -1058,6 +1086,402 @@ void flb_test_response_partially_success() flb_destroy(ctx); } +static int create_upstream_file_va(char *path, size_t size, + const char *first_property, + va_list args) +{ +#ifndef _WIN32 + int fd; +#else + FILE *fp; +#endif + char *tmp_path; + size_t content_len; + const char *property; + const char *value; + size_t written; + int ret; + flb_sds_t content = NULL; + flb_sds_t tmp_content = NULL; + const char *base_content = + "[UPSTREAM]\n" + " name es_cluster\n" + "\n" + "[NODE]\n" + " name es_node_1\n" + " host 127.0.0.1\n" + " port 9200\n"; + + content = flb_sds_create(base_content); + if (content == NULL) { + return -1; + } + + property = first_property; + while (property != NULL) { + value = va_arg(args, const char *); + if (value == NULL) { + flb_sds_destroy(content); + return -1; + } + + tmp_content = flb_sds_printf(&content, " %s %s\n", property, value); + if (tmp_content == NULL) { + if (content != NULL) { + flb_sds_destroy(content); + } + return -1; + } + content = tmp_content; + + property = va_arg(args, const char *); + } + + tmp_path = flb_test_tmpdir_cat("/flb-es-upstream-XXXXXX"); + if (tmp_path == NULL) { + flb_sds_destroy(content); + return -1; + } + + if (strlen(tmp_path) + 1 > size) { + flb_free(tmp_path); + flb_sds_destroy(content); + return -1; + } + + strncpy(path, tmp_path, size); + flb_free(tmp_path); + +#ifndef _WIN32 + fd = mkstemp(path); + if (fd == -1) { + flb_sds_destroy(content); + return -1; + } + + content_len = flb_sds_len(content); + ret = write(fd, content, content_len); + close(fd); + flb_sds_destroy(content); + if (ret != (int) content_len) { + unlink(path); + return -1; + } +#else + if (_mktemp_s(path, size) != 0) { + flb_sds_destroy(content); + return -1; + } + + fp = fopen(path, "wb"); + if (fp == NULL) { + flb_sds_destroy(content); + return -1; + } + + content_len = flb_sds_len(content); + written = fwrite(content, 1, content_len, fp); + ret = fclose(fp); + flb_sds_destroy(content); + if (written != content_len || ret != 0) { + unlink(path); + return -1; + } +#endif + + return 0; +} + +static int create_upstream_file(char *path, size_t size, + const char *first_property, ...) +{ + int ret; + va_list args; + + va_start(args, first_property); + ret = create_upstream_file_va(path, size, first_property, args); + va_end(args); + + return ret; +} + +static flb_ctx_t *create_upstream_test_ctx(char *upstream_file, + size_t upstream_file_size, + int *in_ffd, + int *out_ffd, + const char *first_property, ...) +{ + int ret; + flb_ctx_t *ctx; + va_list args; + + va_start(args, first_property); + ret = create_upstream_file_va(upstream_file, upstream_file_size, + first_property, args); + va_end(args); + TEST_CHECK(ret == 0); + if (ret != 0) { + return NULL; + } + + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + *in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, *in_ffd, "tag", "test", NULL); + + *out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, *out_ffd, + "match", "test", + "upstream", upstream_file, + NULL); + + return ctx; +} + +static void *cb_upstream_flush_ctx(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx) +{ + struct flb_elasticsearch *ctx = plugin_context; + + (void) config; + (void) ins; + (void) flush_ctx; + + if (ctx->ha_mode != FLB_TRUE || ctx->ha == NULL) { + return NULL; + } + + return flb_upstream_ha_node_get(ctx->ha); +} + +void flb_test_upstream_write_operation() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + int in_ffd; + int out_ffd; + char upstream_file[256]; + flb_ctx_t *ctx; + + ctx = create_upstream_test_ctx(upstream_file, sizeof(upstream_file), + &in_ffd, &out_ffd, + "write_operation", "index", NULL); + if (!ctx) { + return; + } + (void) in_ffd; + + flb_output_set(ctx, out_ffd, + "write_operation", "index", + NULL); + + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_write_op_index, + NULL, NULL, + cb_upstream_flush_ctx); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + unlink(upstream_file); +} + +void flb_test_upstream_null_index() +{ + int ret; + int in_ffd; + int out_ffd; + char upstream_file[256]; + flb_ctx_t *ctx; + + ctx = create_upstream_test_ctx(upstream_file, sizeof(upstream_file), + &in_ffd, &out_ffd, + "generate_id", "off", NULL); + if (!ctx) { + return; + } + + flb_output_set(ctx, out_ffd, + "index", "", + "type", "type_test", + NULL); + + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_index_type, + NULL, NULL, + cb_upstream_flush_ctx); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == -1); + + flb_stop(ctx); + flb_destroy(ctx); + unlink(upstream_file); +} + +void flb_test_upstream_index_type() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + int in_ffd; + int out_ffd; + char upstream_file[256]; + flb_ctx_t *ctx; + + ctx = create_upstream_test_ctx(upstream_file, sizeof(upstream_file), + &in_ffd, &out_ffd, + "index", "another_index_test", + "type", "another_type_test", NULL); + if (!ctx) { + return; + } + + flb_output_set(ctx, out_ffd, + "index", "index_test", + "type", "type_test", + NULL); + + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_node_index_type, + NULL, NULL, + cb_upstream_flush_ctx); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + unlink(upstream_file); +} + +void flb_test_upstream_logstash_format() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + int in_ffd; + int out_ffd; + char upstream_file[256]; + flb_ctx_t *ctx; + + ctx = create_upstream_test_ctx(upstream_file, sizeof(upstream_file), + &in_ffd, &out_ffd, + "logstash_format", "on", + "logstash_prefix", "prefix", + "logstash_prefix_separator", "SEP", + "logstash_dateformat", "%Y-%m-%d", + NULL); + if (!ctx) { + return; + } + + flb_output_set(ctx, out_ffd, + "logstash_format", "off", + "logstash_prefix", "logstash", + "logstash_prefix_separator", "-", + "logstash_dateformat", "%Y.%m.%d", + NULL); + + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_logstash_prefix_separator, + NULL, NULL, + cb_upstream_flush_ctx); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + unlink(upstream_file); +} + +void flb_test_upstream_replace_dots() +{ + int ret; + int size = sizeof(JSON_DOTS) - 1; + int in_ffd; + int out_ffd; + char upstream_file[256]; + flb_ctx_t *ctx; + + ctx = create_upstream_test_ctx(upstream_file, sizeof(upstream_file), + &in_ffd, &out_ffd, + "replace_dots", "on", NULL); + if (!ctx) { + return; + } + + flb_output_set(ctx, out_ffd, + "replace_dots", "off", + NULL); + + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_replace_dots, + NULL, NULL, + cb_upstream_flush_ctx); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + flb_lib_push(ctx, in_ffd, (char *) JSON_DOTS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + unlink(upstream_file); +} + +void flb_test_upstream_id_key() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + int in_ffd; + int out_ffd; + char upstream_file[256]; + flb_ctx_t *ctx; + + ctx = create_upstream_test_ctx(upstream_file, sizeof(upstream_file), + &in_ffd, &out_ffd, + "id_key", "key_2", NULL); + if (!ctx) { + return; + } + + flb_output_set(ctx, out_ffd, + "id_key", "key_2", + NULL); + + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_id_key, + NULL, NULL, + cb_upstream_flush_ctx); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + unlink(upstream_file); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -1077,5 +1501,11 @@ TEST_LIST = { {"response_success" , flb_test_response_success }, {"response_successes", flb_test_response_successes }, {"response_partially_success" , flb_test_response_partially_success }, + {"upstream_write_operation" , flb_test_upstream_write_operation }, + {"upstream_null_index" , flb_test_upstream_null_index }, + {"upstream_index_type" , flb_test_upstream_index_type }, + {"upstream_logstash_format" , flb_test_upstream_logstash_format }, + {"upstream_replace_dots" , flb_test_upstream_replace_dots }, + {"upstream_id_key" , flb_test_upstream_id_key }, {NULL, NULL} };