Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ struct flb_http_client_response {
int content_length; /* Content length set by headers */
int chunked_encoding; /* Chunked transfer encoding ? */
int connection_close; /* connection: close ? */
int chunked_trailer_pending; /* terminal chunk parsed */
char *chunk_processed_end; /* Position to mark last chunk */
char *headers_end; /* Headers end (\r\n\r\n) */
char *trailer_buf; /* Raw trailer header block */
size_t trailer_size; /* Trailer block length */

/* Payload: body response: reference to 'data' */
char *payload;
Expand Down Expand Up @@ -217,6 +220,8 @@ struct flb_test_http_response {
struct flb_http_client {
/* Upstream connection */
struct flb_connection *u_conn;
struct flb_net_setup request_net_setup;
struct flb_net_setup *original_net_setup;

/* Request data */
int method;
Expand Down Expand Up @@ -374,6 +379,11 @@ int flb_http_add_header(struct flb_http_client *c,
const char *val, size_t val_len);
flb_sds_t flb_http_get_header(struct flb_http_client *c,
const char *key, size_t key_len);

flb_sds_t flb_http_get_response_header(struct flb_http_client *c,
const char *key, size_t key_len);

int flb_http_client_process_response_buffer(struct flb_http_client *c);
int flb_http_basic_auth(struct flb_http_client *c,
const char *user, const char *passwd);
int flb_http_proxy_auth(struct flb_http_client *c,
Expand Down Expand Up @@ -405,6 +415,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes);
int flb_http_do_with_oauth2(struct flb_http_client *c, size_t *bytes,
struct flb_oauth2 *oauth2);
int flb_http_client_proxy_connect(struct flb_connection *u_conn);
void flb_http_client_detach_connection(struct flb_http_client *c);
void flb_http_client_destroy(struct flb_http_client *c);
int flb_http_buffer_size(struct flb_http_client *c, size_t size);
size_t flb_http_buffer_available(struct flb_http_client *c);
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ int flb_net_socket_reset(flb_sockfd_t fd);
int flb_net_socket_tcp_nodelay(flb_sockfd_t fd);
int flb_net_socket_blocking(flb_sockfd_t fd);
int flb_net_socket_nonblocking(flb_sockfd_t fd);
int flb_net_socket_set_rcvtimeout(flb_sockfd_t fd, int timeout_in_seconds);
int flb_net_socket_rcv_buffer(flb_sockfd_t fd, int rcvbuf);
int flb_net_socket_tcp_fastopen(flb_sockfd_t sockfd);
int flb_net_socket_tcp_keepalive(flb_sockfd_t fd, struct flb_net_setup *net);
Expand Down
26 changes: 18 additions & 8 deletions plugins/filter_ecs/ecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,17 +435,16 @@ static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx)
return -1;
}

if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}

ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
&buffer, &size, &root_type, NULL);

if (ret < 0) {
flb_plg_warn(ctx->ins, "Could not parse response from %s; response=\n%s",
FLB_ECS_FILTER_CLUSTER_PATH, c->resp.payload);
flb_http_client_destroy(c);
if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}
return -1;
}

Expand All @@ -458,10 +457,16 @@ static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx)
flb_free(buffer);
msgpack_unpacked_destroy(&result);
flb_http_client_destroy(c);
if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}
return -1;
}

flb_http_client_destroy(c);
if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}

root = result.data;
if (root.type != MSGPACK_OBJECT_MAP) {
Expand Down Expand Up @@ -1036,10 +1041,6 @@ static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id)
return -1;
}

if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}

ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
&buffer, &size, &root_type, NULL);

Expand All @@ -1048,6 +1049,9 @@ static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id)
http_path, c->resp.payload);
flb_sds_destroy(http_path);
flb_http_client_destroy(c);
if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}
return -1;
}

Expand All @@ -1061,10 +1065,16 @@ static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id)
msgpack_unpacked_destroy(&result);
flb_sds_destroy(http_path);
flb_http_client_destroy(c);
if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}
return -1;
}

flb_http_client_destroy(c);
if (free_conn == FLB_TRUE) {
flb_upstream_conn_release(u_conn);
}

root = result.data;
if (root.type != MSGPACK_OBJECT_MAP) {
Expand Down
1 change: 1 addition & 0 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,7 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config
goto http_do_error;
}

flb_http_client_detach_connection(client);
flb_upstream_conn_release(u_conn);
return client;

Expand Down
8 changes: 6 additions & 2 deletions plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -1011,17 +1011,18 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
flb_free(payload_buf);
}

flb_upstream_conn_release(u_conn);

/* Validate HTTP status */
if (ret == -1) {
flb_plg_error(ctx->ins, "error sending append_blob for %s", ref_name);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return FLB_RETRY;
}

if (c->resp.status == 201) {
flb_plg_info(ctx->ins, "content uploaded successfully: %s", ref_name);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return FLB_OK;
}
else if (c->resp.status == 404) {
Expand All @@ -1033,13 +1034,15 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,

flb_plg_info(ctx->ins, "blob not found: %s", c->uri);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return CREATE_BLOB;
}
else if (c->resp.payload_size > 0) {
flb_plg_error(ctx->ins, "http_status=%i cannot append content to blob\n%s",
c->resp.status, c->resp.payload);
if (strstr(c->resp.payload, "must be 0 for Create Append")) {
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return CREATE_BLOB;
}
}
Expand All @@ -1048,6 +1051,7 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
ref_name, c->resp.status);
}
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);

return FLB_RETRY;
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -1058,12 +1058,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
}
#endif /* FLB_HAVE_CHUNK_TRACE */

flb_upstream_conn_release(u_conn);

if (c) {
flb_http_client_destroy(c);
}

flb_upstream_conn_release(u_conn);

FLB_OUTPUT_RETURN(ret);
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/out_slack/slack.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ static void cb_slack_flush(struct flb_event_chunk *event_chunk,
out_ret = FLB_RETRY;
}

flb_upstream_conn_release(u_conn);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
flb_sds_destroy(out_buf);
FLB_OUTPUT_RETURN(out_ret);
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_td/td.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,14 @@ static void cb_td_flush(struct flb_event_chunk *event_chunk,
}

/* release */
flb_upstream_conn_release(u_conn);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);

FLB_OUTPUT_RETURN(FLB_OK);

retry:
flb_upstream_conn_release(u_conn);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);

FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/out_websocket/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ static int flb_ws_handshake(struct flb_connection *u_conn,
flb_debug("[output_ws] Websocket Server Response\n%s",
c->resp.payload);
}
flb_debug("[out_ws] Http Get Operation ret = %i, http resp = %i",
ret, c->resp.status);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
flb_debug("[out_ws] Http Get Operation ret = %i, http resp = %i", ret, c->resp.status);
return -1;
}
flb_http_client_destroy(c);
Expand Down
10 changes: 7 additions & 3 deletions src/aws/flb_aws_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -544,20 +544,24 @@ struct flb_http_client *request_do(struct flb_aws_client *aws_client,
c = NULL;
}

if (c != NULL) {
flb_http_client_detach_connection(c);
}

flb_upstream_conn_release(u_conn);
flb_sds_destroy(signature);
return c;

error:
if (u_conn) {
flb_upstream_conn_release(u_conn);
}
if (signature) {
flb_sds_destroy(signature);
}
if (c) {
flb_http_client_destroy(c);
}
if (u_conn) {
flb_upstream_conn_release(u_conn);
}
return NULL;
}

Expand Down
Loading
Loading