diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a821187..498e606 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,7 +34,7 @@ jobs: - std ethernet proto-ipv6 socket-icmp socket-tcp # Test features chosen to be as aggressive as possible. - - std ethernet proto-ipv4 proto-ipv6 socket-raw socket-udp socket-tcp socket-icmp + - std ethernet proto-ipv4 proto-ipv6 socket-raw socket-udp socket-tcp socket-icmp async include: # Test alloc feature which requires nightly. @@ -66,7 +66,7 @@ jobs: features: # These feature sets cannot run tests, so we only check they build. - - ethernet proto-ipv6 proto-ipv6 proto-igmp proto-dhcpv4 socket-raw socket-udp socket-tcp socket-icmp + - ethernet proto-ipv6 proto-ipv6 proto-igmp proto-dhcpv4 socket-raw socket-udp socket-tcp socket-icmp async steps: - uses: actions/checkout@v2 diff --git a/Cargo.toml b/Cargo.toml index 1f8f2e8..63a9c04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,12 +42,14 @@ ethernet = [] "socket-udp" = [] "socket-tcp" = [] "socket-icmp" = [] +"async" = [] default = [ "std", "log", # needed for `cargo test --no-default-features --features default` :/ "ethernet", "phy-raw_socket", "phy-tap_interface", "proto-ipv4", "proto-igmp", "proto-dhcpv4", "proto-ipv6", - "socket-raw", "socket-icmp", "socket-udp", "socket-tcp" + "socket-raw", "socket-icmp", "socket-udp", "socket-tcp", + "async" ] # experimental; do not use; no guarantees provided that this feature will be kept diff --git a/src/socket/icmp.rs b/src/socket/icmp.rs index c4364a2..72bb8ab 100644 --- a/src/socket/icmp.rs +++ b/src/socket/icmp.rs @@ -5,6 +5,10 @@ use phy::{ChecksumCapabilities, DeviceCapabilities}; use socket::{Socket, SocketMeta, SocketHandle, PollAt}; use storage::{PacketBuffer, PacketMetadata}; use wire::{IpAddress, IpEndpoint, IpProtocol, IpRepr}; +#[cfg(feature = "async")] +use socket::WakerRegistration; +#[cfg(feature = "async")] +use core::task::Waker; #[cfg(feature = "proto-ipv4")] use wire::{Ipv4Address, Ipv4Repr, Icmpv4Packet, Icmpv4Repr}; @@ -61,7 +65,11 @@ pub struct IcmpSocket<'a, 'b: 'a> { /// The endpoint this socket is communicating with endpoint: Endpoint, /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. - hop_limit: Option + hop_limit: Option, + #[cfg(feature = "async")] + rx_waker: WakerRegistration, + #[cfg(feature = "async")] + tx_waker: WakerRegistration, } impl<'a, 'b> IcmpSocket<'a, 'b> { @@ -73,10 +81,49 @@ impl<'a, 'b> IcmpSocket<'a, 'b> { rx_buffer: rx_buffer, tx_buffer: tx_buffer, endpoint: Endpoint::default(), - hop_limit: None + hop_limit: None, + #[cfg(feature = "async")] + rx_waker: WakerRegistration::new(), + #[cfg(feature = "async")] + tx_waker: WakerRegistration::new(), } } + /// Register a waker for receive operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `recv` method calls, such as receiving data, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_recv_waker(&mut self, waker: &Waker) { + self.rx_waker.register(waker) + } + + /// Register a waker for send operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `send` method calls, such as space becoming available in the transmit + /// buffer, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_send_waker(&mut self, waker: &Waker) { + self.tx_waker.register(waker) + } + /// Return the socket handle. #[inline] pub fn handle(&self) -> SocketHandle { @@ -174,6 +221,13 @@ impl<'a, 'b> IcmpSocket<'a, 'b> { if self.is_open() { return Err(Error::Illegal) } self.endpoint = endpoint; + + #[cfg(feature = "async")] + { + self.rx_waker.wake(); + self.tx_waker.wake(); + } + Ok(()) } @@ -339,6 +393,10 @@ impl<'a, 'b> IcmpSocket<'a, 'b> { self.meta.handle, icmp_repr.buffer_len(), packet_buf.len()); }, } + + #[cfg(feature = "async")] + self.rx_waker.wake(); + Ok(()) } @@ -380,7 +438,12 @@ impl<'a, 'b> IcmpSocket<'a, 'b> { }, _ => Err(Error::Unaddressable) } - }) + })?; + + #[cfg(feature = "async")] + self.tx_waker.wake(); + + Ok(()) } pub(crate) fn poll_at(&self) -> PollAt { diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 531a062..d9bf454 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -26,7 +26,12 @@ mod tcp; mod set; mod ref_; +#[cfg(feature = "async")] +mod waker; + pub(crate) use self::meta::Meta as SocketMeta; +#[cfg(feature = "async")] +pub(crate) use self::waker::WakerRegistration; #[cfg(feature = "socket-raw")] pub use self::raw::{RawPacketMetadata, diff --git a/src/socket/raw.rs b/src/socket/raw.rs index 15df211..6eee47c 100644 --- a/src/socket/raw.rs +++ b/src/socket/raw.rs @@ -9,6 +9,10 @@ use wire::{IpVersion, IpRepr, IpProtocol}; use wire::{Ipv4Repr, Ipv4Packet}; #[cfg(feature = "proto-ipv6")] use wire::{Ipv6Repr, Ipv6Packet}; +#[cfg(feature = "async")] +use socket::WakerRegistration; +#[cfg(feature = "async")] +use core::task::Waker; /// A UDP packet metadata. pub type RawPacketMetadata = PacketMetadata<()>; @@ -27,6 +31,10 @@ pub struct RawSocket<'a, 'b: 'a> { ip_protocol: IpProtocol, rx_buffer: RawSocketBuffer<'a, 'b>, tx_buffer: RawSocketBuffer<'a, 'b>, + #[cfg(feature = "async")] + rx_waker: WakerRegistration, + #[cfg(feature = "async")] + tx_waker: WakerRegistration, } impl<'a, 'b> RawSocket<'a, 'b> { @@ -41,9 +49,48 @@ impl<'a, 'b> RawSocket<'a, 'b> { ip_protocol, rx_buffer, tx_buffer, + #[cfg(feature = "async")] + rx_waker: WakerRegistration::new(), + #[cfg(feature = "async")] + tx_waker: WakerRegistration::new(), } } + /// Register a waker for receive operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `recv` method calls, such as receiving data, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_recv_waker(&mut self, waker: &Waker) { + self.rx_waker.register(waker) + } + + /// Register a waker for send operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `send` method calls, such as space becoming available in the transmit + /// buffer, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_send_waker(&mut self, waker: &Waker) { + self.tx_waker.register(waker) + } + /// Return the socket handle. #[inline] pub fn handle(&self) -> SocketHandle { @@ -171,6 +218,10 @@ impl<'a, 'b> RawSocket<'a, 'b> { net_trace!("{}:{}:{}: receiving {} octets", self.meta.handle, self.ip_version, self.ip_protocol, packet_buf.len()); + + #[cfg(feature = "async")] + self.rx_waker.wake(); + Ok(()) } @@ -228,7 +279,12 @@ impl<'a, 'b> RawSocket<'a, 'b> { Ok(()) } } - }) + })?; + + #[cfg(feature = "async")] + self.tx_waker.wake(); + + Ok(()) } pub(crate) fn poll_at(&self) -> PollAt { diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 23961d4..7b66ca0 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -10,6 +10,10 @@ use time::{Duration, Instant}; use socket::{Socket, SocketMeta, SocketHandle, PollAt}; use storage::{Assembler, RingBuffer}; use wire::{IpProtocol, IpRepr, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl}; +#[cfg(feature = "async")] +use socket::WakerRegistration; +#[cfg(feature = "async")] +use core::task::Waker; /// A TCP socket ring buffer. pub type SocketBuffer<'a> = RingBuffer<'a, u8>; @@ -248,6 +252,12 @@ pub struct TcpSocket<'a> { /// The number of packets recived directly after /// each other which have the same ACK number. local_rx_dup_acks: u8, + + #[cfg(feature = "async")] + rx_waker: WakerRegistration, + #[cfg(feature = "async")] + tx_waker: WakerRegistration, + } const DEFAULT_MSS: usize = 536; @@ -298,9 +308,49 @@ impl<'a> TcpSocket<'a> { local_rx_last_ack: None, local_rx_last_seq: None, local_rx_dup_acks: 0, + + #[cfg(feature = "async")] + rx_waker: WakerRegistration::new(), + #[cfg(feature = "async")] + tx_waker: WakerRegistration::new(), } } + /// Register a waker for receive operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `recv` method calls, such as receiving data, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_recv_waker(&mut self, waker: &Waker) { + self.rx_waker.register(waker) + } + + /// Register a waker for send operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `send` method calls, such as space becoming available in the transmit + /// buffer, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_send_waker(&mut self, waker: &Waker) { + self.tx_waker.register(waker) + } + /// Return the socket handle. #[inline] pub fn handle(&self) -> SocketHandle { @@ -438,6 +488,12 @@ impl<'a> TcpSocket<'a> { self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; self.remote_mss = DEFAULT_MSS; self.remote_last_ts = None; + + #[cfg(feature = "async")] + { + self.rx_waker.wake(); + self.tx_waker.wake(); + } } /// Start listening on the given endpoint. @@ -825,7 +881,17 @@ impl<'a> TcpSocket<'a> { self.state, state); } } - self.state = state + + self.state = state; + + #[cfg(feature = "async")] + { + // Wake all tasks waiting. Even if we haven't received/sent data, this + // is needed because return values of functions may change depending on the state. + // For example, a pending read has to fail with an error if the socket is closed. + self.rx_waker.wake(); + self.tx_waker.wake(); + } } pub(crate) fn reply(ip_repr: &IpRepr, repr: &TcpRepr) -> (IpRepr, TcpRepr<'static>) { @@ -1283,6 +1349,10 @@ impl<'a> TcpSocket<'a> { self.meta.handle, self.local_endpoint, self.remote_endpoint, ack_len, self.tx_buffer.len() - ack_len); self.tx_buffer.dequeue_allocated(ack_len); + + // There's new room available in tx_buffer, wake the waiting task if any. + #[cfg(feature = "async")] + self.tx_waker.wake(); } if let Some(ack_number) = repr.ack_number { @@ -1367,6 +1437,10 @@ impl<'a> TcpSocket<'a> { self.meta.handle, self.local_endpoint, self.remote_endpoint, contig_len, self.rx_buffer.len() + contig_len); self.rx_buffer.enqueue_unallocated(contig_len); + + // There's new data in rx_buffer, notify waiting task if any. + #[cfg(feature = "async")] + self.rx_waker.wake(); } if !self.assembler.is_empty() { diff --git a/src/socket/udp.rs b/src/socket/udp.rs index 7818c0f..1099692 100644 --- a/src/socket/udp.rs +++ b/src/socket/udp.rs @@ -4,6 +4,10 @@ use {Error, Result}; use socket::{Socket, SocketMeta, SocketHandle, PollAt}; use storage::{PacketBuffer, PacketMetadata}; use wire::{IpProtocol, IpRepr, IpEndpoint, UdpRepr}; +#[cfg(feature = "async")] +use socket::WakerRegistration; +#[cfg(feature = "async")] +use core::task::Waker; /// A UDP packet metadata. pub type UdpPacketMetadata = PacketMetadata; @@ -22,7 +26,11 @@ pub struct UdpSocket<'a, 'b: 'a> { rx_buffer: UdpSocketBuffer<'a, 'b>, tx_buffer: UdpSocketBuffer<'a, 'b>, /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. - hop_limit: Option + hop_limit: Option, + #[cfg(feature = "async")] + rx_waker: WakerRegistration, + #[cfg(feature = "async")] + tx_waker: WakerRegistration, } impl<'a, 'b> UdpSocket<'a, 'b> { @@ -34,10 +42,49 @@ impl<'a, 'b> UdpSocket<'a, 'b> { endpoint: IpEndpoint::default(), rx_buffer: rx_buffer, tx_buffer: tx_buffer, - hop_limit: None + hop_limit: None, + #[cfg(feature = "async")] + rx_waker: WakerRegistration::new(), + #[cfg(feature = "async")] + tx_waker: WakerRegistration::new(), } } + /// Register a waker for receive operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `recv` method calls, such as receiving data, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_recv_waker(&mut self, waker: &Waker) { + self.rx_waker.register(waker) + } + + /// Register a waker for send operations. + /// + /// The waker is woken on state changes that might affect the return value + /// of `send` method calls, such as space becoming available in the transmit + /// buffer, or the socket closing. + /// + /// Notes: + /// + /// - Only one waker can be registered at a time. If another waker was previously registered, + /// it is overwritten and will no longer be woken. + /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. + /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has + /// necessarily changed. + #[cfg(feature = "async")] + pub fn register_send_waker(&mut self, waker: &Waker) { + self.tx_waker.register(waker) + } + /// Return the socket handle. #[inline] pub fn handle(&self) -> SocketHandle { @@ -89,6 +136,13 @@ impl<'a, 'b> UdpSocket<'a, 'b> { if self.is_open() { return Err(Error::Illegal) } self.endpoint = endpoint; + + #[cfg(feature = "async")] + { + self.rx_waker.wake(); + self.tx_waker.wake(); + } + Ok(()) } @@ -234,6 +288,10 @@ impl<'a, 'b> UdpSocket<'a, 'b> { net_trace!("{}:{}:{}: receiving {} octets", self.meta.handle, self.endpoint, endpoint, size); + + #[cfg(feature = "async")] + self.rx_waker.wake(); + Ok(()) } @@ -261,7 +319,12 @@ impl<'a, 'b> UdpSocket<'a, 'b> { hop_limit: hop_limit, }; emit((ip_repr, repr)) - }) + })?; + + #[cfg(feature = "async")] + self.tx_waker.wake(); + + Ok(()) } pub(crate) fn poll_at(&self) -> PollAt { diff --git a/src/socket/waker.rs b/src/socket/waker.rs new file mode 100644 index 0000000..4dfb44b --- /dev/null +++ b/src/socket/waker.rs @@ -0,0 +1,33 @@ +use core::task::Waker; + +/// Utility struct to register and wake a waker. +#[derive(Debug)] +pub struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + // In all other cases + // - we have no waker registered + // - we have a waker registered but it's for a different task. + // then clone the new waker and store it + _ => self.waker = Some(w.clone()), + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + self.waker.take().map(|w| w.wake()); + } +} \ No newline at end of file