From a36e56b59c357e00bff6a417ccf52276db453a7c Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 30 Mar 2026 10:05:29 +0000 Subject: [PATCH 1/2] New trait: UdpSplitMulticast --- edge-http/src/io/server.rs | 3 +- edge-http/src/lib.rs | 12 +- edge-mdns/src/io.rs | 15 +-- edge-nal-embassy/CHANGELOG.md | 1 + edge-nal-embassy/src/udp.rs | 215 ++++++++++++++++++++++++---------- edge-nal-std/src/lib.rs | 27 ++++- edge-nal/src/stack/udp.rs | 50 +++++++- 7 files changed, 238 insertions(+), 85 deletions(-) diff --git a/edge-http/src/io/server.rs b/edge-http/src/io/server.rs index b04eec1..1046f79 100644 --- a/edge-http/src/io/server.rs +++ b/edge-http/src/io/server.rs @@ -849,10 +849,9 @@ impl Server { Q, P, Q ); - for acceptor_id in 0..Q { + for (acceptor_id, signal) in accept_signals.iter().enumerate() { let acceptor = &acceptor; let socket_queue = &socket_queue; - let signal = &accept_signals[acceptor_id]; unwrap!(acceptor_tasks .push(async move { diff --git a/edge-http/src/lib.rs b/edge-http/src/lib.rs index 5ea1a47..58f773a 100644 --- a/edge-http/src/lib.rs +++ b/edge-http/src/lib.rs @@ -645,13 +645,11 @@ impl BodyType { Err(HeadersMismatchError::BodyTypeError("Raw body response with a Keep-Alive connection. This is not allowed."))?; } } - BodyType::Chunked => { - if !http11 { - warn!("Chunked body with an HTTP/1.0 connection. This is not allowed."); - Err(HeadersMismatchError::BodyTypeError( - "Chunked body with an HTTP/1.0 connection. This is not allowed.", - ))?; - } + BodyType::Chunked if !http11 => { + warn!("Chunked body with an HTTP/1.0 connection. This is not allowed."); + Err(HeadersMismatchError::BodyTypeError( + "Chunked body with an HTTP/1.0 connection. This is not allowed.", + ))?; } _ => {} } diff --git a/edge-mdns/src/io.rs b/edge-mdns/src/io.rs index 9745f7b..4eaff7d 100644 --- a/edge-mdns/src/io.rs +++ b/edge-mdns/src/io.rs @@ -387,18 +387,9 @@ where for remote_addr in core::iter::once(SocketAddr::V4(SocketAddrV4::new(IP_BROADCAST_ADDR, PORT))) .filter(|_| self.ipv4_interface.is_some()) - .chain( - self.ipv6_interface - .map(|interface| { - SocketAddr::V6(SocketAddrV6::new( - IPV6_BROADCAST_ADDR, - PORT, - 0, - interface, - )) - }) - .into_iter(), - ) + .chain(self.ipv6_interface.map(|interface| { + SocketAddr::V6(SocketAddrV6::new(IPV6_BROADCAST_ADDR, PORT, 0, interface)) + })) { if !data.is_empty() { debug!("Broadcasting mDNS entry to {}", remote_addr); diff --git a/edge-nal-embassy/CHANGELOG.md b/edge-nal-embassy/CHANGELOG.md index c401379..395e77b 100644 --- a/edge-nal-embassy/CHANGELOG.md +++ b/edge-nal-embassy/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +* Breaking: new trait: `UdpSplitMulticast` which is now required on the socket provided by `UdpBind` and `UdpConnect` * Update to `embassy-net` 0.9 * Remove the unused `heapless` dependency * Raise MSRV to 1.91 to compile `smoltcp` diff --git a/edge-nal-embassy/src/udp.rs b/edge-nal-embassy/src/udp.rs index 9fd241b..3b1fbe0 100644 --- a/edge-nal-embassy/src/udp.rs +++ b/edge-nal-embassy/src/udp.rs @@ -2,7 +2,9 @@ use core::fmt::Display; use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use core::ptr::NonNull; -use edge_nal::{MulticastV4, MulticastV6, Readable, UdpBind, UdpReceive, UdpSend, UdpSplit}; +use edge_nal::{ + MulticastV4, MulticastV6, Readable, UdpBind, UdpReceive, UdpSend, UdpSplit, UdpSplitMulticast, +}; use embassy_net::udp::{BindError, PacketMetadata, RecvError, SendError}; use embassy_net::Stack; @@ -111,6 +113,90 @@ impl<'d> UdpSocket<'d> { buffer_token: socket_buffers.token, }) } + + async fn join_v4( + &self, + #[allow(unused)] multicast_addr: Ipv4Addr, + _interface: Ipv4Addr, + ) -> Result<(), UdpError> { + #[cfg(feature = "multicast")] + { + self.stack.join_multicast_group( + crate::to_emb_addr(core::net::IpAddr::V4(multicast_addr)) + .ok_or(UdpError::UnsupportedProto)?, + )?; + } + + #[cfg(not(feature = "multicast"))] + { + Err(UdpError::UnsupportedProto)?; + } + + Ok(()) + } + + async fn leave_v4( + &self, + #[allow(unused)] multicast_addr: Ipv4Addr, + _interface: Ipv4Addr, + ) -> Result<(), UdpError> { + #[cfg(feature = "multicast")] + { + self.stack.leave_multicast_group( + crate::to_emb_addr(core::net::IpAddr::V4(multicast_addr)) + .ok_or(UdpError::UnsupportedProto)?, + )?; + } + + #[cfg(not(feature = "multicast"))] + { + Err(UdpError::UnsupportedProto)?; + } + + Ok(()) + } + + async fn join_v6( + &self, + #[allow(unused)] multicast_addr: Ipv6Addr, + _interface: u32, + ) -> Result<(), UdpError> { + #[cfg(feature = "multicast")] + { + self.stack.join_multicast_group( + crate::to_emb_addr(core::net::IpAddr::V6(multicast_addr)) + .ok_or(UdpError::UnsupportedProto)?, + )?; + } + + #[cfg(not(feature = "multicast"))] + { + Err(UdpError::UnsupportedProto)?; + } + + Ok(()) + } + + async fn leave_v6( + &self, + #[allow(unused)] multicast_addr: Ipv6Addr, + _interface: u32, + ) -> Result<(), UdpError> { + #[cfg(feature = "multicast")] + { + self.stack.leave_multicast_group( + crate::to_emb_addr(core::net::IpAddr::V6(multicast_addr)) + .ok_or(UdpError::UnsupportedProto)?, + )?; + } + + #[cfg(not(feature = "multicast"))] + { + Err(UdpError::UnsupportedProto)?; + } + + Ok(()) + } } impl Drop for UdpSocket<'_> { @@ -195,91 +281,98 @@ impl UdpSplit for UdpSocket<'_> { } } +impl<'d> UdpSplitMulticast for UdpSocket<'d> { + type MulticastV4<'a> + = &'a Self + where + Self: 'a; + + type MulticastV6<'a> + = &'a Self + where + Self: 'a; + + fn split_multicast( + &mut self, + ) -> ( + Self::Receive<'_>, + Self::Send<'_>, + Self::MulticastV4<'_>, + Self::MulticastV6<'_>, + ) { + (&*self, &*self, &*self, &*self) + } +} + impl MulticastV4 for UdpSocket<'_> { async fn join_v4( &mut self, - #[allow(unused)] multicast_addr: Ipv4Addr, - _interface: Ipv4Addr, + multicast_addr: Ipv4Addr, + interface: Ipv4Addr, ) -> Result<(), Self::Error> { - #[cfg(feature = "multicast")] - { - self.stack.join_multicast_group( - crate::to_emb_addr(core::net::IpAddr::V4(multicast_addr)) - .ok_or(UdpError::UnsupportedProto)?, - )?; - } - - #[cfg(not(feature = "multicast"))] - { - Err(UdpError::UnsupportedProto)?; - } - - Ok(()) + Self::join_v4(self, multicast_addr, interface).await } async fn leave_v4( &mut self, - #[allow(unused)] multicast_addr: Ipv4Addr, - _interface: Ipv4Addr, + multicast_addr: Ipv4Addr, + interface: Ipv4Addr, ) -> Result<(), Self::Error> { - #[cfg(feature = "multicast")] - { - self.stack.leave_multicast_group( - crate::to_emb_addr(core::net::IpAddr::V4(multicast_addr)) - .ok_or(UdpError::UnsupportedProto)?, - )?; - } + Self::leave_v4(self, multicast_addr, interface).await + } +} - #[cfg(not(feature = "multicast"))] - { - Err(UdpError::UnsupportedProto)?; - } +impl MulticastV4 for &UdpSocket<'_> { + async fn join_v4( + &mut self, + multicast_addr: Ipv4Addr, + interface: Ipv4Addr, + ) -> Result<(), Self::Error> { + UdpSocket::join_v4(self, multicast_addr, interface).await + } - Ok(()) + async fn leave_v4( + &mut self, + multicast_addr: Ipv4Addr, + interface: Ipv4Addr, + ) -> Result<(), Self::Error> { + UdpSocket::leave_v4(self, multicast_addr, interface).await } } impl MulticastV6 for UdpSocket<'_> { async fn join_v6( &mut self, - #[allow(unused)] multicast_addr: Ipv6Addr, - _interface: u32, + multicast_addr: Ipv6Addr, + interface: u32, ) -> Result<(), Self::Error> { - #[cfg(feature = "multicast")] - { - self.stack.join_multicast_group( - crate::to_emb_addr(core::net::IpAddr::V6(multicast_addr)) - .ok_or(UdpError::UnsupportedProto)?, - )?; - } - - #[cfg(not(feature = "multicast"))] - { - Err(UdpError::UnsupportedProto)?; - } - - Ok(()) + Self::join_v6(self, multicast_addr, interface).await } async fn leave_v6( &mut self, - #[allow(unused)] multicast_addr: Ipv6Addr, - _interface: u32, + multicast_addr: Ipv6Addr, + interface: u32, ) -> Result<(), Self::Error> { - #[cfg(feature = "multicast")] - { - self.stack.leave_multicast_group( - crate::to_emb_addr(core::net::IpAddr::V6(multicast_addr)) - .ok_or(UdpError::UnsupportedProto)?, - )?; - } + Self::leave_v6(self, multicast_addr, interface).await + } +} - #[cfg(not(feature = "multicast"))] - { - Err(UdpError::UnsupportedProto)?; - } +impl MulticastV6 for &UdpSocket<'_> { + async fn join_v6( + &mut self, + multicast_addr: Ipv6Addr, + interface: u32, + ) -> Result<(), Self::Error> { + UdpSocket::join_v6(self, multicast_addr, interface).await + } - Ok(()) + async fn leave_v6( + &mut self, + multicast_addr: Ipv6Addr, + interface: u32, + ) -> Result<(), Self::Error> { + UdpSocket::leave_v6(self, multicast_addr, interface).await } } diff --git a/edge-nal-std/src/lib.rs b/edge-nal-std/src/lib.rs index 1a7333b..99b5221 100644 --- a/edge-nal-std/src/lib.rs +++ b/edge-nal-std/src/lib.rs @@ -21,7 +21,7 @@ use embedded_io_async::{ErrorType, Read, Write}; use edge_nal::{ AddrType, Dns, MulticastV4, MulticastV6, Readable, TcpAccept, TcpBind, TcpConnect, TcpShutdown, - TcpSplit, UdpBind, UdpConnect, UdpReceive, UdpSend, UdpSplit, + TcpSplit, UdpBind, UdpConnect, UdpReceive, UdpSend, UdpSplit, UdpSplitMulticast, }; #[cfg(any(target_os = "linux", target_os = "android"))] @@ -576,6 +576,31 @@ impl UdpSplit for UdpSocket { } } +impl UdpSplitMulticast for UdpSocket { + type MulticastV4<'a> + = &'a Self + where + Self: 'a; + + type MulticastV6<'a> + = &'a Self + where + Self: 'a; + + fn split_multicast( + &mut self, + ) -> ( + Self::Receive<'_>, + Self::Send<'_>, + Self::MulticastV4<'_>, + Self::MulticastV6<'_>, + ) { + let socket = &*self; + + (socket, socket, socket, socket) + } +} + impl Dns for Stack { type Error = io::Error; diff --git a/edge-nal/src/stack/udp.rs b/edge-nal/src/stack/udp.rs index 6fdc38d..71310ff 100644 --- a/edge-nal/src/stack/udp.rs +++ b/edge-nal/src/stack/udp.rs @@ -38,6 +38,52 @@ where } } +/// This trait is implemented by UDP sockets that can be split into separate +/// `send`, `receive`, multicastv4, and multicastv6 parts that can operate independently from each other +/// (i.e., a full-duplex connection with multicasting capabilities) +pub trait UdpSplitMulticast: UdpSplit { + type MulticastV4<'a>: MulticastV4 + where + Self: 'a; + type MulticastV6<'a>: MulticastV6 + where + Self: 'a; + + fn split_multicast( + &mut self, + ) -> ( + Self::Receive<'_>, + Self::Send<'_>, + Self::MulticastV4<'_>, + Self::MulticastV6<'_>, + ); +} + +impl UdpSplitMulticast for &mut T +where + T: UdpSplitMulticast, +{ + type MulticastV4<'a> + = T::MulticastV4<'a> + where + Self: 'a; + type MulticastV6<'a> + = T::MulticastV6<'a> + where + Self: 'a; + + fn split_multicast( + &mut self, + ) -> ( + Self::Receive<'_>, + Self::Send<'_>, + Self::MulticastV4<'_>, + Self::MulticastV6<'_>, + ) { + (**self).split_multicast() + } +} + /// This is a factory trait for creating connected UDP sockets pub trait UdpConnect { /// Error type returned on socket creation failure @@ -46,7 +92,7 @@ pub trait UdpConnect { /// The socket type returned by the factory type Socket<'a>: UdpReceive + UdpSend - + UdpSplit + + UdpSplitMulticast + MulticastV4 + MulticastV6 + Readable @@ -69,7 +115,7 @@ pub trait UdpBind { /// The socket type returned by the stack type Socket<'a>: UdpReceive + UdpSend - + UdpSplit + + UdpSplitMulticast + MulticastV4 + MulticastV6 + Readable From 00e917808f402d88b306d2d426c54bf051647701 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 30 Mar 2026 13:55:38 +0300 Subject: [PATCH 2/2] Update edge-nal/src/stack/udp.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- edge-nal/src/stack/udp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/edge-nal/src/stack/udp.rs b/edge-nal/src/stack/udp.rs index 71310ff..4fbd35f 100644 --- a/edge-nal/src/stack/udp.rs +++ b/edge-nal/src/stack/udp.rs @@ -39,7 +39,7 @@ where } /// This trait is implemented by UDP sockets that can be split into separate -/// `send`, `receive`, multicastv4, and multicastv6 parts that can operate independently from each other +/// `receive`, `send`, `MulticastV4`, and `MulticastV6` parts that can operate independently from each other /// (i.e., a full-duplex connection with multicasting capabilities) pub trait UdpSplitMulticast: UdpSplit { type MulticastV4<'a>: MulticastV4