Skip to content

Commit 8c47a3a

Browse files
committed
test: add dedicated ListenConnections coverage
Add a separate listen_tests.cpp file with reusable UnixListener, ClientSetup, and ListenSetup helpers for exercising ListenConnections() with real Unix domain sockets. The new test covers the baseline behavior that ListenConnections() accepts an incoming connection and serves requests over it. Keeping this coverage separate from the existing general proxy tests makes the socket listener setup easier to review and provides a clearer place to extend listener-specific behavior in follow-up commits.
1 parent 3edbe8f commit 8c47a3a

3 files changed

Lines changed: 210 additions & 2 deletions

File tree

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ if(BUILD_TESTING AND TARGET CapnProto::kj-test)
2626
${MP_PROXY_HDRS}
2727
mp/test/foo-types.h
2828
mp/test/foo.h
29+
mp/test/listen_tests.cpp
2930
mp/test/spawn_tests.cpp
3031
mp/test/test.cpp
3132
)

test/mp/test/listen_tests.cpp

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright (c) The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <mp/test/foo.capnp.h>
6+
#include <mp/test/foo.capnp.proxy.h>
7+
8+
#include <capnp/rpc.h>
9+
#include <chrono>
10+
#include <condition_variable>
11+
#include <cstdlib>
12+
#include <cstring>
13+
#include <future>
14+
#include <kj/test.h>
15+
#include <memory>
16+
#include <mp/proxy-io.h>
17+
#include <mutex>
18+
#include <ratio> // IWYU pragma: keep
19+
#include <stdexcept>
20+
#include <string>
21+
#include <sys/socket.h>
22+
#include <sys/un.h>
23+
#include <thread>
24+
#include <unistd.h>
25+
26+
#include "foo.h"
27+
28+
namespace mp {
29+
namespace test {
30+
namespace {
31+
32+
class UnixListener
33+
{
34+
public:
35+
UnixListener()
36+
{
37+
char dir_template[] = "/tmp/mptest-listener-XXXXXX";
38+
char* dir = mkdtemp(dir_template);
39+
KJ_REQUIRE(dir != nullptr);
40+
m_dir = dir;
41+
m_path = m_dir + "/socket";
42+
43+
m_fd = socket(AF_UNIX, SOCK_STREAM, 0);
44+
KJ_REQUIRE(m_fd >= 0);
45+
46+
sockaddr_un addr{};
47+
addr.sun_family = AF_UNIX;
48+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
49+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
50+
KJ_REQUIRE(bind(m_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
51+
KJ_REQUIRE(listen(m_fd, SOMAXCONN) == 0);
52+
}
53+
54+
~UnixListener()
55+
{
56+
if (m_fd >= 0) close(m_fd);
57+
if (!m_path.empty()) unlink(m_path.c_str());
58+
if (!m_dir.empty()) rmdir(m_dir.c_str());
59+
}
60+
61+
int release()
62+
{
63+
int fd = m_fd;
64+
m_fd = -1;
65+
return fd;
66+
}
67+
68+
int Connect() const
69+
{
70+
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
71+
KJ_REQUIRE(fd >= 0);
72+
73+
sockaddr_un addr{};
74+
addr.sun_family = AF_UNIX;
75+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
76+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
77+
KJ_REQUIRE(connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
78+
return fd;
79+
}
80+
81+
private:
82+
int m_fd{-1};
83+
std::string m_dir;
84+
std::string m_path;
85+
};
86+
87+
class ClientSetup
88+
{
89+
public:
90+
explicit ClientSetup(int fd)
91+
: thread([this, fd] {
92+
EventLoop loop("mptest-client", [](mp::LogMessage log) {
93+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
94+
});
95+
client_promise.set_value(ConnectStream<messages::FooInterface>(loop, fd));
96+
loop.loop();
97+
})
98+
{
99+
client = client_promise.get_future().get();
100+
}
101+
102+
~ClientSetup()
103+
{
104+
client.reset();
105+
thread.join();
106+
}
107+
108+
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
109+
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
110+
std::thread thread;
111+
};
112+
113+
class ListenSetup
114+
{
115+
public:
116+
explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
117+
: thread([this, max_connections] {
118+
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
119+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
120+
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
121+
std::lock_guard<std::mutex> lock(counter_mutex);
122+
++connected_count;
123+
counter_cv.notify_all();
124+
} else if (log.message.find("IPC server: socket disconnected.") != std::string::npos) {
125+
std::lock_guard<std::mutex> lock(counter_mutex);
126+
++disconnected_count;
127+
counter_cv.notify_all();
128+
}
129+
});
130+
FooImplementation foo;
131+
ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
132+
ready_promise.set_value();
133+
loop.loop();
134+
})
135+
{
136+
ready_promise.get_future().get();
137+
}
138+
139+
~ListenSetup()
140+
{
141+
thread.join();
142+
}
143+
144+
size_t ConnectedCount()
145+
{
146+
std::lock_guard<std::mutex> lock(counter_mutex);
147+
return connected_count;
148+
}
149+
150+
void WaitForConnectedCount(size_t expected_count)
151+
{
152+
std::unique_lock<std::mutex> lock(counter_mutex);
153+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
154+
const bool matched = counter_cv.wait_until(lock, deadline, [&] {
155+
return connected_count >= expected_count;
156+
});
157+
KJ_REQUIRE(matched);
158+
}
159+
160+
void WaitForDisconnectedCount(size_t expected_count)
161+
{
162+
std::unique_lock<std::mutex> lock(counter_mutex);
163+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
164+
const bool matched = counter_cv.wait_until(lock, deadline, [&] {
165+
return disconnected_count >= expected_count;
166+
});
167+
KJ_REQUIRE(matched);
168+
}
169+
170+
UnixListener listener;
171+
std::promise<void> ready_promise;
172+
std::thread thread;
173+
std::mutex counter_mutex;
174+
std::condition_variable counter_cv;
175+
size_t connected_count{0};
176+
size_t disconnected_count{0};
177+
};
178+
179+
KJ_TEST("ListenConnections accepts incoming connections")
180+
{
181+
ListenSetup setup;
182+
auto client = std::make_unique<ClientSetup>(setup.listener.Connect());
183+
184+
setup.WaitForConnectedCount(1);
185+
KJ_EXPECT(client->client->add(1, 2) == 3);
186+
}
187+
188+
KJ_TEST("ListenConnections enforces a local connection limit")
189+
{
190+
ListenSetup setup(/*max_connections=*/1);
191+
192+
auto client1 = std::make_unique<ClientSetup>(setup.listener.Connect());
193+
setup.WaitForConnectedCount(1);
194+
KJ_EXPECT(client1->client->add(1, 2) == 3);
195+
196+
auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
197+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
198+
KJ_EXPECT(setup.ConnectedCount() == 1);
199+
200+
client1.reset();
201+
setup.WaitForDisconnectedCount(1);
202+
setup.WaitForConnectedCount(2);
203+
204+
KJ_EXPECT(client2->client->add(2, 3) == 5);
205+
}
206+
207+
} // namespace
208+
} // namespace test
209+
} // namespace mp

test/mp/test/test.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@
99
#include <capnp/capability.h>
1010
#include <capnp/rpc.h>
1111
#include <cassert>
12-
#include <chrono>
1312
#include <condition_variable>
1413
#include <cstdint>
15-
#include <cstring>
1614
#include <functional>
1715
#include <future>
1816
#include <kj/async.h>

0 commit comments

Comments
 (0)