diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 5717d3c99..942b4869b 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -112,6 +112,13 @@ typedef struct { nxt_timer_t timer; nxt_queue_link_t link; + + /* + * Two-phase close state. When non-zero, accept(2) has been disarmed + * and the FD will be released once all in-flight accepted connections + * release their refs via nxt_router_listen_event_release(). + */ + uint8_t draining; /* 1 bit */ } nxt_listen_event_t; diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index dda3900e3..74e9b61a6 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -275,10 +275,10 @@ nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c) joint = c->listen->socket.data; - if (nxt_slow_path(joint == NULL)) { + if (nxt_slow_path(joint == NULL || c->listen->draining)) { /* - * Listening socket had been closed while - * connection was in keep-alive state. + * Listening socket had been closed or is draining while + * connection is still idle. */ c->read_state = &nxt_h1p_idle_close_state; return 0; @@ -368,10 +368,10 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) joint = c->listen->socket.data; - if (nxt_slow_path(joint == NULL)) { + if (nxt_slow_path(joint == NULL || c->listen->draining)) { /* - * Listening socket had been closed while - * connection was in keep-alive state. + * Listening socket had been closed or is draining while + * connection is still idle. */ nxt_h1p_closing(task, c); return; @@ -411,10 +411,10 @@ nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c) joint = c->listen->socket.data; - if (nxt_slow_path(joint == NULL)) { + if (nxt_slow_path(joint == NULL || c->listen->draining)) { /* - * Listening socket had been closed while - * connection was in keep-alive state. + * Listening socket had been closed or is draining while + * connection is still idle. */ c->read_state = &nxt_h1p_idle_close_state; return 0; diff --git a/src/nxt_router.c b/src/nxt_router.c index c994f3707..fef2461d1 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -3894,35 +3894,130 @@ nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) } +/* + * Two-phase close of a router listen event. + * + * Accepting -> Draining: nxt_router_listen_socket_close() (phase 1) + * disarms accept(2), marks lev->draining = 1. + * Draining -> Closed: nxt_router_listen_socket_close_finish() (phase 2) + * runs once lev->count == 1 (no in-flight accepted + * connections), releases the FD and posts the + * close_job back to the configuration thread. + * + * The intermediate state lets in-flight TLS handshakes and accepted-but- + * not-yet-handled connections complete cleanly instead of being RST when + * the listener is reconfigured under load (mirrors the engine->shutdown + * pattern in nxt_router_worker_thread_quit()). + */ + +static void nxt_router_listen_socket_close_ready(nxt_socket_conf_joint_t *joint); +static void nxt_router_listen_socket_close_finish(nxt_task_t *task, + nxt_listen_event_t *lev); + + static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) { nxt_timer_t *timer; - nxt_joint_job_t *job; nxt_listen_event_t *lev; + nxt_event_engine_t *engine; nxt_socket_conf_joint_t *joint; timer = obj; lev = nxt_timer_data(timer, nxt_listen_event_t, timer); - nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, - lev->socket.fd); + engine = task->thread->engine; + joint = lev->socket.data; - nxt_queue_remove(&lev->link); + nxt_debug(task, "engine %p: listen socket close: %d draining:%d count:%D", + engine, lev->socket.fd, lev->draining, lev->count); - joint = lev->socket.data; - lev->socket.data = NULL; + /* + * Phase 1: disarm accept(2). nxt_router_listen_socket_delete() has + * already called nxt_fd_event_delete() on epoll/kqueue, but be + * defensive in case this entry point is reused or the event + * backend's delete is a no-op for events that were never armed + * (kqueue's behaviour differs from epoll's when an FD is closed + * with pending kevents, see nxt_kqueue_close()). + * + * Scope note: this drains *already-accepted* connections only. + * TCP connections completed by the kernel but still in the listen + * queue waiting for a userspace accept(2) are not preserved -- + * they will be RST when the FD is released in phase 2 below. + * Draining the kernel accept queue is P5 territory (full + * connection drain with timeout escalation, see + * roadmap/plan-graceful-shutdown.md). + */ + if (!lev->draining) { + lev->draining = 1; - /* 'task' refers to lev->task and we cannot use after nxt_free() */ - task = &task->thread->engine->task; + if (nxt_fd_event_is_active(lev->socket.read)) { + nxt_fd_event_disable_read(engine, &lev->socket); + } + } + + /* + * Configuration is ready once accept(2) has been disarmed. The + * listener FD and old socket configuration stay alive below until + * accepted connections release their listener refs, but the control + * request must not wait for idle client timing. + */ + nxt_router_listen_socket_close_ready(joint); + + /* + * Phase 2 gating: wait for accepted connections to release their + * refs. lev->count == 1 means only the original listener ref + * remains (see nxt_listen_event() and nxt_conn_accept()). + */ + if (lev->count > 1) { + nxt_debug(task, "engine %p: listen socket %d drain pending, " + "in-flight: %D", engine, lev->socket.fd, lev->count - 1); + return; + } + + nxt_router_listen_socket_close_finish(task, lev); +} - nxt_router_listen_socket_release(task, joint->socket_conf); + +static void +nxt_router_listen_socket_close_ready(nxt_socket_conf_joint_t *joint) +{ + nxt_joint_job_t *job; job = joint->close_job; + if (job == NULL) { + return; + } + + joint->close_job = NULL; + job->work.next = NULL; job->work.handler = nxt_router_conf_wait; nxt_event_engine_post(job->tmcf->engine, &job->work); +} + + +static void +nxt_router_listen_socket_close_finish(nxt_task_t *task, + nxt_listen_event_t *lev) +{ + nxt_socket_conf_joint_t *joint; + + nxt_debug(task, "engine %p: listen socket close finish: %d", + task->thread->engine, lev->socket.fd); + + nxt_queue_remove(&lev->link); + + joint = lev->socket.data; + lev->socket.data = NULL; + + nxt_router_listen_socket_close_ready(joint); + + /* 'task' refers to lev->task and we cannot use after nxt_free() */ + task = &task->thread->engine->task; + + nxt_router_listen_socket_release(task, joint->socket_conf); nxt_router_listen_event_release(task, lev, joint); } @@ -3996,7 +4091,8 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, { nxt_event_engine_t *engine; - nxt_debug(task, "listen event count: %D", lev->count); + nxt_debug(task, "listen event count: %D draining:%d", + lev->count, lev->draining); engine = task->thread->engine; @@ -4008,6 +4104,22 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, } nxt_free(lev); + + } else if (lev->draining && lev->count == 1) { + /* + * Phase 1 of the two-phase close marked this listener as + * draining and deferred FD release because in-flight accepted + * connections still held refs. The last such ref has just + * been dropped, so finalise the close now. finish() drops + * the listener's own ref (count: 1 -> 0) and frees lev via + * the nxt_router_listen_event_release() call at its tail. + */ + if (joint != NULL) { + nxt_router_conf_release(task, joint); + joint = NULL; + } + + nxt_router_listen_socket_close_finish(task, lev); } if (joint != NULL) { diff --git a/test/test_listener_drain.py b/test/test_listener_drain.py new file mode 100644 index 000000000..64966f126 --- /dev/null +++ b/test/test_listener_drain.py @@ -0,0 +1,239 @@ +""" +Functional tests for the two-phase listener close (P2 of the graceful- +shutdown plan, see roadmap/plan-graceful-shutdown.md). + +These tests exercise the new "draining" state on nxt_listen_event_t. +The state machine they cover is: + + Accepting -> Draining: nxt_router_listen_socket_close() phase 1 + (accept(2) disarmed, draining = 1) + Draining -> Closed: nxt_router_listen_socket_close_finish() phase 2 + (FD released once lev->count == 1) + +The pre-P2 behaviour was to run phase 1 and phase 2 back-to-back from a +zero-timeout timer, which RST'd in-flight TLS handshakes and any +accepted-but-not-yet-handled connection on a busy listener. +""" + +import socket +import ssl +import subprocess +import time + +import pytest + +from unit.applications.tls import ApplicationTLS + +prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} + +client = ApplicationTLS() + + +def _has_openssl(): + try: + subprocess.check_output( + ['openssl', 'version'], stderr=subprocess.STDOUT + ) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + return False + + +def _add_tls(application='empty', cert='default', port=8080): + assert 'success' in client.conf( + { + "pass": f"applications/{application}", + "tls": {"certificate": cert}, + }, + f'listeners/*:{port}', + ) + + +def _clear_config(): + assert 'success' in client.conf({"listeners": {}, "applications": {}}) + + +def _port_listening(port): + """Return True if something is listening on `port` on localhost.""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(1.0) + try: + s.connect(('127.0.0.1', port)) + s.close() + return True + except (ConnectionRefusedError, socket.timeout, OSError): + return False + + +@pytest.mark.skipif( + not _has_openssl(), reason='openssl CLI not available for cert generation' +) +def test_listener_reconfigure_drains_inflight_tls_handshake(): + """ + Begin a TLS handshake on a busy listener, then PUT a new listener + config that removes TLS from the listener. The handshake must + either complete cleanly or fail with a clean TLS-level error, + NOT with ECONNRESET (which is what pre-P2 produced). + """ + client.load('empty') + client.certificate() + _add_tls() + + # Pre-warm: confirm TLS is up. + assert client.get_ssl()['status'] == 200, 'pre-warm TLS' + + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + # Open a raw TCP socket; do NOT begin handshake yet. + raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw.settimeout(5.0) + raw.connect(('127.0.0.1', 8080)) + + # Wait longer than TCP_DEFER_ACCEPT so the no-data connection is + # accepted by Unit before the reconfigure fires. Otherwise it can + # still be in the kernel listen queue when the FD closes, which P2 + # explicitly does not cover. + time.sleep(1.5) + + # Delete the listener while the connection is + # accepted but mid-handshake (we have not sent ClientHello yet). + _clear_config() + + # Now drive the handshake. Acceptable outcomes: + # - clean TLS error (SSLError) — server tore down the TLS layer. + # - clean EOF / handshake failure. + # NOT acceptable: ECONNRESET (errno 104) on the bare socket. + reset = False + try: + wrapped = ctx.wrap_socket(raw, server_hostname='localhost') + try: + wrapped.send(b'GET / HTTP/1.0\r\n\r\n') + wrapped.recv(4096) + finally: + try: + wrapped.close() + except OSError: + pass + except ssl.SSLError: + pass + except ConnectionResetError: + reset = True + except OSError as exc: + # errno 104 is ECONNRESET. + if exc.errno == 104: + reset = True + finally: + try: + raw.close() + except OSError: + pass + + assert not reset, ( + 'in-flight TLS handshake was RST by listener reconfiguration; ' + 'two-phase drain regressed' + ) + + +@pytest.mark.skip( + reason=( + 'Full per-connection drain on listener reconfiguration is P5 ' + 'territory (connection drain with timeout escalation, see ' + 'roadmap/plan-graceful-shutdown.md). P2 only guarantees the ' + 'listener-level FD release waits for accept refs to drop; the ' + 'individual accepted connections are still torn down by the ' + 'config swap path. This test is staged here so P5 can flip it ' + 'on by removing the skip marker.' + ) +) +def test_listener_drain_no_dropped_accepted_connection(): + """ + Open a plain TCP connection, let it be accepted, then reconfigure + the listener. The accepted connection must still be able to + complete a request (it was accepted under the old listener and + the drain must keep its joint alive). + """ + assert 'success' in client.conf( + { + "listeners": {"*:8080": {"pass": "routes"}}, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'initial listener' + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect(('127.0.0.1', 8080)) + + # Give the kernel + router a beat to actually accept(2). + time.sleep(0.05) + + # Replace the listener with one bound to a different port. + assert 'success' in client.conf( + { + "listeners": {"*:8081": {"pass": "routes"}}, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'reconfigure to new port' + + # The already-accepted connection must still be writable and must + # not have been RST. + data = b'' + try: + s.sendall(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') + while True: + chunk = s.recv(4096) + if not chunk: + break + data += chunk + if len(data) > 65536: + break + except ConnectionResetError: + pytest.fail('accepted connection was RST during listener drain') + finally: + s.close() + + # We don't assert exact response shape — just that we got *some* + # bytes back (no RST mid-flight). An empty reply would mean the + # router dropped the conn, which is the bug we're guarding against. + assert data, 'accepted connection produced no response after drain' + + +def test_listener_close_releases_fd_eventually(): + """ + After phase 2 of the two-phase close runs, the old listener FD + must actually be closed: nothing should be listening on the + original port. + """ + assert 'success' in client.conf( + { + "listeners": {"*:8080": {"pass": "routes"}}, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'initial listener on 8080' + + assert _port_listening(8080), 'pre-condition: 8080 is up' + + assert 'success' in client.conf( + { + "listeners": {"*:8081": {"pass": "routes"}}, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'move to 8081' + + # Phase 2 runs as soon as no in-flight conns remain. Allow a + # short window for the work queue to drain. + deadline = time.monotonic() + 2.0 + while time.monotonic() < deadline: + if not _port_listening(8080): + break + time.sleep(0.05) + + assert not _port_listening( + 8080 + ), 'old listener FD was not released after drain' + assert _port_listening(8081), 'new listener FD did not come up'