Split `poll_at`/`poll_delay` out of `poll`.

The previous model was flawed. Consider the following case:
  * The main loop looks as follows (pseudocode):
      loop {
        let _ = (tcp:1234).read_all()
        wait(iface.poll())
      }
  * The remote end is continuously transmitting data and at some
    point fills the window of (tcp:1234), stopping the transmission
    afterwards.
  * The local end processes the packets and, as a part of egress
    routine, emits an ACK. That also updates the window, and
    the socket's poll_at() routine returns None, since there is
    nothing to transmit or retransmit.
  * The local end now waits indefinitely even though it can start
    processing the data in the socket buffers right now.
v0.7.x
whitequark 2017-12-22 12:04:35 +00:00
parent fec3bb32eb
commit f1a7fbe973
9 changed files with 93 additions and 50 deletions

View File

@ -67,7 +67,7 @@ required-features = ["std", "phy-tap_interface", "socket-tcp", "socket-udp"]
[[example]]
name = "loopback"
required-features = ["alloc", "proto-ipv4"]
required-features = ["log", "socket-tcp"]
# This is really a test, but it requires root privileges for setup (the tap interface)
# so it is built as an example.

View File

@ -59,6 +59,9 @@ fn main() {
let mut tcp_active = false;
loop {
let timestamp = utils::millis_since(startup_time);
iface.poll(&mut sockets, timestamp).expect("poll error");
{
let mut socket = sockets.get::<TcpSocket>(tcp_handle);
if socket.is_active() && !tcp_active {
@ -92,8 +95,6 @@ fn main() {
}
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
}
}

View File

@ -59,6 +59,9 @@ fn main() {
let mut state = State::Connect;
loop {
let timestamp = utils::millis_since(startup_time);
iface.poll(&mut sockets, timestamp).expect("poll error");
{
let mut socket = sockets.get::<TcpSocket>(tcp_handle);
@ -94,8 +97,6 @@ fn main() {
}
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
}
}

View File

@ -124,6 +124,8 @@ fn main() {
let mut did_connect = false;
let mut done = false;
while !done && clock.elapsed() < 10_000 {
iface.poll(&mut socket_set, clock.elapsed()).expect("poll error");
{
let mut socket = socket_set.get::<TcpSocket>(server_handle);
if !socket.is_active() && !socket.is_listening() {
@ -161,16 +163,14 @@ fn main() {
}
}
match iface.poll(&mut socket_set, clock.elapsed()) {
Ok(Some(poll_at)) if poll_at < clock.elapsed() =>
match iface.poll_delay(&socket_set, clock.elapsed()) {
Some(0) =>
debug!("resuming"),
Ok(Some(poll_at)) => {
let delay = poll_at - clock.elapsed();
Some(delay) => {
debug!("sleeping for {} ms", delay);
clock.advance(delay)
}
Ok(None) => clock.advance(1),
Err(e) => debug!("poll error: {}", e)
None => clock.advance(1)
}
}

View File

@ -77,6 +77,9 @@ fn main() {
let endpoint = IpAddress::Ipv4(remote_addr);
loop {
let timestamp = utils::millis_since(startup_time);
iface.poll(&mut sockets, timestamp).expect("poll error");
{
let mut socket = sockets.get::<IcmpSocket>(icmp_handle);
if !socket.is_open() {
@ -141,7 +144,7 @@ fn main() {
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
let poll_at = iface.poll_at(&sockets, timestamp);
let resume_at = [poll_at, Some(send_at)].iter().flat_map(|x| *x).min();
phy_wait(fd, resume_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
}

View File

@ -71,6 +71,9 @@ fn main() {
let mut tcp_6970_active = false;
loop {
let timestamp = utils::millis_since(startup_time);
iface.poll(&mut sockets, timestamp).expect("poll error");
// udp:6969: respond "hello"
{
let mut socket = sockets.get::<UdpSocket>(udp_handle);
@ -187,8 +190,6 @@ fn main() {
}
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
}
}

View File

@ -41,7 +41,7 @@ fn client(kind: Client) {
match result {
Ok(0) => break,
Ok(result) => {
print!("(P:{})", result);
// print!("(P:{})", result);
processed += result
}
Err(err) => panic!("cannot process: {}", err)
@ -54,6 +54,9 @@ fn client(kind: Client) {
static CLIENT_DONE: AtomicBool = ATOMIC_BOOL_INIT;
fn main() {
#[cfg(feature = "log")]
utils::setup_logging("info");
let (mut opts, mut free) = utils::create_options();
utils::add_tap_options(&mut opts, &mut free);
utils::add_middleware_options(&mut opts, &mut free);
@ -97,6 +100,9 @@ fn main() {
let mut processed = 0;
while !CLIENT_DONE.load(Ordering::SeqCst) {
let timestamp = utils::millis_since(startup_time);
iface.poll(&mut sockets, timestamp).expect("poll error");
// tcp:1234: emit data
{
let mut socket = sockets.get::<TcpSocket>(tcp1_handle);
@ -133,8 +139,6 @@ fn main() {
}
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
}
}

View File

@ -14,7 +14,9 @@ use log::{LogLevel, LogLevelFilter, LogRecord};
use env_logger::LogBuilder;
use getopts::{Options, Matches};
use smoltcp::phy::{Device, EthernetTracer, FaultInjector, TapInterface};
use smoltcp::phy::{Device, EthernetTracer, FaultInjector};
#[cfg(feature = "phy-tap_interface")]
use smoltcp::phy::TapInterface;
use smoltcp::phy::{PcapWriter, PcapSink, PcapMode, PcapLinkType};
#[cfg(feature = "log")]

View File

@ -261,10 +261,9 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
/// The timestamp must be a number of milliseconds, monotonically increasing
/// since an arbitrary moment in time, such as system startup.
///
/// This function returns a _soft deadline_ for calling it the next time.
/// That is, if `iface.poll(&mut sockets, 1000)` returns `Ok(Some(2000))`,
/// it harmless (but wastes energy) to call it 500 ms later, and potentially
/// harmful (impacting quality of service) to call it 1500 ms later.
/// This function returns a boolean value indicating whether any packets were
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
///
/// # Errors
/// This method will routinely return errors in response to normal network
@ -276,18 +275,50 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
/// packets containing any unsupported protocol, option, or form, which is
/// a very common occurrence and on a production system it should not even
/// be logged.
pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<Option<u64>> {
self.socket_egress(sockets, timestamp)?;
if self.socket_ingress(sockets, timestamp)? {
Ok(Some(0))
} else {
Ok(sockets.iter().filter_map(|socket| {
let socket_poll_at = socket.poll_at();
socket.meta().poll_at(socket_poll_at, |ip_addr|
self.inner.has_neighbor(&ip_addr, timestamp))
}).min())
pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
let mut readiness_may_have_changed = false;
loop {
let processed_any = self.socket_ingress(sockets, timestamp)?;
let emitted_any = self.socket_egress(sockets, timestamp)?;
if processed_any || emitted_any {
readiness_may_have_changed = true;
} else {
break
}
}
Ok(readiness_may_have_changed)
}
/// Return a _soft deadline_ for calling [poll] the next time.
/// That is, if `iface.poll_at(&sockets, 1000)` returns `Ok(Some(2000))`,
/// you should call call [poll] in 1000 ms; it is harmless (but wastes energy)
/// to call it 500 ms later, and potentially harmful (impacting quality of service)
/// to call it 1500 ms later.
///
/// The timestamp argument is the same as for [poll].
///
/// [poll]: #method.poll
pub fn poll_at(&self, sockets: &SocketSet, timestamp: u64) -> Option<u64> {
sockets.iter().filter_map(|socket| {
let socket_poll_at = socket.poll_at();
socket.meta().poll_at(socket_poll_at, |ip_addr|
self.inner.has_neighbor(&ip_addr, timestamp))
}).min()
}
/// Return an _advisory wait time_ for calling [poll] the next time.
/// That is, if `iface.poll_at(&sockets, 1000)` returns `Ok(Some(1000))`,
/// you should call call [poll] in 1000 ms; it is harmless (but wastes energy)
/// to call it 500 ms later, and potentially harmful (impacting quality of service)
/// to call it 1500 ms later.
///
/// This is a shortcut for `poll_at(..., timestamp).map(|at| at.saturating_sub(timestamp))`.
///
/// The timestamp argument is the same as for [poll].
///
/// [poll]: #method.poll
pub fn poll_delay(&self, sockets: &SocketSet, timestamp: u64) -> Option<u64> {
self.poll_at(sockets, timestamp).map(|at| at.saturating_sub(timestamp))
}
fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
@ -298,29 +329,29 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
None => break,
Some(tokens) => tokens,
};
let dispatch_result = rx_token.consume(timestamp, |frame| {
let response = inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
rx_token.consume(timestamp, |frame| {
inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
net_debug!("cannot process ingress packet: {}", err);
net_debug!("packet dump follows:\n{}",
PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &frame));
err
})?;
processed_any = true;
inner.dispatch(tx_token, timestamp, response)
});
dispatch_result.map_err(|err| {
net_debug!("cannot dispatch response packet: {}", err);
err
}).and_then(|response| {
processed_any = true;
inner.dispatch(tx_token, timestamp, response).map_err(|err| {
net_debug!("cannot dispatch response packet: {}", err);
err
})
})
})?;
}
Ok(processed_any)
}
fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
let mut caps = self.device.capabilities();
caps.max_transmission_unit -= EthernetFrame::<&[u8]>::header_len();
let mut emitted_any = false;
for mut socket in sockets.iter_mut() {
if !socket.meta_mut().egress_permitted(|ip_addr|
self.inner.has_neighbor(&ip_addr, timestamp)) {
@ -392,10 +423,10 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
socket.meta().handle, err);
return Err(err)
}
(Ok(()), Ok(())) => ()
(Ok(()), Ok(())) => emitted_any = true
}
}
Ok(())
Ok(emitted_any)
}
}