Merge pull request #394 from smoltcp-rs/async

Async/await waker support.
v0.7.x
Dario Nieuwenhuis 2020-12-27 17:33:22 +01:00 committed by GitHub
commit 1d5a329155
8 changed files with 307 additions and 11 deletions

View File

@ -34,7 +34,7 @@ jobs:
- std ethernet proto-ipv6 socket-icmp socket-tcp
# Test features chosen to be as aggressive as possible.
- std ethernet proto-ipv4 proto-ipv6 socket-raw socket-udp socket-tcp socket-icmp
- std ethernet proto-ipv4 proto-ipv6 socket-raw socket-udp socket-tcp socket-icmp async
include:
# Test alloc feature which requires nightly.
@ -66,7 +66,7 @@ jobs:
features:
# These feature sets cannot run tests, so we only check they build.
- ethernet proto-ipv6 proto-ipv6 proto-igmp proto-dhcpv4 socket-raw socket-udp socket-tcp socket-icmp
- ethernet proto-ipv6 proto-ipv6 proto-igmp proto-dhcpv4 socket-raw socket-udp socket-tcp socket-icmp async
steps:
- uses: actions/checkout@v2

View File

@ -42,12 +42,14 @@ ethernet = []
"socket-udp" = []
"socket-tcp" = []
"socket-icmp" = []
"async" = []
default = [
"std", "log", # needed for `cargo test --no-default-features --features default` :/
"ethernet",
"phy-raw_socket", "phy-tap_interface",
"proto-ipv4", "proto-igmp", "proto-dhcpv4", "proto-ipv6",
"socket-raw", "socket-icmp", "socket-udp", "socket-tcp"
"socket-raw", "socket-icmp", "socket-udp", "socket-tcp",
"async"
]
# experimental; do not use; no guarantees provided that this feature will be kept

View File

@ -5,6 +5,10 @@ use phy::{ChecksumCapabilities, DeviceCapabilities};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{PacketBuffer, PacketMetadata};
use wire::{IpAddress, IpEndpoint, IpProtocol, IpRepr};
#[cfg(feature = "async")]
use socket::WakerRegistration;
#[cfg(feature = "async")]
use core::task::Waker;
#[cfg(feature = "proto-ipv4")]
use wire::{Ipv4Address, Ipv4Repr, Icmpv4Packet, Icmpv4Repr};
@ -61,7 +65,11 @@ pub struct IcmpSocket<'a, 'b: 'a> {
/// The endpoint this socket is communicating with
endpoint: Endpoint,
/// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets.
hop_limit: Option<u8>
hop_limit: Option<u8>,
#[cfg(feature = "async")]
rx_waker: WakerRegistration,
#[cfg(feature = "async")]
tx_waker: WakerRegistration,
}
impl<'a, 'b> IcmpSocket<'a, 'b> {
@ -73,10 +81,49 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
rx_buffer: rx_buffer,
tx_buffer: tx_buffer,
endpoint: Endpoint::default(),
hop_limit: None
hop_limit: None,
#[cfg(feature = "async")]
rx_waker: WakerRegistration::new(),
#[cfg(feature = "async")]
tx_waker: WakerRegistration::new(),
}
}
/// Register a waker for receive operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `recv` method calls, such as receiving data, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_recv_waker(&mut self, waker: &Waker) {
self.rx_waker.register(waker)
}
/// Register a waker for send operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `send` method calls, such as space becoming available in the transmit
/// buffer, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_send_waker(&mut self, waker: &Waker) {
self.tx_waker.register(waker)
}
/// Return the socket handle.
#[inline]
pub fn handle(&self) -> SocketHandle {
@ -174,6 +221,13 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
if self.is_open() { return Err(Error::Illegal) }
self.endpoint = endpoint;
#[cfg(feature = "async")]
{
self.rx_waker.wake();
self.tx_waker.wake();
}
Ok(())
}
@ -339,6 +393,10 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
self.meta.handle, icmp_repr.buffer_len(), packet_buf.len());
},
}
#[cfg(feature = "async")]
self.rx_waker.wake();
Ok(())
}
@ -380,7 +438,12 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
},
_ => Err(Error::Unaddressable)
}
})
})?;
#[cfg(feature = "async")]
self.tx_waker.wake();
Ok(())
}
pub(crate) fn poll_at(&self) -> PollAt {

View File

@ -26,7 +26,12 @@ mod tcp;
mod set;
mod ref_;
#[cfg(feature = "async")]
mod waker;
pub(crate) use self::meta::Meta as SocketMeta;
#[cfg(feature = "async")]
pub(crate) use self::waker::WakerRegistration;
#[cfg(feature = "socket-raw")]
pub use self::raw::{RawPacketMetadata,

View File

@ -9,6 +9,10 @@ use wire::{IpVersion, IpRepr, IpProtocol};
use wire::{Ipv4Repr, Ipv4Packet};
#[cfg(feature = "proto-ipv6")]
use wire::{Ipv6Repr, Ipv6Packet};
#[cfg(feature = "async")]
use socket::WakerRegistration;
#[cfg(feature = "async")]
use core::task::Waker;
/// A UDP packet metadata.
pub type RawPacketMetadata = PacketMetadata<()>;
@ -27,6 +31,10 @@ pub struct RawSocket<'a, 'b: 'a> {
ip_protocol: IpProtocol,
rx_buffer: RawSocketBuffer<'a, 'b>,
tx_buffer: RawSocketBuffer<'a, 'b>,
#[cfg(feature = "async")]
rx_waker: WakerRegistration,
#[cfg(feature = "async")]
tx_waker: WakerRegistration,
}
impl<'a, 'b> RawSocket<'a, 'b> {
@ -41,9 +49,48 @@ impl<'a, 'b> RawSocket<'a, 'b> {
ip_protocol,
rx_buffer,
tx_buffer,
#[cfg(feature = "async")]
rx_waker: WakerRegistration::new(),
#[cfg(feature = "async")]
tx_waker: WakerRegistration::new(),
}
}
/// Register a waker for receive operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `recv` method calls, such as receiving data, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_recv_waker(&mut self, waker: &Waker) {
self.rx_waker.register(waker)
}
/// Register a waker for send operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `send` method calls, such as space becoming available in the transmit
/// buffer, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_send_waker(&mut self, waker: &Waker) {
self.tx_waker.register(waker)
}
/// Return the socket handle.
#[inline]
pub fn handle(&self) -> SocketHandle {
@ -171,6 +218,10 @@ impl<'a, 'b> RawSocket<'a, 'b> {
net_trace!("{}:{}:{}: receiving {} octets",
self.meta.handle, self.ip_version, self.ip_protocol,
packet_buf.len());
#[cfg(feature = "async")]
self.rx_waker.wake();
Ok(())
}
@ -228,7 +279,12 @@ impl<'a, 'b> RawSocket<'a, 'b> {
Ok(())
}
}
})
})?;
#[cfg(feature = "async")]
self.tx_waker.wake();
Ok(())
}
pub(crate) fn poll_at(&self) -> PollAt {

View File

@ -10,6 +10,10 @@ use time::{Duration, Instant};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{Assembler, RingBuffer};
use wire::{IpProtocol, IpRepr, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
#[cfg(feature = "async")]
use socket::WakerRegistration;
#[cfg(feature = "async")]
use core::task::Waker;
/// A TCP socket ring buffer.
pub type SocketBuffer<'a> = RingBuffer<'a, u8>;
@ -248,6 +252,12 @@ pub struct TcpSocket<'a> {
/// The number of packets recived directly after
/// each other which have the same ACK number.
local_rx_dup_acks: u8,
#[cfg(feature = "async")]
rx_waker: WakerRegistration,
#[cfg(feature = "async")]
tx_waker: WakerRegistration,
}
const DEFAULT_MSS: usize = 536;
@ -298,9 +308,49 @@ impl<'a> TcpSocket<'a> {
local_rx_last_ack: None,
local_rx_last_seq: None,
local_rx_dup_acks: 0,
#[cfg(feature = "async")]
rx_waker: WakerRegistration::new(),
#[cfg(feature = "async")]
tx_waker: WakerRegistration::new(),
}
}
/// Register a waker for receive operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `recv` method calls, such as receiving data, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_recv_waker(&mut self, waker: &Waker) {
self.rx_waker.register(waker)
}
/// Register a waker for send operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `send` method calls, such as space becoming available in the transmit
/// buffer, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_send_waker(&mut self, waker: &Waker) {
self.tx_waker.register(waker)
}
/// Return the socket handle.
#[inline]
pub fn handle(&self) -> SocketHandle {
@ -438,6 +488,12 @@ impl<'a> TcpSocket<'a> {
self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8;
self.remote_mss = DEFAULT_MSS;
self.remote_last_ts = None;
#[cfg(feature = "async")]
{
self.rx_waker.wake();
self.tx_waker.wake();
}
}
/// Start listening on the given endpoint.
@ -825,7 +881,17 @@ impl<'a> TcpSocket<'a> {
self.state, state);
}
}
self.state = state
self.state = state;
#[cfg(feature = "async")]
{
// Wake all tasks waiting. Even if we haven't received/sent data, this
// is needed because return values of functions may change depending on the state.
// For example, a pending read has to fail with an error if the socket is closed.
self.rx_waker.wake();
self.tx_waker.wake();
}
}
pub(crate) fn reply(ip_repr: &IpRepr, repr: &TcpRepr) -> (IpRepr, TcpRepr<'static>) {
@ -1283,6 +1349,10 @@ impl<'a> TcpSocket<'a> {
self.meta.handle, self.local_endpoint, self.remote_endpoint,
ack_len, self.tx_buffer.len() - ack_len);
self.tx_buffer.dequeue_allocated(ack_len);
// There's new room available in tx_buffer, wake the waiting task if any.
#[cfg(feature = "async")]
self.tx_waker.wake();
}
if let Some(ack_number) = repr.ack_number {
@ -1367,6 +1437,10 @@ impl<'a> TcpSocket<'a> {
self.meta.handle, self.local_endpoint, self.remote_endpoint,
contig_len, self.rx_buffer.len() + contig_len);
self.rx_buffer.enqueue_unallocated(contig_len);
// There's new data in rx_buffer, notify waiting task if any.
#[cfg(feature = "async")]
self.rx_waker.wake();
}
if !self.assembler.is_empty() {

View File

@ -4,6 +4,10 @@ use {Error, Result};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{PacketBuffer, PacketMetadata};
use wire::{IpProtocol, IpRepr, IpEndpoint, UdpRepr};
#[cfg(feature = "async")]
use socket::WakerRegistration;
#[cfg(feature = "async")]
use core::task::Waker;
/// A UDP packet metadata.
pub type UdpPacketMetadata = PacketMetadata<IpEndpoint>;
@ -22,7 +26,11 @@ pub struct UdpSocket<'a, 'b: 'a> {
rx_buffer: UdpSocketBuffer<'a, 'b>,
tx_buffer: UdpSocketBuffer<'a, 'b>,
/// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets.
hop_limit: Option<u8>
hop_limit: Option<u8>,
#[cfg(feature = "async")]
rx_waker: WakerRegistration,
#[cfg(feature = "async")]
tx_waker: WakerRegistration,
}
impl<'a, 'b> UdpSocket<'a, 'b> {
@ -34,10 +42,49 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
endpoint: IpEndpoint::default(),
rx_buffer: rx_buffer,
tx_buffer: tx_buffer,
hop_limit: None
hop_limit: None,
#[cfg(feature = "async")]
rx_waker: WakerRegistration::new(),
#[cfg(feature = "async")]
tx_waker: WakerRegistration::new(),
}
}
/// Register a waker for receive operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `recv` method calls, such as receiving data, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_recv_waker(&mut self, waker: &Waker) {
self.rx_waker.register(waker)
}
/// Register a waker for send operations.
///
/// The waker is woken on state changes that might affect the return value
/// of `send` method calls, such as space becoming available in the transmit
/// buffer, or the socket closing.
///
/// Notes:
///
/// - Only one waker can be registered at a time. If another waker was previously registered,
/// it is overwritten and will no longer be woken.
/// - The Waker is woken only once. Once woken, you must register it again to receive more wakes.
/// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
/// necessarily changed.
#[cfg(feature = "async")]
pub fn register_send_waker(&mut self, waker: &Waker) {
self.tx_waker.register(waker)
}
/// Return the socket handle.
#[inline]
pub fn handle(&self) -> SocketHandle {
@ -89,6 +136,13 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
if self.is_open() { return Err(Error::Illegal) }
self.endpoint = endpoint;
#[cfg(feature = "async")]
{
self.rx_waker.wake();
self.tx_waker.wake();
}
Ok(())
}
@ -234,6 +288,10 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
net_trace!("{}:{}:{}: receiving {} octets",
self.meta.handle, self.endpoint,
endpoint, size);
#[cfg(feature = "async")]
self.rx_waker.wake();
Ok(())
}
@ -261,7 +319,12 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
hop_limit: hop_limit,
};
emit((ip_repr, repr))
})
})?;
#[cfg(feature = "async")]
self.tx_waker.wake();
Ok(())
}
pub(crate) fn poll_at(&self) -> PollAt {

33
src/socket/waker.rs Normal file
View File

@ -0,0 +1,33 @@
use core::task::Waker;
/// Utility struct to register and wake a waker.
#[derive(Debug)]
pub struct WakerRegistration {
waker: Option<Waker>,
}
impl WakerRegistration {
pub const fn new() -> Self {
Self { waker: None }
}
/// Register a waker. Overwrites the previous waker, if any.
pub fn register(&mut self, w: &Waker) {
match self.waker {
// Optimization: If both the old and new Wakers wake the same task, we can simply
// keep the old waker, skipping the clone. (In most executor implementations,
// cloning a waker is somewhat expensive, comparable to cloning an Arc).
Some(ref w2) if (w2.will_wake(w)) => {}
// In all other cases
// - we have no waker registered
// - we have a waker registered but it's for a different task.
// then clone the new waker and store it
_ => self.waker = Some(w.clone()),
}
}
/// Wake the registered waker, if any.
pub fn wake(&mut self) {
self.waker.take().map(|w| w.wake());
}
}