diff --git a/.github/workflows/bitcoin-core-ci.yml b/.github/workflows/bitcoin-core-ci.yml index e6ac83f0..89380ac4 100644 --- a/.github/workflows/bitcoin-core-ci.yml +++ b/.github/workflows/bitcoin-core-ci.yml @@ -18,6 +18,8 @@ concurrency: env: BITCOIN_REPO: bitcoin/bitcoin + # Temporary: use PR #35084 until it merges; revert to refs/heads/master after + BITCOIN_CORE_REF: refs/pull/35084/merge LLVM_VERSION: 22 LIBCXX_DIR: /tmp/libcxx-build/ @@ -79,6 +81,7 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ env.BITCOIN_REPO }} + ref: ${{ env.BITCOIN_CORE_REF }} fetch-depth: 1 - name: Checkout libmultiprocess @@ -195,6 +198,7 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ env.BITCOIN_REPO }} + ref: ${{ env.BITCOIN_CORE_REF }} fetch-depth: 1 - name: Checkout libmultiprocess diff --git a/CMakeLists.txt b/CMakeLists.txt index a36023b1..56f77b62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ endif() include("cmake/compat_find.cmake") find_package(Threads REQUIRED) -find_package(CapnProto 0.7 QUIET NO_MODULE) +find_package(CapnProto 0.9 QUIET NO_MODULE) if(NOT CapnProto_FOUND) message(FATAL_ERROR "Cap'n Proto is required but was not found.\n" @@ -203,6 +203,10 @@ target_link_libraries(mpgen PRIVATE CapnProto::capnp-rpc) target_link_libraries(mpgen PRIVATE CapnProto::capnpc) target_link_libraries(mpgen PRIVATE CapnProto::kj) target_link_libraries(mpgen PRIVATE Threads::Threads) +target_compile_definitions(mpgen PRIVATE + "CAPNP_EXECUTABLE=\"$\"" + "CAPNPC_CXX_EXECUTABLE=\"$\"" + "CAPNP_INCLUDE_DIRS=\"${CAPNP_INCLUDE_DIRS}\"") set_target_properties(mpgen PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE) set_target_properties(mpgen PROPERTIES diff --git a/ci/configs/olddeps.bash b/ci/configs/olddeps.bash index 95f44128..1a363b1b 100644 --- a/ci/configs/olddeps.bash +++ b/ci/configs/olddeps.bash @@ -1,5 +1,5 @@ CI_DESC="CI job using old Cap'n Proto and cmake versions" CI_DIR=build-olddeps export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter -Wno-error=array-bounds" -NIX_ARGS=(--argstr capnprotoVersion "0.7.1" --argstr cmakeVersion "3.12.4") +NIX_ARGS=(--argstr capnprotoVersion "0.9.2" --argstr cmakeVersion "3.12.4") BUILD_ARGS=(-k) diff --git a/cmake/compat_config.cmake b/cmake/compat_config.cmake index f9d3004f..51bda36b 100644 --- a/cmake/compat_config.cmake +++ b/cmake/compat_config.cmake @@ -12,6 +12,75 @@ if (NOT DEFINED capnp_PREFIX AND DEFINED CAPNP_INCLUDE_DIRS) get_filename_component(capnp_PREFIX "${CAPNP_INCLUDE_DIRS}" DIRECTORY) endif() +if (NOT DEFINED CAPNP_INCLUDE_DIRS AND DEFINED capnp_PREFIX) + set(CAPNP_INCLUDE_DIRS "${capnp_PREFIX}/include") +endif() + +if (NOT TARGET CapnProto::capnp_tool) + if (DEFINED CAPNP_EXECUTABLE) + add_executable(CapnProto::capnp_tool IMPORTED GLOBAL) + set_target_properties(CapnProto::capnp_tool PROPERTIES IMPORTED_LOCATION "${CAPNP_EXECUTABLE}") + elseif (DEFINED capnp_PREFIX) + add_executable(CapnProto::capnp_tool IMPORTED GLOBAL) + set_target_properties(CapnProto::capnp_tool PROPERTIES IMPORTED_LOCATION "${capnp_PREFIX}/bin/capnp") + endif() +endif() + +if (NOT TARGET CapnProto::capnpc_cpp) + if (DEFINED CAPNPC_CXX_EXECUTABLE) + add_executable(CapnProto::capnpc_cpp IMPORTED GLOBAL) + set_target_properties(CapnProto::capnpc_cpp PROPERTIES IMPORTED_LOCATION "${CAPNPC_CXX_EXECUTABLE}") + elseif (DEFINED capnp_PREFIX) + add_executable(CapnProto::capnpc_cpp IMPORTED GLOBAL) + set_target_properties(CapnProto::capnpc_cpp PROPERTIES IMPORTED_LOCATION "${capnp_PREFIX}/bin/capnpc-c++") + endif() +endif() + +# Validate CapnProto tool target locations and fix if broken. +# Some packaged capnproto versions (e.g., Ubuntu Noble libcapnp-dev 1.0.1) +# have incorrect IMPORTED_LOCATION paths due to a packaging bug where the cmake +# config file is installed under /usr/lib/.../cmake/ but the _IMPORT_PREFIX +# calculation goes up too few directory levels, yielding /usr/lib/bin/capnp +# instead of the correct /usr/bin/capnp. +foreach(_mp_tool IN ITEMS capnp_tool capnpc_cpp) + if (TARGET "CapnProto::${_mp_tool}") + get_target_property(_mp_configs "CapnProto::${_mp_tool}" IMPORTED_CONFIGURATIONS) + set(_mp_valid FALSE) + foreach(_mp_cfg IN LISTS _mp_configs) + get_target_property(_mp_loc "CapnProto::${_mp_tool}" "IMPORTED_LOCATION_${_mp_cfg}") + if (EXISTS "${_mp_loc}") + set(_mp_valid TRUE) + break() + endif() + endforeach() + if (NOT _mp_valid) + get_target_property(_mp_loc "CapnProto::${_mp_tool}" IMPORTED_LOCATION) + if (EXISTS "${_mp_loc}") + set(_mp_valid TRUE) + endif() + endif() + if (NOT _mp_valid) + if ("${_mp_tool}" STREQUAL "capnp_tool") + find_program(_mp_fixed capnp HINTS "${capnp_PREFIX}/bin") + else() + find_program(_mp_fixed capnpc-c++ HINTS "${capnp_PREFIX}/bin") + endif() + if (_mp_fixed) + foreach(_mp_cfg IN LISTS _mp_configs) + set_target_properties("CapnProto::${_mp_tool}" PROPERTIES "IMPORTED_LOCATION_${_mp_cfg}" "${_mp_fixed}") + endforeach() + set_target_properties("CapnProto::${_mp_tool}" PROPERTIES IMPORTED_LOCATION "${_mp_fixed}") + endif() + unset(_mp_fixed CACHE) + endif() + endif() +endforeach() +unset(_mp_tool) +unset(_mp_configs) +unset(_mp_valid) +unset(_mp_cfg) +unset(_mp_loc) + if (NOT DEFINED CAPNPC_OUTPUT_DIR) set(CAPNPC_OUTPUT_DIR "${CMAKE_CURRENT_BINARY_DIR}") endif() diff --git a/doc/design.md b/doc/design.md index 113cafc4..094602e9 100644 --- a/doc/design.md +++ b/doc/design.md @@ -120,7 +120,7 @@ sequenceDiagram participant PMT as ProxyMethodTraits participant Impl as Actual C++ Method - serverInvoke->>SF1: SF1::invoke + serverInvoke->>SF1: SF1::invoke SF1->>SF2: SF2::invoke SF2->>SR: SR::invoke SR->>SC: SC::invoke @@ -165,7 +165,7 @@ Thread mapping enables each client thread to have a dedicated server thread proc Thread mapping is initialized by defining an interface method with a `ThreadMap` parameter and/or response. The example below adds `ThreadMap` to the `construct` method because libmultiprocess calls the `construct` method automatically. ```capnp -interface InitInterface $Proxy.wrap("Init") { +interface InitInterface $Proxy.wrap("Init") { construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); } ``` diff --git a/doc/versions.md b/doc/versions.md index 3cfa28e3..14bd8ad8 100644 --- a/doc/versions.md +++ b/doc/versions.md @@ -7,8 +7,12 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v11 +## v12 - Current unstable version. +- Adds support for nonunix platforms, making API changes that are not backwards compatible. + +## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0) +- Improves debug output if EventLoop::post callback fails. ## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) - Increases spawn test timeout to avoid spurious failures. diff --git a/example/calculator.cpp b/example/calculator.cpp index 86ce388b..6ed2df5f 100644 --- a/example/calculator.cpp +++ b/example/calculator.cpp @@ -6,8 +6,7 @@ #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include -#include +#include // IWYU pragma: keep #include #include #include @@ -16,9 +15,9 @@ #include #include #include +#include #include #include -#include #include class CalculatorImpl : public Calculator @@ -51,14 +50,10 @@ int main(int argc, char** argv) std::cout << "Usage: mpcalculator \n"; return 1; } - int fd; - if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) { - std::cerr << argv[1] << " is not a number or is larger than an int\n"; - return 1; - } + mp::SocketId socket{mp::StartSpawned(argv[1])}; mp::EventLoop loop("mpcalculator", LogPrint); std::unique_ptr init = std::make_unique(); - mp::ServeStream(loop, fd, *init); + mp::ServeStream(loop, mp::MakeStream(loop.m_io_context, socket), *init); loop.loop(); return 0; } diff --git a/example/example.cpp b/example/example.cpp index 38313977..68bce888 100644 --- a/example/example.cpp +++ b/example/example.cpp @@ -19,20 +19,20 @@ #include #include #include +#include #include namespace fs = std::filesystem; static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const std::string& new_exe_name) { - int pid; - const int fd = mp::SpawnProcess(pid, [&](int fd) -> std::vector { + const auto [pid, socket] = mp::SpawnProcess([&](mp::ConnectInfo info) -> std::vector { fs::path path = process_argv0; path.remove_filename(); path.append(new_exe_name); - return {path.string(), std::to_string(fd)}; + return {path.string(), std::move(info)}; }); - return std::make_tuple(mp::ConnectStream(loop, fd), pid); + return std::make_tuple(mp::ConnectStream(loop, mp::MakeStream(loop.m_io_context, socket)), pid); } static void LogPrint(mp::LogMessage log_data) diff --git a/example/printer.cpp b/example/printer.cpp index 9150d59b..9b456d9c 100644 --- a/example/printer.cpp +++ b/example/printer.cpp @@ -7,8 +7,7 @@ #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include -#include +#include // IWYU pragma: keep #include #include #include @@ -16,9 +15,9 @@ #include #include #include +#include #include #include -#include class PrinterImpl : public Printer { @@ -44,14 +43,10 @@ int main(int argc, char** argv) std::cout << "Usage: mpprinter \n"; return 1; } - int fd; - if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) { - std::cerr << argv[1] << " is not a number or is larger than an int\n"; - return 1; - } + mp::SocketId socket{mp::StartSpawned(argv[1])}; mp::EventLoop loop("mpprinter", LogPrint); std::unique_ptr init = std::make_unique(); - mp::ServeStream(loop, fd, *init); + mp::ServeStream(loop, mp::MakeStream(loop.m_io_context, socket), *init); loop.loop(); return 0; } diff --git a/include/mp/config.h.in b/include/mp/config.h.in index 9d3c6240..4a8c9168 100644 --- a/include/mp/config.h.in +++ b/include/mp/config.h.in @@ -6,7 +6,6 @@ #define MP_CONFIG_H #cmakedefine CMAKE_INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" -#cmakedefine capnp_PREFIX "@capnp_PREFIX@" #cmakedefine HAVE_KJ_FILESYSTEM #cmakedefine HAVE_PTHREAD_GETNAME_NP @HAVE_PTHREAD_GETNAME_NP@ diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index d7b9f0e5..4f629963 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -210,6 +211,21 @@ class Logger std::string LongThreadName(const char* exe_name); +inline SocketId StreamSocketId(const Stream& stream) +{ + if (stream) KJ_IF_MAYBE(socket, stream->getFd()) return *socket; +#ifdef WIN32 + if (stream) KJ_IF_MAYBE(handle, stream->getWin32Handle()) return reinterpret_cast(*handle); +#endif + throw std::logic_error("Stream socket unset"); +} + +//! Wrap a socket file descriptor as an async stream, taking ownership of the fd. +inline Stream MakeStream(kj::AsyncIoContext& io_context, SocketId socket) +{ + return io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); +} + //! Event loop implementation. //! //! Cap'n Proto threading model is very simple: all I/O operations are @@ -308,11 +324,12 @@ class EventLoop //! Callback functions to run on async thread. std::optional m_async_fns MP_GUARDED_BY(m_mutex); - //! Pipe read handle used to wake up the event loop thread. - int m_wait_fd = -1; + //! Socket pair used to post and wait for wakeups to the event loop thread. + kj::Own m_wait_stream; + kj::Own m_post_stream; - //! Pipe write handle used to wake up the event loop thread. - int m_post_fd = -1; + //! Synchronous writer used to write to m_post_stream. + kj::Own m_post_writer; //! Number of clients holding references to ProxyServerBase objects that //! reference this event loop. @@ -793,17 +810,15 @@ kj::Promise ProxyServer::post(Fn&& fn) return ret; } -//! Given stream file descriptor, make a new ProxyClient object to send requests -//! over the stream. Also create a new Connection object embedded in the -//! client that is freed when the client is closed. +//! Given a stream, make a new ProxyClient object to send requests over it. +//! Also create a new Connection object embedded in the client that is freed +//! when the client is closed. template -std::unique_ptr> ConnectStream(EventLoop& loop, int fd) +std::unique_ptr> ConnectStream(EventLoop& loop, kj::Own stream) { typename InitInterface::Client init_client(nullptr); std::unique_ptr connection; loop.sync([&] { - auto stream = - loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); connection = std::make_unique(loop, kj::mv(stream)); init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(); Connection* connection_ptr = connection.get(); @@ -851,13 +866,12 @@ void _Listen(EventLoop& loop, kj::Own&& listener, InitIm })); } -//! Given stream file descriptor and an init object, handle requests on the -//! stream by calling methods on the Init object. +//! Given a stream and an init object, handle requests on the stream by calling +//! methods on the Init object. template -void ServeStream(EventLoop& loop, int fd, InitImpl& init) +void ServeStream(EventLoop& loop, kj::Own stream, InitImpl& init) { - _Serve( - loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); + _Serve(loop, kj::mv(stream), init); } //! Given listening socket file descriptor and an init object, handle incoming diff --git a/include/mp/proxy.h b/include/mp/proxy.h index c55380c1..b63eaa5b 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -314,11 +314,11 @@ static constexpr int FIELD_BOXED = 16; template struct Accessor : public Field { - static const bool in = flags & FIELD_IN; - static const bool out = flags & FIELD_OUT; - static const bool optional = flags & FIELD_OPTIONAL; - static const bool requested = flags & FIELD_REQUESTED; - static const bool boxed = flags & FIELD_BOXED; + static constexpr bool in = (flags & FIELD_IN) != 0; + static constexpr bool out = (flags & FIELD_OUT) != 0; + static constexpr bool optional = (flags & FIELD_OPTIONAL) != 0; + static constexpr bool requested = (flags & FIELD_REQUESTED) != 0; + static constexpr bool boxed = (flags & FIELD_BOXED) != 0; }; //! Wrapper around std::function for passing std::function objects between client and servers. diff --git a/include/mp/type-interface.h b/include/mp/type-interface.h index a32c53d2..f685a623 100644 --- a/include/mp/type-interface.h +++ b/include/mp/type-interface.h @@ -54,12 +54,12 @@ void CustomBuildField(TypeList, InvokeContext& invoke_context, Impl& value, Output&& output, - typename decltype(output.get())::Calls* enable = nullptr) + typename Decay::Calls* enable = nullptr) { // Disable deleter so proxy server object doesn't attempt to delete the // wrapped implementation when the proxy client is destroyed or // disconnected. - using Interface = typename decltype(output.get())::Calls; + using Interface = typename Decay::Calls; output.set(CustomMakeProxyServer(invoke_context, std::shared_ptr(&value, [](Impl*){}))); } diff --git a/include/mp/util.h b/include/mp/util.h index a3db1282..5b5daa8c 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -5,12 +5,15 @@ #ifndef MP_UTIL_H #define MP_UTIL_H +#include #include #include #include #include #include #include +#include +#include #include #include #include @@ -20,6 +23,10 @@ #include #include +#ifdef WIN32 +#include +#endif + namespace mp { //! Generic utility functions used by capnp code. @@ -136,7 +143,10 @@ struct PtrOrValue { std::variant data; template - PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant{std::in_place_type, std::forward(args)...}) {} + PtrOrValue(T* ptr, Args&&... args) : data(std::in_place_type, ptr) + { + if (!ptr) data.template emplace(std::forward(args)...); + } T& operator*() { return data.index() ? std::get(data) : *std::get(data); } T* operator->() { return &**this; } @@ -249,25 +259,51 @@ std::string ThreadName(const char* exe_name); //! errors in python unit tests. std::string LogEscape(const kj::StringTree& string, size_t max_size); +using Stream = kj::Own; + +#ifdef WIN32 +using ProcessId = uintptr_t; +using SocketId = uintptr_t; +constexpr SocketId SocketError{INVALID_SOCKET}; +#else +using ProcessId = int; +using SocketId = int; +constexpr SocketId SocketError{-1}; +#endif + +//! Information about parent process passed to child process as a command-line +//! argument. On unix this is the child socket fd number formatted as a string. +//! On windows, this is a path to a named pipe the parent process will write +//! WSADuplicateSocket info to. +using ConnectInfo = std::string; + //! Callback type used by SpawnProcess below. -using FdToArgsFn = std::function(int fd)>; +using ConnectInfoToArgsFn = std::function(const ConnectInfo&)>; //! Spawn a new process that communicates with the current process over a socket -//! pair. Returns pid through an output argument, and file descriptor for the -//! local side of the socket. -//! The fd_to_args callback is invoked in the parent process before fork(). -//! It must not rely on child pid/state, and must return the command line -//! arguments that should be used to execute the process. Embed the remote file -//! descriptor number in whatever format the child process expects. -int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args); - -//! Call execvp with vector args. -//! Not safe to call in a post-fork child of a multi-threaded process. -//! Currently only used by mpgen at build time. -void ExecProcess(const std::vector& args); +//! pair. Calls connect_info_to_args callback with a connection string that +//! needs to be passed to the child process, and executes the argv command line +//! it returns. Returns child process id and socket id. +std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args); + +//! Spawn a process and return its process id. Caller should call WaitProcess +//! on the returned id. +ProcessId SpawnProcess(const std::vector& args); + +//! Initialize spawned child process using the ConnectInfo string passed to it, +//! returning a socket id for communicating with the parent process. +SocketId StartSpawned(const ConnectInfo& connect_info); + +//! Create a socket pair that can be used to communicate within a process or +//! between parent and child processes. +std::array SocketPair(); + +//! Start a process and return its process id. Caller should call WaitProcess +//! on the returned id. +ProcessId ExecProcess(const std::vector& args); //! Wait for a process to exit and return its exit code. -int WaitProcess(int pid); +int WaitProcess(ProcessId pid); inline char* CharCast(char* c) { return c; } inline char* CharCast(unsigned char* c) { return (char*)c; } diff --git a/include/mp/version.h b/include/mp/version.h index 423ed460..4587a288 100644 --- a/include/mp/version.h +++ b/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 11 +#define MP_MAJOR_VERSION 12 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to diff --git a/src/mp/gen.cpp b/src/mp/gen.cpp index 603f9ccb..07a41a1f 100644 --- a/src/mp/gen.cpp +++ b/src/mp/gen.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -20,14 +19,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include -#include #include #include @@ -170,7 +168,7 @@ static void Generate(kj::StringPtr src_prefix, if (p != std::string::npos) include_base.erase(p); std::vector args; - args.emplace_back(capnp_PREFIX "/bin/capnp"); + args.emplace_back(CAPNP_EXECUTABLE); args.emplace_back("compile"); args.emplace_back("--src-prefix="); args.back().append(src_prefix.cStr(), src_prefix.size()); @@ -178,18 +176,11 @@ static void Generate(kj::StringPtr src_prefix, args.emplace_back("--import-path="); args.back().append(import_path.cStr(), import_path.size()); } - args.emplace_back("--output=" capnp_PREFIX "/bin/capnpc-c++"); + args.emplace_back("--output=" CAPNPC_CXX_EXECUTABLE); args.emplace_back(src_file); - const int pid = fork(); - if (pid == -1) { - throw std::system_error(errno, std::system_category(), "fork"); - } - if (!pid) { - mp::ExecProcess(args); - } - const int status = mp::WaitProcess(pid); + const int status = mp::WaitProcess(mp::ExecProcess(args)); if (status) { - throw std::runtime_error("Invoking " capnp_PREFIX "/bin/capnp failed"); + throw std::runtime_error("Invoking " CAPNP_EXECUTABLE " failed"); } const capnp::SchemaParser parser; @@ -677,35 +668,41 @@ static void Generate(kj::StringPtr src_prefix, int main(int argc, char** argv) { - if (argc < 3) { - std::cerr << "Usage: " << PROXY_BIN << " SRC_PREFIX INCLUDE_PREFIX SRC_FILE [IMPORT_PATH...]\n"; - exit(1); - } - std::vector import_paths; - std::vector> import_dirs; - auto fs = kj::newDiskFilesystem(); - auto cwd = fs->getCurrentPath(); - kj::Own src_dir; - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[1]))) { - src_dir = kj::mv(*dir); - } else { - throw std::runtime_error(std::string("Failed to open src_prefix prefix directory: ") + argv[1]); - } - for (int i = 4; i < argc; ++i) { - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[i]))) { - import_paths.emplace_back(argv[i]); - import_dirs.emplace_back(kj::mv(*dir)); + int ret = 1; + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { + if (argc < 3) { + std::cerr << "Usage: " << PROXY_BIN << " SRC_PREFIX INCLUDE_PREFIX SRC_FILE [IMPORT_PATH...]\n"; + exit(1); + } + std::vector import_paths; + std::vector> import_dirs; + auto fs = kj::newDiskFilesystem(); + auto cwd = fs->getCurrentPath(); + kj::Own src_dir; + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[1]))) { + src_dir = kj::mv(*dir); } else { - throw std::runtime_error(std::string("Failed to open import directory: ") + argv[i]); + throw std::runtime_error(std::string("Failed to open src_prefix prefix directory: ") + argv[1]); } - } - for (const char* path : {CMAKE_INSTALL_PREFIX "/include", capnp_PREFIX "/include"}) { - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(path))) { - import_paths.emplace_back(path); - import_dirs.emplace_back(kj::mv(*dir)); + for (int i = 4; i < argc; ++i) { + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[i]))) { + import_paths.emplace_back(argv[i]); + import_dirs.emplace_back(kj::mv(*dir)); + } else { + throw std::runtime_error(std::string("Failed to open import directory: ") + argv[i]); + } + } + for (const char* path : {CMAKE_INSTALL_PREFIX "/include", CAPNP_INCLUDE_DIRS}) { + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(path))) { + import_paths.emplace_back(path); + import_dirs.emplace_back(kj::mv(*dir)); + } + // No exception thrown if _PREFIX directories do not exist } - // No exception thrown if _PREFIX directories do not exist + Generate(argv[1], argv[2], argv[3], import_paths, *src_dir, import_dirs); + ret = 0; + })) { + std::cerr << "mpgen error: " << kj::str(*exception).cStr() << '\n'; } - Generate(argv[1], argv[2], argv[3], import_paths, *src_dir, import_dirs); - return 0; + return ret; } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 963050c3..64f5693a 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -31,10 +32,8 @@ #include #include #include -#include #include #include -#include #include namespace mp { @@ -67,10 +66,9 @@ void EventLoopRef::reset(bool relock) MP_NO_TSA loop->m_num_clients -= 1; if (loop->done()) { loop->m_cv.notify_all(); - int post_fd{loop->m_post_fd}; loop_lock->unlock(); char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) + loop->m_post_writer->write(&buffer, 1); // By default, do not try to relock `loop_lock` after writing, // because the event loop could wake up and destroy itself and the // mutex might no longer exist. @@ -101,6 +99,35 @@ Connection::~Connection() // after the calls finish. m_rpc_system.reset(); + // shutdownWrite is needed on Windows so pending data in the m_stream socket + // will be sent instead of discarded when m_stream is destroyed. On unix, + // this doesn't seem to be needed because data is sent more reliably. + // + // Sending pending data is important if the connection is a socketpair + // because when one side of the socketpair is closed, the other side doesn't + // seem to receive any onDisconnect event. So it is important for the other + // side to instead receive Cap'n Proto "release" messages (see `struct + // Release` in capnp/rpc.capnp) from local Client objects being destroyed so + // the remote side can free resources and shut down cleanly. Without this, + // when one side of a socket pair is closed the other side may not receive + // these messages, preventing the remote side from freeing ProxyServer + // resources and shutting down cleanly. + // Use kj::runCatchingExceptions instead of try/catch because on macOS with + // dynamic libraries, kj::Exception typeinfo differs between libcapnp and + // the calling binary, so catch (const kj::Exception&) silently fails to + // match. kj::runCatchingExceptions uses KJ's own interception mechanism + // which works correctly across dynamic library boundaries. + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { + m_stream->shutdownWrite(); + })) { + // Ignore ENOTCONN: on macOS/FreeBSD (unlike Linux), shutdown(SHUT_WR) + // returns ENOTCONN if the peer already closed the connection. This is + // expected when the destructor is triggered by a remote disconnect. + if (exception->getType() != kj::Exception::Type::DISCONNECTED) { + kj::throwRecoverableException(kj::mv(*exception)); + } + } + // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup // handlers are in the async list. // @@ -174,6 +201,40 @@ void Connection::removeSyncCleanup(CleanupIt it) m_sync_cleanup_fns.erase(it); } +#ifdef WIN32 +//! Synchronous socket output stream. Cap'n Proto library only provides limited +//! support for synchronous IO. It provides `FdOutputStream` which wraps unix +//! file descriptors and calls write() internally, and `HandleOutStream` which +//! wraps windows HANDLE values and calls WriteFile() internally. This class +//! just provides analogous functionality wrapping SOCKET values and calls +//! send() internally. +class SocketOutputStream : public kj::OutputStream { +public: + explicit SocketOutputStream(SOCKET socket) : m_socket(socket) {} + + void write(const void* buffer, size_t size) override; + +private: + SOCKET m_socket; +}; + +static constexpr size_t WRITE_CLAMP_SIZE = 1u << 30; // 1GB clamp for Windows, like FdOutputStream + +void SocketOutputStream::write(const void* buffer, size_t size) { + const char* pos = reinterpret_cast(buffer); + + while (size > 0) { + int n = send(m_socket, pos, static_cast(kj::min(size, WRITE_CLAMP_SIZE)), 0); + + KJ_WIN32(n != SOCKET_ERROR, "send() failed"); + KJ_ASSERT(n > 0, "send() returned zero."); + + pos += n; + size -= n; + } +} +#endif + void EventLoop::addAsyncCleanup(std::function fn) { const Lock lock(m_mutex); @@ -204,10 +265,18 @@ EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context) m_log_opts(std::move(log_opts)), m_context(context) { - int fds[2]; - KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); - m_wait_fd = fds[0]; - m_post_fd = fds[1]; + auto pipe = m_io_context.provider->newTwoWayPipe(); + m_wait_stream = kj::mv(pipe.ends[0]); + m_post_stream = kj::mv(pipe.ends[1]); + KJ_IF_MAYBE(fd, m_post_stream->getFd()) { + m_post_writer = kj::heap(*fd); +#ifdef WIN32 + } else KJ_IF_MAYBE(handle, m_post_stream->getWin32Handle()) { + m_post_writer = kj::heap(reinterpret_cast(*handle)); +#endif + } else { + throw std::logic_error("Could not get file descriptor for new pipe."); + } } EventLoop::~EventLoop() @@ -216,8 +285,8 @@ EventLoop::~EventLoop() const Lock lock(m_mutex); KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(!m_async_fns); - KJ_ASSERT(m_wait_fd == -1); - KJ_ASSERT(m_post_fd == -1); + KJ_ASSERT(!m_wait_stream); + KJ_ASSERT(!m_post_stream); KJ_ASSERT(m_num_clients == 0); // Spin event loop. wait for any promises triggered by RPC shutdown. @@ -237,9 +306,7 @@ void EventLoop::loop() m_async_fns.emplace(); } - kj::Own wait_stream{ - m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; - int post_fd{m_post_fd}; + kj::Own& wait_stream{m_wait_stream}; char buffer = 0; for (;;) { const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); @@ -256,7 +323,7 @@ void EventLoop::loop() m_cv.notify_all(); } else if (done()) { // Intentionally do not break if m_post_fn was set, even if done() - // would return true, to ensure that the EventLoopRef write(post_fd) + // would return true, to ensure that the post() m_post_writer->write() // call always succeeds and the loop does not exit between the time // that the done condition is set and the write call is made. break; @@ -266,10 +333,9 @@ void EventLoop::loop() m_task_set.reset(); MP_LOG(*this, Log::Info) << "EventLoop::loop bye."; wait_stream = nullptr; - KJ_SYSCALL(::close(post_fd)); const Lock lock(m_mutex); - m_wait_fd = -1; - m_post_fd = -1; + m_wait_stream = nullptr; + m_post_stream = nullptr; m_async_fns.reset(); m_cv.notify_all(); } @@ -284,10 +350,9 @@ void EventLoop::post(kj::Function fn) EventLoopRef ref(*this, &lock); m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; }); m_post_fn = &fn; - int post_fd{m_post_fd}; Unlock(lock, [&] { char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); + m_post_writer->write(&buffer, 1); }); m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; }); } diff --git a/src/mp/util.cpp b/src/mp/util.cpp index 463947b9..a1255e07 100644 --- a/src/mp/util.cpp +++ b/src/mp/util.cpp @@ -10,20 +10,31 @@ #include #include #include +#include #include -#include #include #include -#include -#include -#include -#include #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include #include #include +#ifdef WIN32 +#include +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#include +#define _getpid getpid +#endif + #ifdef __linux__ #include #endif @@ -32,11 +43,17 @@ #include #endif // HAVE_PTHREAD_GETTHREADID_NP +#ifdef WIN32 +// Forward-declare internal capnp function. +namespace kj { namespace _ { int win32Socketpair(SOCKET socks[2]); } } +#endif + namespace fs = std::filesystem; namespace mp { namespace { +#ifndef WIN32 std::vector MakeArgv(const std::vector& args) { std::vector argv; @@ -58,18 +75,19 @@ size_t MaxFd() return 1023; } } +#endif } // namespace std::string ThreadName(const char* exe_name) { char thread_name[16] = {0}; -#ifdef HAVE_PTHREAD_GETNAME_NP +#if defined(HAVE_PTHREAD_GETNAME_NP) && !defined(WIN32) pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name)); #endif // HAVE_PTHREAD_GETNAME_NP std::ostringstream buffer; - buffer << (exe_name ? exe_name : "") << "-" << getpid() << "/"; + buffer << (exe_name ? exe_name : "") << "-" << _getpid() << "/"; if (thread_name[0] != '\0') { buffer << thread_name << "-"; @@ -79,6 +97,8 @@ std::string ThreadName(const char* exe_name) // the former are shorter and are the same as what gdb prints "LWP ...". #ifdef __linux__ buffer << syscall(SYS_gettid); +#elif defined(WIN32) + buffer << GetCurrentThreadId(); #elif defined(HAVE_PTHREAD_THREADID_NP) uint64_t tid = 0; pthread_threadid_np(nullptr, &tid); @@ -116,23 +136,66 @@ std::string LogEscape(const kj::StringTree& string, size_t max_size) return result; } -int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) +//! Generate command line that the executable being invoked will split up using +//! the CommandLineToArgvW function, which expects arguments with spaces to be +//! quoted, quote characters to be backslash-escaped, and backslashes to also be +//! backslash-escaped, but only if they precede a quote character. +std::string CommandLineFromArgv(const std::vector& argv) { - int fds[2]; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) { - throw std::system_error(errno, std::system_category(), "socketpair"); + std::string out; + for (const auto& arg : argv) { + if (!out.empty()) out += " "; + if (!arg.empty() && arg.find_first_of(" \t\"") == std::string::npos) { + // Argument has no quotes or spaces so escaping not necessary. + out += arg; + } else { + out += '"'; // Start with a quote + for (size_t i = 0; i < arg.size(); ++i) { + if (arg[i] == '\\') { + // Count consecutive backslashes + size_t backslash_count = 0; + while (i < arg.size() && arg[i] == '\\') { + ++backslash_count; + ++i; + } + if (i < arg.size() && arg[i] == '"') { + // Backslashes before a quote need to be doubled + out.append(backslash_count * 2 + 1, '\\'); + out.push_back('"'); + } else { + // Otherwise, backslashes remain as-is + out.append(backslash_count, '\\'); + --i; // Compensate for the outer loop's increment + } + } else if (arg[i] == '"') { + // Escape double quotes with a backslash + out.push_back('\\'); + out.push_back('"'); + } else { + out.push_back(arg[i]); + } + } + out += '"'; // End with a quote + } } + return out; +} + +std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args) +{ + auto fds{SocketPair()}; +#ifndef WIN32 // Evaluate the callback and build the argv array before forking. // // The parent process may be multi-threaded and holding internal library // locks at fork time. In that case, running code that allocates memory or // takes locks in the child between fork() and exec() can deadlock // indefinitely. Precomputing arguments in the parent avoids this. - const std::vector args{fd_to_args(fds[0])}; + const std::vector args{connect_info_to_args(std::to_string(fds[0]))}; const std::vector argv{MakeArgv(args)}; - pid = fork(); + ProcessId pid = fork(); if (pid == -1) { throw std::system_error(errno, std::system_category(), "fork"); } @@ -160,6 +223,16 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) } } + // Explicitly clear FD_CLOEXEC on the child's socket before calling + // exec, so the fd survives into the spawned process regardless of how + // the socket was created. + int flags = fcntl(fds[0], F_GETFD); + if (flags == -1) throw std::system_error(errno, std::system_category(), "fcntl F_GETFD"); + if (flags & FD_CLOEXEC) { + flags &= ~FD_CLOEXEC; + if (fcntl(fds[0], F_SETFD, flags) == -1) throw std::system_error(errno, std::system_category(), "fcntl F_SETFD"); + } + execvp(argv[0], argv.data()); // NOTE: perror() is not async-signal-safe; calling it here in a // post-fork child may deadlock in multithreaded parents. @@ -168,12 +241,77 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) perror("execvp failed"); _exit(127); } - return fds[1]; + return {pid, fds[1]}; +#else + // Create windows pipe to send socket over to child process. + static std::atomic counter{1}; + ConnectInfo pipe_path{"\\\\.\\pipe\\mp-" + std::to_string(GetCurrentProcessId()) + "-" + std::to_string(counter.fetch_add(1))}; + HANDLE pipe{CreateNamedPipeA(pipe_path.c_str(), PIPE_ACCESS_OUTBOUND, PIPE_TYPE_MESSAGE | PIPE_WAIT, /*nMaxInstances=*/1, /*nOutBufferSize=*/0, /*nInBufferSize=*/0, /*nDefaultTimeOut=*/0, /*lpSecurityAttributes=*/nullptr)}; + KJ_WIN32(pipe != INVALID_HANDLE_VALUE, "CreateNamedPipe failed"); + + // Start child process + std::string cmd{CommandLineFromArgv(connect_info_to_args(pipe_path))}; + STARTUPINFOA si{}; + si.cb = sizeof(si); + PROCESS_INFORMATION pi{}; + KJ_WIN32(CreateProcessA(/*lpApplicationName=*/nullptr, const_cast(cmd.c_str()), /*lpProcessAttributes=*/nullptr, /*lpThreadAttributes=*/nullptr, /*bInheritHandles=*/TRUE, /*dwCreationFlags=*/0, /*lpEnvironment=*/nullptr, /*lpCurrentDirectory=*/nullptr, &si, &pi), "CreateProcess failed"); + KJ_WIN32(CloseHandle(pi.hThread), "CloseHandle(hThread)"); + + // Duplicate socket for the child (now that we know its PID) + WSAPROTOCOL_INFO info{}; + KJ_WINSOCK(WSADuplicateSocket(fds[0], pi.dwProcessId, &info), "WSADuplicateSocket failed"); + + // Send socket to the child via the pipe + KJ_WIN32(ConnectNamedPipe(pipe, nullptr) || GetLastError() == ERROR_PIPE_CONNECTED, "ConnectNamedPipe failed"); + DWORD wr; + KJ_WIN32(WriteFile(pipe, &info, sizeof(info), &wr, nullptr) && wr == sizeof(info), "WriteFile(pipe) failed"); + KJ_WIN32(CloseHandle(pipe), "CloseHandle(pipe)"); + + return {reinterpret_cast(pi.hProcess), fds[1]}; +#endif +} + +SocketId StartSpawned(const ConnectInfo& connect_info) +{ +#ifndef WIN32 + return std::stoi(connect_info); +#else + HANDLE pipe = CreateFileA(connect_info.c_str(), /*dwDesiredAccess=*/GENERIC_READ, /*dwShareMode=*/0, /*lpSecurityAttributes=*/nullptr, /*dwCreationDisposition=*/OPEN_EXISTING, /*dwFlagsAndAttributes=*/0, /*hTemplateFile=*/nullptr); + KJ_WIN32(pipe != INVALID_HANDLE_VALUE, "CreateFile(pipe) failed"); + + WSAPROTOCOL_INFO info{}; + DWORD rd; + KJ_WIN32(ReadFile(pipe, &info, sizeof(info), &rd, nullptr) && rd == sizeof(info), "ReadFile(pipe) failed"); + KJ_WIN32(CloseHandle(pipe), "CloseHandle(pipe)"); + + WSADATA dontcare; + if (int wsaErr = WSAStartup(MAKEWORD(2, 2), &dontcare)) KJ_FAIL_WIN32("WSAStartup()", wsaErr); + + SOCKET socket{WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &info, 0, WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT)}; + KJ_WINSOCK(socket, "WSASocket(FROM_PROTOCOL_INFO) failed"); + return socket; +#endif +} + +std::array SocketPair() +{ +#ifdef WIN32 + SOCKET pair[2]; + KJ_WINSOCK(kj::_::win32Socketpair(pair)); +#else + int pair[2]; + KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, pair)); +#endif + return {pair[0], pair[1]}; } -void ExecProcess(const std::vector& args) +ProcessId ExecProcess(const std::vector& args) { +#ifndef WIN32 const std::vector argv{MakeArgv(args)}; + ProcessId pid; + KJ_SYSCALL(pid = fork()); + if (pid) return pid; if (execvp(argv[0], argv.data()) != 0) { perror("execvp failed"); if (errno == ENOENT && !args.empty()) { @@ -181,15 +319,34 @@ void ExecProcess(const std::vector& args) } _exit(1); } + KJ_UNREACHABLE; +#else + std::string cmd{CommandLineFromArgv(args)}; + STARTUPINFOA si{}; + si.cb = sizeof(si); + PROCESS_INFORMATION pi{}; + KJ_WIN32(CreateProcessA(/*lpApplicationName=*/nullptr, const_cast(cmd.c_str()), /*lpProcessAttributes=*/nullptr, /*lpThreadAttributes=*/nullptr, /*bInheritHandles=*/FALSE, /*dwCreationFlags=*/0, /*lpEnvironment=*/nullptr, /*lpCurrentDirectory=*/nullptr, &si, &pi), "CreateProcess"); + KJ_WIN32(CloseHandle(pi.hThread), "CloseHandle(hThread)"); + return reinterpret_cast(pi.hProcess); +#endif } -int WaitProcess(int pid) +int WaitProcess(ProcessId pid) { +#ifndef WIN32 int status; if (::waitpid(pid, &status, /*options=*/0) != pid) { throw std::system_error(errno, std::system_category(), "waitpid"); } return status; +#else + HANDLE handle{reinterpret_cast(pid)}; + DWORD result{WaitForSingleObject(handle, /*dwMilliseconds=*/INFINITE)}; + if (result != WAIT_OBJECT_0) KJ_FAIL_WIN32("WaitForSingleObject(child)", GetLastError()); + KJ_WIN32(GetExitCodeProcess(handle, &result), "GetExitCodeProcess"); + KJ_WIN32(CloseHandle(handle), "CloseHandle(process)"); + return result; +#endif } } // namespace mp diff --git a/test/mp/test/spawn_tests.cpp b/test/mp/test/spawn_tests.cpp index a14e50e2..5184667b 100644 --- a/test/mp/test/spawn_tests.cpp +++ b/test/mp/test/spawn_tests.cpp @@ -9,23 +9,31 @@ #include #include #include -#include #include #include #include -#include #include -#include +#include +#include #include +#ifndef WIN32 +#include +#include +#include +#endif + +namespace mp { +namespace test { namespace { +#ifndef WIN32 constexpr auto FAILURE_TIMEOUT = std::chrono::seconds{30}; // Poll for child process exit using waitpid(..., WNOHANG) until the child exits // or timeout expires. Returns true if the child exited and status_out was set. // Returns false on timeout or error. -static bool WaitPidWithTimeout(int pid, std::chrono::milliseconds timeout, int& status_out) +static bool WaitPidWithTimeout(ProcessId pid, std::chrono::milliseconds timeout, int& status_out) { const auto deadline = std::chrono::steady_clock::now() + timeout; while (std::chrono::steady_clock::now() < deadline) { @@ -40,14 +48,19 @@ static bool WaitPidWithTimeout(int pid, std::chrono::milliseconds timeout, int& } return false; } +#endif // !WIN32 } // namespace +#ifndef WIN32 KJ_TEST("SpawnProcess does not run callback in child") { // This test is designed to fail deterministically if fd_to_args is invoked // in the post-fork child: a mutex held by another parent thread at fork // time appears locked forever in the child. + // + // This test is Unix-only: Windows uses CreateProcess (not fork), so the + // inherited-locked-mutex hazard does not apply there. std::mutex target_mutex; std::mutex control_mutex; std::condition_variable control_cv; @@ -86,14 +99,13 @@ KJ_TEST("SpawnProcess does not run callback in child") control_cv.notify_one(); }); - int pid{-1}; - const int fd{mp::SpawnProcess(pid, [&](int child_fd) -> std::vector { + const auto [pid, socket]{SpawnProcess([&](ConnectInfo connect_info) -> std::vector { // If this callback runs in the post-fork child, target_mutex appears // locked forever (the owning thread does not exist), so this deadlocks. std::lock_guard g(target_mutex); - return {"true", std::to_string(child_fd)}; + return {"true", std::move(connect_info)}; })}; - ::close(fd); + ::close(socket); int status{0}; // Give the child some time to exit. If it does not, terminate it and @@ -110,3 +122,6 @@ KJ_TEST("SpawnProcess does not run callback in child") KJ_EXPECT(exited, "Timeout waiting for child process to exit"); KJ_EXPECT(WIFEXITED(status) && WEXITSTATUS(status) == 0); } +#endif // !WIN32 +} // namespace test +} // namespace mp