Skip to content

Commit 25983cd

Browse files
committed
out_stackdriver: fix metric accounting for dropped records and decouple logic
This patch refactors the response handling in the stackdriver output plugin to resolve incorrect metric accounting and inconsistent behavior between builds with and without Prometheus metrics enabled. 1. Defer Metric Accounting to Core Engine: Removed manual increment of cmt_dropped_records on standard 4xx errors. The core engine automatically increments drop counters when the plugin returns FLB_ERROR. Manual increments caused double counting. This aligns with how other plugins (like out_cloudwatch_logs) defer core accounting to the engine. 2. Enable Partial Success Handling for All Builds: Moved parse_partial_success_response outside of #ifdef FLB_HAVE_METRICS. Previously, partial success was only checked if metrics were enabled. Now, all builds will correctly identify partial successes and return FLB_OK to clear the chunk, preventing unnecessary retries and ensuring consistent behavior across different build configurations. 3. Added Visibility Logging: Added a warning log for partial successes that reports the exact number of failed records, providing operator visibility into silent drops. 4. Style Guide Compliance: Ensured all variables are declared at the beginning of the function scope as required by Fluent Bit style guidelines. Moved ts outside of ifdef to support its use on all builds. 5. Fixed Double Counting on Partial Success: Guarded the batch success metrics update in if (ret_code == FLB_OK) to ensure we don't double count successful records that were already accounted for in the partial success block. Signed-off-by: pdewilde <pdewilde@google.com>
1 parent 6adf68b commit 25983cd

File tree

1 file changed

+42
-33
lines changed

1 file changed

+42
-33
lines changed

plugins/out_stackdriver/stackdriver.c

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2937,7 +2937,8 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
29372937
(void) config;
29382938
int ret;
29392939
int code;
2940-
int ret_partial_success;
2940+
int failed_records = 0;
2941+
int ret_partial_success = -1;
29412942
int ret_code = FLB_RETRY;
29422943
int formatted_records = 0;
29432944
int grpc_status_counts[GRPC_STATUS_CODES_SIZE] = {0};
@@ -2953,9 +2954,9 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
29532954
uint64_t write_entries_start = 0;
29542955
uint64_t write_entries_end = 0;
29552956
float write_entries_latency = 0.0;
2957+
uint64_t ts = cfl_time_now();
29562958
#ifdef FLB_HAVE_METRICS
29572959
char *name = (char *) flb_output_name(ctx->ins);
2958-
uint64_t ts = cfl_time_now();
29592960
#endif
29602961

29612962
/* Reformat msgpack to stackdriver JSON payload */
@@ -3069,55 +3070,60 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
30693070
ret_code = FLB_OK;
30703071
}
30713072
else {
3072-
#ifdef FLB_HAVE_METRICS
30733073
/* check partial success */
30743074
ret_partial_success =
30753075
parse_partial_success_response(c,
30763076
ctx,
30773077
ts,
30783078
grpc_status_counts);
30793079

3080-
int failed_records = 0;
3081-
if (ret_partial_success == 0) {
3082-
for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) {
3083-
if (grpc_status_counts[code] != 0) {
3084-
failed_records += grpc_status_counts[code];
3085-
}
3086-
}
3087-
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
3088-
failed_records, 1, (char* []) {name});
3089-
int successful_records =
3090-
formatted_records - failed_records;
3091-
if (successful_records != 0) {
3092-
add_record_metrics(ctx, ts, successful_records, 200, 0);
3080+
if (ret_partial_success == 0) {
3081+
ret_code = FLB_OK;
3082+
3083+
for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) {
3084+
if (grpc_status_counts[code] != 0) {
3085+
failed_records += grpc_status_counts[code];
30933086
}
30943087
}
3095-
else {
3096-
add_record_metrics(ctx, ts, formatted_records,
3097-
c->resp.status, -1);
3098-
}
3099-
#endif
3100-
if (c->resp.status >= 400 && c->resp.status < 500) {
3101-
ret_code = FLB_ERROR;
3088+
3089+
flb_plg_warn(ctx->ins, "partial success: %d of %d records dropped",
3090+
failed_records, formatted_records);
3091+
31023092
#ifdef FLB_HAVE_METRICS
31033093
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
3104-
formatted_records, 1,
3105-
(char* []) {name});
3094+
failed_records, 1, (char* []) {name});
3095+
int successful_records =
3096+
formatted_records - failed_records;
3097+
if (successful_records != 0) {
3098+
add_record_metrics(ctx, ts, successful_records, 200, 0);
3099+
}
31063100
#endif
3107-
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
3108-
c->resp.payload);
31093101
}
3102+
#ifdef FLB_HAVE_METRICS
31103103
else {
3111-
if (c->resp.payload_size > 0) {
3112-
/* we got an error */
3104+
add_record_metrics(ctx, ts, formatted_records,
3105+
c->resp.status, -1);
3106+
}
3107+
#endif
3108+
/* Partial success is set FLB_OK, with manual update of failure metrics */
3109+
if (ret_partial_success != 0) {
3110+
if (c->resp.status >= 400 && c->resp.status < 500) {
3111+
ret_code = FLB_ERROR;
31133112
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
31143113
c->resp.payload);
31153114
}
31163115
else {
3117-
flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag,
3118-
c->resp.payload);
3116+
if (c->resp.payload_size > 0) {
3117+
/* we got an error */
3118+
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
3119+
c->resp.payload);
3120+
}
3121+
else {
3122+
flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag,
3123+
c->resp.payload);
3124+
}
3125+
ret_code = FLB_RETRY;
31193126
}
3120-
ret_code = FLB_RETRY;
31213127
}
31223128
}
31233129
}
@@ -3129,7 +3135,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
31293135
if (write_entries_latency > 0.0) {
31303136
cmt_histogram_observe(ctx->cmt_write_entries_latency, ts, write_entries_latency, 1, (char *[]) {name});
31313137
}
3132-
add_record_metrics(ctx, ts, formatted_records, 200, 0);
3138+
/* Only add full batch metrics if this was NOT a partial success */
3139+
if (ret_partial_success != 0) {
3140+
add_record_metrics(ctx, ts, formatted_records, 200, 0);
3141+
}
31333142

31343143
/* OLD api */
31353144
flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);

0 commit comments

Comments
 (0)