diff --git a/Cargo.lock b/Cargo.lock index e2c7a049c..74b8420d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -749,6 +749,7 @@ dependencies = [ "log", "nix 0.30.1", "vmm-sys-util 0.14.0", + "windows-sys", ] [[package]] diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 67475ad23..48f7d878f 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -47,7 +47,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" dependencies = [ - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -58,7 +58,7 @@ checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -420,7 +420,7 @@ dependencies = [ "gobject-sys", "libc", "system-deps", - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -713,6 +713,7 @@ dependencies = [ "log", "nix", "vmm-sys-util", + "windows-sys 0.61.2", ] [[package]] @@ -1106,7 +1107,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -1115,6 +1116,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.59.0" @@ -1124,6 +1131,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-targets" version = "0.52.6" diff --git a/src/polly/Cargo.toml b/src/polly/Cargo.toml index 93c9d57bc..b5e0f5bfe 100644 --- a/src/polly/Cargo.toml +++ b/src/polly/Cargo.toml @@ -8,5 +8,7 @@ license = "Apache-2.0" repository = "https://github.com/containers/libkrun" [dependencies] -libc = ">=0.2.39" utils = { package = "krun-utils", version = "=0.1.0-1.18.0", path="../utils" } + +[target.'cfg(unix)'.dependencies] +libc = ">=0.2.39" \ No newline at end of file diff --git a/src/polly/src/event_manager.rs b/src/polly/src/event_manager.rs index 55306459a..0a42df0d8 100644 --- a/src/polly/src/event_manager.rs +++ b/src/polly/src/event_manager.rs @@ -4,9 +4,13 @@ use std::collections::HashMap; use std::fmt::Formatter; use std::io; -use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::{Arc, Mutex}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(target_os = "windows")] +use utils::windows::{AsRawFd, RawFd}; + use utils::epoll::{self, Epoll, EpollEvent}; pub type Result = std::result::Result; @@ -33,11 +37,13 @@ impl std::fmt::Debug for Error { Poll(err) => write!(f, "Error during epoll call: {err}"), AlreadyExists(pollable) => write!( f, - "A handler for the specified pollable {pollable} already exists." + "A handler for the specified pollable {:?} already exists.", + pollable ), NotFound(pollable) => write!( f, - "A handler for the specified pollable {pollable} was not found." + "A handler for the specified pollable {:?} was not found.", + pollable ), } } @@ -110,7 +116,7 @@ impl EventManager { let interest_list = subscriber.lock().unwrap().interest_list(); for event in interest_list { - self.register(event.data() as i32, event, subscriber.clone())? + self.register(event.data() as Pollable, event, subscriber.clone())? } Ok(()) @@ -202,6 +208,7 @@ impl EventManager { &mut self.ready_events[..], ) { Ok(event_count) => event_count, + #[cfg(unix)] Err(e) if e.raw_os_error() == Some(libc::EINTR) => 0, Err(e) => return Err(Error::Poll(e)), }; @@ -254,9 +261,19 @@ mod tests { impl DummySubscriber { fn new() -> Self { + let event_fd_1 = EventFd::new(0).unwrap(); + let event_fd_2 = EventFd::new(0).unwrap(); + // On Windows the IOCP only delivers a packet when the underlying + // Event object is signaled. Linux eventfds are always writable + // (EPOLLOUT fires instantly) so this isn't needed there. + #[cfg(target_os = "windows")] + { + event_fd_1.write(1).unwrap(); + event_fd_2.write(1).unwrap(); + } DummySubscriber { - event_fd_1: EventFd::new(0).unwrap(), - event_fd_2: EventFd::new(0).unwrap(), + event_fd_1, + event_fd_2, processed_ev1_in: false, processed_ev2_in: false, register_ev2: false, @@ -339,7 +356,7 @@ mod tests { impl Subscriber for DummySubscriber { fn process(&mut self, event: &EpollEvent, event_manager: &mut EventManager) { - let source = event.data() as i32; + let source = event.data() as Pollable; let event_set = EventSet::from_bits(event.events()).unwrap(); // We only know how to treat EPOLLOUT and EPOLLIN. @@ -521,6 +538,11 @@ mod tests { let dummy_fd = dummy_subscriber.lock().unwrap().event_fd_1.as_raw_fd(); assert!(event_manager.subscriber(dummy_fd).is_ok()); - assert!(event_manager.subscriber(-1).is_err()); + + #[cfg(unix)] + let bad_fd: Pollable = -1; + #[cfg(windows)] + let bad_fd: Pollable = std::ptr::null_mut(); + assert!(event_manager.subscriber(bad_fd).is_err()); } } diff --git a/src/utils/Cargo.toml b/src/utils/Cargo.toml index 9a8c4937e..f24921bc4 100644 --- a/src/utils/Cargo.toml +++ b/src/utils/Cargo.toml @@ -9,8 +9,10 @@ repository = "https://github.com/containers/libkrun" [dependencies] bitflags = "1.2.0" -libc = ">=0.2.85" log = "0.4.0" + +[target.'cfg(unix)'.dependencies] +libc = ">=0.2.85" nix = "0.30.1" vmm-sys-util = "0.14" crossbeam-channel = ">=0.5.15" @@ -20,3 +22,14 @@ kvm-bindings = { version = "0.12", features = ["fam-wrappers"] } [target.'cfg(target_os = "macos")'.dependencies] nix = { version = "0.30.1", features = ["fs"] } + +[target.'cfg(target_os = "windows")'.dependencies] +windows-sys = { version = "0.61.2", features = [ + "Win32_Foundation", + "Win32_Security", + "Win32_System_IO", + "Win32_System_Performance", + "Win32_System_SystemInformation", + "Win32_System_Threading", + "Win32_System_Time", +] } diff --git a/src/utils/src/lib.rs b/src/utils/src/lib.rs index f3b22a37b..64749b460 100644 --- a/src/utils/src/lib.rs +++ b/src/utils/src/lib.rs @@ -1,6 +1,7 @@ // Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +#[cfg(unix)] pub use vmm_sys_util::{errno, tempdir, tempfile, terminal}; #[cfg(target_os = "linux")] pub use vmm_sys_util::{eventfd, ioctl}; @@ -16,6 +17,12 @@ pub mod macos; pub use macos::epoll; #[cfg(target_os = "macos")] pub use macos::eventfd; +#[cfg(target_os = "windows")] +pub mod windows; +#[cfg(target_os = "windows")] +pub use windows::epoll; +#[cfg(target_os = "windows")] +pub use windows::eventfd; pub mod pollable_channel; #[cfg(target_arch = "x86_64")] pub mod rand; diff --git a/src/utils/src/pollable_channel.rs b/src/utils/src/pollable_channel.rs index aca76f4a4..bc9ad8883 100644 --- a/src/utils/src/pollable_channel.rs +++ b/src/utils/src/pollable_channel.rs @@ -2,9 +2,13 @@ use crate::eventfd::{EventFd, EFD_NONBLOCK, EFD_SEMAPHORE}; use std::collections::VecDeque; use std::io; use std::io::ErrorKind; -use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::sync::{Arc, Mutex}; +#[cfg(target_os = "windows")] +use crate::windows::{AsRawFd, RawFd}; +#[cfg(unix)] +use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}; + /// A multiple producer single consumer channel that can be listened to by a file descriptor pub fn pollable_channel( ) -> io::Result<(PollableChannelSender, PollableChannelReciever)> { @@ -82,6 +86,7 @@ impl AsRawFd for PollableChannelReciever { } } +#[cfg(unix)] impl AsFd for PollableChannelReciever { fn as_fd(&self) -> BorrowedFd<'_> { // SAFETY: The lifetime of the fd is the same as the lifetime of self.inner.eventfd which diff --git a/src/utils/src/time.rs b/src/utils/src/time.rs index 74604702c..892abf755 100644 --- a/src/utils/src/time.rs +++ b/src/utils/src/time.rs @@ -3,6 +3,27 @@ use std::fmt; +#[cfg(target_os = "windows")] +use std::mem::MaybeUninit; +#[cfg(target_os = "windows")] +use std::sync::OnceLock; +#[cfg(target_os = "windows")] +use std::time::{SystemTime, UNIX_EPOCH}; +#[cfg(target_os = "windows")] +use windows_sys::Win32::Foundation::{FILETIME, SYSTEMTIME}; +#[cfg(target_os = "windows")] +use windows_sys::Win32::System::Performance::{QueryPerformanceCounter, QueryPerformanceFrequency}; +#[cfg(target_os = "windows")] +use windows_sys::Win32::System::Threading::{ + GetCurrentProcess, GetCurrentThread, GetProcessTimes, GetThreadTimes, +}; +#[cfg(target_os = "windows")] +use windows_sys::Win32::System::Time::{FileTimeToSystemTime, SystemTimeToTzSpecificLocalTime}; +#[cfg(target_os = "windows")] +use windows_sys::Win32::{ + Foundation::HANDLE, System::SystemInformation::GetSystemTimePreciseAsFileTime, +}; + /// Constant to convert seconds to nanoseconds. pub const NANOS_PER_SECOND: u64 = 1_000_000_000; @@ -18,6 +39,7 @@ pub enum ClockType { ThreadCpu, } +#[cfg(unix)] impl From for libc::clockid_t { fn from(ctype: ClockType) -> libc::clockid_t { match ctype { @@ -47,6 +69,7 @@ pub struct LocalTime { nsec: i64, } +#[cfg(unix)] impl LocalTime { /// Returns the [LocalTime](struct.LocalTime.html) structure for the calling moment. pub fn now() -> LocalTime { @@ -89,6 +112,46 @@ impl LocalTime { } } +#[cfg(target_os = "windows")] +impl LocalTime { + pub fn now() -> LocalTime { + unsafe { + // Get high-precision UTC time (FILETIME) + let mut ft_utc = MaybeUninit::::uninit(); + GetSystemTimePreciseAsFileTime(ft_utc.as_mut_ptr()); + let ft_utc = ft_utc.assume_init(); + + // Convert directly to UTC SYSTEMTIME + let mut st_utc = MaybeUninit::::uninit(); + FileTimeToSystemTime(&ft_utc, st_utc.as_mut_ptr()); + let st_utc = st_utc.assume_init(); + + // Convert UTC SYSTEMTIME to Local SYSTEMTIME (handles DST perfectly) + let mut st_local = MaybeUninit::::uninit(); + SystemTimeToTzSpecificLocalTime( + std::ptr::null(), // Uses the active system time zone + &st_utc, + st_local.as_mut_ptr(), + ); + let st_local = st_local.assume_init(); + + // Extract nanoseconds from the original FILETIME (100ns ticks) + let ticks = ((ft_utc.dwHighDateTime as u64) << 32) | (ft_utc.dwLowDateTime as u64); + let nsec = (ticks % 10_000_000) * 100; + + LocalTime { + sec: st_local.wSecond as i32, + min: st_local.wMinute as i32, + hour: st_local.wHour as i32, + mday: st_local.wDay as i32, + mon: (st_local.wMonth as i32) - 1, + year: (st_local.wYear as i32) - 1900, + nsec: nsec as i64, + } + } + } +} + impl fmt::Display for LocalTime { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( @@ -143,6 +206,7 @@ pub fn timestamp_cycles() -> u64 { /// # Arguments /// /// * `clock_type` - Identifier of the Linux Kernel clock on which to act. +#[cfg(unix)] pub fn get_time(clock_type: ClockType) -> u64 { let mut time_struct = libc::timespec { tv_sec: 0, @@ -153,6 +217,66 @@ pub fn get_time(clock_type: ClockType) -> u64 { seconds_to_nanoseconds(time_struct.tv_sec).unwrap() as u64 + (time_struct.tv_nsec as u64) } +/// Returns a timestamp in nanoseconds based on the provided clock type. +#[cfg(target_os = "windows")] +pub fn get_time(clock_type: ClockType) -> u64 { + match clock_type { + ClockType::Monotonic => { + static FREQ: OnceLock = OnceLock::new(); + let freq = *FREQ.get_or_init(|| { + let mut f = 0; + unsafe { QueryPerformanceFrequency(&mut f) }; + f + }); + + let mut counter: i64 = 0; + unsafe { QueryPerformanceCounter(&mut counter) }; + ((counter as u128 * NANOS_PER_SECOND as u128) / freq as u128) as u64 + } + ClockType::Real => SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64, + ClockType::ProcessCpu => unsafe { get_handle_cpu_time(GetCurrentProcess(), true) }, + ClockType::ThreadCpu => unsafe { get_handle_cpu_time(GetCurrentThread(), false) }, + } +} + +#[cfg(target_os = "windows")] +unsafe fn get_handle_cpu_time(handle: HANDLE, is_process: bool) -> u64 { + let mut creation = MaybeUninit::::uninit(); + let mut exit = MaybeUninit::::uninit(); + let mut kernel = MaybeUninit::::uninit(); + let mut user = MaybeUninit::::uninit(); + + if is_process { + let _ = GetProcessTimes( + handle, + creation.as_mut_ptr(), + exit.as_mut_ptr(), + kernel.as_mut_ptr(), + user.as_mut_ptr(), + ); + } else { + let _ = GetThreadTimes( + handle, + creation.as_mut_ptr(), + exit.as_mut_ptr(), + kernel.as_mut_ptr(), + user.as_mut_ptr(), + ); + } + + let (kernel, user) = (kernel.assume_init(), user.assume_init()); + filetime_to_nanos(&kernel) + filetime_to_nanos(&user) +} + +#[cfg(target_os = "windows")] +fn filetime_to_nanos(ft: &FILETIME) -> u64 { + let ticks = ((ft.dwHighDateTime as u64) << 32) | (ft.dwLowDateTime as u64); + ticks * 100 // FILETIME ticks are 100ns intervals +} + /// Converts a timestamp in seconds to an equivalent one in nanoseconds. /// Returns `None` if the conversion overflows. /// diff --git a/src/utils/src/windows/bindings.rs b/src/utils/src/windows/bindings.rs new file mode 100644 index 000000000..c61857a2e --- /dev/null +++ b/src/utils/src/windows/bindings.rs @@ -0,0 +1,58 @@ +//! Windows FFI bindings used by the epoll and eventfd implementations. +//! +//! Documented Win32 APIs come from the [`windows_sys`] crate. The +//! undocumented NT native APIs (`NtCreateWaitCompletionPacket`, etc.) are +//! declared manually since they are not included in any official bindings +//! crate. + +use std::io; + +use windows_sys::Win32::Foundation::HANDLE; + +#[allow(non_camel_case_types, clippy::upper_case_acronyms)] +pub type NTSTATUS = i32; + +#[link(name = "ntdll")] +extern "system" { + pub fn NtCreateWaitCompletionPacket( + WaitCompletionPacketHandle: *mut HANDLE, + DesiredAccess: u32, + ObjectAttributes: *const std::ffi::c_void, + ) -> NTSTATUS; + + pub fn NtAssociateWaitCompletionPacket( + WaitCompletionPacketHandle: HANDLE, + IoCompletionHandle: HANDLE, + TargetObjectHandle: HANDLE, + KeyContext: *mut std::ffi::c_void, + ApcContext: *mut std::ffi::c_void, + IoStatus: NTSTATUS, + IoStatusInformation: usize, + AlreadySignaled: *mut u8, + ) -> NTSTATUS; + + pub fn NtCancelWaitCompletionPacket( + WaitCompletionPacketHandle: HANDLE, + RemoveSignaledPacket: u8, + ) -> NTSTATUS; + + pub fn RtlNtStatusToDosError(Status: NTSTATUS) -> u32; +} + +/// Equivalent of the `NT_SUCCESS` macro: returns `true` when `status` is in +/// the success (0x0000_0000–0x3FFF_FFFF) or informational +/// (0x4000_0000–0x7FFF_FFFF) range. +/// https://learn.microsoft.com/en-us/windows-hardware/drivers/kernel/using-ntstatus-values +#[inline] +pub fn nt_success(status: NTSTATUS) -> bool { + status >= 0 +} + +/// Convert a failing `NTSTATUS` to an [`io::Error`] via the corresponding +/// Win32 error code. +/// A mapping of NTSTATUS values to Win32 error codes can be found at +/// https://www.osr.com/blog/2020/04/23/ntstatus-to-win32-error-code-mappings/ +pub fn nt_status_err(status: NTSTATUS) -> io::Error { + let win_err = unsafe { RtlNtStatusToDosError(status) }; + io::Error::from_raw_os_error(win_err as i32) +} diff --git a/src/utils/src/windows/epoll.rs b/src/utils/src/windows/epoll.rs new file mode 100644 index 000000000..45bcd30b5 --- /dev/null +++ b/src/utils/src/windows/epoll.rs @@ -0,0 +1,710 @@ +//! Epoll-like I/O event polling for Windows, backed by I/O Completion Ports. +//! +//! Uses an IOCP as the central event multiplexer with +//! `NtAssociateWaitCompletionPacket` to bridge waitable kernel handles (like +//! Windows Event objects used by `EventFd`) into the completion port. This +//! gives true O(1) wake-up with no handle-count limitations. +//! +//! ## How it works +//! +//! 1. [`Epoll::new`] creates an I/O Completion Port. +//! 2. [`Epoll::ctl`] with [`ControlOperation::Add`] heap-allocates a [`Watch`] +//! struct, creates a *Wait Completion Packet* (WCP) via the NT native API, +//! and associates the caller's waitable handle with the IOCP. The raw +//! `Watch` pointer is used as the completion key. +//! 3. [`Epoll::wait`] calls `GetQueuedCompletionStatusEx` which blocks until +//! one or more packets arrive (or timeout). The returned completion key is +//! cast back to a `Watch` pointer, and event metadata is read through +//! atomics -- **no locks are acquired on this path**. +//! 4. After delivering each event, `wait` **re-associates** (if level-triggered) +//! the WCP so the next signal produces another packet. +//! 5. [`Epoll::ctl`] with [`ControlOperation::Delete`] marks the watch as +//! inactive, closes the WCP handle, and moves the `Watch` allocation to a +//! zombie list. A time-based GC sweep in the same method frees zombies +//! older than 5 seconds, ensuring any in-flight packets referencing the +//! `Watch` memory have been drained. + +use log::debug; +use std::collections::HashMap; +use std::io; +use std::ptr; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use bitflags::bitflags; + +use super::bindings::*; +use super::{AsRawFd, RawFd}; + +use windows_sys::Win32::Foundation::{ + CloseHandle, HANDLE, INVALID_HANDLE_VALUE, WAIT_TIMEOUT as WAIT_TIMEOUT_CODE, +}; +use windows_sys::Win32::System::Threading::INFINITE; +use windows_sys::Win32::System::IO::{ + CreateIoCompletionPort, GetQueuedCompletionStatusEx, OVERLAPPED_ENTRY, +}; + +// Generic access mask requesting all permissions the caller is allowed. +// https://learn.microsoft.com/en-us/windows/win32/secauthz/access-mask +const MAXIMUM_ALLOWED: u32 = 0x0200_0000; +const GC_THRESHOLD: Duration = Duration::from_secs(5); + +#[repr(i32)] +pub enum ControlOperation { + Add, + Modify, + Delete, +} + +bitflags! { + /// Bitmask of I/O readiness event types. + /// + /// Bit values are intentionally identical to the macOS implementation so + /// that device code using these constants is portable across all three + /// supported platforms. + pub struct EventSet: u32 { + /// The handle is ready for reading. + const IN = 0b00000001; + /// The handle is ready for writing. + const OUT = 0b00000010; + /// Hang-up (peer closed its end). + const HANG_UP = 0b00000100; + /// Read hang-up (peer shut down its write side). + const READ_HANG_UP = 0b00001000; + /// Request edge-triggered notification. The WCP is not re-armed + /// after delivery; use [`ControlOperation::Modify`] to re-register. + const EDGE_TRIGGERED = 0b00010000; + } +} + +/// Carrier for a readiness event, mirroring `libc::epoll_event` on Linux. +#[derive(Clone, Copy, Default)] +pub struct EpollEvent { + pub events: u32, + u64: u64, +} + +impl std::fmt::Debug for EpollEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{ events: {}, data: {} }}", self.events(), self.data()) + } +} + +impl EpollEvent { + /// Create a new event with the given readiness mask and user data. + pub fn new(events: EventSet, data: u64) -> Self { + EpollEvent { + events: events.bits(), + u64: data, + } + } + + pub fn events(&self) -> u32 { + self.events + } + + pub fn event_set(&self) -> EventSet { + // This unwrap is safe because `epoll_events` can only be user created or + // initialized by the kernel. We trust the kernel to only send us valid + // events. The user can only initialize `epoll_events` using valid events. + EventSet::from_bits(self.events()).unwrap() + } + + pub fn data(&self) -> u64 { + debug!("EpollEvent data: {}", self.u64); + self.u64 + } + + pub fn fd(&self) -> RawFd { + self.u64 as RawFd + } +} + +/// A watched handle and its associated metadata. +/// +/// Heap-allocated so its address can serve as the IOCP completion key. +/// This lets [`Epoll::wait`] access `Watch` fields through raw +/// pointers and atomics with no locking. +struct Watch { + fd: HANDLE, + wcp: HANDLE, + events: AtomicU32, + data: AtomicU64, + /// Cleared by [`ControlOperation::Delete`] so that in-flight completion + /// packets already queued by the kernel are silently ignored. + is_active: AtomicBool, +} + +/// The I/O Completion Port and the set of handles it is watching. +struct CompletionPort { + handle: HANDLE, + watches: Mutex>, + /// When you call ctl(Delete), we tell the kernel to cancel the WCP. + /// However, there is a possible race condition where + /// the kernel already signaled the handle and put the completion packet in the IOCP queue, + /// but the vCPU thread hasn't popped it out. + /// If we drop Watch immediately during ctl(Delete), + /// the VCPU will have a pointer to a freed resource resulting in a segfault. + /// Instead, we use this vector to maintain a list of Watch pointers and when + /// they were added so we can drop the Watch safely. + zombies: Mutex>, +} + +// All raw `*mut Watch` pointers are either behind a `Mutex` (in +// `watches` / `zombies`) or read-only in the lock-free `wait` path where +// the pointed-to fields are atomics. Windows `HANDLE` values are valid +// across threads. +unsafe impl Send for CompletionPort {} +unsafe impl Sync for CompletionPort {} + +impl Drop for CompletionPort { + fn drop(&mut self) { + for (_, ptr) in self.watches.get_mut().unwrap().drain() { + unsafe { + let w = Box::from_raw(ptr); + let _ = NtCancelWaitCompletionPacket(w.wcp, 1); + CloseHandle(w.wcp); + } + } + for (_, ptr) in self.zombies.get_mut().unwrap().drain(..) { + unsafe { + let _ = Box::from_raw(ptr); + } + } + unsafe { + CloseHandle(self.handle); + } + } +} + +/// Associate a Wait Completion Packet with the given IOCP and target handle. +/// +/// When `fd` becomes signaled, the kernel pushes a completion packet to +/// `iocp` carrying `key` as the completion key (a raw pointer to the +/// corresponding [`Watch`]). +fn associate_wcp( + wcp: HANDLE, + iocp: HANDLE, + fd: HANDLE, + key: *mut std::ffi::c_void, +) -> io::Result<()> { + let mut already_signaled: u8 = 0; + let status = unsafe { + NtAssociateWaitCompletionPacket( + wcp, + iocp, + fd, + key, + ptr::null_mut(), + 0, + 0, + &mut already_signaled, + ) + }; + if !nt_success(status) { + return Err(nt_status_err(status)); + } + Ok(()) +} + +/// Epoll-compatible polling abstraction backed by an I/O Completion Port. +pub struct Epoll { + iocp: Arc, + entries: Vec, +} + +impl Clone for Epoll { + fn clone(&self) -> Self { + Epoll { + iocp: self.iocp.clone(), + entries: Vec::new(), + } + } +} + +impl Epoll { + /// Create a new polling instance backed by a fresh I/O Completion Port. + pub fn new() -> io::Result { + let handle = + unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, std::ptr::null_mut(), 0, 0) }; + if handle.is_null() { + return Err(io::Error::last_os_error()); + } + Ok(Epoll { + iocp: Arc::new(CompletionPort { + handle, + watches: Mutex::new(HashMap::new()), + zombies: Mutex::new(Vec::new()), + }), + entries: Vec::with_capacity(32), + }) + } + + /// Add, modify, or remove a handle in the interest set. + /// + /// * `fd` – the waitable handle (as [`RawFd`] / `HANDLE`). + /// * `event` – carries the desired [`EventSet`] mask and a `u64` data + /// payload that will be returned by [`wait`](Self::wait) when this + /// handle becomes ready. + pub fn ctl( + &self, + operation: ControlOperation, + fd: RawFd, + event: &EpollEvent, + ) -> io::Result<()> { + let mut watches = self.iocp.watches.lock().unwrap(); + match operation { + ControlOperation::Add => { + if watches.contains_key(&fd) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "handle already registered", + )); + } + + let mut wcp: HANDLE = ptr::null_mut(); + let status = + unsafe { NtCreateWaitCompletionPacket(&mut wcp, MAXIMUM_ALLOWED, ptr::null()) }; + if !nt_success(status) { + return Err(nt_status_err(status)); + } + + let watch_ptr = Box::into_raw(Box::new(Watch { + fd, + wcp, + events: AtomicU32::new(event.events()), + data: AtomicU64::new(event.data()), + is_active: AtomicBool::new(true), + })); + + if let Err(e) = associate_wcp(wcp, self.iocp.handle, fd, watch_ptr as *mut _) { + unsafe { + CloseHandle(wcp); + let _ = Box::from_raw(watch_ptr); + } + return Err(e); + } + + watches.insert(fd, watch_ptr); + } + ControlOperation::Modify => { + let &watch_ptr = watches.get(&fd).ok_or_else(|| { + io::Error::new(io::ErrorKind::NotFound, "handle not registered") + })?; + + let watch = unsafe { &*watch_ptr }; + watch.events.store(event.events(), Ordering::Release); + watch.data.store(event.data(), Ordering::Release); + + unsafe { + let _ = NtCancelWaitCompletionPacket(watch.wcp, 1); + } + associate_wcp(watch.wcp, self.iocp.handle, fd, watch_ptr as *mut _)?; + } + ControlOperation::Delete => { + let watch_ptr = watches.remove(&fd).ok_or_else(|| { + io::Error::new(io::ErrorKind::NotFound, "handle not registered") + })?; + + let watch = unsafe { &*watch_ptr }; + watch.is_active.store(false, Ordering::Release); + unsafe { + let _ = NtCancelWaitCompletionPacket(watch.wcp, 1); + CloseHandle(watch.wcp); + } + + // Add the Watch to the zombies list with the current time + // so we can drop it safely after some delay. + let mut zombies = self.iocp.zombies.lock().unwrap(); + zombies.push((Instant::now(), watch_ptr)); + + zombies.retain(|(deleted_at, ptr)| { + if deleted_at.elapsed() > GC_THRESHOLD { + // Free the Watch from the heap + unsafe { + let _ = Box::from_raw(*ptr); + } + false // Remove from the vector + } else { + true // Keep in the vector + } + }); + } + } + Ok(()) + } + + /// Block until at least one registered handle is signaled, or until + /// `timeout` milliseconds have elapsed. Pass `-1` to wait indefinitely. + /// + /// This is the lock-free hot path: no `Mutex` is acquired and no heap + /// allocation is performed. The completion key returned by the kernel + /// is the raw `Watch` pointer set during [`ctl`](Self::ctl), so we can + /// read event metadata through atomics without any table lookup. + pub fn wait( + &mut self, + max_events: usize, + timeout: i32, + events: &mut [EpollEvent], + ) -> io::Result { + let iocp_handle = self.iocp.handle; + + let capacity = events.len().min(max_events).min(i32::MAX as usize); + if capacity == 0 { + return Ok(0); + } + + self.entries.clear(); + if self.entries.capacity() < capacity { + self.entries.reserve_exact(capacity); + } + + let mut count: u32 = 0; + let win_timeout: u32 = if timeout < 0 { + INFINITE + } else { + timeout as u32 + }; + + let ok = unsafe { + GetQueuedCompletionStatusEx( + iocp_handle, + self.entries.spare_capacity_mut().as_mut_ptr() as *mut _, + capacity as u32, + &mut count, + win_timeout, + 0, + ) + }; + + if ok == 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(WAIT_TIMEOUT_CODE as i32) { + return Ok(0); + } + return Err(err); + } + + // Update vector length based on how many packets the kernel actually wrote. + unsafe { self.entries.set_len(count as usize) } + let mut result_count = 0; + + for entry in &self.entries { + let watch_ptr = entry.lpCompletionKey as *const Watch; + if watch_ptr.is_null() { + continue; + } + + let watch = unsafe { &*watch_ptr }; + + if !watch.is_active.load(Ordering::Acquire) { + continue; + } + + let current_events = watch.events.load(Ordering::Acquire); + let event_set = EventSet::from_bits_truncate(current_events); + + events[result_count] = EpollEvent { + events: (event_set & (EventSet::IN | EventSet::OUT)).bits(), + u64: watch.data.load(Ordering::Acquire), + }; + result_count += 1; + + if !event_set.contains(EventSet::EDGE_TRIGGERED) { + // Level-triggered: re-associate the WCP so the next signal + // on this handle produces another completion packet. + // + // KNOWN RACE: there is a race between this call and + // `CloseHandle(watch.wcp)` in `Epoll::ctl(Delete)`. Another + // thread may delete the watch (marking it inactive and closing + // the WCP handle) between our `is_active` check above and the + // `associate_wcp` call here. In that case: + // + // 1. The WCP handle is already closed and + // `NtAssociateWaitCompletionPacket` returns + // `STATUS_INVALID_HANDLE` -- harmless, we ignore the error. + // 2. Windows recycles the handle value for a non-WCP object -- + // `NtAssociateWaitCompletionPacket` returns + // `STATUS_OBJECT_TYPE_MISMATCH` -- also harmless. + // 3. Windows recycles the handle value for a *new* WCP created + // by a third thread. The associate succeeds on the wrong + // WCP. When that third thread later tries to associate its + // own WCP it will fail and delete its handle, leaving the + // kernel to drop the WCP once its refcount reaches zero + // (the original delete already closed the userspace handle + // and the event was never queued to the IOCP). The only + // consequence is a lost event for the third thread, which + // should be re-queued on the next iteration. + // + // We should try to remove the GC mechanism but for now + // this is acceptable. + let _ = associate_wcp(watch.wcp, iocp_handle, watch.fd, watch_ptr as *mut _); + } + } + + Ok(result_count) + } +} + +impl AsRawFd for Epoll { + fn as_raw_fd(&self) -> RawFd { + self.iocp.handle + } +} + +#[cfg(test)] +mod tests { + use super::*; + use windows_sys::Win32::System::Threading::{CreateEventW, ResetEvent, SetEvent}; + + /// Create a manual-reset, initially non-signaled Windows Event object. + fn create_event() -> HANDLE { + let h = unsafe { CreateEventW(ptr::null(), 1, 0, ptr::null()) }; + assert!(h != std::ptr::null_mut(), "CreateEventW failed"); + h + } + + fn signal(handle: HANDLE) { + assert_ne!(unsafe { SetEvent(handle) }, 0, "SetEvent failed"); + } + + fn reset(handle: HANDLE) { + assert_ne!(unsafe { ResetEvent(handle) }, 0, "ResetEvent failed"); + } + + fn close(handle: HANDLE) { + unsafe { + CloseHandle(handle); + } + } + + #[test] + fn test_event_ops() { + let mut event = EpollEvent::default(); + assert_eq!(event.events(), 0); + assert_eq!(event.data(), 0); + + event = EpollEvent::new(EventSet::IN, 42); + assert_eq!(event.events(), EventSet::IN.bits()); + assert_eq!(event.event_set(), EventSet::IN); + assert_eq!(event.data(), 42); + assert_eq!(event.fd(), 42 as RawFd); + } + + #[test] + fn test_events_debug() { + let event = EpollEvent::new(EventSet::IN, 42); + assert_eq!(format!("{:?}", event), "{ events: 1, data: 42 }"); + } + + #[test] + fn test_ctl_add_modify_delete() { + let epoll = Epoll::new().unwrap(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN, ev as u64); + + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + assert!(epoll.ctl(ControlOperation::Add, ev, &event).is_err()); + + let event2 = EpollEvent::new(EventSet::OUT, ev as u64); + epoll.ctl(ControlOperation::Modify, ev, &event2).unwrap(); + + epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + assert!(epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .is_err()); + assert!(epoll + .ctl(ControlOperation::Modify, ev, &EpollEvent::default()) + .is_err()); + + close(ev); + } + + #[test] + fn test_clone_shares_state() { + let epoll = Epoll::new().unwrap(); + let epoll2 = epoll.clone(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN, ev as u64); + + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + assert!(epoll2.ctl(ControlOperation::Add, ev, &event).is_err()); + + epoll2 + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + assert!(epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .is_err()); + + close(ev); + } + + #[test] + fn test_wait_returns_signaled_event() { + let mut epoll = Epoll::new().unwrap(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN, ev as u64); + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + + signal(ev); + + let mut ready = vec![EpollEvent::default(); 8]; + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 1); + assert_eq!(ready[0].fd(), ev); + assert_eq!(ready[0].event_set(), EventSet::IN); + + epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + close(ev); + } + + #[test] + fn test_wait_timeout_no_signal() { + let mut epoll = Epoll::new().unwrap(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN, ev as u64); + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + + let mut ready = vec![EpollEvent::default(); 8]; + let n = epoll.wait(8, 50, &mut ready).unwrap(); + assert_eq!(n, 0); + + epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + close(ev); + } + + #[test] + fn test_wait_multiple_handles() { + let mut epoll = Epoll::new().unwrap(); + let ev1 = create_event(); + let ev2 = create_event(); + + epoll + .ctl( + ControlOperation::Add, + ev1, + &EpollEvent::new(EventSet::IN, ev1 as u64), + ) + .unwrap(); + epoll + .ctl( + ControlOperation::Add, + ev2, + &EpollEvent::new(EventSet::IN, ev2 as u64), + ) + .unwrap(); + + signal(ev1); + signal(ev2); + + let mut ready = vec![EpollEvent::default(); 8]; + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 2); + + let handles: Vec = ready[..n].iter().map(|e| e.fd()).collect(); + assert!(handles.contains(&ev1)); + assert!(handles.contains(&ev2)); + + epoll + .ctl(ControlOperation::Delete, ev1, &EpollEvent::default()) + .unwrap(); + epoll + .ctl(ControlOperation::Delete, ev2, &EpollEvent::default()) + .unwrap(); + close(ev1); + close(ev2); + } + + #[test] + fn test_level_triggered_redelivers() { + let mut epoll = Epoll::new().unwrap(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN, ev as u64); + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + + signal(ev); + + let mut ready = vec![EpollEvent::default(); 8]; + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 1); + + // Handle is still signaled (manual-reset event), so a second wait + // should deliver it again (level-triggered semantics). + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 1); + assert_eq!(ready[0].fd(), ev); + + epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + close(ev); + } + + // -- Edge-triggered tests ----------------------------------------------- + + #[test] + fn test_edge_triggered_no_redelivery() { + let mut epoll = Epoll::new().unwrap(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN | EventSet::EDGE_TRIGGERED, ev as u64); + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + + signal(ev); + + let mut ready = vec![EpollEvent::default(); 8]; + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 1); + assert_eq!(ready[0].fd(), ev); + + // Even though the handle is still signaled, edge-triggered mode + // should NOT re-deliver because the WCP was not re-armed. + let n = epoll.wait(8, 100, &mut ready).unwrap(); + assert_eq!(n, 0); + + epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + close(ev); + } + + #[test] + fn test_edge_triggered_rearm_via_modify() { + let mut epoll = Epoll::new().unwrap(); + let ev = create_event(); + let event = EpollEvent::new(EventSet::IN | EventSet::EDGE_TRIGGERED, ev as u64); + epoll.ctl(ControlOperation::Add, ev, &event).unwrap(); + + signal(ev); + + let mut ready = vec![EpollEvent::default(); 8]; + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 1); + + // Simulate the caller draining data, then re-registering interest. + reset(ev); + epoll.ctl(ControlOperation::Modify, ev, &event).unwrap(); + + // No signal yet — should time out. + let n = epoll.wait(8, 100, &mut ready).unwrap(); + assert_eq!(n, 0); + + // Signal again — now we should get the event. + signal(ev); + let n = epoll.wait(8, 1000, &mut ready).unwrap(); + assert_eq!(n, 1); + assert_eq!(ready[0].fd(), ev); + + epoll + .ctl(ControlOperation::Delete, ev, &EpollEvent::default()) + .unwrap(); + close(ev); + } +} diff --git a/src/utils/src/windows/eventfd.rs b/src/utils/src/windows/eventfd.rs new file mode 100644 index 000000000..eb87c47d0 --- /dev/null +++ b/src/utils/src/windows/eventfd.rs @@ -0,0 +1,268 @@ +//! Structure and wrapper functions emulating eventfd using a Windows manual-reset Event object. + +use std::sync::{Arc, Mutex}; +use std::{io, result}; + +use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0}; +use windows_sys::Win32::System::Threading::{ + CreateEventW, ResetEvent, SetEvent, WaitForSingleObject, INFINITE, +}; + +use super::{AsRawFd, RawFd}; + +pub const EFD_NONBLOCK: i32 = 1; +pub const EFD_SEMAPHORE: i32 = 2; + +#[derive(Debug)] +struct Inner { + event: HANDLE, + counter: Mutex, + nonblock: bool, + semaphore: bool, +} + +// The HANDLE is a Windows kernel object usable from any thread. +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +impl Drop for Inner { + fn drop(&mut self) { + unsafe { + CloseHandle(self.event); + } + } +} + +#[derive(Clone, Debug)] +pub struct EventFd { + inner: Arc, +} + +impl EventFd { + pub fn new(flag: i32) -> result::Result { + let event = unsafe { + CreateEventW( + std::ptr::null(), + 1, // bManualReset = TRUE + 0, // bInitialState = FALSE (non-signaled) + std::ptr::null(), + ) + }; + if event.is_null() { + return Err(io::Error::last_os_error()); + } + + Ok(EventFd { + inner: Arc::new(Inner { + event, + counter: Mutex::new(0), + nonblock: (flag & EFD_NONBLOCK) != 0, + semaphore: (flag & EFD_SEMAPHORE) != 0, + }), + }) + } + + pub fn write(&self, v: u64) -> result::Result<(), io::Error> { + let mut counter = self.inner.counter.lock().unwrap(); + + let was_zero = *counter == 0; + *counter = counter.saturating_add(v); + + // Only signal the event if it was not already signaled. + if was_zero && unsafe { SetEvent(self.inner.event) } == 0 { + return Err(io::Error::last_os_error()); + } + Ok(()) + } + + pub fn read(&self) -> result::Result { + loop { + { + let mut counter = self.inner.counter.lock().unwrap(); + if *counter > 0 { + let result = if self.inner.semaphore { + // Semaphore mode: Decrement by 1 + *counter -= 1; + 1 + } else { + // Standard mode: Drain the whole counter + let val = *counter; + *counter = 0; + val + }; + + if *counter == 0 { + unsafe { + ResetEvent(self.inner.event); + } + } + return Ok(result); + } + if self.inner.nonblock { + return Err(io::ErrorKind::WouldBlock.into()); + } + } // Lock is dropped here before blocking so writers can make progress! + + let ret = unsafe { WaitForSingleObject(self.inner.event, INFINITE) }; + if ret != WAIT_OBJECT_0 { + return Err(io::Error::last_os_error()); + } + } + } + + pub fn try_clone(&self) -> result::Result { + Ok(EventFd { + inner: Arc::clone(&self.inner), + }) + } + + /// Waits up to `ms` milliseconds for the event to be signaled. + /// + /// Returns `true` if the event was signaled, `false` on timeout. + /// On signal, consumes one unit (semaphore mode) or drains the counter + /// (standard mode). The kernel event is only reset when the counter + /// reaches zero. + pub fn wait_timeout(&self, ms: u32) -> bool { + let result = unsafe { WaitForSingleObject(self.inner.event, ms) }; + if result == WAIT_OBJECT_0 { + let mut counter = self.inner.counter.lock().unwrap(); + if *counter > 0 { + if self.inner.semaphore { + *counter -= 1; + } else { + *counter = 0; + } + if *counter == 0 { + unsafe { + ResetEvent(self.inner.event); + } + } + } + true + } else { + false + } + } +} + +impl AsRawFd for EventFd { + fn as_raw_fd(&self) -> RawFd { + self.inner.event + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_write() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + evt.write(55).unwrap(); + assert_eq!(evt.read().unwrap(), 55); + } + + #[test] + fn test_read_nothing_nonblock() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + let res = evt.read(); + assert!(matches!(res, Err(err) if err.kind() == io::ErrorKind::WouldBlock)); + } + + #[test] + fn test_multiple_writes_accumulate() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + evt.write(3).unwrap(); + evt.write(5).unwrap(); + assert_eq!(evt.read().unwrap(), 8); + } + + /// After read() drains the counter to 0, the kernel event must be + /// unsignaled. If ResetEvent is missing, wait_timeout(0) would + /// return true forever — the "infinite wakeup" bug. + #[test] + fn test_event_reset_after_read() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + evt.write(1).unwrap(); + assert_eq!(evt.read().unwrap(), 1); + assert!( + !evt.wait_timeout(0), + "kernel event should be unsignaled after drain" + ); + } + + /// Verify that writing after a full drain re-signals the event. + #[test] + fn test_write_read_cycle() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + + evt.write(10).unwrap(); + assert_eq!(evt.read().unwrap(), 10); + assert!(!evt.wait_timeout(0)); + + evt.write(20).unwrap(); + assert_eq!(evt.read().unwrap(), 20); + assert!(!evt.wait_timeout(0)); + } + + #[test] + fn test_semaphore_mode() { + let evt = EventFd::new(EFD_NONBLOCK | EFD_SEMAPHORE).unwrap(); + evt.write(3).unwrap(); + + assert_eq!(evt.read().unwrap(), 1); + assert_eq!(evt.read().unwrap(), 1); + assert_eq!(evt.read().unwrap(), 1); + + let res = evt.read(); + assert!(matches!(res, Err(err) if err.kind() == io::ErrorKind::WouldBlock)); + } + + /// In semaphore mode, the kernel event must stay signaled as long as + /// the counter is > 0, and only unsignal on the final decrement. + #[test] + fn test_semaphore_event_stays_signaled() { + let evt = EventFd::new(EFD_NONBLOCK | EFD_SEMAPHORE).unwrap(); + evt.write(3).unwrap(); + + assert_eq!(evt.read().unwrap(), 1); // counter: 3 -> 2 + assert!( + evt.wait_timeout(0), + "event should still be signaled with counter=2" + ); + + // wait_timeout consumed one (counter: 2 -> 1) + assert!( + evt.wait_timeout(0), + "event should still be signaled with counter=1" + ); + + // wait_timeout consumed one (counter: 1 -> 0, ResetEvent) + assert!( + !evt.wait_timeout(0), + "event should be unsignaled after full drain" + ); + } + + #[test] + fn test_wait_timeout_not_signaled() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + assert!(!evt.wait_timeout(0)); + } + + #[test] + fn test_wait_timeout_signaled() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + evt.write(42).unwrap(); + assert!(evt.wait_timeout(0)); + } + + #[test] + fn test_clone() { + let evt = EventFd::new(EFD_NONBLOCK).unwrap(); + let evt_clone = evt.try_clone().unwrap(); + + evt.write(923).unwrap(); + assert_eq!(evt_clone.read().unwrap(), 923); + } +} diff --git a/src/utils/src/windows/mod.rs b/src/utils/src/windows/mod.rs new file mode 100644 index 000000000..90a8544b4 --- /dev/null +++ b/src/utils/src/windows/mod.rs @@ -0,0 +1,14 @@ +use windows_sys::Win32::Foundation::HANDLE; + +pub(crate) mod bindings; +pub mod epoll; +pub mod eventfd; + +/// Cross-platform alias used by the rest of the codebase. On Windows this +/// is just [`HANDLE`] — the two names are interchangeable. +pub type RawFd = HANDLE; + +/// Windows equivalent of [`std::os::unix::io::AsRawFd`]. +pub trait AsRawFd { + fn as_raw_fd(&self) -> RawFd; +} diff --git a/src/utils/src/worker_message.rs b/src/utils/src/worker_message.rs index 50a4fcf9c..202ea8bbc 100644 --- a/src/utils/src/worker_message.rs +++ b/src/utils/src/worker_message.rs @@ -7,16 +7,17 @@ pub struct MemoryProperties { #[derive(Debug)] pub enum WorkerMessage { - #[cfg(target_arch = "x86_64")] + #[cfg(all(target_arch = "x86_64", not(target_os = "windows")))] GsiRoute( crossbeam_channel::Sender, Vec, ), - #[cfg(target_arch = "x86_64")] + #[cfg(all(target_arch = "x86_64", not(target_os = "windows")))] IrqLine(crossbeam_channel::Sender, u32, bool), #[cfg(target_os = "macos")] GpuAddMapping(crossbeam_channel::Sender, u64, u64, u64), #[cfg(target_os = "macos")] GpuRemoveMapping(crossbeam_channel::Sender, u64, u64), + #[cfg(not(target_os = "windows"))] ConvertMemory(crossbeam_channel::Sender, MemoryProperties), }