diff --git a/.github/workflows/cmake-multi-platform.yml b/.github/workflows/cmake-multi-platform.yml index 76f6d02..88db2bf 100644 --- a/.github/workflows/cmake-multi-platform.yml +++ b/.github/workflows/cmake-multi-platform.yml @@ -45,6 +45,12 @@ jobs: sudo apt-get update sudo apt-get install -y clang + - name: Install Linux dependencies + if: matrix.os == 'ubuntu-latest' + run: | + sudo apt-get update + sudo apt-get install -y liburing-dev + - name: Ensure CMake on macOS if: matrix.os == 'macos-latest' run: | @@ -134,7 +140,7 @@ jobs: - name: Install sanitizer toolchain run: | sudo apt-get update - sudo apt-get install -y clang + sudo apt-get install -y clang liburing-dev - name: Configure CMake with sanitizers run: > diff --git a/CMakeLists.txt b/CMakeLists.txt index f59f257..cbe95a7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,8 @@ find_package(Threads REQUIRED) option(FKVS_ENABLE_WARNINGS "Enable strict compiler warnings" ON) option(FKVS_WARNINGS_AS_ERRORS "Treat compiler warnings as errors" OFF) option(FKVS_ENABLE_SANITIZERS "Enable AddressSanitizer and UndefinedBehaviorSanitizer" OFF) +option(FKVS_ENABLE_IO_URING "Enable io_uring server backend when liburing is available" ON) +set(FKVS_HAVE_IO_URING FALSE) function(fkvs_configure_target target_name) target_compile_definitions(${target_name} PRIVATE _POSIX_C_SOURCE=200809L) @@ -42,13 +44,18 @@ if(APPLE) target_compile_definitions(fkvs-benchmark PRIVATE CLI) fkvs_configure_target(fkvs-benchmark) elseif(LINUX) - find_library(LIBURING liburing) + if(FKVS_ENABLE_IO_URING) + find_path(LIBURING_INCLUDE_DIR NAMES liburing.h) + find_library(LIBURING_LIBRARY NAMES uring) + endif() - if(LIBURING) + if(FKVS_ENABLE_IO_URING AND LIBURING_INCLUDE_DIR AND LIBURING_LIBRARY) add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/server_limits.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_io_uring.c src/ttl.c src/numeric_parse.c) - target_link_libraries(fkvs-server PRIVATE LIBURING) + target_include_directories(fkvs-server PRIVATE ${LIBURING_INCLUDE_DIR}) + target_link_libraries(fkvs-server PRIVATE ${LIBURING_LIBRARY}) target_compile_definitions(fkvs-server PRIVATE SERVER IO_URING_ENABLED) fkvs_configure_target(fkvs-server) + set(FKVS_HAVE_IO_URING TRUE) else() add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/server_limits.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_epoll.c src/ttl.c src/numeric_parse.c) target_compile_definitions(fkvs-server PRIVATE SERVER) @@ -106,6 +113,14 @@ add_test(NAME ServerLifecycleTest COMMAND test_server_lifecycle) add_test(NAME ServerConfigTest COMMAND test_server_config) add_test(NAME ServerLimitsTest COMMAND test_server_limits) add_test(NAME IntegrationTest COMMAND test_integration) +if(FKVS_HAVE_IO_URING) + find_package(Python3 COMPONENTS Interpreter REQUIRED) + add_test( + NAME IoUringSmokeTest + COMMAND ${Python3_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/tests/io_uring_smoke.py $ + ) + set_tests_properties(IoUringSmokeTest PROPERTIES TIMEOUT 30) +endif() add_executable(fkvs-cli src/string_utils.c src/fkvs-cli.c src/config.c src/commands/common/command_parser.c src/commands/client/client_command_handlers.c) target_link_libraries(fkvs-benchmark PRIVATE Threads::Threads) target_link_libraries(fkvs-cli PUBLIC linenoise) diff --git a/src/io/event_dispatcher_io_uring.c b/src/io/event_dispatcher_io_uring.c index 3d5d7b6..55144c2 100644 --- a/src/io/event_dispatcher_io_uring.c +++ b/src/io/event_dispatcher_io_uring.c @@ -13,14 +13,40 @@ #include #include +#include +#include +#include #include #include #include +#include #include #include -#define QUEUE_DEPTH 256 -#define BATCH_SUBMIT_THRESHOLD 32 +#define IO_URING_MIN_QUEUE_DEPTH 256U +#define IO_URING_MAX_QUEUE_DEPTH 32768U +#define IO_URING_QUEUE_HEADROOM 8U +#define TTL_SWEEP_BATCH 20U + +typedef enum { + URING_ACCEPT_READY, + URING_CLIENT_READ_READY, + URING_CLIENT_WRITE_READY, + URING_TIMER_READY, +} uring_op_kind_t; + +typedef struct uring_op { + uring_op_kind_t kind; + client_t *client; + struct uring_op *next; +} uring_op_t; + +typedef struct { + struct io_uring ring; + uring_op_t *ops; + unsigned int outstanding_ops; + int timer_fd; +} uring_dispatcher_t; static bool reject_if_server_at_capacity(const int cfd) { @@ -38,185 +64,478 @@ static bool reject_if_server_at_capacity(const int cfd) return true; } -static void close_and_drop_client(struct io_uring *ring, client_t *c) +static unsigned int queue_depth_for_server(void) { - (void)ring; - if (!c) - return; + uint32_t max_clients = server.max_clients; + if (max_clients == 0) + max_clients = FKVS_DEFAULT_MAX_CLIENTS; + + if (max_clients > IO_URING_MAX_QUEUE_DEPTH - + IO_URING_QUEUE_HEADROOM) { + return IO_URING_MAX_QUEUE_DEPTH; + } - server_drop_client(&server, c); + const unsigned int needed = + (unsigned int)max_clients + IO_URING_QUEUE_HEADROOM; + return needed < IO_URING_MIN_QUEUE_DEPTH + ? IO_URING_MIN_QUEUE_DEPTH + : needed; } -int run_event_loop() +static void track_op(uring_dispatcher_t *dispatcher, + uring_op_t *op) { - struct io_uring ring; - int res = io_uring_queue_init(QUEUE_DEPTH, &ring, 0); + op->next = dispatcher->ops; + dispatcher->ops = op; + dispatcher->outstanding_ops += 1; +} + +static void untrack_op(uring_dispatcher_t *dispatcher, + uring_op_t *op) +{ + uring_op_t **cursor = &dispatcher->ops; + while (*cursor) { + if (*cursor == op) { + *cursor = op->next; + if (dispatcher->outstanding_ops > 0) + dispatcher->outstanding_ops -= 1; + return; + } + cursor = &(*cursor)->next; + } +} + +static void free_tracked_ops(uring_dispatcher_t *dispatcher) +{ + uring_op_t *op = dispatcher->ops; + while (op) { + uring_op_t *next = op->next; + free(op); + op = next; + } + dispatcher->ops = NULL; + dispatcher->outstanding_ops = 0; +} + +static struct io_uring_sqe *get_sqe(uring_dispatcher_t *dispatcher) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&dispatcher->ring); + if (sqe) + return sqe; + + const int res = io_uring_submit(&dispatcher->ring); if (res < 0) { - fprintf(stderr, "Unable to setup io_uring: %s\n", strerror(-res)); + fprintf(stderr, "io_uring submit failed: %s\n", strerror(-res)); + return NULL; + } + + return io_uring_get_sqe(&dispatcher->ring); +} + +static int submit_poll_op(uring_dispatcher_t *dispatcher, + const uring_op_kind_t kind, client_t *client, + const int fd, const unsigned int poll_mask) +{ + if (fd < 0) + return -1; + + uring_op_t *op = calloc(1, sizeof(*op)); + if (!op) { + perror("calloc io_uring op"); return -1; } + op->kind = kind; + op->client = client; - // Create a timerfd for active key expiration sweep (100ms) - int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); - static uint64_t timer_buf; // buffer for timerfd reads - static client_t timer_sentinel = {.fd = -1}; // sentinel to identify timer CQEs - if (tfd >= 0) { - struct itimerspec its = { - .it_interval = {0, 100000000}, - .it_value = {0, 100000000} - }; - timerfd_settime(tfd, 0, &its, NULL); - timer_sentinel.fd = tfd; + struct io_uring_sqe *sqe = get_sqe(dispatcher); + if (!sqe) { + free(op); + return -1; + } + + io_uring_prep_poll_add(sqe, fd, poll_mask); + io_uring_sqe_set_data(sqe, op); + track_op(dispatcher, op); - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - io_uring_prep_read(sqe, tfd, &timer_buf, sizeof(timer_buf), 0); - io_uring_sqe_set_data(sqe, &timer_sentinel); - io_uring_submit(&ring); + const int res = io_uring_submit(&dispatcher->ring); + if (res <= 0) { + if (res == 0) { + fprintf(stderr, "io_uring submit queued no operations\n"); + return -1; + } + fprintf(stderr, "io_uring submit failed: %s\n", strerror(-res)); + return -1; } - unsigned int sqe_count = 0; + return 0; +} + +static int submit_accept_ready(uring_dispatcher_t *dispatcher) +{ + return submit_poll_op(dispatcher, URING_ACCEPT_READY, NULL, server.fd, + POLLIN); +} - while (!server_shutdown_requested()) { - struct io_uring_cqe *cqe; - res = io_uring_wait_cqe(&ring, &cqe); - if (res < 0) { - if (res == -EINTR && server_shutdown_requested()) +static int submit_client_read_ready(uring_dispatcher_t *dispatcher, + client_t *client) +{ + return submit_poll_op(dispatcher, URING_CLIENT_READ_READY, client, + client ? client->fd : -1, POLLIN); +} + +static int submit_client_write_ready(uring_dispatcher_t *dispatcher, + client_t *client) +{ + const int res = + submit_poll_op(dispatcher, URING_CLIENT_WRITE_READY, client, + client ? client->fd : -1, POLLOUT); + if (res == 0) + client->write_registered = true; + return res; +} + +static int submit_timer_ready(uring_dispatcher_t *dispatcher) +{ + if (dispatcher->timer_fd < 0) + return 0; + + return submit_poll_op(dispatcher, URING_TIMER_READY, NULL, + dispatcher->timer_fd, POLLIN); +} + +static void close_untracked_client(client_t *client) +{ + if (!client) + return; + + if (client->fd >= 0) { + close(client->fd); + client->fd = -1; + } + free_client(client); +} + +static void close_and_drop_client(client_t *client) +{ + if (!client) + return; + + server_drop_client(&server, client); +} + +static bool client_has_oversized_partial_frame(const client_t *client) +{ + return client->buf_used == sizeof(client->buffer) && + client->frame_need > 0 && + (ssize_t)client->buf_used < client->frame_need; +} + +static int setup_timer(uring_dispatcher_t *dispatcher) +{ + dispatcher->timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (dispatcher->timer_fd < 0) { + perror("timerfd_create"); + return 0; + } + + const struct itimerspec its = { + .it_interval = {0, 100000000}, + .it_value = {0, 100000000}, + }; + + if (timerfd_settime(dispatcher->timer_fd, 0, &its, NULL) == -1) { + perror("timerfd_settime"); + close(dispatcher->timer_fd); + dispatcher->timer_fd = -1; + } + + return 0; +} + +static int handle_accept_ready(uring_dispatcher_t *dispatcher, + const int cqe_res) +{ + if (cqe_res < 0 && !server_shutdown_requested()) { + fprintf(stderr, "io_uring accept readiness failed: %s\n", + strerror(-cqe_res)); + return -1; + } + + for (;;) { + struct sockaddr_storage ss; + socklen_t slen = sizeof(ss); + const int cfd = accept(server.fd, (struct sockaddr *)&ss, &slen); + + if (cfd < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; - if (res == -EINTR) + if (errno == EINTR) continue; - fprintf(stderr, "Wait for completion queue entry failed: %s\n", - strerror(-res)); + perror("accept"); break; } - client_t *c = io_uring_cqe_get_data(cqe); + if (reject_if_server_at_capacity(cfd)) + continue; - // Handle timer expiration for active sweep - if (c == &timer_sentinel) { - if (cqe->res > 0) { - expire_sweep(server.database->store, - server.database->expires, 20); - } - // Re-arm the timer read - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - io_uring_prep_read(sqe, tfd, &timer_buf, sizeof(timer_buf), 0); - io_uring_sqe_set_data(sqe, &timer_sentinel); - io_uring_submit(&ring); - io_uring_cqe_seen(&ring, cqe); + set_nonblocking(cfd); + if (server.socket_domain == TCP_IP) + set_tcp_no_delay(cfd); + + client_t *client = init_client(cfd, ss, server.socket_domain); + if (!client) + continue; + + list_t *updated_clients = listAddNodeToTail(server.clients, client); + if (!updated_clients) { + fprintf(stderr, "Unable to allocate client list node\n"); + close_untracked_client(client); continue; } + server.clients = updated_clients; + server.num_clients += 1; - if (cqe->res < 0) { - if (server.verbose) { - printf("Client fd=%d operation failed (res=%d)\n", - c ? c->fd : -1, cqe->res); - } - close_and_drop_client(&ring, c); - io_uring_cqe_seen(&ring, cqe); + if (server.verbose) { + printf("Client connected fd=%d %s:%d (total=%d)\n", client->fd, + client->ip_str, client->port, (int)server.num_clients); + } + + if (submit_client_read_ready(dispatcher, client) == -1) { + close_and_drop_client(client); + return -1; + } + } + + if (!server_shutdown_requested() && submit_accept_ready(dispatcher) == -1) + return -1; + + return 0; +} + +static int handle_timer_ready(uring_dispatcher_t *dispatcher, + const int cqe_res) +{ + if (cqe_res < 0 && !server_shutdown_requested()) { + fprintf(stderr, "io_uring timer readiness failed: %s\n", + strerror(-cqe_res)); + return -1; + } + + for (;;) { + uint64_t expirations = 0; + const ssize_t nread = + read(dispatcher->timer_fd, &expirations, sizeof(expirations)); + if (nread == (ssize_t)sizeof(expirations)) { + expire_sweep(server.database->store, server.database->expires, + TTL_SWEEP_BATCH); continue; } + if (nread < 0 && errno == EINTR) + continue; + if (nread < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + break; + if (nread < 0) + perror("timerfd read"); + break; + } - if (cqe->res == 0) { - // New connection handling. - for (;;) { - struct sockaddr_storage ss; - socklen_t slen = sizeof(ss); - const int cfd = accept(c->fd, (struct sockaddr *)&ss, &slen); - - if (cfd < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - break; - } - perror("accept"); - break; - } - - if (reject_if_server_at_capacity(cfd)) - continue; - - set_nonblocking(cfd); - - if (server.socket_domain == TCP_IP) { - set_tcp_no_delay(cfd); - } - - client_t *new_client = - init_client(cfd, ss, server.socket_domain); - if (!new_client) { - continue; // Already closed on failure. - } - - server.clients = listAddNodeToTail(server.clients, new_client); - server.num_clients++; - - if (server.verbose) { - printf("Client connected fd=%d %s:%d (total=%d)\n", - new_client->fd, new_client->ip_str, new_client->port, - (int)server.num_clients); - } - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - io_uring_prep_recv(sqe, new_client->fd, new_client->buffer, - sizeof(new_client->buffer), 0); - io_uring_sqe_set_data(sqe, new_client); - sqe_count++; - - // Check if batch submission threshold is met - if (sqe_count >= BATCH_SUBMIT_THRESHOLD) { - io_uring_submit(&ring); - sqe_count = 0; - } - } - } else { - // Data read for existing connections. - c->buf_used += cqe->res; + if (!server_shutdown_requested() && submit_timer_ready(dispatcher) == -1) + return -1; + + return 0; +} +static int rearm_client_after_read(uring_dispatcher_t *dispatcher, + client_t *client) +{ + if (client->wbuf_used > 0) + return submit_client_write_ready(dispatcher, client); + + return submit_client_read_ready(dispatcher, client); +} + +static int handle_client_read_ready(uring_dispatcher_t *dispatcher, + client_t *client, const int cqe_res) +{ + if (!client) + return 0; + + if (cqe_res < 0) { + if (server.verbose) { + printf("Client fd=%d read readiness failed (%s)\n", client->fd, + strerror(-cqe_res)); + } + close_and_drop_client(client); + return 0; + } + + for (;;) { + const size_t available = sizeof(client->buffer) - client->buf_used; + if (available == 0) { + fprintf(stderr, + "fd=%d read buffer full before frame completion; dropping " + "client\n", + client->fd); + close_and_drop_client(client); + return 0; + } + + const ssize_t nread = + recv(client->fd, client->buffer + client->buf_used, available, 0); + if (nread > 0) { + client->buf_used += (size_t)nread; if (server.verbose) { - printf("fd=%d read %d bytes (buf_used=%zu)\n", c->fd, cqe->res, - c->buf_used); + printf("fd=%d read %zd bytes (buf_used=%zu)\n", client->fd, + nread, client->buf_used); } - // Process as many complete frames as possible. - if (try_process_frames(c) < 0) { - close_and_drop_client(&ring, c); - io_uring_cqe_seen(&ring, cqe); - continue; + if (try_process_frames(client) < 0) { + close_and_drop_client(client); + return 0; } - if (c->buf_used == sizeof(c->buffer) && c->frame_need > 0 && - (ssize_t)c->buf_used < c->frame_need) { - fprintf( - stderr, - "fd=%d frame exceeds buffer capacity; dropping client\n", - c->fd); - close_and_drop_client(&ring, c); - } else { - // Re-add to the ring for further reads. - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - io_uring_prep_recv(sqe, c->fd, c->buffer + c->buf_used, - sizeof(c->buffer) - c->buf_used, 0); - io_uring_sqe_set_data(sqe, c); - sqe_count++; - - // Check if batch submission threshold is met - if (sqe_count >= BATCH_SUBMIT_THRESHOLD) { - io_uring_submit(&ring); - sqe_count = 0; - } + if (client_has_oversized_partial_frame(client)) { + fprintf(stderr, + "fd=%d frame exceeds buffer capacity; dropping " + "client\n", + client->fd); + close_and_drop_client(client); + return 0; } + + continue; + } + + if (nread == 0) { + if (server.verbose) + printf("Client fd=%d closed (recv=0)\n", client->fd); + close_and_drop_client(client); + return 0; + } + + if (errno == EINTR) + continue; + + if (errno == EAGAIN || errno == EWOULDBLOCK) + return rearm_client_after_read(dispatcher, client); + + perror("recv"); + close_and_drop_client(client); + return 0; + } +} + +static int handle_client_write_ready(uring_dispatcher_t *dispatcher, + client_t *client, const int cqe_res) +{ + if (!client) + return 0; + + client->write_registered = false; + + if (cqe_res < 0) { + if (server.verbose) { + printf("Client fd=%d write readiness failed (%s)\n", client->fd, + strerror(-cqe_res)); } + close_and_drop_client(client); + return 0; + } - io_uring_cqe_seen(&ring, cqe); + wbuf_flush(client); + if (client->write_failed) { + close_and_drop_client(client); + return 0; } - // Ensure any remaining SQEs are submitted - if (sqe_count > 0) { - io_uring_submit(&ring); + if (client->wbuf_used > 0) + return submit_client_write_ready(dispatcher, client); + + return submit_client_read_ready(dispatcher, client); +} + +static int handle_cqe(uring_dispatcher_t *dispatcher, + struct io_uring_cqe *cqe) +{ + uring_op_t *op = io_uring_cqe_get_data(cqe); + const int cqe_res = cqe->res; + io_uring_cqe_seen(&dispatcher->ring, cqe); + + if (!op) + return 0; + + const uring_op_kind_t kind = op->kind; + client_t *client = op->client; + untrack_op(dispatcher, op); + free(op); + + switch (kind) { + case URING_ACCEPT_READY: + return handle_accept_ready(dispatcher, cqe_res); + case URING_CLIENT_READ_READY: + return handle_client_read_ready(dispatcher, client, cqe_res); + case URING_CLIENT_WRITE_READY: + return handle_client_write_ready(dispatcher, client, cqe_res); + case URING_TIMER_READY: + return handle_timer_ready(dispatcher, cqe_res); } - if (tfd >= 0) - close(tfd); - io_uring_queue_exit(&ring); return 0; } + +static void cleanup_dispatcher(uring_dispatcher_t *dispatcher) +{ + if (dispatcher->timer_fd >= 0) { + close(dispatcher->timer_fd); + dispatcher->timer_fd = -1; + } + + io_uring_queue_exit(&dispatcher->ring); + free_tracked_ops(dispatcher); +} + +int run_event_loop() +{ + uring_dispatcher_t dispatcher = {.timer_fd = -1}; + set_nonblocking(server.fd); + + const unsigned int queue_depth = queue_depth_for_server(); + int res = io_uring_queue_init(queue_depth, &dispatcher.ring, 0); + if (res < 0) { + fprintf(stderr, "Unable to setup io_uring: %s\n", strerror(-res)); + return -1; + } + + setup_timer(&dispatcher); + + if (submit_accept_ready(&dispatcher) == -1 || + submit_timer_ready(&dispatcher) == -1) { + cleanup_dispatcher(&dispatcher); + return -1; + } + + int status = 0; + while (!server_shutdown_requested()) { + struct io_uring_cqe *cqe = NULL; + res = io_uring_wait_cqe(&dispatcher.ring, &cqe); + if (res < 0) { + if (res == -EINTR && server_shutdown_requested()) + break; + if (res == -EINTR) + continue; + fprintf(stderr, "Wait for completion queue entry failed: %s\n", + strerror(-res)); + status = -1; + break; + } + + if (handle_cqe(&dispatcher, cqe) == -1) { + status = -1; + break; + } + } + + cleanup_dispatcher(&dispatcher); + return status; +} #endif diff --git a/tests/io_uring_smoke.py b/tests/io_uring_smoke.py new file mode 100644 index 0000000..bf8df52 --- /dev/null +++ b/tests/io_uring_smoke.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +import os +import socket +import struct +import subprocess +import sys +import tempfile +import time + + +CMD_SET = 0x01 +CMD_GET = 0x02 +CMD_PING = 0x05 +STATUS_FAILURE = 0x00 +STATUS_SUCCESS = 0x01 + + +def build_set(key: bytes, value: bytes) -> bytes: + core = bytes([CMD_SET]) + struct.pack(">H", len(key)) + key + core += struct.pack(">H", len(value)) + value + return struct.pack(">H", len(core)) + core + + +def build_get(key: bytes) -> bytes: + core = bytes([CMD_GET]) + struct.pack(">H", len(key)) + key + return struct.pack(">H", len(core)) + core + + +def build_ping(value: bytes) -> bytes: + core = bytes([CMD_PING]) + struct.pack(">H", len(value)) + value + return struct.pack(">H", len(core)) + core + + +def read_exact(sock: socket.socket, size: int) -> bytes: + chunks = [] + remaining = size + while remaining > 0: + chunk = sock.recv(remaining) + if not chunk: + raise AssertionError("connection closed while reading response") + chunks.append(chunk) + remaining -= len(chunk) + return b"".join(chunks) + + +def read_frame(sock: socket.socket) -> bytes: + header = read_exact(sock, 2) + (core_len,) = struct.unpack(">H", header) + return read_exact(sock, core_len) + + +def expect_success_value(sock: socket.socket, expected: bytes) -> None: + frame = read_frame(sock) + assert frame[0] == STATUS_SUCCESS, frame + assert len(frame) >= 3, frame + (value_len,) = struct.unpack(">H", frame[1:3]) + assert frame[3:] == expected, frame + assert value_len == len(expected), frame + + +def expect_ping(sock: socket.socket, expected: bytes) -> None: + frame = read_frame(sock) + assert frame[0] == CMD_PING, frame + assert len(frame) >= 3, frame + (value_len,) = struct.unpack(">H", frame[1:3]) + assert frame[3:] == expected, frame + assert value_len == len(expected), frame + + +def expect_disconnect(sock: socket.socket) -> None: + deadline = time.monotonic() + 2 + while time.monotonic() < deadline: + try: + data = sock.recv(1) + except (BrokenPipeError, ConnectionResetError): + return + except socket.timeout: + continue + + if data == b"": + return + + raise AssertionError(f"unexpected response from malformed frame: {data!r}") + + raise AssertionError("server did not close malformed connection") + + +def unused_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +def wait_for_server(proc: subprocess.Popen, port: int) -> None: + deadline = time.monotonic() + 8 + last_error = None + while time.monotonic() < deadline: + if proc.poll() is not None: + stdout, stderr = proc.communicate(timeout=1) + raise AssertionError( + f"server exited early with {proc.returncode}\n" + f"stdout={stdout.decode(errors='replace')}\n" + f"stderr={stderr.decode(errors='replace')}" + ) + try: + with socket.create_connection(("127.0.0.1", port), timeout=0.25): + return + except OSError as exc: + last_error = exc + time.sleep(0.05) + raise AssertionError(f"server did not accept connections: {last_error}") + + +def dump_server_output(stdout: bytes, stderr: bytes) -> None: + if stdout: + print("server stdout:", file=sys.stderr) + print(stdout.decode(errors="replace"), file=sys.stderr) + if stderr: + print("server stderr:", file=sys.stderr) + print(stderr.decode(errors="replace"), file=sys.stderr) + + +def stop_server(proc: subprocess.Popen, print_output: bool = False) -> None: + if proc.poll() is not None: + stdout, stderr = proc.communicate(timeout=1) + if print_output: + dump_server_output(stdout, stderr) + return + + proc.terminate() + try: + proc.wait(timeout=8) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=2) + raise + + stdout, stderr = proc.communicate(timeout=1) + if print_output: + dump_server_output(stdout, stderr) + + +def main() -> int: + if len(sys.argv) != 2: + print("usage: io_uring_smoke.py /path/to/fkvs-server", file=sys.stderr) + return 2 + + server_path = sys.argv[1] + port = unused_port() + config = ( + f"port {port}\n" + "bind 127.0.0.1\n" + "max-clients 16\n" + "event-loop-max-events 128\n" + "show-logo false\n" + "verbose false\n" + "daemonize false\n" + "use-io-uring true\n" + ) + + with tempfile.NamedTemporaryFile("w", delete=False) as cfg: + cfg.write(config) + cfg_path = cfg.name + + proc = subprocess.Popen( + [server_path, "-c", cfg_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True, + ) + + failed = False + try: + wait_for_server(proc, port) + + with socket.create_connection(("127.0.0.1", port), timeout=2) as sock: + sock.settimeout(2) + sock.sendall( + build_set(b"alpha", b"one") + + build_get(b"alpha") + + build_ping(b"probe") + ) + expect_success_value(sock, b"one") + expect_success_value(sock, b"one") + expect_ping(sock, b"probe") + + with socket.create_connection(("127.0.0.1", port), timeout=2) as bad: + bad.settimeout(2) + try: + bad.sendall(b"\xff\xff" + (b"\x01" * (65536 - 2))) + except (BrokenPipeError, ConnectionResetError): + pass + expect_disconnect(bad) + + with socket.create_connection(("127.0.0.1", port), timeout=2) as sock: + sock.settimeout(2) + sock.sendall(build_get(b"alpha")) + expect_success_value(sock, b"one") + + except Exception: + failed = True + raise + finally: + stop_server(proc, print_output=failed) + os.unlink(cfg_path) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())