Adding prototype fix for fail-free ingress

This commit is contained in:
Ryan Summers 2021-05-31 17:25:04 +02:00
parent 03429a8b4e
commit 8d4e255090
17 changed files with 147 additions and 123 deletions

View File

@ -537,13 +537,13 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: Instant) -> Result<bool> {
let mut readiness_may_have_changed = false;
loop {
let processed_any = self.socket_ingress(sockets, timestamp)?;
let emitted_any = self.socket_egress(sockets, timestamp)?;
let processed_any = self.socket_ingress(sockets, timestamp);
let emitted_any = self.socket_egress(sockets, timestamp);
#[cfg(feature = "proto-igmp")]
self.igmp_egress(timestamp)?;
self.igmp_egress(timestamp);
if processed_any || emitted_any {
if processed_any || emitted_any? {
readiness_may_have_changed = true;
} else {
break
@ -592,7 +592,7 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
}
}
fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: Instant) -> Result<bool> {
fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: Instant) -> bool {
let mut processed_any = false;
loop {
let &mut Self { ref mut device, ref mut inner } = self;
@ -600,53 +600,47 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
None => break,
Some(tokens) => tokens,
};
rx_token.consume(timestamp, |frame| {
match rx_token.consume(timestamp, |frame| {
match inner.device_capabilities.medium {
#[cfg(feature = "medium-ethernet")]
Medium::Ethernet => {
inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
net_debug!("cannot process ingress packet: {}", err);
#[cfg(not(feature = "defmt"))]
net_debug!("packet dump follows:\n{}",
PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &frame));
err
}).and_then(|response| {
processed_any = true;
match response {
Some(packet) => {
inner.dispatch(tx_token, timestamp, packet).map_err(|err| {
net_debug!("cannot dispatch response packet: {}", err);
err
})
match inner.process_ethernet(sockets, timestamp, &frame) {
Ok(response) => {
processed_any = true;
if let Some(packet) = response {
inner.dispatch(tx_token, timestamp, packet);
}
None => Ok(())
}
})
Err(err) => {
net_debug!("cannot process ingress packet: {}", err);
#[cfg(not(feature = "defmt"))]
net_debug!("packet dump follows:\n{}",
PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &frame));
}
}
}
#[cfg(feature = "medium-ip")]
Medium::Ip => {
inner.process_ip(sockets, timestamp, &frame).map_err(|err| {
net_debug!("cannot process ingress packet: {}", err);
//net_debug!("packet dump follows:\n{}",
// PrettyPrinter::<IpFrame<&[u8]>>::new("", &frame));
err
}).and_then(|response| {
processed_any = true;
match response {
Some(packet) => {
inner.dispatch_ip(tx_token, timestamp, packet).map_err(|err| {
net_debug!("cannot dispatch response packet: {}", err);
err
})
match inner.process_ip(sockets, timestamp, &frame) {
Ok(response) => {
processed_any = true;
if let Some(packet) = response {
match inner.dispatch_ip(tx_token, timestamp, packet) {
Err(err) => net_debug!("Failed to send response: {}", err),
_ => {},
};
}
None => Ok(())
}
})
Err(err) => net_debug!("cannot process ingress packet: {}", err),
}
}
}
})?;
}) {
Err(err) => net_debug!("Failed to consume RX token: {}", err),
Ok(_) => {},
};
}
Ok(processed_any)
processed_any
}
fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: Instant) -> Result<bool> {
@ -663,13 +657,16 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
let mut device_result = Ok(());
let &mut Self { ref mut device, ref mut inner } = self;
let tx_token = match device.transmit() {
Some(token) => token,
None => break,
};
macro_rules! respond {
($response:expr) => ({
let response = $response;
neighbor_addr = Some(response.ip_repr().dst_addr());
let tx_token = device.transmit().ok_or(Error::Exhausted)?;
device_result = inner.dispatch_ip(tx_token, timestamp, response);
device_result
})
}
@ -697,7 +694,7 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
#[cfg(feature = "proto-ipv6")]
(IpRepr::Ipv6(ipv6_repr), IcmpRepr::Ipv6(icmpv6_repr)) =>
respond!(IpPacket::Icmpv6((ipv6_repr, icmpv6_repr))),
_ => Err(Error::Unaddressable)
_ => panic!("Invalid ICMP represnetation"),
}
}),
#[cfg(feature = "socket-udp")]
@ -742,18 +739,27 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
/// Depending on `igmp_report_state` and the therein contained
/// timeouts, send IGMP membership reports.
#[cfg(feature = "proto-igmp")]
fn igmp_egress(&mut self, timestamp: Instant) -> Result<bool> {
fn igmp_egress(&mut self, timestamp: Instant) -> bool {
match self.inner.igmp_report_state {
IgmpReportState::ToSpecificQuery { version, timeout, group }
if timestamp >= timeout => {
if let Some(pkt) = self.inner.igmp_report_packet(version, group) {
// Send initial membership report
let tx_token = self.device.transmit().ok_or(Error::Exhausted)?;
self.inner.dispatch_ip(tx_token, timestamp, pkt)?;
let tx_token = match self.device.transmit() {
Some(token) => token,
None => {
net_debug!("IGMP egress failure: Exhausted");
return false
}
};
match self.inner.dispatch_ip(tx_token, timestamp, pkt) {
Err(err) => net_debug!("Failed to egress IGMP: {}", err),
_ => {},
};
}
self.inner.igmp_report_state = IgmpReportState::Inactive;
Ok(true)
true
}
IgmpReportState::ToGeneralQuery { version, timeout, interval, next_index }
if timestamp >= timeout => {
@ -766,24 +772,34 @@ impl<'a, DeviceT> Interface<'a, DeviceT>
Some(addr) => {
if let Some(pkt) = self.inner.igmp_report_packet(version, addr) {
// Send initial membership report
let tx_token = self.device.transmit().ok_or(Error::Exhausted)?;
self.inner.dispatch_ip(tx_token, timestamp, pkt)?;
let tx_token = match self.device.transmit() {
Some(token) => token,
None => {
net_debug!("IGMP egress failure: Exhausted");
return false
}
};
match self.inner.dispatch_ip(tx_token, timestamp, pkt) {
Err(err) => net_debug!("Failed to egress IGMP: {}", err),
_ => {},
};
}
let next_timeout = (timeout + interval).max(timestamp);
self.inner.igmp_report_state = IgmpReportState::ToGeneralQuery {
version, timeout: next_timeout, interval, next_index: next_index + 1
};
Ok(true)
true
}
None => {
self.inner.igmp_report_state = IgmpReportState::Inactive;
Ok(false)
false
}
}
}
_ => Ok(false)
_ => false
}
}
}
@ -1567,7 +1583,7 @@ impl<'a> InterfaceInner<'a> {
#[cfg(feature = "medium-ethernet")]
fn dispatch<Tx>(&mut self, tx_token: Tx, timestamp: Instant,
packet: EthernetPacket) -> Result<()>
packet: EthernetPacket)
where Tx: TxToken
{
match packet {
@ -1584,29 +1600,33 @@ impl<'a> InterfaceInner<'a> {
let mut packet = ArpPacket::new_unchecked(frame.payload_mut());
arp_repr.emit(&mut packet);
})
});
},
EthernetPacket::Ip(packet) => {
self.dispatch_ip(tx_token, timestamp, packet)
match self.dispatch_ip(tx_token, timestamp, packet) {
Err(err) => net_debug!("Failed to egress: {}", err),
_ => {},
};
},
}
}
#[cfg(feature = "medium-ethernet")]
fn dispatch_ethernet<Tx, F>(&mut self, tx_token: Tx, timestamp: Instant,
buffer_len: usize, f: F) -> Result<()>
buffer_len: usize, f: F) -> ()
where Tx: TxToken, F: FnOnce(EthernetFrame<&mut [u8]>)
{
let tx_len = EthernetFrame::<&[u8]>::buffer_len(buffer_len);
tx_token.consume(timestamp, tx_len, |tx_buffer| {
match tx_token.consume(timestamp, tx_len, |tx_buffer| {
debug_assert!(tx_buffer.as_ref().len() == tx_len);
let mut frame = EthernetFrame::new_unchecked(tx_buffer);
frame.set_src_addr(self.ethernet_addr.unwrap());
f(frame);
Ok(())
})
}) {
Err(err) => net_debug!("Failed to consume TX token: {}", err),
Ok(_) => {},
};
}
fn in_same_network(&self, addr: &IpAddress) -> bool {
@ -1705,7 +1725,7 @@ impl<'a> InterfaceInner<'a> {
frame.set_ethertype(EthernetProtocol::Arp);
arp_repr.emit(&mut ArpPacket::new_unchecked(frame.payload_mut()))
})?;
});
}
#[cfg(feature = "proto-ipv6")]
@ -1765,7 +1785,8 @@ impl<'a> InterfaceInner<'a> {
let payload = &mut frame.payload_mut()[ip_repr.buffer_len()..];
packet.emit_payload(ip_repr, payload, &caps);
})
});
Ok(())
}
#[cfg(feature = "medium-ip")]
Medium::Ip => {
@ -1777,8 +1798,6 @@ impl<'a> InterfaceInner<'a> {
let payload = &mut tx_buffer[ip_repr.buffer_len()..];
packet.emit_payload(ip_repr, payload, &caps);
Ok(())
})
}
}
@ -2746,7 +2765,7 @@ mod test {
// loopback have been processed, including responses to
// GENERAL_QUERY_BYTES. Therefore `recv_all()` would return 0
// pkts that could be checked.
iface.socket_ingress(&mut socket_set, timestamp).unwrap();
iface.socket_ingress(&mut socket_set, timestamp);
// Leave multicast groups
let timestamp = Instant::now();

View File

@ -242,7 +242,7 @@ pub struct RxToken<'a, Rx: phy::RxToken> {
impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
if self.state.borrow_mut().maybe(self.config.drop_pct) {
net_trace!("rx: randomly dropping a packet");
@ -254,10 +254,11 @@ impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
}
let Self { token, config, state, mut corrupt } = self;
token.consume(timestamp, |buffer| {
if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
net_trace!("rx: dropping a packet that is too large");
return Err(Error::Exhausted)
}
// TODO: Implement this in a new mechanism. Token consumption is infallible.
//if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
// net_trace!("rx: dropping a packet that is too large");
// return Error::Exhausted
//}
if state.borrow_mut().maybe(config.corrupt_pct) {
net_trace!("rx: randomly corrupting a packet");
let mut corrupt = &mut corrupt[..buffer.len()];
@ -281,7 +282,7 @@ pub struct TxToken<'a, Tx: phy::TxToken> {
impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
fn consume<R, F>(mut self, timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) {
net_trace!("tx: randomly dropping a packet");
@ -297,7 +298,7 @@ impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
};
if drop {
return f(&mut self.junk[..len]);
return Ok(f(&mut self.junk[..len]));
}
let Self { token, state, config, .. } = self;

View File

@ -86,7 +86,7 @@ pub struct RxToken<'a, Rx: phy::RxToken, F: Fuzzer + 'a>{
impl<'a, Rx: phy::RxToken, FRx: Fuzzer> phy::RxToken for RxToken<'a, Rx, FRx> {
fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let Self { fuzzer, token } = self;
token.consume(timestamp, |buffer| {
@ -104,7 +104,7 @@ pub struct TxToken<'a, Tx: phy::TxToken, F: Fuzzer + 'a> {
impl<'a, Tx: phy::TxToken, FTx: Fuzzer> phy::TxToken for TxToken<'a, Tx, FTx> {
fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let Self { fuzzer, token } = self;
token.consume(timestamp, len, |mut buf| {

View File

@ -68,9 +68,9 @@ pub struct RxToken {
impl phy::RxToken for RxToken {
fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
f(&mut self.buffer)
Ok(f(&mut self.buffer))
}
}
@ -81,12 +81,12 @@ pub struct TxToken<'a> {
impl<'a> phy::TxToken for TxToken<'a> {
fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let mut buffer = Vec::new();
buffer.resize(len, 0);
let result = f(&mut buffer);
self.queue.push_back(buffer);
result
Ok(result)
}
}

View File

@ -295,7 +295,7 @@ pub trait RxToken {
/// The timestamp must be a number of milliseconds, monotonically increasing since an
/// arbitrary moment in time, such as system startup.
fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>;
where F: FnOnce(&mut [u8]) -> R;
}
/// A token to transmit a single network packet.
@ -310,5 +310,5 @@ pub trait TxToken {
/// The timestamp must be a number of milliseconds, monotonically increasing since an
/// arbitrary moment in time, such as system startup.
fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>;
where F: FnOnce(&mut [u8]) -> R;
}

View File

@ -178,7 +178,7 @@ pub struct RxToken<Rx: phy::RxToken, S: PcapSink> {
}
impl<Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<Rx, S> {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, timestamp: Instant, f: F) -> Result<R> {
fn consume<R, F: FnOnce(&mut [u8]) -> R>(self, timestamp: Instant, f: F) -> Result<R> {
let Self { token, sink, mode } = self;
token.consume(timestamp, |buffer| {
match mode {
@ -200,7 +200,7 @@ pub struct TxToken<Tx: phy::TxToken, S: PcapSink> {
impl<Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<Tx, S> {
fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let Self { token, sink, mode } = self;
token.consume(timestamp, len, |buffer| {

View File

@ -80,9 +80,9 @@ pub struct RxToken {
impl phy::RxToken for RxToken {
fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
f(&mut self.buffer[..])
Ok(f(&mut self.buffer[..]))
}
}
@ -93,12 +93,12 @@ pub struct TxToken {
impl phy::TxToken for TxToken {
fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; len];
let result = f(&mut buffer);
lower.send(&buffer[..]).unwrap();
result
Ok(result)
}
}

View File

@ -77,7 +77,7 @@ pub struct RxToken<Rx: phy::RxToken> {
impl<Rx: phy::RxToken> phy::RxToken for RxToken<Rx> {
fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let Self { token, writer, medium } = self;
token.consume(timestamp, |buffer| {
@ -100,7 +100,7 @@ pub struct TxToken<Tx: phy::TxToken> {
impl<Tx: phy::TxToken> phy::TxToken for TxToken<Tx> {
fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let Self { token, writer, medium } = self;
token.consume(timestamp, len, |buffer| {
@ -137,4 +137,4 @@ impl<'a> fmt::Display for Packet<'a> {
}
}
}
}
}

View File

@ -83,9 +83,9 @@ pub struct RxToken {
impl phy::RxToken for RxToken {
fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
f(&mut self.buffer[..])
Ok(f(&mut self.buffer[..]))
}
}
@ -96,12 +96,12 @@ pub struct TxToken {
impl phy::TxToken for TxToken {
fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
where F: FnOnce(&mut [u8]) -> R
{
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; len];
let result = f(&mut buffer);
lower.send(&buffer[..]).unwrap();
result
Ok(result)
}
}

View File

@ -298,7 +298,7 @@ impl Dhcpv4Socket {
}
pub(crate) fn dispatch<F>(&mut self, now: Instant, ethernet_addr: EthernetAddress, ip_mtu: usize, emit: F) -> Result<()>
where F: FnOnce((Ipv4Repr, UdpRepr, DhcpRepr)) -> Result<()> {
where F: FnOnce((Ipv4Repr, UdpRepr, DhcpRepr)) {
// Worst case biggest IPv4 header length.
// 0x0f * 4 = 60 bytes.
@ -350,7 +350,7 @@ impl Dhcpv4Socket {
// send packet
net_debug!("DHCP send DISCOVER to {}: {:?}", ipv4_repr.dst_addr, dhcp_repr);
ipv4_repr.payload_len = udp_repr.header_len() + dhcp_repr.buffer_len();
emit((ipv4_repr, udp_repr, dhcp_repr))?;
emit((ipv4_repr, udp_repr, dhcp_repr));
// Update state AFTER the packet has been successfully sent.
state.retry_at = now + DISCOVER_TIMEOUT;
@ -376,7 +376,7 @@ impl Dhcpv4Socket {
net_debug!("DHCP send request to {}: {:?}", ipv4_repr.dst_addr, dhcp_repr);
ipv4_repr.payload_len = udp_repr.header_len() + dhcp_repr.buffer_len();
emit((ipv4_repr, udp_repr, dhcp_repr))?;
emit((ipv4_repr, udp_repr, dhcp_repr));
// Exponential backoff: Double every 2 retries.
state.retry_at = now + (REQUEST_TIMEOUT << (state.retry as u32 / 2));
@ -405,7 +405,7 @@ impl Dhcpv4Socket {
net_debug!("DHCP send renew to {}: {:?}", ipv4_repr.dst_addr, dhcp_repr);
ipv4_repr.payload_len = udp_repr.header_len() + dhcp_repr.buffer_len();
emit((ipv4_repr, udp_repr, dhcp_repr))?;
emit((ipv4_repr, udp_repr, dhcp_repr));
// In both RENEWING and REBINDING states, if the client receives no
// response to its DHCPREQUEST message, the client SHOULD wait one-half

View File

@ -402,7 +402,7 @@ impl<'a> IcmpSocket<'a> {
}
pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
where F: FnOnce((IpRepr, IcmpRepr)) -> Result<()>
where F: FnOnce((IpRepr, IcmpRepr))
{
let handle = self.meta.handle;
let hop_limit = self.hop_limit.unwrap_or(64);
@ -413,7 +413,13 @@ impl<'a> IcmpSocket<'a> {
#[cfg(feature = "proto-ipv4")]
IpAddress::Ipv4(ipv4_addr) => {
let packet = Icmpv4Packet::new_unchecked(&*packet_buf);
let repr = Icmpv4Repr::parse(&packet, &ChecksumCapabilities::ignored())?;
let repr = match Icmpv4Repr::parse(&packet, &ChecksumCapabilities::ignored()) {
Ok(repr) => repr,
Err(err) => {
net_debug!("ICMPv4 represnetation invalid: {}", err);
return
},
};
let ip_repr = IpRepr::Ipv4(Ipv4Repr {
src_addr: Ipv4Address::default(),
dst_addr: ipv4_addr,
@ -427,7 +433,14 @@ impl<'a> IcmpSocket<'a> {
IpAddress::Ipv6(ipv6_addr) => {
let packet = Icmpv6Packet::new_unchecked(&*packet_buf);
let src_addr = Ipv6Address::default();
let repr = Icmpv6Repr::parse(&src_addr.into(), &ipv6_addr.into(), &packet, &ChecksumCapabilities::ignored())?;
let repr = match Icmpv6Repr::parse(&src_addr.into(), &ipv6_addr.into(), &packet,
&ChecksumCapabilities::ignored()) {
Ok(repr) => repr,
Err(err) => {
net_debug!("ICMPv6 represnetation invalid: {}", err);
return
},
};
let ip_repr = IpRepr::Ipv6(Ipv6Repr {
src_addr: src_addr,
dst_addr: ipv6_addr,
@ -437,7 +450,7 @@ impl<'a> IcmpSocket<'a> {
});
emit((ip_repr, IcmpRepr::Ipv6(repr)))
},
_ => Err(Error::Unaddressable)
_ => net_debug!("ICMP destination unaddressable"),
}
})?;

View File

@ -228,7 +228,7 @@ impl<'a> RawSocket<'a> {
pub(crate) fn dispatch<F>(&mut self, checksum_caps: &ChecksumCapabilities, emit: F) ->
Result<()>
where F: FnOnce((IpRepr, &[u8])) -> Result<()> {
where F: FnOnce((IpRepr, &[u8])) {
fn prepare<'a>(protocol: IpProtocol, buffer: &'a mut [u8],
_checksum_caps: &ChecksumCapabilities) -> Result<(IpRepr, &'a [u8])> {
match IpVersion::of_packet(buffer)? {
@ -275,8 +275,6 @@ impl<'a> RawSocket<'a> {
net_debug!("{}:{}:{}: dropping outgoing packet ({})",
handle, ip_version, ip_protocol,
error);
// Return Ok(()) so the packet is dequeued.
Ok(())
}
}
})?;

View File

@ -1669,7 +1669,7 @@ impl<'a> TcpSocket<'a> {
pub(crate) fn dispatch<F>(&mut self, timestamp: Instant, ip_mtu: usize,
emit: F) -> Result<()>
where F: FnOnce((IpRepr, TcpRepr)) -> Result<()> {
where F: FnOnce((IpRepr, TcpRepr)) {
if !self.remote_endpoint.is_specified() { return Err(Error::Exhausted) }
if self.remote_last_ts.is_none() {
@ -1867,7 +1867,7 @@ impl<'a> TcpSocket<'a> {
// to not waste time waiting for the retransmit timer on packets that we know
// for sure will not be successfully transmitted.
ip_repr.set_payload_len(repr.buffer_len());
emit((ip_repr, repr))?;
emit((ip_repr, repr));
// We've sent something, whether useful data or a keep-alive packet, so rewind
// the keep-alive timer.

View File

@ -296,7 +296,7 @@ impl<'a> UdpSocket<'a> {
}
pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
where F: FnOnce((IpRepr, UdpRepr, &[u8])) -> Result<()> {
where F: FnOnce((IpRepr, UdpRepr, &[u8])) {
let handle = self.handle();
let endpoint = self.endpoint;
let hop_limit = self.hop_limit.unwrap_or(64);

View File

@ -30,4 +30,4 @@ impl WakerRegistration {
pub fn wake(&mut self) {
self.waker.take().map(|w| w.wake());
}
}
}

View File

@ -125,7 +125,7 @@ impl<'a, H> PacketBuffer<'a, H> {
/// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
where F: FnOnce(&mut H, &'c mut [u8]) -> Result<R> {
where F: FnOnce(&mut H, &'c mut [u8]) -> R {
self.dequeue_padding();
let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
@ -136,12 +136,9 @@ impl<'a, H> PacketBuffer<'a, H> {
payload_ring.dequeue_many_with(|payload_buf| {
debug_assert!(payload_buf.len() >= size);
match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
Ok(val) => (size, Ok(val)),
Err(err) => (0, Err(err)),
}
(size, Ok(f(header.as_mut().unwrap(), &mut payload_buf[..size])))
}).1
})
})?
}
/// Dequeue a single packet from the buffer, and return a reference to its payload

View File

@ -136,18 +136,14 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a single buffer element, and dequeue the element if `f`
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
where F: FnOnce(&'b mut T) -> Result<R> {
where F: FnOnce(&'b mut T) -> R {
if self.is_empty() { return Err(Error::Exhausted) }
let next_at = self.get_idx_unchecked(1);
match f(&mut self.storage[self.read_at]) {
Ok(result) => {
self.length -= 1;
self.read_at = next_at;
Ok(result)
}
Err(error) => Err(error)
}
let result = f(&mut self.storage[self.read_at]);
self.length -= 1;
self.read_at = next_at;
Ok(result)
}
/// Dequeue an element from the buffer, and return a reference to it,
@ -155,7 +151,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
///
/// This function is a shortcut for `ring_buf.dequeue_one_with(Ok)`.
pub fn dequeue_one(&mut self) -> Result<&mut T> {
self.dequeue_one_with(Ok)
self.dequeue_one_with(|buf| buf)
}
}