Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/nxt_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ typedef struct {
nxt_timer_t timer;

nxt_queue_link_t link;

/*
* Two-phase close state. When non-zero, accept(2) has been disarmed
* and the FD will be released once all in-flight accepted connections
* release their refs via nxt_router_listen_event_release().
*/
uint8_t draining; /* 1 bit */
} nxt_listen_event_t;


Expand Down
18 changes: 9 additions & 9 deletions src/nxt_h1proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)

joint = c->listen->socket.data;

if (nxt_slow_path(joint == NULL)) {
if (nxt_slow_path(joint == NULL || c->listen->draining)) {
/*
* Listening socket had been closed while
* connection was in keep-alive state.
* Listening socket had been closed or is draining while
* connection is still idle.
*/
c->read_state = &nxt_h1p_idle_close_state;
return 0;
Expand Down Expand Up @@ -368,10 +368,10 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)

joint = c->listen->socket.data;

if (nxt_slow_path(joint == NULL)) {
if (nxt_slow_path(joint == NULL || c->listen->draining)) {
/*
* Listening socket had been closed while
* connection was in keep-alive state.
* Listening socket had been closed or is draining while
* connection is still idle.
*/
nxt_h1p_closing(task, c);
return;
Expand Down Expand Up @@ -411,10 +411,10 @@ nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)

joint = c->listen->socket.data;

if (nxt_slow_path(joint == NULL)) {
if (nxt_slow_path(joint == NULL || c->listen->draining)) {
/*
* Listening socket had been closed while
* connection was in keep-alive state.
* Listening socket had been closed or is draining while
* connection is still idle.
*/
c->read_state = &nxt_h1p_idle_close_state;
return 0;
Expand Down
132 changes: 122 additions & 10 deletions src/nxt_router.c
Original file line number Diff line number Diff line change
Expand Up @@ -3894,35 +3894,130 @@ nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
}


/*
* Two-phase close of a router listen event.
*
* Accepting -> Draining: nxt_router_listen_socket_close() (phase 1)
* disarms accept(2), marks lev->draining = 1.
* Draining -> Closed: nxt_router_listen_socket_close_finish() (phase 2)
* runs once lev->count == 1 (no in-flight accepted
* connections), releases the FD and posts the
* close_job back to the configuration thread.
*
* The intermediate state lets in-flight TLS handshakes and accepted-but-
* not-yet-handled connections complete cleanly instead of being RST when
* the listener is reconfigured under load (mirrors the engine->shutdown
* pattern in nxt_router_worker_thread_quit()).
*/

static void nxt_router_listen_socket_close_ready(nxt_socket_conf_joint_t *joint);
static void nxt_router_listen_socket_close_finish(nxt_task_t *task,
nxt_listen_event_t *lev);


static void
nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *timer;
nxt_joint_job_t *job;
nxt_listen_event_t *lev;
nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;

timer = obj;
lev = nxt_timer_data(timer, nxt_listen_event_t, timer);

nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
lev->socket.fd);
engine = task->thread->engine;
joint = lev->socket.data;

nxt_queue_remove(&lev->link);
nxt_debug(task, "engine %p: listen socket close: %d draining:%d count:%D",
engine, lev->socket.fd, lev->draining, lev->count);

joint = lev->socket.data;
lev->socket.data = NULL;
/*
* Phase 1: disarm accept(2). nxt_router_listen_socket_delete() has
* already called nxt_fd_event_delete() on epoll/kqueue, but be
* defensive in case this entry point is reused or the event
* backend's delete is a no-op for events that were never armed
* (kqueue's behaviour differs from epoll's when an FD is closed
* with pending kevents, see nxt_kqueue_close()).
*
* Scope note: this drains *already-accepted* connections only.
* TCP connections completed by the kernel but still in the listen
* queue waiting for a userspace accept(2) are not preserved --
* they will be RST when the FD is released in phase 2 below.
* Draining the kernel accept queue is P5 territory (full
* connection drain with timeout escalation, see
* roadmap/plan-graceful-shutdown.md).
*/
if (!lev->draining) {
lev->draining = 1;

/* 'task' refers to lev->task and we cannot use after nxt_free() */
task = &task->thread->engine->task;
if (nxt_fd_event_is_active(lev->socket.read)) {
nxt_fd_event_disable_read(engine, &lev->socket);
}
}

/*
* Configuration is ready once accept(2) has been disarmed. The
* listener FD and old socket configuration stay alive below until
* accepted connections release their listener refs, but the control
* request must not wait for idle client timing.
*/
nxt_router_listen_socket_close_ready(joint);

/*
* Phase 2 gating: wait for accepted connections to release their
* refs. lev->count == 1 means only the original listener ref
* remains (see nxt_listen_event() and nxt_conn_accept()).
*/
if (lev->count > 1) {
nxt_debug(task, "engine %p: listen socket %d drain pending, "
"in-flight: %D", engine, lev->socket.fd, lev->count - 1);
return;
}

nxt_router_listen_socket_close_finish(task, lev);
}

nxt_router_listen_socket_release(task, joint->socket_conf);

static void
nxt_router_listen_socket_close_ready(nxt_socket_conf_joint_t *joint)
{
nxt_joint_job_t *job;

job = joint->close_job;
if (job == NULL) {
return;
}

joint->close_job = NULL;

job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;

nxt_event_engine_post(job->tmcf->engine, &job->work);
}


static void
nxt_router_listen_socket_close_finish(nxt_task_t *task,
nxt_listen_event_t *lev)
{
nxt_socket_conf_joint_t *joint;

nxt_debug(task, "engine %p: listen socket close finish: %d",
task->thread->engine, lev->socket.fd);

nxt_queue_remove(&lev->link);

joint = lev->socket.data;
lev->socket.data = NULL;

nxt_router_listen_socket_close_ready(joint);

/* 'task' refers to lev->task and we cannot use after nxt_free() */
task = &task->thread->engine->task;

nxt_router_listen_socket_release(task, joint->socket_conf);

nxt_router_listen_event_release(task, lev, joint);
}
Expand Down Expand Up @@ -3996,7 +4091,8 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
{
nxt_event_engine_t *engine;

nxt_debug(task, "listen event count: %D", lev->count);
nxt_debug(task, "listen event count: %D draining:%d",
lev->count, lev->draining);

engine = task->thread->engine;

Expand All @@ -4008,6 +4104,22 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
}

nxt_free(lev);

} else if (lev->draining && lev->count == 1) {
/*
* Phase 1 of the two-phase close marked this listener as
* draining and deferred FD release because in-flight accepted
* connections still held refs. The last such ref has just
* been dropped, so finalise the close now. finish() drops
* the listener's own ref (count: 1 -> 0) and frees lev via
* the nxt_router_listen_event_release() call at its tail.
*/
if (joint != NULL) {
nxt_router_conf_release(task, joint);
joint = NULL;
}

nxt_router_listen_socket_close_finish(task, lev);
}

if (joint != NULL) {
Expand Down
Loading
Loading