Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function(fkvs_configure_target target_name)
endfunction()

if(APPLE)
add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_kqueue.c src/ttl.c src/numeric_parse.c)
add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/server_limits.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_kqueue.c src/ttl.c src/numeric_parse.c)
target_compile_definitions(fkvs-server PRIVATE SERVER)
fkvs_configure_target(fkvs-server)

Expand All @@ -45,12 +45,12 @@ elseif(LINUX)
find_library(LIBURING liburing)

if(LIBURING)
add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_io_uring.c src/ttl.c src/numeric_parse.c)
add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/server_limits.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_io_uring.c src/ttl.c src/numeric_parse.c)
target_link_libraries(fkvs-server PRIVATE LIBURING)
target_compile_definitions(fkvs-server PRIVATE SERVER IO_URING_ENABLED)
fkvs_configure_target(fkvs-server)
else()
add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_epoll.c src/ttl.c src/numeric_parse.c)
add_executable(fkvs-server src/memory.c src/counter.c src/client.c src/core/list.c src/config.c src/networking/networking.c src/server.c src/server_lifecycle.c src/server_limits.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/server/server_command_handlers.c src/io/event_dispatcher_epoll.c src/ttl.c src/numeric_parse.c)
target_compile_definitions(fkvs-server PRIVATE SERVER)
fkvs_configure_target(fkvs-server)
endif()
Expand All @@ -65,22 +65,31 @@ add_executable(test_counter tests/test_counter.c src/counter.c)
add_executable(test_string_utils tests/test_string_utils.c src/string_utils.c)
add_executable(test_response_writer tests/test_response_writer.c src/client.c src/commands/common/command_registry.c)
add_executable(test_server_lifecycle tests/test_server_lifecycle.c src/server_lifecycle.c src/client.c src/core/list.c src/core/hashtable.c)
add_executable(test_server_config tests/test_server_config.c src/config.c src/numeric_parse.c)
add_executable(test_server_limits tests/test_server_limits.c src/server_limits.c)
add_executable(test_integration tests/test_integration.c src/client.c src/core/hashtable.c src/commands/common/command_registry.c src/commands/common/command_parser.c src/commands/server/server_command_handlers.c src/counter.c src/ttl.c src/numeric_parse.c)
target_compile_definitions(test_server_config PRIVATE SERVER)
target_compile_definitions(test_integration PRIVATE SERVER)
fkvs_configure_target(test_counter)
fkvs_configure_target(test_string_utils)
fkvs_configure_target(test_response_writer)
fkvs_configure_target(test_server_lifecycle)
fkvs_configure_target(test_server_config)
fkvs_configure_target(test_server_limits)
fkvs_configure_target(test_integration)
target_compile_options(test_counter PRIVATE -UNDEBUG)
target_compile_options(test_string_utils PRIVATE -UNDEBUG)
target_compile_options(test_response_writer PRIVATE -UNDEBUG)
target_compile_options(test_server_lifecycle PRIVATE -UNDEBUG)
target_compile_options(test_server_config PRIVATE -UNDEBUG)
target_compile_options(test_server_limits PRIVATE -UNDEBUG)
target_compile_options(test_integration PRIVATE -UNDEBUG)
target_link_libraries(test_counter)
target_link_libraries(test_string_utils)
target_link_libraries(test_response_writer)
target_link_libraries(test_server_lifecycle)
target_link_libraries(test_server_config)
target_link_libraries(test_server_limits)
target_link_libraries(test_integration)

# Enable testing
Expand All @@ -89,6 +98,8 @@ add_test(NAME CounterTest COMMAND test_counter)
add_test(NAME StringUtilsTest COMMAND test_string_utils)
add_test(NAME ResponseWriterTest COMMAND test_response_writer)
add_test(NAME ServerLifecycleTest COMMAND test_server_lifecycle)
add_test(NAME ServerConfigTest COMMAND test_server_config)
add_test(NAME ServerLimitsTest COMMAND test_server_limits)
add_test(NAME IntegrationTest COMMAND test_integration)
add_executable(fkvs-cli src/string_utils.c src/fkvs-cli.c src/config.c src/commands/common/command_parser.c src/commands/client/client_command_handlers.c)
target_link_libraries(fkvs-benchmark PRIVATE Threads::Threads)
Expand Down
5 changes: 5 additions & 0 deletions server.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Server configuration
port 5995
# Bind to localhost by default. Use 0.0.0.0 only when intentionally exposing
# FKVS beyond the local machine, for example behind a firewall or proxy.
bind 127.0.0.1
# Maximum number of concurrently connected clients.
max-clients 128
logs-enabled false
verbose false
daemonize false
Expand Down
61 changes: 59 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,49 @@
#include "client.h"
#include "io/event_dispatcher.h"
#include "networking/networking.h"
#include "numeric_parse.h"
#include "utils.h"

#include <inttypes.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#ifdef SERVER
static int64_t parse_config_i64(const char *key, const char *value,
const int64_t min_value,
const int64_t max_value)
{
int64_t parsed = 0;
if (!value ||
!fkvs_parse_i64_decimal((const unsigned char *)value, strlen(value),
min_value, max_value, &parsed)) {
fprintf(stderr,
"Invalid config value for '%s': '%s' (expected integer in "
"range %" PRId64 "..%" PRId64 ")\n",
key, value ? value : "(null)", min_value, max_value);
exit(EXIT_FAILURE);
}

return parsed;
}

static void set_server_bind_address(const char *value)
{
char *copy = strdup(value);
if (!copy) {
ERROR_AND_EXIT("Failed to allocate bind address");
}

if (server.owns_bind_address) {
free(server.bind_address);
}

server.bind_address = copy;
server.owns_bind_address = true;
}

server_t load_server_config(const char *path)
{
FILE *config =
Expand All @@ -18,9 +54,19 @@ server_t load_server_config(const char *path)
ERROR_AND_EXIT("Failed to open server config file: ");
}

if (server.owns_bind_address) {
free(server.bind_address);
}
if (server.owns_uds_socket_path) {
free(server.uds_socket_path);
}

server.num_clients = 0;
server.fd = -1;
server.event_loop_fd = -1;
server.bind_address = (char *)FKVS_DEFAULT_BIND_ADDRESS;
server.owns_bind_address = false;
server.max_clients = FKVS_DEFAULT_MAX_CLIENTS;
server.uds_socket_path = NULL;
server.owns_uds_socket_path = false;
server.socket_domain = TCP_IP;
Expand All @@ -44,11 +90,22 @@ server_t load_server_config(const char *path)
continue;

if (strcmp(key, "port") == 0) {
server.port = atoi(value);
server.port =
(int)parse_config_i64(key, value, 1, UINT16_MAX);
}

if (strcmp(key, "bind") == 0) {
set_server_bind_address(value);
}

if (strcmp(key, "event-loop-max-events") == 0) {
server.event_loop_max_events = atoi(value);
server.event_loop_max_events =
(int)parse_config_i64(key, value, 1, INT_MAX);
}

if (strcmp(key, "max-clients") == 0) {
server.max_clients =
(uint32_t)parse_config_i64(key, value, 1, UINT32_MAX);
}

if (strcmp(key, "unixsocket") == 0) {
Expand Down
20 changes: 20 additions & 0 deletions src/io/event_dispatcher_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "../networking/networking.h"
#include "../server.h"
#include "../server_lifecycle.h"
#include "../server_limits.h"
#include "../ttl.h"
#include "../utils.h"
#include "event_dispatcher.h"
Expand All @@ -24,6 +25,22 @@
#include <sys/types.h>
#include <unistd.h>

static bool reject_if_server_at_capacity(const int cfd)
{
if (fkvs_server_can_accept_client(&server))
return false;

if (server.verbose) {
fprintf(stderr,
"Rejecting client fd=%d: max-clients limit reached (%u)\n",
cfd, server.max_clients);
}

close(cfd);
fkvs_server_record_rejected_client(&server);
return true;
}

static void close_and_drop_client(const int epfd, client_t *c)
{
if (!c)
Expand Down Expand Up @@ -161,6 +178,9 @@ int run_event_loop()
break;
}

if (reject_if_server_at_capacity(cfd))
continue;

set_nonblocking(cfd);
if (server.socket_domain == TCP_IP) {
set_tcp_no_delay(cfd);
Expand Down
20 changes: 20 additions & 0 deletions src/io/event_dispatcher_io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "../networking/networking.h"
#include "../server.h"
#include "../server_lifecycle.h"
#include "../server_limits.h"
#include "../ttl.h"
#include "../utils.h"
#include "event_dispatcher.h"
Expand All @@ -21,6 +22,22 @@
#define QUEUE_DEPTH 256
#define BATCH_SUBMIT_THRESHOLD 32

static bool reject_if_server_at_capacity(const int cfd)
{
if (fkvs_server_can_accept_client(&server))
return false;

if (server.verbose) {
fprintf(stderr,
"Rejecting client fd=%d: max-clients limit reached (%u)\n",
cfd, server.max_clients);
}

close(cfd);
fkvs_server_record_rejected_client(&server);
return true;
}

static void close_and_drop_client(struct io_uring *ring, client_t *c)
{
(void)ring;
Expand Down Expand Up @@ -128,6 +145,9 @@ int run_event_loop()
break;
}

if (reject_if_server_at_capacity(cfd))
continue;

set_nonblocking(cfd);

if (server.socket_domain == TCP_IP) {
Expand Down
20 changes: 20 additions & 0 deletions src/io/event_dispatcher_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "../networking/networking.h"
#include "../server.h"
#include "../server_lifecycle.h"
#include "../server_limits.h"
#include "../ttl.h"
#include "../utils.h"
#include "event_dispatcher.h"
Expand All @@ -21,6 +22,22 @@
#include <sys/types.h>
#include <unistd.h>

static bool reject_if_server_at_capacity(const int cfd)
{
if (fkvs_server_can_accept_client(&server))
return false;

if (server.verbose) {
fprintf(stderr,
"Rejecting client fd=%d: max-clients limit reached (%u)\n",
cfd, server.max_clients);
}

close(cfd);
fkvs_server_record_rejected_client(&server);
return true;
}

static void close_and_drop_client(const int kq, client_t *c)
{
if (!c)
Expand Down Expand Up @@ -137,6 +154,9 @@ int run_event_loop()
perror("accept");
break;
}
if (reject_if_server_at_capacity(cfd))
continue;

set_nonblocking(cfd);

if (server.socket_domain == TCP_IP) {
Expand Down
32 changes: 27 additions & 5 deletions src/networking/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
#include "../utils.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <limits.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>

#define BACKLOG 2000000 // Number of allowed connections

#ifdef SERVER

#include "../commands/common/command_registry.h"
Expand All @@ -21,6 +20,15 @@
#include <sys/stat.h>
#include <sys/types.h>

static int server_listen_backlog(void)
{
if (server.max_clients == 0)
return (int)FKVS_DEFAULT_MAX_CLIENTS;
if (server.max_clients > (uint32_t)INT_MAX)
return INT_MAX;
return (int)server.max_clients;
}

int start_server()
{
int server_fd;
Expand All @@ -37,26 +45,40 @@ int start_server()
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));

memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(server.port);

if (server_addr.sin_port == 0) {
fprintf(stderr, "Invalid port 0\n");
close(server_fd);
server.fd = -1;
return -1;
}

const char *bind_address =
server.bind_address ? server.bind_address : FKVS_DEFAULT_BIND_ADDRESS;
if (inet_pton(AF_INET, bind_address, &server_addr.sin_addr) != 1) {
fprintf(stderr,
"Invalid bind address '%s' (expected an IPv4 address)\n",
bind_address);
close(server_fd);
server.fd = -1;
return -1;
}

if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
0) {
perror("bind failed");
close(server_fd);
server.fd = -1;
return -1;
}

if (listen(server_fd, BACKLOG) < 0) {
if (listen(server_fd, server_listen_backlog()) < 0) {
perror("listen");
close(server_fd);
server.fd = -1;
return -1;
}

Expand Down Expand Up @@ -112,7 +134,7 @@ int start_uds_server()
return -1;
}

if (listen(server_fd, BACKLOG) < 0) {
if (listen(server_fd, server_listen_backlog()) < 0) {
perror("listen");
close(server_fd);
return -1;
Expand Down
Loading
Loading