diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 17140f698c1..44583159d36 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -116,8 +116,11 @@ struct flb_http_client_response { int content_length; /* Content length set by headers */ int chunked_encoding; /* Chunked transfer encoding ? */ int connection_close; /* connection: close ? */ + int chunked_trailer_pending; /* terminal chunk parsed */ char *chunk_processed_end; /* Position to mark last chunk */ char *headers_end; /* Headers end (\r\n\r\n) */ + char *trailer_buf; /* Raw trailer header block */ + size_t trailer_size; /* Trailer block length */ /* Payload: body response: reference to 'data' */ char *payload; @@ -217,6 +220,8 @@ struct flb_test_http_response { struct flb_http_client { /* Upstream connection */ struct flb_connection *u_conn; + struct flb_net_setup request_net_setup; + struct flb_net_setup *original_net_setup; /* Request data */ int method; @@ -374,6 +379,11 @@ int flb_http_add_header(struct flb_http_client *c, const char *val, size_t val_len); flb_sds_t flb_http_get_header(struct flb_http_client *c, const char *key, size_t key_len); + +flb_sds_t flb_http_get_response_header(struct flb_http_client *c, + const char *key, size_t key_len); + +int flb_http_client_process_response_buffer(struct flb_http_client *c); int flb_http_basic_auth(struct flb_http_client *c, const char *user, const char *passwd); int flb_http_proxy_auth(struct flb_http_client *c, @@ -405,6 +415,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes); int flb_http_do_with_oauth2(struct flb_http_client *c, size_t *bytes, struct flb_oauth2 *oauth2); int flb_http_client_proxy_connect(struct flb_connection *u_conn); +void flb_http_client_detach_connection(struct flb_http_client *c); void flb_http_client_destroy(struct flb_http_client *c); int flb_http_buffer_size(struct flb_http_client *c, size_t size); size_t flb_http_buffer_available(struct flb_http_client *c); diff --git a/include/fluent-bit/flb_network.h b/include/fluent-bit/flb_network.h index fa1624b268b..6cec5bf3a87 100644 --- a/include/fluent-bit/flb_network.h +++ b/include/fluent-bit/flb_network.h @@ -144,6 +144,7 @@ int flb_net_socket_reset(flb_sockfd_t fd); int flb_net_socket_tcp_nodelay(flb_sockfd_t fd); int flb_net_socket_blocking(flb_sockfd_t fd); int flb_net_socket_nonblocking(flb_sockfd_t fd); +int flb_net_socket_set_rcvtimeout(flb_sockfd_t fd, int timeout_in_seconds); int flb_net_socket_rcv_buffer(flb_sockfd_t fd, int rcvbuf); int flb_net_socket_tcp_fastopen(flb_sockfd_t sockfd); int flb_net_socket_tcp_keepalive(flb_sockfd_t fd, struct flb_net_setup *net); diff --git a/plugins/filter_ecs/ecs.c b/plugins/filter_ecs/ecs.c index 4ba30af9b0b..93387d2d04c 100644 --- a/plugins/filter_ecs/ecs.c +++ b/plugins/filter_ecs/ecs.c @@ -435,10 +435,6 @@ static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx) return -1; } - if (free_conn == FLB_TRUE) { - flb_upstream_conn_release(u_conn); - } - ret = flb_pack_json(c->resp.payload, c->resp.payload_size, &buffer, &size, &root_type, NULL); @@ -446,6 +442,9 @@ static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx) flb_plg_warn(ctx->ins, "Could not parse response from %s; response=\n%s", FLB_ECS_FILTER_CLUSTER_PATH, c->resp.payload); flb_http_client_destroy(c); + if (free_conn == FLB_TRUE) { + flb_upstream_conn_release(u_conn); + } return -1; } @@ -458,10 +457,16 @@ static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx) flb_free(buffer); msgpack_unpacked_destroy(&result); flb_http_client_destroy(c); + if (free_conn == FLB_TRUE) { + flb_upstream_conn_release(u_conn); + } return -1; } flb_http_client_destroy(c); + if (free_conn == FLB_TRUE) { + flb_upstream_conn_release(u_conn); + } root = result.data; if (root.type != MSGPACK_OBJECT_MAP) { @@ -1036,10 +1041,6 @@ static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id) return -1; } - if (free_conn == FLB_TRUE) { - flb_upstream_conn_release(u_conn); - } - ret = flb_pack_json(c->resp.payload, c->resp.payload_size, &buffer, &size, &root_type, NULL); @@ -1048,6 +1049,9 @@ static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id) http_path, c->resp.payload); flb_sds_destroy(http_path); flb_http_client_destroy(c); + if (free_conn == FLB_TRUE) { + flb_upstream_conn_release(u_conn); + } return -1; } @@ -1061,10 +1065,16 @@ static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id) msgpack_unpacked_destroy(&result); flb_sds_destroy(http_path); flb_http_client_destroy(c); + if (free_conn == FLB_TRUE) { + flb_upstream_conn_release(u_conn); + } return -1; } flb_http_client_destroy(c); + if (free_conn == FLB_TRUE) { + flb_upstream_conn_release(u_conn); + } root = result.data; if (root.type != MSGPACK_OBJECT_MAP) { diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index ca506031f90..ba9eb76aa95 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -1028,6 +1028,7 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config goto http_do_error; } + flb_http_client_detach_connection(client); flb_upstream_conn_release(u_conn); return client; diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index 712da85c779..5b30d8bae4a 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -1011,17 +1011,18 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx, flb_free(payload_buf); } - flb_upstream_conn_release(u_conn); - /* Validate HTTP status */ if (ret == -1) { flb_plg_error(ctx->ins, "error sending append_blob for %s", ref_name); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); return FLB_RETRY; } if (c->resp.status == 201) { flb_plg_info(ctx->ins, "content uploaded successfully: %s", ref_name); flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); return FLB_OK; } else if (c->resp.status == 404) { @@ -1033,6 +1034,7 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx, flb_plg_info(ctx->ins, "blob not found: %s", c->uri); flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); return CREATE_BLOB; } else if (c->resp.payload_size > 0) { @@ -1040,6 +1042,7 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx, c->resp.status, c->resp.payload); if (strstr(c->resp.payload, "must be 0 for Create Append")) { flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); return CREATE_BLOB; } } @@ -1048,6 +1051,7 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx, ref_name, c->resp.status); } flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); return FLB_RETRY; } diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index 27de65ec5a2..5642939df3c 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -1058,12 +1058,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, } #endif /* FLB_HAVE_CHUNK_TRACE */ - flb_upstream_conn_release(u_conn); - if (c) { flb_http_client_destroy(c); } + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(ret); } diff --git a/plugins/out_slack/slack.c b/plugins/out_slack/slack.c index 655a7615eb9..85b606c6646 100644 --- a/plugins/out_slack/slack.c +++ b/plugins/out_slack/slack.c @@ -282,8 +282,8 @@ static void cb_slack_flush(struct flb_event_chunk *event_chunk, out_ret = FLB_RETRY; } - flb_upstream_conn_release(u_conn); flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); flb_sds_destroy(out_buf); FLB_OUTPUT_RETURN(out_ret); } diff --git a/plugins/out_td/td.c b/plugins/out_td/td.c index 87f6354832b..3b3be592e5d 100644 --- a/plugins/out_td/td.c +++ b/plugins/out_td/td.c @@ -207,14 +207,14 @@ static void cb_td_flush(struct flb_event_chunk *event_chunk, } /* release */ - flb_upstream_conn_release(u_conn); flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_OK); retry: - flb_upstream_conn_release(u_conn); flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } diff --git a/plugins/out_websocket/websocket.c b/plugins/out_websocket/websocket.c index cd78bfa5b85..5631a02c635 100644 --- a/plugins/out_websocket/websocket.c +++ b/plugins/out_websocket/websocket.c @@ -85,9 +85,10 @@ static int flb_ws_handshake(struct flb_connection *u_conn, flb_debug("[output_ws] Websocket Server Response\n%s", c->resp.payload); } + flb_debug("[out_ws] Http Get Operation ret = %i, http resp = %i", + ret, c->resp.status); flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); - flb_debug("[out_ws] Http Get Operation ret = %i, http resp = %i", ret, c->resp.status); return -1; } flb_http_client_destroy(c); diff --git a/src/aws/flb_aws_util.c b/src/aws/flb_aws_util.c index ef6db9ec375..1846233da16 100644 --- a/src/aws/flb_aws_util.c +++ b/src/aws/flb_aws_util.c @@ -544,20 +544,24 @@ struct flb_http_client *request_do(struct flb_aws_client *aws_client, c = NULL; } + if (c != NULL) { + flb_http_client_detach_connection(c); + } + flb_upstream_conn_release(u_conn); flb_sds_destroy(signature); return c; error: - if (u_conn) { - flb_upstream_conn_release(u_conn); - } if (signature) { flb_sds_destroy(signature); } if (c) { flb_http_client_destroy(c); } + if (u_conn) { + flb_upstream_conn_release(u_conn); + } return NULL; } diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 7a267d750f6..7cbb56ebe17 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -31,6 +31,7 @@ */ #define _GNU_SOURCE +#include #include #ifdef FLB_SYSTEM_WINDOWS @@ -48,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -280,130 +282,349 @@ static inline void http_client_response_reset(struct flb_http_client *c) c->resp.content_length = -1; c->resp.chunked_encoding = FLB_FALSE; c->resp.connection_close = -1; + c->resp.chunked_trailer_pending = FLB_FALSE; c->resp.headers_end = NULL; c->resp.payload = NULL; c->resp.payload_size = 0; c->resp.chunk_processed_end = NULL; + if (c->resp.trailer_buf != NULL) { + flb_free(c->resp.trailer_buf); + c->resp.trailer_buf = NULL; + } + c->resp.trailer_size = 0; } -static int process_chunked_data(struct flb_http_client *c) +static char *chunked_line_end(char *buf, size_t length) { - long len; - long drop; - long val; - char *p; - char tmp[32]; - int found_full_chunk = FLB_FALSE; - struct flb_http_client_response *r = &c->resp; + size_t index; + if (length < 2) { + return NULL; + } - chunk_start: - p = strstr(r->chunk_processed_end, "\r\n"); - if (!p) { + for (index = 0; index + 1 < length; index++) { + if (buf[index] == '\r' && buf[index + 1] == '\n') { + return &buf[index]; + } + } + + return NULL; +} + +static int chunked_trailer_store(struct flb_http_client *c, + const char *buf, size_t size) +{ + if (c->resp.trailer_buf != NULL) { + flb_free(c->resp.trailer_buf); + c->resp.trailer_buf = NULL; + c->resp.trailer_size = 0; + } + + if (size == 0) { + return 0; + } + + c->resp.trailer_buf = flb_malloc(size + 1); + if (c->resp.trailer_buf == NULL) { + flb_errno(); + return -1; + } + + memcpy(c->resp.trailer_buf, buf, size); + c->resp.trailer_buf[size] = '\0'; + c->resp.trailer_size = size; + + return 0; +} + +static int chunked_trailer_block_size(char *buf, size_t length, + size_t *out_size, + size_t *out_trailer_size) +{ + char *line; + char *cursor; + size_t line_length; + size_t trailer_length; + + cursor = buf; + trailer_length = 0; + + while (1) { + line = chunked_line_end(cursor, length - (cursor - buf)); + if (line == NULL) { + return FLB_HTTP_MORE; + } + + line_length = line - cursor; + + if (line_length == 0) { + *out_size = (line + 2) - buf; + *out_trailer_size = trailer_length; + return FLB_HTTP_OK; + } + + if (memchr(cursor, ':', line_length) == NULL) { + return FLB_HTTP_ERROR; + } + + trailer_length += line_length + 2; + cursor = line + 2; + } +} + +static int chunked_data_size(char *buf, size_t length, + size_t *out_size) +{ + char *cursor; + char *line_end; + int extension_started; + size_t digit; + size_t digit_count; + size_t line_length; + size_t total_size; + size_t value; + + line_end = chunked_line_end(buf, length); + if (line_end == NULL) { return FLB_HTTP_MORE; } - /* Hexa string length */ - len = (p - r->chunk_processed_end); - if ((len > sizeof(tmp) - 1) || len == 0) { + line_length = line_end - buf; + if (line_length == 0) { return FLB_HTTP_ERROR; } - p += 2; - /* Copy hexa string to temporary buffer */ - memcpy(tmp, r->chunk_processed_end, len); - tmp[len] = '\0'; + cursor = buf; + while (line_length > 0 && (*cursor == ' ' || *cursor == '\t')) { + cursor++; + line_length--; + } - /* Convert hexa string to decimal */ errno = 0; - val = strtol(tmp, NULL, 16); - if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) - || (errno != 0 && val == 0)) { - flb_errno(); + digit_count = 0; + extension_started = FLB_FALSE; + value = 0; + + while (digit_count < line_length) { + if (*cursor >= '0' && *cursor <= '9') { + digit = *cursor - '0'; + } + else if (*cursor >= 'a' && *cursor <= 'f') { + digit = (*cursor - 'a') + 10; + } + else if (*cursor >= 'A' && *cursor <= 'F') { + digit = (*cursor - 'A') + 10; + } + else { + break; + } + + if (value > ((SIZE_MAX - digit) / 16)) { + return FLB_HTTP_ERROR; + } + + value = (value * 16) + digit; + digit_count++; + cursor++; + } + + if (digit_count == 0) { return FLB_HTTP_ERROR; } - if (val < 0) { + + while (digit_count < line_length && (*cursor == ' ' || *cursor == '\t')) { + digit_count++; + cursor++; + } + + if (digit_count < line_length && *cursor == ';') { + cursor++; + digit_count++; + extension_started = FLB_TRUE; + } + + while (digit_count < line_length) { + if (extension_started == FLB_FALSE) { + if (*cursor != ' ' && *cursor != '\t') { + return FLB_HTTP_ERROR; + } + } + else if (*cursor != ' ' && *cursor != '\t' && *cursor != ';' && + *cursor != '=' && *cursor != '"' && *cursor != '\\' && + *cursor != '/' && *cursor != ',' && *cursor != '_' && + *cursor != '-' && *cursor != '.' && *cursor != ':' && + *cursor != '(' && *cursor != ')' && + !(*cursor >= '0' && *cursor <= '9') && + !(*cursor >= 'a' && *cursor <= 'z') && + !(*cursor >= 'A' && *cursor <= 'Z')) { + return FLB_HTTP_ERROR; + } + + digit_count++; + cursor++; + } + + total_size = (line_end + 2) - buf; + if (value == 0) { + *out_size = total_size; + return FLB_HTTP_OK; + } + + if (value > (SIZE_MAX - total_size - 2)) { return FLB_HTTP_ERROR; } - /* - * 'val' contains the expected number of bytes, check current lengths - * and do buffer adjustments. - * - * we do val + 2 because the chunk always ends with \r\n - */ - val += 2; - /* Number of bytes after the Chunk header */ - len = r->data_len - (p - r->data); - if (len < val) { + total_size += value + 2; + + if (length < total_size) { + return FLB_HTTP_MORE; + } + + if (value > (length - ((line_end + 2) - buf) - 2)) { return FLB_HTTP_MORE; } - /* From the current chunk we expect it ends with \r\n */ - if (p[val -2] != '\r' || p[val - 1] != '\n') { + if (line_end[2 + value] != '\r' || line_end[3 + value] != '\n') { return FLB_HTTP_ERROR; } - /* - * At this point we are just fine, the chunk is valid, next steps: - * - * 1. check possible last chunk - * 2. drop chunk header from the buffer - * 3. remove chunk ending \r\n - */ + *out_size = total_size; - found_full_chunk = FLB_TRUE; - /* 1. Validate ending chunk */ - if (val - 2 == 0) { - /* - * For an ending chunk we expect: - * - * 0\r\n - * \r\n - * - * so at least we need 5 bytes in the buffer - */ - len = r->data_len - (r->chunk_processed_end - r->data); - if (len < 5) { - return FLB_HTTP_MORE; + return FLB_HTTP_OK; +} + +static flb_sds_t raw_header_lookup(const char *buf, size_t len, + const char *key, size_t key_len) +{ + const char *line_end; + const char *cursor; + const char *value; + size_t line_len; + size_t offset; + + cursor = buf; + offset = 0; + + while (offset + 1 < len) { + line_end = NULL; + + while (offset + 1 < len) { + if (buf[offset] == '\r' && buf[offset + 1] == '\n') { + line_end = &buf[offset]; + break; + } + + offset++; } - if (r->chunk_processed_end[3] != '\r' || - r->chunk_processed_end[4] != '\n') { - return FLB_HTTP_ERROR; + if (line_end == NULL) { + break; } - } - /* 2. Drop chunk header */ - drop = (p - r->chunk_processed_end); - len = r->data_len - (r->chunk_processed_end - r->data); - consume_bytes(r->chunk_processed_end, drop, len); - r->data_len -= drop; - r->data[r->data_len] = '\0'; + line_len = line_end - cursor; + if (line_len == 0) { + break; + } - /* 3. Remove chunk ending \r\n */ - drop = 2; - r->chunk_processed_end += labs(val - 2); - len = r->data_len - (r->chunk_processed_end - r->data); - consume_bytes(r->chunk_processed_end, drop, len); - r->data_len -= drop; + if (line_len > key_len && + cursor[key_len] == ':' && + strncasecmp(cursor, key, key_len) == 0) { + value = cursor + key_len + 1; - /* Always append a NULL byte */ - r->data[r->data_len] = '\0'; + while (value < line_end && (*value == ' ' || *value == '\t')) { + value++; + } - /* Always update payload size after full chunk */ - r->payload_size = r->data_len - (r->headers_end - r->data); + return flb_sds_create_len(value, line_end - value); + } - /* Is this the last chunk ? */ - if ((val - 2 == 0)) { - /* Update payload size */ - return FLB_HTTP_OK; + cursor = line_end + 2; + offset = cursor - buf; } - /* If we have some remaining bytes, start over */ - len = r->data_len - (r->chunk_processed_end - r->data); - if (len > 0) { - goto chunk_start; + return NULL; +} + +static int process_chunked_data(struct flb_http_client *c) +{ + char *cursor; + char *payload_end; + long available; + size_t chunk_data_size; + size_t chunk_header_size; + size_t chunk_size_line_length; + size_t trailer_bytes; + size_t trailer_raw_size; + int found_full_chunk; + int ret; + struct flb_http_client_response *r; + + r = &c->resp; + found_full_chunk = FLB_FALSE; + + while (1) { + cursor = r->chunk_processed_end; + available = r->data_len - (cursor - r->data); + if (available <= 0) { + break; + } + + if (r->chunked_trailer_pending == FLB_TRUE) { + ret = chunked_trailer_block_size(cursor, available, + &trailer_bytes, + &trailer_raw_size); + if (ret != FLB_HTTP_OK) { + if (found_full_chunk == FLB_TRUE && ret == FLB_HTTP_MORE) { + return FLB_HTTP_CHUNK_AVAILABLE; + } + return ret; + } + + if (chunked_trailer_store(c, cursor, trailer_raw_size) != 0) { + return FLB_HTTP_ERROR; + } + + consume_bytes(cursor, trailer_bytes, available); + r->data_len -= trailer_bytes; + r->data[r->data_len] = '\0'; + r->payload_size = r->data_len - (r->headers_end - r->data); + r->chunked_trailer_pending = FLB_FALSE; + + return FLB_HTTP_OK; + } + + ret = chunked_data_size(cursor, available, &chunk_header_size); + if (ret != FLB_HTTP_OK) { + if (found_full_chunk == FLB_TRUE && ret == FLB_HTTP_MORE) { + return FLB_HTTP_CHUNK_AVAILABLE; + } + return ret; + } + + payload_end = chunked_line_end(cursor, available) + 2; + chunk_size_line_length = payload_end - cursor; + + consume_bytes(cursor, payload_end - cursor, available); + r->data_len -= (payload_end - cursor); + r->data[r->data_len] = '\0'; + + available = r->data_len - (cursor - r->data); + if (chunk_header_size == chunk_size_line_length) { + r->chunked_trailer_pending = FLB_TRUE; + if (r->chunk_processed_end != NULL) { + r->payload_size = r->chunk_processed_end - r->headers_end; + } + continue; + } + + chunk_data_size = chunk_header_size - chunk_size_line_length - 2; + + consume_bytes(cursor + chunk_data_size, 2, available - chunk_data_size); + r->data_len -= 2; + r->data[r->data_len] = '\0'; + r->chunk_processed_end = cursor + chunk_data_size; + r->payload_size = r->data_len - (r->headers_end - r->data); + found_full_chunk = FLB_TRUE; } if (found_full_chunk == FLB_TRUE) { @@ -703,10 +924,10 @@ static int add_host_and_content_length(struct flb_http_client *c) } struct flb_http_client *create_http_client(struct flb_connection *u_conn, - int method, const char *uri, - const char *body, size_t body_len, - const char *host, int port, - const char *proxy, int flags) + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) { int ret; char *p; @@ -796,6 +1017,17 @@ struct flb_http_client *create_http_client(struct flb_connection *u_conn, } c->u_conn = u_conn; + c->original_net_setup = u_conn->net; + if (u_conn->net != NULL) { + c->request_net_setup = *u_conn->net; + } + else if (u_conn->upstream != NULL) { + c->request_net_setup = u_conn->upstream->base.net; + c->original_net_setup = &u_conn->upstream->base.net; + } + if (c->original_net_setup != NULL) { + c->u_conn->net = &c->request_net_setup; + } c->method = method; c->uri = uri; c->host = host; @@ -1089,6 +1321,39 @@ flb_sds_t flb_http_get_header(struct flb_http_client *c, return NULL; } +flb_sds_t flb_http_get_response_header(struct flb_http_client *c, + const char *key, size_t key_len) +{ + flb_sds_t value; + size_t header_size; + + if (c == NULL || c->resp.data == NULL || c->resp.headers_end == NULL) { + return NULL; + } + + header_size = c->resp.headers_end - c->resp.data; + value = raw_header_lookup(c->resp.data, header_size, key, key_len); + if (value != NULL) { + return value; + } + + if (c->resp.trailer_buf == NULL || c->resp.trailer_size == 0) { + return NULL; + } + + return raw_header_lookup(c->resp.trailer_buf, c->resp.trailer_size, + key, key_len); +} + +int flb_http_client_process_response_buffer(struct flb_http_client *c) +{ + if (c == NULL) { + return FLB_HTTP_ERROR; + } + + return process_data(c); +} + static int http_header_push(struct flb_http_client *c, struct flb_kv *header) { char *tmp; @@ -1224,15 +1489,137 @@ int flb_http_set_content_encoding_snappy(struct flb_http_client *c) return ret; } +static int http_client_clamp_connection_io_timeout(struct flb_http_client *c, + int timeout) +{ + struct flb_net_setup *net_setup; + + if (c == NULL || c->u_conn == NULL || timeout <= 0) { + return timeout; + } + + net_setup = c->original_net_setup; + if (net_setup == NULL) { + net_setup = c->u_conn->net; + } + + if (net_setup == NULL) { + return timeout; + } + + if (net_setup->io_timeout > 0 && net_setup->io_timeout < timeout) { + return net_setup->io_timeout; + } + + return timeout; +} + +static void http_client_update_connection_io_timeout(struct flb_http_client *c) +{ + struct flb_net_setup *net_setup; + int effective_timeout; + + if (c == NULL || c->u_conn == NULL) { + return; + } + + net_setup = c->original_net_setup; + if (net_setup == NULL) { + net_setup = c->u_conn->net; + } + if (net_setup == NULL && c->u_conn->upstream != NULL) { + net_setup = &c->u_conn->upstream->base.net; + } + if (net_setup == NULL) { + return; + } + + effective_timeout = net_setup->io_timeout; + + if (c->response_timeout > 0) { + effective_timeout = http_client_clamp_connection_io_timeout(c, + c->response_timeout); + } + + if (c->read_idle_timeout > 0) { + if (effective_timeout > 0) { + effective_timeout = http_client_clamp_connection_io_timeout(c, + c->read_idle_timeout < effective_timeout ? + c->read_idle_timeout : + effective_timeout); + } + else { + effective_timeout = http_client_clamp_connection_io_timeout(c, + c->read_idle_timeout); + } + } + + c->request_net_setup.io_timeout = effective_timeout; + + if (c->u_conn->fd > 0) { + flb_net_socket_set_rcvtimeout(c->u_conn->fd, + c->request_net_setup.io_timeout); + } +} + +static void http_client_bind_connection(struct flb_http_client *c, + struct flb_connection *u_conn) +{ + c->u_conn = u_conn; + c->original_net_setup = NULL; + + if (u_conn == NULL) { + return; + } + + c->original_net_setup = u_conn->net; + + if (c->original_net_setup == NULL && u_conn->upstream != NULL) { + c->original_net_setup = &u_conn->upstream->base.net; + } + + if (c->original_net_setup != NULL) { + c->request_net_setup = *c->original_net_setup; + c->u_conn->net = &c->request_net_setup; + http_client_update_connection_io_timeout(c); + } +} + +static void http_client_unbind_connection(struct flb_http_client *c) +{ + if (c == NULL || c->u_conn == NULL) { + return; + } + + if (c->original_net_setup != NULL && c->u_conn->net == &c->request_net_setup) { + c->u_conn->net = c->original_net_setup; + + if (c->u_conn->fd > 0) { + flb_net_socket_set_rcvtimeout(c->u_conn->fd, + c->original_net_setup->io_timeout); + } + } + + c->u_conn = NULL; + c->original_net_setup = NULL; +} + +void flb_http_client_detach_connection(struct flb_http_client *c) +{ + http_client_unbind_connection(c); +} + int flb_http_set_read_idle_timeout(struct flb_http_client *c, int timeout) { c->read_idle_timeout = timeout; + http_client_update_connection_io_timeout(c); return 0; } int flb_http_set_response_timeout(struct flb_http_client *c, int timeout) { c->response_timeout = timeout; + http_client_update_connection_io_timeout(c); return 0; } @@ -1527,6 +1914,32 @@ int flb_http_do_request(struct flb_http_client *c, size_t *bytes) return FLB_HTTP_MORE; } +static int http_client_response_timeout_reached(struct flb_http_client *c, + time_t now) +{ + if (c->response_timeout > 0 && (now - c->ts_start) >= c->response_timeout) { + flb_error("[http_client] response timeout reached (elapsed=%lds, limit=%ds)", + (long) (now - c->ts_start), c->response_timeout); + flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int http_client_read_idle_timeout_reached(struct flb_http_client *c, + time_t now) +{ + if (c->read_idle_timeout > 0 && (now - c->last_read_ts) >= c->read_idle_timeout) { + flb_error("[http_client] read idle timeout reached (idle=%lds, limit=%ds)", + (long) (now - c->last_read_ts), c->read_idle_timeout); + flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + return FLB_TRUE; + } + + return FLB_FALSE; +} + int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed) { /* returns @@ -1589,17 +2002,11 @@ int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed) } now = time(NULL); - if (c->response_timeout > 0 && (now - c->ts_start) > c->response_timeout) { - flb_error("[http_client] response timeout reached (elapsed=%lds, limit=%ds)", - (long)(now - c->ts_start), c->response_timeout); - flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + if (http_client_response_timeout_reached(c, now) == FLB_TRUE) { return FLB_HTTP_ERROR; } - if (c->read_idle_timeout > 0 && (now - c->last_read_ts) > c->read_idle_timeout) { - flb_error("[http_client] read idle timeout reached (idle=%lds, limit=%ds)", - (long)(now - c->last_read_ts), c->read_idle_timeout); - flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + if (http_client_read_idle_timeout_reached(c, now) == FLB_TRUE) { return FLB_HTTP_ERROR; } @@ -1607,9 +2014,28 @@ int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed) c->resp.data + c->resp.data_len, available); if (r_bytes <= 0) { + now = time(NULL); + if (c->flags & FLB_HTTP_10) { return FLB_HTTP_OK; } + + if (http_client_response_timeout_reached(c, now) == FLB_TRUE) { + return FLB_HTTP_ERROR; + } + + if (http_client_read_idle_timeout_reached(c, now) == FLB_TRUE) { + return FLB_HTTP_ERROR; + } + + if (c->u_conn != NULL && c->u_conn->net_error == ETIMEDOUT) { + flb_error("[http_client] upstream I/O timeout reached while " + "waiting for response from %s:%i", + c->u_conn->upstream->tcp_host, + c->u_conn->upstream->tcp_port); + flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + return FLB_HTTP_ERROR; + } } /* Always append a NULL byte */ @@ -1709,6 +2135,7 @@ int flb_http_do_with_oauth2(struct flb_http_client *c, size_t *bytes, { int ret; flb_sds_t token = NULL; + struct flb_connection *old_conn; struct flb_upstream *u; if (!oauth2 || oauth2->cfg.enabled == FLB_FALSE) { @@ -1740,8 +2167,10 @@ int flb_http_do_with_oauth2(struct flb_http_client *c, size_t *bytes, /* If connection was closed, get a new one */ if (c->resp.connection_close == FLB_TRUE && c->u_conn) { u = c->u_conn->upstream; - flb_upstream_conn_release(c->u_conn); - c->u_conn = flb_upstream_conn_get(u); + old_conn = c->u_conn; + http_client_unbind_connection(c); + flb_upstream_conn_release(old_conn); + http_client_bind_connection(c, flb_upstream_conn_get(u)); if (!c->u_conn) { return -1; } @@ -1820,8 +2249,10 @@ int flb_http_client_proxy_connect(struct flb_connection *u_conn) void flb_http_client_destroy(struct flb_http_client *c) { + http_client_unbind_connection(c); http_headers_destroy(c); flb_free(c->resp.data); + flb_free(c->resp.trailer_buf); flb_free(c->header_buf); flb_free((void *)c->proxy.host); flb_free(c); diff --git a/tests/integration/scenarios/out_http/config/out_http_chunked_basic.yaml b/tests/integration/scenarios/out_http/config/out_http_chunked_basic.yaml new file mode 100644 index 00000000000..949164954ce --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_chunked_basic.yaml @@ -0,0 +1,21 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http_chunked + dummy: '{"message":"hello from chunked out_http","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http_chunked + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false diff --git a/tests/integration/scenarios/out_http/config/out_http_chunked_read_idle_timeout.yaml b/tests/integration/scenarios/out_http/config/out_http_chunked_read_idle_timeout.yaml new file mode 100644 index 00000000000..e9643c7b5ea --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_chunked_read_idle_timeout.yaml @@ -0,0 +1,24 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http_chunked_read_idle_timeout + dummy: '{"message":"retry chunked response","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http_chunked_read_idle_timeout + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + http.response_timeout: 10s + http.read_idle_timeout: 2s + retry_limit: 1 diff --git a/tests/integration/scenarios/out_http/config/out_http_chunked_response_timeout.yaml b/tests/integration/scenarios/out_http/config/out_http_chunked_response_timeout.yaml new file mode 100644 index 00000000000..68c6b0e3a90 --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_chunked_response_timeout.yaml @@ -0,0 +1,23 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http_chunked_response_timeout + dummy: '{"message":"retry chunked response","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http_chunked_response_timeout + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + http.response_timeout: 2s + retry_limit: 1 diff --git a/tests/integration/scenarios/out_http/config/out_http_chunked_retry.yaml b/tests/integration/scenarios/out_http/config/out_http_chunked_retry.yaml new file mode 100644 index 00000000000..cc8db5cbe9c --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_chunked_retry.yaml @@ -0,0 +1,22 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http_chunked_retry + dummy: '{"message":"retry chunked response","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http_chunked_retry + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + retry_limit: 1 diff --git a/tests/integration/scenarios/out_http/config/out_http_oauth2_timeout.yaml b/tests/integration/scenarios/out_http/config/out_http_oauth2_timeout.yaml new file mode 100644 index 00000000000..97418dcfdd9 --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_oauth2_timeout.yaml @@ -0,0 +1,29 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http + dummy: '{"message":"hello from oauth2 timeout","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + retry_limit: 1 + http.response_timeout: 10s + oauth2.enable: true + oauth2.token_url: http://127.0.0.1:${TEST_SUITE_HTTP_PORT}/oauth/token + oauth2.client_id: client1 + oauth2.client_secret: secret1 + oauth2.scope: logs.write + oauth2.timeout: 2s diff --git a/tests/integration/scenarios/out_http/config/out_http_tls_read_idle_timeout.yaml b/tests/integration/scenarios/out_http/config/out_http_tls_read_idle_timeout.yaml new file mode 100644 index 00000000000..a029d9c2eef --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_tls_read_idle_timeout.yaml @@ -0,0 +1,29 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http_tls + dummy: '{"message":"hello over tls timeout","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http_tls + host: localhost + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + retry_limit: 1 + http.response_timeout: 10s + http.read_idle_timeout: 2s + tls: on + tls.verify: on + tls.verify_hostname: on + tls.vhost: localhost + tls.ca_file: ${CERTIFICATE_TEST} diff --git a/tests/integration/scenarios/out_http/config/out_http_tls_response_timeout.yaml b/tests/integration/scenarios/out_http/config/out_http_tls_response_timeout.yaml new file mode 100644 index 00000000000..4128c4ce754 --- /dev/null +++ b/tests/integration/scenarios/out_http/config/out_http_tls_response_timeout.yaml @@ -0,0 +1,28 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_http_tls + dummy: '{"message":"hello over tls timeout","source":"dummy"}' + samples: 1 + + outputs: + - name: http + match: out_http_tls + host: localhost + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + retry_limit: 1 + http.response_timeout: 2s + tls: on + tls.verify: on + tls.verify_hostname: on + tls.vhost: localhost + tls.ca_file: ${CERTIFICATE_TEST} diff --git a/tests/integration/scenarios/out_http/tests/test_out_http_001.py b/tests/integration/scenarios/out_http/tests/test_out_http_001.py index 52f444b9765..01dea075922 100644 --- a/tests/integration/scenarios/out_http/tests/test_out_http_001.py +++ b/tests/integration/scenarios/out_http/tests/test_out_http_001.py @@ -1,6 +1,7 @@ import json import logging import os +import time import requests @@ -9,19 +10,49 @@ configure_oauth_token_response, data_storage, http_server_run, + server_instances, ) from utils.test_service import FluentBitTestService logger = logging.getLogger(__name__) +def _wait_for_http_server(port, timeout=5): + deadline = time.time() + timeout + + while time.time() < deadline: + try: + response = requests.get(f"http://127.0.0.1:{port}/ping", timeout=1) + if response.status_code == 200: + return + except requests.RequestException: + pass + + time.sleep(0.1) + + raise TimeoutError(f"Timed out waiting for HTTP server on port {port}") + + +def _wait_for_http_server_port(timeout=5): + deadline = time.time() + timeout + + while time.time() < deadline: + if server_instances: + return server_instances[-1].server_port + + time.sleep(0.1) + + raise TimeoutError("Timed out waiting for HTTP server port assignment") + class Service: - def __init__(self, config_file): + def __init__(self, config_file, *, response_setup=None, use_tls=False): self.config_file = os.path.abspath(os.path.join(os.path.dirname(__file__), "../config", config_file)) test_path = os.path.dirname(os.path.abspath(__file__)) cert_dir = os.path.abspath(os.path.join(test_path, "../../in_splunk/certificate")) self.tls_crt_file = os.path.join(cert_dir, "certificate.pem") self.tls_key_file = os.path.join(cert_dir, "private_key.pem") + self.response_setup = response_setup + self.use_tls = use_tls self.service = FluentBitTestService( self.config_file, data_storage=data_storage, @@ -35,16 +66,53 @@ def __init__(self, config_file): ) def _start_receiver(self, service): - http_server_run(service.test_suite_http_port) - self.service.wait_for_http_endpoint( - f"http://127.0.0.1:{service.test_suite_http_port}/ping", - timeout=10, - interval=0.5, + http_server_run( + service.test_suite_http_port, + use_tls=self.use_tls, + tls_crt_file=self.tls_crt_file, + tls_key_file=self.tls_key_file, ) + if self.response_setup is not None: + self.response_setup() + + if self.use_tls: + def _https_ready(): + try: + response = requests.get( + f"https://localhost:{service.test_suite_http_port}/ping", + timeout=1, + verify=self.tls_crt_file, + ) + return response.status_code == 200 + except requests.RequestException: + return False + + self.service.wait_for_condition( + _https_ready, + timeout=10, + interval=0.5, + description="HTTPS out_http receiver readiness", + ) + else: + self.service.wait_for_http_endpoint( + f"http://127.0.0.1:{service.test_suite_http_port}/ping", + timeout=10, + interval=0.5, + ) def _stop_receiver(self, service): try: - requests.post(f"http://127.0.0.1:{service.test_suite_http_port}/shutdown", timeout=2) + if self.use_tls: + requests.post( + f"https://localhost:{service.test_suite_http_port}/shutdown", + timeout=2, + verify=self.tls_crt_file, + ) + else: + requests.post( + f"http://127.0.0.1:{service.test_suite_http_port}/shutdown", + timeout=2, + ) except requests.RequestException: pass @@ -65,6 +133,26 @@ def wait_for_requests(self, minimum_count, timeout=10): description=f"{minimum_count} outbound HTTP requests", ) + def wait_for_log_message(self, pattern, timeout=10): + def _read_log(): + if not os.path.exists(self.flb.log_file): + return None + + with open(self.flb.log_file, encoding="utf-8", errors="replace") as log_file: + contents = log_file.read() + + if pattern in contents: + return contents + + return None + + return self.service.wait_for_condition( + _read_log, + timeout=timeout, + interval=0.25, + description=f"log message '{pattern}'", + ) + def test_out_http_sends_json_payload(): service = Service("out_http_basic.yaml") @@ -138,3 +226,177 @@ def test_out_http_oauth2_private_key_jwt_adds_bearer_token(): assert "client_assertion=" in token_request["raw_data"] assert "client_id=client1" in token_request["raw_data"] assert data_request["headers"].get("Authorization") == "Bearer oauth-access-token" + + +def test_out_http_oauth2_timeout_retries_hung_token_endpoint(): + service = Service( + "out_http_oauth2_timeout.yaml", + response_setup=lambda: configure_oauth_token_response( + hang_before_response=True, + ), + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + log_text = service.wait_for_log_message("response timeout reached", timeout=15) + service.stop() + + token_requests = [request for request in requests_seen if request["path"] == "/oauth/token"] + data_requests = [request for request in requests_seen if request["path"] == "/data"] + + assert len(token_requests) >= 2 + assert len(data_requests) == 0 + assert "response timeout reached" in log_text + + +def test_out_http_oauth2_read_idle_timeout_retries_partial_token_response(): + service = Service( + "out_http_oauth2_timeout.yaml", + response_setup=lambda: configure_oauth_token_response( + stream_fragments=[ + '{"access_token":"partial', + ], + hang_after_fragment_index=0, + ), + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + log_text = service.wait_for_log_message("response timeout reached", timeout=15) + service.stop() + + token_requests = [request for request in requests_seen if request["path"] == "/oauth/token"] + data_requests = [request for request in requests_seen if request["path"] == "/data"] + + assert len(token_requests) >= 2 + assert len(data_requests) == 0 + assert "response timeout reached" in log_text + + +def test_out_http_tls_response_timeout_retries_hung_server(): + service = Service( + "out_http_tls_response_timeout.yaml", + response_setup=lambda: configure_http_response( + hang_before_response=True, + ), + use_tls=True, + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + log_text = service.wait_for_log_message("response timeout reached", timeout=15) + service.stop() + + assert len(requests_seen) >= 2 + assert "response timeout reached" in log_text + + +def test_out_http_tls_read_idle_timeout_retries_partial_response(): + service = Service( + "out_http_tls_read_idle_timeout.yaml", + response_setup=lambda: configure_http_response( + stream_fragments=[ + '{"status":"par', + ], + hang_after_fragment_index=0, + ), + use_tls=True, + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + log_text = service.wait_for_log_message("read idle timeout reached", timeout=15) + service.stop() + + assert len(requests_seen) >= 2 + assert "read idle timeout reached" in log_text + + +def test_http_server_configure_helpers_allow_clearing_nullable_fields(): + http_server_run(0, reset_state=True) + port = _wait_for_http_server_port() + _wait_for_http_server(port) + + try: + configure_http_response( + stream_fragments=["part"], + hang_after_fragment_index=0, + ) + configure_http_response( + stream_fragments=None, + hang_after_fragment_index=None, + ) + + response = requests.post( + f"http://127.0.0.1:{port}/data", + json={"test": "clear"}, + timeout=5, + ) + + assert response.status_code == 200 + assert response.json() == {"status": "received"} + finally: + try: + requests.post(f"http://127.0.0.1:{port}/shutdown", timeout=2) + except requests.RequestException: + pass + + +def test_http_server_oauth_token_honors_explicit_content_type_and_raw_body(): + http_server_run(0, reset_state=True) + port = _wait_for_http_server_port() + _wait_for_http_server(port) + + try: + configure_oauth_token_response( + stream_fragments=["partial-token"], + hang_after_fragment_index=0, + ) + configure_oauth_token_response( + body="not-json", + content_type="text/plain", + stream_fragments=None, + hang_after_fragment_index=None, + ) + + response = requests.post( + f"http://127.0.0.1:{port}/oauth/token", + data="grant_type=client_credentials", + timeout=5, + ) + + assert response.status_code == 200 + assert response.text == "not-json" + assert response.headers["Content-Type"].startswith("text/plain") + finally: + try: + requests.post(f"http://127.0.0.1:{port}/shutdown", timeout=2) + except requests.RequestException: + pass + + +def test_http_server_oauth_token_honors_explicit_json_content_type(): + http_server_run(0, reset_state=True) + port = _wait_for_http_server_port() + _wait_for_http_server(port) + + try: + configure_oauth_token_response( + body={"access_token": "json-token", "token_type": "Bearer"}, + content_type="application/json; charset=utf-8", + ) + + response = requests.post( + f"http://127.0.0.1:{port}/oauth/token", + data="grant_type=client_credentials", + timeout=5, + ) + + assert response.status_code == 200 + assert response.json()["access_token"] == "json-token" + assert "application/json" in response.headers["Content-Type"] + finally: + try: + requests.post(f"http://127.0.0.1:{port}/shutdown", timeout=2) + except requests.RequestException: + pass diff --git a/tests/integration/scenarios/out_http/tests/test_out_http_chunked_response_001.py b/tests/integration/scenarios/out_http/tests/test_out_http_chunked_response_001.py new file mode 100644 index 00000000000..9e4a6504423 --- /dev/null +++ b/tests/integration/scenarios/out_http/tests/test_out_http_chunked_response_001.py @@ -0,0 +1,293 @@ +import json +import logging +import os + +from server.chunked_http_server import ( + chunked_http_server_run, + chunked_response_storage, + configure_chunked_http_response, + shutdown_chunked_http_server, +) +from utils.test_service import FluentBitTestService + + +logger = logging.getLogger(__name__) + + +class Service: + def __init__(self, config_file, *, response_setup=None): + self.config_file = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../config", config_file) + ) + self.response_setup = response_setup + self.service = FluentBitTestService( + self.config_file, + data_storage=chunked_response_storage, + data_keys=["payloads", "requests"], + pre_start=self._start_receiver, + post_stop=self._stop_receiver, + ) + + def _start_receiver(self, service): + chunked_http_server_run(service.test_suite_http_port) + if self.response_setup is not None: + self.response_setup() + self.service.wait_for_http_endpoint( + f"http://127.0.0.1:{service.test_suite_http_port}/ping", + timeout=10, + interval=0.5, + ) + + def _stop_receiver(self, service): + shutdown_chunked_http_server() + + def start(self): + self.service.start() + self.flb = self.service.flb + + def stop(self): + self.service.stop() + + def wait_for_requests(self, minimum_count, timeout=10): + return self.service.wait_for_condition( + lambda: chunked_response_storage["requests"] + if len(chunked_response_storage["requests"]) >= minimum_count + else None, + timeout=timeout, + interval=0.5, + description=f"{minimum_count} outbound HTTP requests", + ) + + def wait_for_log_message(self, pattern, timeout=10): + def _read_log(): + if not os.path.exists(self.flb.log_file): + return None + + with open(self.flb.log_file, encoding="utf-8", errors="replace") as log_file: + contents = log_file.read() + + if pattern in contents: + return contents + + return None + + return self.service.wait_for_condition( + _read_log, + timeout=timeout, + interval=0.25, + description=f"log message '{pattern}'", + ) + + +def _assert_payload(request_record): + assert request_record["path"] == "/data" + assert request_record["method"] == "POST" + assert "application/json" in request_record["headers"].get("Content-Type", "") + + payload = json.loads(request_record["raw_data"]) + assert isinstance(payload, list) + assert payload[0]["message"] in { + "hello from chunked out_http", + "retry chunked response", + } + assert payload[0]["source"] == "dummy" + + +def test_out_http_accepts_basic_chunked_response(): + service = Service( + "out_http_chunked_basic.yaml", + response_setup=lambda: configure_chunked_http_response( + fragments=[ + "2\r\nOK\r\n", + "0\r\n\r\n", + ] + ), + ) + service.start() + + requests_seen = service.wait_for_requests(1) + service.stop() + + assert len(requests_seen) == 1 + _assert_payload(requests_seen[0]) + + +def test_out_http_accepts_chunked_response_with_trailers_and_extensions(): + service = Service( + "out_http_chunked_basic.yaml", + response_setup=lambda: configure_chunked_http_response( + headers=[ + ("Transfer-Encoding", "chunked"), + ("Trailer", "Expires, X-Trace"), + ("Connection", "close"), + ], + fragments=[ + "4;foo=bar\r\nWiki\r\n", + "5\r\npedia\r\n", + "0;done=yes\r\nExpires: tomorrow\r\nX-Trace: abc\r\n\r\n", + ], + ), + ) + service.start() + + requests_seen = service.wait_for_requests(1) + service.stop() + + assert len(requests_seen) == 1 + _assert_payload(requests_seen[0]) + + +def test_out_http_accepts_fragmented_chunked_terminal_sequence(): + service = Service( + "out_http_chunked_basic.yaml", + response_setup=lambda: configure_chunked_http_response( + headers=[ + ("Transfer-Encoding", "chunked"), + ("Trailer", "X-Trace"), + ("Connection", "close"), + ], + fragments=[ + "4\r\nWi", + "ki\r\n", + "5\r\npedia\r\n", + "0\r\n", + "X-Trace: stream\r\n", + "\r\n", + ], + ), + ) + service.start() + + requests_seen = service.wait_for_requests(1) + service.stop() + + assert len(requests_seen) == 1 + _assert_payload(requests_seen[0]) + + +def test_out_http_retries_when_chunked_trailer_block_is_invalid(): + service = Service( + "out_http_chunked_retry.yaml", + response_setup=lambda: configure_chunked_http_response( + fragments=[ + "4\r\nWiki\r\n", + "0\r\nBroken-Trailer\r\n\r\n", + ] + ), + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + service.stop() + + assert len(requests_seen) >= 2 + _assert_payload(requests_seen[0]) + + +def test_out_http_accepts_uppercase_hex_and_whitespace_chunk_size(): + service = Service( + "out_http_chunked_basic.yaml", + response_setup=lambda: configure_chunked_http_response( + fragments=[ + " 2 ;foo=bar\r\nOK\r\n", + "0\r\n\r\n", + ] + ), + ) + service.start() + + requests_seen = service.wait_for_requests(1) + service.stop() + + assert len(requests_seen) == 1 + _assert_payload(requests_seen[0]) + + +def test_out_http_accepts_empty_terminal_chunk_split_from_final_crlf(): + service = Service( + "out_http_chunked_basic.yaml", + response_setup=lambda: configure_chunked_http_response( + fragments=[ + "2\r\nOK\r\n", + "0\r\n", + "\r\n", + ] + ), + ) + service.start() + + requests_seen = service.wait_for_requests(1) + service.stop() + + assert len(requests_seen) == 1 + _assert_payload(requests_seen[0]) + + +def test_out_http_accepts_multi_stage_trailer_delivery(): + service = Service( + "out_http_chunked_basic.yaml", + response_setup=lambda: configure_chunked_http_response( + headers=[ + ("Transfer-Encoding", "chunked"), + ("Trailer", "X-One, X-Two"), + ("Connection", "close"), + ], + fragments=[ + "2\r\nOK\r\n", + "0\r\n", + "X-One: 1\r\n", + "X-Two: 2\r\n", + "\r\n", + ], + ), + ) + service.start() + + requests_seen = service.wait_for_requests(1) + service.stop() + + assert len(requests_seen) == 1 + _assert_payload(requests_seen[0]) + + +def test_out_http_response_timeout_retries_hung_server(): + service = Service( + "out_http_chunked_response_timeout.yaml", + response_setup=lambda: configure_chunked_http_response( + hang_before_headers=True, + ), + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + log_text = service.wait_for_log_message("response timeout reached", timeout=15) + service.stop() + + assert len(requests_seen) >= 2 + assert "response timeout reached" in log_text + _assert_payload(requests_seen[0]) + + +def test_out_http_read_idle_timeout_retries_stalled_chunked_response(): + service = Service( + "out_http_chunked_read_idle_timeout.yaml", + response_setup=lambda: configure_chunked_http_response( + headers=[ + ("Transfer-Encoding", "chunked"), + ("Connection", "close"), + ], + fragments=[ + "4\r\nWiki\r\n", + ], + hang_after_fragment_index=0, + ), + ) + service.start() + + requests_seen = service.wait_for_requests(2, timeout=15) + log_text = service.wait_for_log_message("read idle timeout reached", timeout=15) + service.stop() + + assert len(requests_seen) >= 2 + assert "read idle timeout reached" in log_text + _assert_payload(requests_seen[0]) diff --git a/tests/integration/src/server/chunked_http_server.py b/tests/integration/src/server/chunked_http_server.py new file mode 100644 index 00000000000..c7b6357459c --- /dev/null +++ b/tests/integration/src/server/chunked_http_server.py @@ -0,0 +1,244 @@ +# Fluent Bit +# ========== +# Copyright (C) 2015-2026 The Fluent Bit Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +import logging +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + + +logger = logging.getLogger(__name__) + +chunked_response_storage = {"payloads": [], "requests": []} +chunked_response_config = { + "status_code": 200, + "reason": "OK", + "headers": [ + ("Transfer-Encoding", "chunked"), + ("Connection", "close"), + ], + "fragments": [ + "2\r\nOK\r\n", + "0\r\n\r\n", + ], + "delay_seconds": 0.0, + "fragment_delay_seconds": 0.01, + "hang_before_headers": False, + "hang_after_fragment_index": None, +} + +server_thread = None +server_instance = None +shutdown_event = threading.Event() + + +class ChunkedThreadingHTTPServer(ThreadingHTTPServer): + daemon_threads = True + allow_reuse_address = True + + +def _sleep_interruptible(seconds): + if seconds <= 0: + return + + deadline = time.time() + seconds + + while time.time() < deadline: + if shutdown_event.wait(timeout=min(0.1, deadline - time.time())): + break + + +def _wait_for_shutdown(): + shutdown_event.wait(timeout=30) + + +def reset_chunked_http_server_state(): + shutdown_event.clear() + chunked_response_storage["payloads"] = [] + chunked_response_storage["requests"] = [] + chunked_response_config.update( + { + "status_code": 200, + "reason": "OK", + "headers": [ + ("Transfer-Encoding", "chunked"), + ("Connection", "close"), + ], + "fragments": [ + "2\r\nOK\r\n", + "0\r\n\r\n", + ], + "delay_seconds": 0.0, + "fragment_delay_seconds": 0.01, + "hang_before_headers": False, + "hang_after_fragment_index": None, + } + ) + + +def configure_chunked_http_response(*, status_code=None, reason=None, + headers=None, fragments=None, + delay_seconds=None, + fragment_delay_seconds=None, + hang_before_headers=None, + hang_after_fragment_index=None): + if status_code is not None: + chunked_response_config["status_code"] = status_code + if reason is not None: + chunked_response_config["reason"] = reason + if headers is not None: + chunked_response_config["headers"] = list(headers) + if fragments is not None: + chunked_response_config["fragments"] = list(fragments) + if delay_seconds is not None: + chunked_response_config["delay_seconds"] = delay_seconds + if fragment_delay_seconds is not None: + chunked_response_config["fragment_delay_seconds"] = fragment_delay_seconds + if hang_before_headers is not None: + chunked_response_config["hang_before_headers"] = hang_before_headers + if hang_after_fragment_index is not None: + chunked_response_config["hang_after_fragment_index"] = hang_after_fragment_index + + +def _decode_json_payload(decoded_payload): + if not decoded_payload: + return None + + try: + return json.loads(decoded_payload.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + return None + + +def _record_request(handler, payload): + raw_data = payload.decode("utf-8", errors="replace") + chunked_response_storage["payloads"].append(_decode_json_payload(payload)) + chunked_response_storage["requests"].append( + { + "path": handler.path, + "method": handler.command, + "headers": dict(handler.headers), + "raw_data": raw_data, + "decoded_data": raw_data, + "json": _decode_json_payload(payload), + } + ) + + +class ChunkedHTTPHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def log_message(self, fmt, *args): + logger.debug("chunked_http_server: " + fmt, *args) + + def do_GET(self): + if self.path == "/ping": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len('{"status":"pong"}'))) + self.end_headers() + self.wfile.write(b'{"status":"pong"}') + self.wfile.flush() + return + + if self.path == "/shutdown": + self.send_response(200) + self.send_header("Content-Length", "0") + self.end_headers() + self.wfile.flush() + threading.Thread(target=self.server.shutdown, daemon=True).start() + return + + self.send_response(404) + self.send_header("Content-Length", "0") + self.end_headers() + + def do_POST(self): + content_length = int(self.headers.get("Content-Length", "0")) + payload = self.rfile.read(content_length) if content_length > 0 else b"" + + _record_request(self, payload) + + if chunked_response_config["delay_seconds"] > 0: + _sleep_interruptible(chunked_response_config["delay_seconds"]) + + if chunked_response_config["hang_before_headers"]: + _wait_for_shutdown() + self.close_connection = True + return + + status_code = chunked_response_config["status_code"] + reason = chunked_response_config["reason"] + + self.connection.sendall( + f"HTTP/1.1 {status_code} {reason}\r\n".encode("utf-8") + ) + + for key, value in chunked_response_config["headers"]: + self.connection.sendall(f"{key}: {value}\r\n".encode("utf-8")) + + self.connection.sendall(b"\r\n") + + for fragment in chunked_response_config["fragments"]: + if isinstance(fragment, str): + fragment = fragment.encode("utf-8") + + self.connection.sendall(fragment) + if chunked_response_config["hang_after_fragment_index"] == 0: + _wait_for_shutdown() + self.close_connection = True + return + + if chunked_response_config["hang_after_fragment_index"] is not None: + chunked_response_config["hang_after_fragment_index"] -= 1 + + _sleep_interruptible(chunked_response_config["fragment_delay_seconds"]) + + self.close_connection = True + + +def _serve(port): + global server_instance + + server_instance = ChunkedThreadingHTTPServer(("127.0.0.1", port), + ChunkedHTTPHandler) + server_instance.serve_forever() + + +def chunked_http_server_run(port=60000, *, reset_state=True): + global server_thread + + if reset_state: + reset_chunked_http_server_state() + + logger.info("Starting chunked HTTP server on port %s", port) + server_thread = threading.Thread(target=_serve, args=(port,), daemon=True) + server_thread.start() + return server_thread + + +def shutdown_chunked_http_server(): + global server_instance + + shutdown_event.set() + + if server_instance is not None: + server_instance.shutdown() + server_instance.server_close() + server_instance = None diff --git a/tests/integration/src/server/http_server.py b/tests/integration/src/server/http_server.py index de3d7972256..7295fffe12d 100644 --- a/tests/integration/src/server/http_server.py +++ b/tests/integration/src/server/http_server.py @@ -40,9 +40,19 @@ "body": {"status": "received"}, "content_type": "application/json", "delay_seconds": 0, + "stream_fragments": None, + "fragment_delay_seconds": 0, + "hang_before_response": False, + "hang_after_fragment_index": None, } oauth_token_response = { "status_code": 200, + "content_type": "application/json", + "delay_seconds": 0, + "hang_before_response": False, + "stream_fragments": None, + "fragment_delay_seconds": 0, + "hang_after_fragment_index": None, "body": { "access_token": "oauth-access-token", "token_type": "Bearer", @@ -52,9 +62,27 @@ logger = logging.getLogger(__name__) server_thread = None server_instances = [] +shutdown_event = threading.Event() +UNSET = object() + + +def _sleep_interruptible(seconds): + if seconds <= 0: + return + + deadline = time.time() + seconds + + while time.time() < deadline: + if shutdown_event.wait(timeout=min(0.1, deadline - time.time())): + break + + +def _wait_for_shutdown(): + shutdown_event.wait(timeout=30) def reset_http_server_state(): + shutdown_event.clear() data_storage["payloads"] = [] data_storage["requests"] = [] server_instances.clear() @@ -64,11 +92,21 @@ def reset_http_server_state(): "body": {"status": "received"}, "content_type": "application/json", "delay_seconds": 0, + "stream_fragments": None, + "fragment_delay_seconds": 0, + "hang_before_response": False, + "hang_after_fragment_index": None, } ) oauth_token_response.update( { "status_code": 200, + "content_type": "application/json", + "delay_seconds": 0, + "hang_before_response": False, + "stream_fragments": None, + "fragment_delay_seconds": 0, + "hang_after_fragment_index": None, "body": { "access_token": "oauth-access-token", "token_type": "Bearer", @@ -78,27 +116,93 @@ def reset_http_server_state(): ) -def configure_http_response(*, status_code=None, body=None, content_type=None, delay_seconds=None): - if status_code is not None: +def configure_http_response(*, status_code=UNSET, body=UNSET, content_type=UNSET, + delay_seconds=UNSET, stream_fragments=UNSET, + fragment_delay_seconds=UNSET, + hang_before_response=UNSET, + hang_after_fragment_index=UNSET): + if status_code is not UNSET: response_config["status_code"] = status_code - if body is not None: + if body is not UNSET: response_config["body"] = body - if content_type is not None: + if content_type is not UNSET: response_config["content_type"] = content_type - if delay_seconds is not None: + if delay_seconds is not UNSET: response_config["delay_seconds"] = delay_seconds - - -def configure_oauth_token_response(*, status_code=None, body=None): - if status_code is not None: + if stream_fragments is not UNSET: + response_config["stream_fragments"] = None if stream_fragments is None else list(stream_fragments) + if fragment_delay_seconds is not UNSET: + response_config["fragment_delay_seconds"] = fragment_delay_seconds + if hang_before_response is not UNSET: + response_config["hang_before_response"] = hang_before_response + if hang_after_fragment_index is not UNSET: + response_config["hang_after_fragment_index"] = hang_after_fragment_index + + +def configure_oauth_token_response(*, status_code=UNSET, body=UNSET, + content_type=UNSET, + delay_seconds=UNSET, + hang_before_response=UNSET, + stream_fragments=UNSET, + fragment_delay_seconds=UNSET, + hang_after_fragment_index=UNSET): + if status_code is not UNSET: oauth_token_response["status_code"] = status_code - if body is not None: + if body is not UNSET: oauth_token_response["body"] = body + if content_type is not UNSET: + oauth_token_response["content_type"] = content_type + if delay_seconds is not UNSET: + oauth_token_response["delay_seconds"] = delay_seconds + if hang_before_response is not UNSET: + oauth_token_response["hang_before_response"] = hang_before_response + if stream_fragments is not UNSET: + oauth_token_response["stream_fragments"] = None if stream_fragments is None else list(stream_fragments) + if fragment_delay_seconds is not UNSET: + oauth_token_response["fragment_delay_seconds"] = fragment_delay_seconds + if hang_after_fragment_index is not UNSET: + oauth_token_response["hang_after_fragment_index"] = hang_after_fragment_index + + +def _stream_fragments(config): + hang_after_fragment_index = config["hang_after_fragment_index"] + + for fragment in config["stream_fragments"]: + if isinstance(fragment, str): + fragment = fragment.encode("utf-8") + + yield fragment + + if hang_after_fragment_index == 0: + _wait_for_shutdown() + return + + if hang_after_fragment_index is not None: + hang_after_fragment_index -= 1 + + if config["fragment_delay_seconds"]: + _sleep_interruptible(config["fragment_delay_seconds"]) + + +def _build_streaming_response(config): + return Response( + _stream_fragments(config), + status=config["status_code"], + content_type=config["content_type"], + direct_passthrough=True, + ) def _build_response(): if response_config["delay_seconds"]: - time.sleep(response_config["delay_seconds"]) + _sleep_interruptible(response_config["delay_seconds"]) + + if response_config["hang_before_response"]: + _wait_for_shutdown() + return Response(status=503) + + if response_config["stream_fragments"] is not None: + return _build_streaming_response(response_config) body = response_config["body"] if isinstance(body, (dict, list)): @@ -174,7 +278,33 @@ def jwks(): @app.route('/oauth/token', methods=['POST']) def oauth_token(): _record_request() - return jsonify(oauth_token_response["body"]), oauth_token_response["status_code"] + if oauth_token_response["delay_seconds"]: + _sleep_interruptible(oauth_token_response["delay_seconds"]) + if oauth_token_response["hang_before_response"]: + _wait_for_shutdown() + return Response(status=503) + if oauth_token_response["stream_fragments"] is not None: + return _build_streaming_response(oauth_token_response) + + body = oauth_token_response["body"] + content_type = oauth_token_response.get("content_type") + normalized_content_type = None + + if content_type is not None: + normalized_content_type = content_type.split(";", 1)[0].strip().lower() + + if isinstance(body, (dict, list)) and ( + normalized_content_type is None or + normalized_content_type == "application/json" or + normalized_content_type.endswith("+json") + ): + return jsonify(body), oauth_token_response["status_code"] + + return Response( + body, + status=oauth_token_response["status_code"], + content_type=content_type, + ) @app.route('/ping', methods=['GET']) @@ -185,6 +315,7 @@ def ping(): @app.route('/shutdown', methods=['POST']) def shutdown(): logger.info("HTTP server shutdown requested") + shutdown_event.set() for server_instance in list(server_instances): threading.Thread(target=server_instance.shutdown, daemon=True).start() return jsonify({"status": "shutting down"}), 200 @@ -195,7 +326,9 @@ def run_server(port=60000, *, use_tls=False, tls_crt_file=None, tls_key_file=Non if use_tls: ssl_context = (tls_crt_file, tls_key_file) - server_instance = make_server("0.0.0.0", port, app, ssl_context=ssl_context) + server_instance = make_server("0.0.0.0", port, app, + threaded=True, + ssl_context=ssl_context) server_instances.append(server_instance) server_instance.serve_forever() diff --git a/tests/internal/http_client.c b/tests/internal/http_client.c index 6e04150c201..780446f87cb 100644 --- a/tests/internal/http_client.c +++ b/tests/internal/http_client.c @@ -638,6 +638,481 @@ void test_https_ipv6_zone_id_non_standard_port_host_header() test_tls_host_header_format("fe80::1%eth0", 8443, "[fe80::1]:8443"); } +static void append_response_fragment(struct flb_http_client *c, + const char *fragment) +{ + size_t length; + + length = strlen(fragment); + memcpy(c->resp.data + c->resp.data_len, fragment, length); + c->resp.data_len += length; + c->resp.data[c->resp.data_len] = '\0'; +} + +void test_http_response_header_lookup() +{ + int ret; + flb_sds_t value; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 204 No Content\r\n" + "X-Trace: abc123\r\n" + "\r\n"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_OK); + + value = flb_http_get_response_header(c, "X-Trace", 7); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "abc123") == 0); + flb_sds_destroy(value); + } + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_trailers() +{ + int ret; + flb_sds_t value; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "Trailer: Expires, X-Trace\r\n" + "\r\n" + "4;foo=bar\r\n" + "Wiki\r\n" + "5\r\n" + "pedia\r\n" + "0;done=yes\r\n" + "Expires: tomorrow\r\n" + "X-Trace: abc\r\n" + "\r\n"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_OK); + TEST_CHECK(c->resp.payload_size == strlen("Wikipedia")); + TEST_CHECK(strncmp(c->resp.payload, "Wikipedia", strlen("Wikipedia")) == 0); + + value = flb_http_get_response_header(c, "Transfer-Encoding", 17); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "chunked") == 0); + flb_sds_destroy(value); + } + + value = flb_http_get_response_header(c, "Expires", 7); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "tomorrow") == 0); + flb_sds_destroy(value); + } + + value = flb_http_get_response_header(c, "X-Trace", 7); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "abc") == 0); + flb_sds_destroy(value); + } + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_incremental() +{ + int ret; + flb_sds_t value; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "4\r\n" + "Wi"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_MORE); + + append_response_fragment(c, "ki\r\n5\r\npedia\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_CHUNK_AVAILABLE); + TEST_CHECK(c->resp.payload_size == strlen("Wikipedia")); + TEST_CHECK(strncmp(c->resp.payload, "Wikipedia", strlen("Wikipedia")) == 0); + + append_response_fragment(c, "0\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_MORE); + + append_response_fragment(c, "X-Trace: stream\r\n\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_OK); + + value = flb_http_get_response_header(c, "X-Trace", 7); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "stream") == 0); + flb_sds_destroy(value); + } + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_invalid_trailer() +{ + int ret; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "4\r\n" + "Wiki\r\n" + "0\r\n" + "Broken-Trailer\r\n" + "\r\n"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_ERROR); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_empty_terminal_split() +{ + int ret; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "4\r\n" + "Wiki\r\n" + "0\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE); + + append_response_fragment(c, "\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_OK); + TEST_CHECK(c->resp.payload_size == strlen("Wiki")); + TEST_CHECK(strncmp(c->resp.payload, "Wiki", strlen("Wiki")) == 0); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_uppercase_hex_whitespace() +{ + int ret; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + " A ;foo=bar\r\n" + "0123456789\r\n" + "0\r\n\r\n"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_OK); + TEST_CHECK(c->resp.payload_size == strlen("0123456789")); + TEST_CHECK(strncmp(c->resp.payload, "0123456789", strlen("0123456789")) == 0); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_multi_stage_trailers() +{ + int ret; + flb_sds_t value; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "4\r\n" + "Wiki\r\n" + "0\r\n" + "X-One: 1\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_CHUNK_AVAILABLE); + + append_response_fragment(c, "X-Two: 2\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_MORE); + + append_response_fragment(c, "\r\n"); + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_OK); + + value = flb_http_get_response_header(c, "X-One", 5); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "1") == 0); + flb_sds_destroy(value); + } + + value = flb_http_get_response_header(c, "X-Two", 5); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "2") == 0); + flb_sds_destroy(value); + } + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_partial_trailer_preserves_chunk_available() +{ + int ret; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "4\r\n" + "Wiki\r\n" + "0\r\n" + "X-Trace: partial"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_CHUNK_AVAILABLE); + TEST_CHECK(c->resp.payload_size == strlen("Wiki")); + TEST_CHECK(strncmp(c->resp.payload, "Wiki", strlen("Wiki")) == 0); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_invalid_size_suffix() +{ + int ret; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "1g\r\n" + "A\r\n" + "0\r\n\r\n"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_ERROR); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_response_chunked_oversized_length() +{ + int ret; + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + append_response_fragment(c, + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "ffffffffffffffffffffffffffffffff\r\n"); + + ret = flb_http_client_process_response_buffer(c); + TEST_CHECK(ret == FLB_HTTP_ERROR); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + +void test_http_timeout_setters_preserve_upstream_io_timeout() +{ + struct test_ctx *ctx; + struct flb_http_client *c; + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + exit(EXIT_FAILURE); + } + + ctx->u->base.net.io_timeout = 10; + + c = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", 80, NULL, FLB_HTTP_11); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("flb_http_client failed"); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + TEST_CHECK(flb_http_set_response_timeout(c, 5) == 0); + TEST_CHECK(ctx->u->base.net.io_timeout == 10); + TEST_CHECK(c->u_conn->net->io_timeout == 5); + + TEST_CHECK(flb_http_set_read_idle_timeout(c, 3) == 0); + TEST_CHECK(ctx->u->base.net.io_timeout == 10); + TEST_CHECK(c->u_conn->net->io_timeout == 3); + + TEST_CHECK(flb_http_set_response_timeout(c, 8) == 0); + TEST_CHECK(ctx->u->base.net.io_timeout == 10); + TEST_CHECK(c->u_conn->net->io_timeout == 3); + + flb_http_client_destroy(c); + test_ctx_destroy(ctx); +} + TEST_LIST = { { "http_buffer_increase" , test_http_buffer_increase}, { "add_get_header" , test_http_add_get_header}, @@ -661,5 +1136,16 @@ TEST_LIST = { { "https_ipv6_non_standard_port_host_header", test_https_ipv6_non_standard_port_host_header}, { "https_ipv6_zone_id_default_port_host_header", test_https_ipv6_zone_id_default_port_host_header}, { "https_ipv6_zone_id_non_standard_port_host_header", test_https_ipv6_zone_id_non_standard_port_host_header}, + { "response_header_lookup", test_http_response_header_lookup}, + { "response_chunked_trailers", test_http_response_chunked_trailers}, + { "response_chunked_incremental", test_http_response_chunked_incremental}, + { "response_chunked_invalid_trailer", test_http_response_chunked_invalid_trailer}, + { "response_chunked_empty_terminal_split", test_http_response_chunked_empty_terminal_split}, + { "response_chunked_uppercase_hex_whitespace", test_http_response_chunked_uppercase_hex_whitespace}, + { "response_chunked_multi_stage_trailers", test_http_response_chunked_multi_stage_trailers}, + { "response_chunked_partial_trailer_preserves_chunk_available", test_http_response_chunked_partial_trailer_preserves_chunk_available}, + { "response_chunked_invalid_size_suffix", test_http_response_chunked_invalid_size_suffix}, + { "response_chunked_oversized_length", test_http_response_chunked_oversized_length}, + { "timeout_setters_preserve_upstream_io_timeout", test_http_timeout_setters_preserve_upstream_io_timeout}, { 0 } }; diff --git a/tests/internal/signv4.c b/tests/internal/signv4.c index da696b11c96..2c50ccdda88 100644 --- a/tests/internal/signv4.c +++ b/tests/internal/signv4.c @@ -358,6 +358,9 @@ static flb_sds_t file_to_buffer(char *path, char *context, char *ext) static void aws_test_destroy(struct aws_test *awt) { + struct flb_connection *u_conn; + struct flb_upstream *upstream; + if (awt->name) { flb_sds_destroy(awt->name); } @@ -378,9 +381,11 @@ static void aws_test_destroy(struct aws_test *awt) } if (awt->c) { - flb_upstream_destroy(awt->c->u_conn->upstream); - flb_free(awt->c->u_conn); + u_conn = awt->c->u_conn; + upstream = u_conn->upstream; flb_http_client_destroy(awt->c); + flb_free(u_conn); + flb_upstream_destroy(upstream); } http_request_destroy(awt->r); diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 132faa84d57..e29620e3000 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -27,6 +27,7 @@ endmacro() FLB_RT_CORE_TEST(FLB_COROUTINE_TIMEOUT "core-timeout.c") FLB_RT_CORE_TEST(FLB_INTERNAL_LOGGER "core_internal_logger.c") FLB_RT_CORE_TEST(FLB_DOWNSTREAM_ACCEPT_TIMEOUT "core_accept_timeout.c") +FLB_RT_CORE_TEST(1 "http_client_chunked.c") FLB_RT_TEST(FLB_CHUNK_TRACE "core_chunk_trace.c") diff --git a/tests/runtime/http_client_chunked.c b/tests/runtime/http_client_chunked.c new file mode 100644 index 00000000000..e2687692ac6 --- /dev/null +++ b/tests/runtime/http_client_chunked.c @@ -0,0 +1,322 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "flb_tests_runtime.h" + +struct runtime_http_client_ctx { + struct flb_upstream *u; + struct flb_connection *u_conn; + struct flb_config *config; + struct mk_event_loop *evl; +}; + +struct chunked_server_ctx { + int listen_fd; + int port; + pthread_t thread; +}; + +static int socket_write_all(int fd, const char *buffer, size_t length) +{ + ssize_t bytes; + size_t offset; + + offset = 0; + + while (offset < length) { + bytes = write(fd, buffer + offset, length - offset); + if (bytes == -1) { + if (errno == EINTR) { + continue; + } + + return -1; + } + + if (bytes == 0) { + return -1; + } + + offset += bytes; + } + + return 0; +} + +static int create_listen_socket(int *out_port) +{ + int fd; + socklen_t length; + struct sockaddr_in address; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd == -1) { + return -1; + } + + memset(&address, 0, sizeof(address)); + address.sin_family = AF_INET; + address.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + address.sin_port = htons(0); + + if (bind(fd, (struct sockaddr *) &address, sizeof(address)) == -1) { + close(fd); + return -1; + } + + if (listen(fd, 4) == -1) { + close(fd); + return -1; + } + + length = sizeof(address); + if (getsockname(fd, (struct sockaddr *) &address, &length) == -1) { + close(fd); + return -1; + } + + *out_port = ntohs(address.sin_port); + + return fd; +} + +static void *chunked_server_thread(void *data) +{ + int conn_fd; + ssize_t bytes; + char request[2048]; + size_t fragment_length; + struct chunked_server_ctx *ctx; + const char *fragments[] = { + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "Trailer: Expires, X-Trace\r\n" + "Connection: close\r\n" + "\r\n" + "4;foo=bar\r\n" + "Wi", + "ki\r\n" + "5\r\n" + "pedia\r\n", + "0;done=yes\r\n" + "Expires: tomorrow\r\n" + "X-Trace: abc\r\n" + "\r\n", + NULL + }; + int index; + + ctx = data; + + conn_fd = accept(ctx->listen_fd, NULL, NULL); + if (conn_fd == -1) { + return NULL; + } + + bytes = read(conn_fd, request, sizeof(request)); + (void) bytes; + + for (index = 0; fragments[index] != NULL; index++) { + fragment_length = strlen(fragments[index]); + + if (socket_write_all(conn_fd, fragments[index], fragment_length) != 0) { + break; + } + + usleep(10000); + } + + close(conn_fd); + + return NULL; +} + +static struct runtime_http_client_ctx *runtime_http_client_ctx_create(int port) +{ + struct runtime_http_client_ctx *ctx; + + ctx = flb_calloc(1, sizeof(struct runtime_http_client_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + flb_errno(); + return NULL; + } + + ctx->evl = mk_event_loop_create(16); + if (!TEST_CHECK(ctx->evl != NULL)) { + flb_free(ctx); + return NULL; + } + + flb_engine_evl_init(); + flb_engine_evl_set(ctx->evl); + + ctx->config = flb_config_init(); + if (!TEST_CHECK(ctx->config != NULL)) { + mk_event_loop_destroy(ctx->evl); + flb_free(ctx); + return NULL; + } + + ctx->u = flb_upstream_create(ctx->config, "127.0.0.1", port, 0, NULL); + if (!TEST_CHECK(ctx->u != NULL)) { + flb_config_exit(ctx->config); + mk_event_loop_destroy(ctx->evl); + flb_free(ctx); + return NULL; + } + + ctx->u_conn = flb_upstream_conn_get(ctx->u); + if (!TEST_CHECK(ctx->u_conn != NULL)) { + flb_upstream_destroy(ctx->u); + flb_config_exit(ctx->config); + mk_event_loop_destroy(ctx->evl); + flb_free(ctx); + return NULL; + } + + ctx->u_conn->upstream = ctx->u; + + return ctx; +} + +static void runtime_http_client_ctx_destroy(struct runtime_http_client_ctx *ctx) +{ + if (ctx == NULL) { + return; + } + + if (ctx->u != NULL) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->config != NULL) { + flb_config_exit(ctx->config); + } + + if (ctx->evl != NULL) { + mk_event_loop_destroy(ctx->evl); + } + + flb_free(ctx); +} + +void test_http_client_chunked_runtime() +{ + int ret; + int thread_started; + size_t bytes_sent; + flb_sds_t value; + struct flb_http_client *client; + struct chunked_server_ctx server; + struct runtime_http_client_ctx *ctx; + int payload_ready; + + memset(&server, 0, sizeof(server)); + server.listen_fd = -1; + client = NULL; + ctx = NULL; + value = NULL; + payload_ready = FLB_FALSE; + thread_started = FLB_FALSE; + + server.listen_fd = create_listen_socket(&server.port); + TEST_CHECK(server.listen_fd != -1); + if (server.listen_fd == -1) { + return; + } + + ret = pthread_create(&server.thread, NULL, chunked_server_thread, &server); + TEST_CHECK(ret == 0); + if (ret != 0) { + close(server.listen_fd); + return; + } + thread_started = FLB_TRUE; + + ctx = runtime_http_client_ctx_create(server.port); + if (!TEST_CHECK(ctx != NULL)) { + close(server.listen_fd); + pthread_join(server.thread, NULL); + return; + } + + client = flb_http_client(ctx->u_conn, FLB_HTTP_GET, "/", NULL, 0, + "127.0.0.1", server.port, NULL, FLB_HTTP_11); + TEST_CHECK(client != NULL); + if (client == NULL) { + goto cleanup; + } + + ret = flb_http_do(client, &bytes_sent); + if (!TEST_CHECK(ret == 0)) { + goto cleanup; + } + + if (!TEST_CHECK(client->resp.status == 200)) { + goto cleanup; + } + + payload_ready = TEST_CHECK(client->resp.payload != NULL); + payload_ready &= TEST_CHECK(client->resp.payload_size == strlen("Wikipedia")); + + if (payload_ready) { + TEST_CHECK(strncmp(client->resp.payload, "Wikipedia", + strlen("Wikipedia")) == 0); + } + else { + goto cleanup; + } + + value = flb_http_get_response_header(client, "X-Trace", 7); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "abc") == 0); + flb_sds_destroy(value); + } + + value = flb_http_get_response_header(client, "Expires", 7); + TEST_CHECK(value != NULL); + if (value != NULL) { + TEST_CHECK(strcmp(value, "tomorrow") == 0); + flb_sds_destroy(value); + } + +cleanup: + if (client != NULL) { + flb_http_client_destroy(client); + } + + if (ctx != NULL) { + runtime_http_client_ctx_destroy(ctx); + } + + if (server.listen_fd != -1) { + close(server.listen_fd); + } + + if (thread_started == FLB_TRUE) { + pthread_join(server.thread, NULL); + } +} + +TEST_LIST = { + {"http_client_chunked_runtime", test_http_client_chunked_runtime}, + {0} +};