Compute soft deadline in poll() and use nonblocking sockets.

Before this commit, anything that touched RawSocket or TapInterface
worked partly by accident and partly because of a horrible crutch
that resulted in massive latencies as well as inevitable packet loss
every time an ARP request had to be issued. Also, there was no way
to use poll() other than by continuously calling it in a busy loop.

After this commit, poll() indicates when the earliest timer expires,
and so the caller can sleep until that moment (or until packets
arrive).

Note that there is a subtle problem remaining: every time poll()
is called, every socket with a pending outbound packet whose
IP address doesn't correspond to a MAC address will send a new
ARP request, resulting in potentially a whole lot of such requests.
ARP rate limiting is a separate topic though.
v0.7.x
whitequark 2017-08-29 19:35:09 +00:00
parent 7cd7bd4683
commit 996389d653
18 changed files with 312 additions and 195 deletions

View File

@ -8,7 +8,8 @@ mod utils;
use std::str::{self, FromStr};
use std::time::Instant;
use smoltcp::Error;
use std::os::unix::io::AsRawFd;
use smoltcp::phy::wait as phy_wait;
use smoltcp::wire::{EthernetAddress, IpAddress};
use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
use smoltcp::socket::{AsSocket, SocketSet};
@ -25,6 +26,7 @@ fn main() {
let mut matches = utils::parse_options(&opts, free);
let device = utils::parse_tap_options(&mut matches);
let fd = device.as_raw_fd();
let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
let address = IpAddress::from_str(&matches.free[0]).expect("invalid address format");
let port = u16::from_str(&matches.free[1]).expect("invalid port format");
@ -86,12 +88,8 @@ fn main() {
}
}
let timestamp = Instant::now().duration_since(startup_time);
let timestamp_ms = (timestamp.as_secs() * 1000) +
(timestamp.subsec_nanos() / 1000000) as u64;
match iface.poll(&mut sockets, timestamp_ms) {
Ok(()) | Err(Error::Exhausted) => (),
Err(e) => debug!("poll error: {}", e)
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at).expect("wait error");
}
}

View File

@ -16,7 +16,6 @@ extern crate getopts;
mod utils;
use core::str;
use smoltcp::Error;
use smoltcp::phy::Loopback;
use smoltcp::wire::{EthernetAddress, IpAddress};
use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
@ -161,11 +160,14 @@ fn main() {
}
match iface.poll(&mut socket_set, clock.elapsed()) {
Ok(()) | Err(Error::Exhausted) => (),
Ok(Some(poll_at)) => {
let delay = poll_at - clock.elapsed();
debug!("sleeping for {} ms", delay);
clock.advance(delay)
}
Ok(None) => clock.advance(1),
Err(e) => debug!("poll error: {}", e)
}
clock.advance(1);
}
if done {

View File

@ -8,8 +8,10 @@ extern crate byteorder;
mod utils;
use std::str::{self, FromStr};
use std::time::{Duration, Instant};
use smoltcp::Error;
use std::cmp;
use std::time::Instant;
use std::os::unix::io::AsRawFd;
use smoltcp::phy::wait as phy_wait;
use smoltcp::wire::{EthernetAddress, IpVersion, IpProtocol, IpAddress,
Ipv4Address, Ipv4Packet, Ipv4Repr,
Icmpv4Repr, Icmpv4Packet};
@ -35,6 +37,7 @@ fn main() {
let mut matches = utils::parse_options(&opts, free);
let device = utils::parse_tap_options(&mut matches);
let fd = device.as_raw_fd();
let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
let address = Ipv4Address::from_str(&matches.free[0]).expect("invalid address format");
let count = matches.opt_str("count").map(|s| usize::from_str(&s).unwrap()).unwrap_or(4);
@ -61,7 +64,7 @@ fn main() {
let mut sockets = SocketSet::new(vec![]);
let raw_handle = sockets.add(raw_socket);
let mut send_next = Duration::default();
let mut send_at = 0;
let mut seq_no = 0;
let mut received = 0;
let mut echo_payload = [0xffu8; 40];
@ -75,11 +78,8 @@ fn main() {
let timestamp_us = (timestamp.as_secs() * 1000000) +
(timestamp.subsec_nanos() / 1000) as u64;
if seq_no == count as u16 && waiting_queue.is_empty() {
break;
}
if socket.can_send() && seq_no < count as u16 && send_next <= timestamp {
if socket.can_send() && seq_no < count as u16 &&
send_at <= utils::millis_since(startup_time) {
NetworkEndian::write_u64(&mut echo_payload, timestamp_us);
let icmp_repr = Icmpv4Repr::EchoRequest {
ident: 1,
@ -105,7 +105,7 @@ fn main() {
waiting_queue.insert(seq_no, timestamp);
seq_no += 1;
send_next += Duration::new(interval, 0);
send_at += interval * 1000;
}
if socket.can_recv() {
@ -137,16 +137,23 @@ fn main() {
println!("From {} icmp_seq={} timeout", remote_addr, seq);
false
}
})
});
if seq_no == count as u16 && waiting_queue.is_empty() {
break
}
}
let timestamp = Instant::now().duration_since(startup_time);
let timestamp_ms = (timestamp.as_secs() * 1000) +
(timestamp.subsec_nanos() / 1000000) as u64;
match iface.poll(&mut sockets, timestamp_ms) {
Ok(()) | Err(Error::Exhausted) => (),
Err(e) => debug!("poll error: {}", e),
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
let mut resume_at = Some(send_at);
if let Some(poll_at) = poll_at {
resume_at = resume_at.map(|at| cmp::min(at, poll_at))
}
debug!("waiting until {:?} ms", resume_at);
phy_wait(fd, resume_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
}
println!("--- {} ping statistics ---", remote_addr);

View File

@ -9,7 +9,8 @@ mod utils;
use std::str;
use std::fmt::Write;
use std::time::Instant;
use smoltcp::Error;
use std::os::unix::io::AsRawFd;
use smoltcp::phy::wait as phy_wait;
use smoltcp::wire::{EthernetAddress, IpAddress};
use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
use smoltcp::socket::{AsSocket, SocketSet};
@ -25,6 +26,7 @@ fn main() {
let mut matches = utils::parse_options(&opts, free);
let device = utils::parse_tap_options(&mut matches);
let fd = device.as_raw_fd();
let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
let startup_time = Instant::now();
@ -154,12 +156,8 @@ fn main() {
}
}
let timestamp = Instant::now().duration_since(startup_time);
let timestamp_ms = (timestamp.as_secs() * 1000) +
(timestamp.subsec_nanos() / 1000000) as u64;
match iface.poll(&mut sockets, timestamp_ms) {
Ok(()) | Err(Error::Exhausted) => (),
Err(e) => debug!("poll error: {}", e)
}
let timestamp = utils::millis_since(startup_time);
let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
phy_wait(fd, poll_at).expect("wait error");
}
}

View File

@ -129,3 +129,10 @@ pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loo
device.set_bucket_interval(shaping_interval);
device
}
pub fn millis_since(startup_time: Instant) -> u64 {
let duration = Instant::now().duration_since(startup_time);
let duration_ms = (duration.as_secs() * 1000) +
(duration.subsec_nanos() / 1000000) as u64;
duration_ms
}

View File

@ -23,8 +23,8 @@ pub struct Interface<'a, 'b, 'c, DeviceT: Device + 'a> {
protocol_addrs: ManagedSlice<'c, IpAddress>,
}
enum Response<'a> {
Nop,
enum Packet<'a> {
None,
Arp(ArpRepr),
Icmpv4(Ipv4Repr, Icmpv4Repr<'a>),
Raw((IpRepr, &'a [u8])),
@ -107,38 +107,103 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
self.protocol_addrs.iter().any(|&probe| probe == addr)
}
/// Receive and process a packet, if available, and then transmit a packet, if necessary,
/// handling the given set of sockets.
/// Transmit packets queued in the given sockets, and receive packets queued
/// in the device.
///
/// The timestamp is a monotonically increasing number of milliseconds.
pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
// First, transmit any outgoing packets.
loop {
if self.dispatch(sockets, timestamp)? { break }
}
/// The timestamp must be a number of milliseconds, monotonically increasing
/// since an arbitrary moment in time, such as system startup.
///
/// This function returns a _soft deadline_ for calling it the next time.
/// That is, if `iface.poll(&mut sockets, 1000)` returns `Ok(Some(2000))`,
/// it harmless (but wastes energy) to call it 500 ms later, and potentially
/// harmful (impacting quality of service) to call it 1500 ms later.
pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<Option<u64>> {
self.socket_egress(sockets, timestamp)?;
// Now, receive any incoming packets.
self.process(sockets, timestamp)
if self.socket_ingress(sockets, timestamp)? {
Ok(Some(0))
} else {
Ok(sockets.iter().filter_map(|socket| socket.poll_at()).min())
}
}
fn process(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
let mut processed_any = false;
loop {
let frame = self.device.receive(timestamp)?;
let response = self.process_ethernet(sockets, timestamp, &frame)?;
self.dispatch_response(timestamp, response)?;
let frame =
match self.device.receive(timestamp) {
Ok(frame) => frame,
Err(Error::Exhausted) => break, // nothing to receive
Err(err) => return Err(err)
};
let response =
match self.process_ethernet(sockets, timestamp, &frame) {
Ok(response) => response,
Err(err) => {
net_debug!("cannot process ingress packet: {}", err);
continue
}
};
processed_any = true;
match self.dispatch(timestamp, response) {
Ok(()) => (),
Err(err) => {
net_debug!("cannot dispatch response packet: {}", err);
continue
}
}
}
Ok(processed_any)
}
fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
let mut limits = self.device.limits();
limits.max_transmission_unit -= EthernetFrame::<&[u8]>::header_len();
for socket in sockets.iter_mut() {
let mut device_result = Ok(());
let socket_result =
match socket {
&mut Socket::Raw(ref mut socket) =>
socket.dispatch(|response| {
device_result = self.dispatch(timestamp, Packet::Raw(response));
device_result
}),
&mut Socket::Udp(ref mut socket) =>
socket.dispatch(|response| {
device_result = self.dispatch(timestamp, Packet::Udp(response));
device_result
}),
&mut Socket::Tcp(ref mut socket) =>
socket.dispatch(timestamp, &limits, |response| {
device_result = self.dispatch(timestamp, Packet::Tcp(response));
device_result
}),
&mut Socket::__Nonexhaustive => unreachable!()
};
match (device_result, socket_result) {
(Ok(()), Err(Error::Exhausted)) => (), // nothing to transmit
(Err(err), _) | (_, Err(err)) =>
net_debug!("cannot dispatch egress packet: {}", err),
(Ok(()), Ok(())) => ()
}
}
Ok(())
}
fn process_ethernet<'frame, T: AsRef<[u8]>>
(&mut self, sockets: &mut SocketSet, timestamp: u64,
frame: &'frame T) ->
Result<Response<'frame>> {
Result<Packet<'frame>> {
let eth_frame = EthernetFrame::new_checked(frame)?;
// Ignore any packets not directed to our hardware address.
if !eth_frame.dst_addr().is_broadcast() &&
eth_frame.dst_addr() != self.hardware_addr {
return Ok(Response::Nop)
return Ok(Packet::None)
}
match eth_frame.ethertype() {
@ -153,7 +218,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
fn process_arp<'frame, T: AsRef<[u8]>>
(&mut self, eth_frame: &EthernetFrame<&'frame T>) ->
Result<Response<'frame>> {
Result<Packet<'frame>> {
let arp_packet = ArpPacket::new_checked(eth_frame.payload())?;
let arp_repr = ArpRepr::parse(&arp_packet)?;
@ -175,7 +240,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
if operation == ArpOperation::Request &&
self.has_protocol_addr(target_protocol_addr) {
Ok(Response::Arp(ArpRepr::EthernetIpv4 {
Ok(Packet::Arp(ArpRepr::EthernetIpv4 {
operation: ArpOperation::Reply,
source_hardware_addr: self.hardware_addr,
source_protocol_addr: target_protocol_addr,
@ -183,7 +248,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
target_protocol_addr: source_protocol_addr
}))
} else {
Ok(Response::Nop)
Ok(Packet::None)
}
}
@ -194,7 +259,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
fn process_ipv4<'frame, T: AsRef<[u8]>>
(&mut self, sockets: &mut SocketSet, timestamp: u64,
eth_frame: &EthernetFrame<&'frame T>) ->
Result<Response<'frame>> {
Result<Packet<'frame>> {
let ipv4_packet = Ipv4Packet::new_checked(eth_frame.payload())?;
let ipv4_repr = Ipv4Repr::parse(&ipv4_packet)?;
@ -229,7 +294,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
if !self.has_protocol_addr(ipv4_repr.dst_addr) {
// Ignore IP packets not directed at us.
return Ok(Response::Nop)
return Ok(Packet::None)
}
match ipv4_repr.protocol {
@ -240,7 +305,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
IpProtocol::Tcp =>
Self::process_tcp(sockets, timestamp, ip_repr, ip_payload),
_ if handled_by_raw_socket =>
Ok(Response::Nop),
Ok(Packet::None),
_ => {
let icmp_reply_repr = Icmpv4Repr::DstUnreachable {
reason: Icmpv4DstUnreachable::ProtoUnreachable,
@ -253,13 +318,13 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
protocol: IpProtocol::Icmp,
payload_len: icmp_reply_repr.buffer_len()
};
Ok(Response::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
Ok(Packet::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
}
}
}
fn process_icmpv4<'frame>(ipv4_repr: Ipv4Repr, ip_payload: &'frame [u8]) ->
Result<Response<'frame>> {
Result<Packet<'frame>> {
let icmp_packet = Icmpv4Packet::new_checked(ip_payload)?;
let icmp_repr = Icmpv4Repr::parse(&icmp_packet)?;
@ -277,11 +342,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
protocol: IpProtocol::Icmp,
payload_len: icmp_reply_repr.buffer_len()
};
Ok(Response::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
Ok(Packet::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
}
// Ignore any echo replies.
Icmpv4Repr::EchoReply { .. } => Ok(Response::Nop),
Icmpv4Repr::EchoReply { .. } => Ok(Packet::None),
// FIXME: do something correct here?
_ => Err(Error::Unrecognized),
@ -290,7 +355,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
fn process_udp<'frame>(sockets: &mut SocketSet,
ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
Result<Response<'frame>> {
Result<Packet<'frame>> {
let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
let udp_packet = UdpPacket::new_checked(ip_payload)?;
let udp_repr = UdpRepr::parse(&udp_packet, &src_addr, &dst_addr)?;
@ -299,7 +364,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
<Socket as AsSocket<UdpSocket>>::try_as_socket) {
match udp_socket.process(&ip_repr, &udp_repr) {
// The packet is valid and handled by socket.
Ok(()) => return Ok(Response::Nop),
Ok(()) => return Ok(Packet::None),
// The packet isn't addressed to the socket.
Err(Error::Rejected) => continue,
// The packet is malformed, or addressed to the socket but cannot be accepted.
@ -321,7 +386,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
protocol: IpProtocol::Icmp,
payload_len: icmpv4_reply_repr.buffer_len()
};
Ok(Response::Icmpv4(ipv4_reply_repr, icmpv4_reply_repr))
Ok(Packet::Icmpv4(ipv4_reply_repr, icmpv4_reply_repr))
},
IpRepr::Unspecified { .. } |
IpRepr::__Nonexhaustive =>
@ -331,7 +396,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
fn process_tcp<'frame>(sockets: &mut SocketSet, timestamp: u64,
ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
Result<Response<'frame>> {
Result<Packet<'frame>> {
let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
let tcp_packet = TcpPacket::new_checked(ip_payload)?;
let tcp_repr = TcpRepr::parse(&tcp_packet, &src_addr, &dst_addr)?;
@ -340,7 +405,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
<Socket as AsSocket<TcpSocket>>::try_as_socket) {
match tcp_socket.process(timestamp, &ip_repr, &tcp_repr) {
// The packet is valid and handled by socket.
Ok(reply) => return Ok(reply.map_or(Response::Nop, Response::Tcp)),
Ok(reply) => return Ok(reply.map_or(Packet::None, Packet::Tcp)),
// The packet isn't addressed to the socket.
// Send RST only if no other socket accepts the packet.
Err(Error::Rejected) => continue,
@ -351,48 +416,16 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
if tcp_repr.control == TcpControl::Rst {
// Never reply to a TCP RST packet with another TCP RST packet.
Ok(Response::Nop)
Ok(Packet::None)
} else {
// The packet wasn't handled by a socket, send a TCP RST packet.
Ok(Response::Tcp(TcpSocket::rst_reply(&ip_repr, &tcp_repr)))
Ok(Packet::Tcp(TcpSocket::rst_reply(&ip_repr, &tcp_repr)))
}
}
fn dispatch(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
let mut limits = self.device.limits();
limits.max_transmission_unit -= EthernetFrame::<&[u8]>::header_len();
let mut nothing_to_transmit = true;
for socket in sockets.iter_mut() {
let result = match socket {
&mut Socket::Raw(ref mut socket) =>
socket.dispatch(|response|
self.dispatch_response(timestamp, Response::Raw(response))),
&mut Socket::Udp(ref mut socket) =>
socket.dispatch(|response|
self.dispatch_response(timestamp, Response::Udp(response))),
&mut Socket::Tcp(ref mut socket) =>
socket.dispatch(timestamp, &limits, |response|
self.dispatch_response(timestamp, Response::Tcp(response))),
&mut Socket::__Nonexhaustive => unreachable!()
};
match result {
Ok(()) => {
nothing_to_transmit = false;
break
}
Err(Error::Exhausted) => continue,
Err(e) => return Err(e)
}
}
Ok(nothing_to_transmit)
}
fn dispatch_response(&mut self, timestamp: u64, response: Response) -> Result<()> {
match response {
Response::Arp(arp_repr) => {
fn dispatch(&mut self, timestamp: u64, packet: Packet) -> Result<()> {
match packet {
Packet::Arp(arp_repr) => {
let dst_hardware_addr =
match arp_repr {
ArpRepr::EthernetIpv4 { target_hardware_addr, .. } => target_hardware_addr,
@ -407,29 +440,29 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
arp_repr.emit(&mut packet);
})
},
Response::Icmpv4(ipv4_repr, icmpv4_repr) => {
Packet::Icmpv4(ipv4_repr, icmpv4_repr) => {
self.dispatch_ip(timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
icmpv4_repr.emit(&mut Icmpv4Packet::new(payload));
})
}
Response::Raw((ip_repr, raw_packet)) => {
Packet::Raw((ip_repr, raw_packet)) => {
self.dispatch_ip(timestamp, ip_repr, |_ip_repr, payload| {
payload.copy_from_slice(raw_packet);
})
}
Response::Udp((ip_repr, udp_repr)) => {
Packet::Udp((ip_repr, udp_repr)) => {
self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
udp_repr.emit(&mut UdpPacket::new(payload),
&ip_repr.src_addr(), &ip_repr.dst_addr());
})
}
Response::Tcp((ip_repr, tcp_repr)) => {
Packet::Tcp((ip_repr, tcp_repr)) => {
self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
tcp_repr.emit(&mut TcpPacket::new(payload),
&ip_repr.src_addr(), &ip_repr.dst_addr());
})
}
Response::Nop => Ok(())
Packet::None => Ok(())
}
}
@ -488,13 +521,8 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
where F: FnOnce(IpRepr, &mut [u8]) {
let ip_repr = ip_repr.lower(&self.protocol_addrs)?;
// FIXME: use plain try! here once we don't have the horrible nothing_to_transmit hack.
let dst_hardware_addr =
self.lookup_hardware_addr(timestamp, &ip_repr.src_addr(), &ip_repr.dst_addr());
if let Err(Error::Unaddressable) = dst_hardware_addr {
return Ok(())
}
let dst_hardware_addr = dst_hardware_addr?;
self.lookup_hardware_addr(timestamp, &ip_repr.src_addr(), &ip_repr.dst_addr())?;
self.dispatch_ethernet(timestamp, ip_repr.total_len(), |mut frame| {
frame.set_dst_addr(dst_hardware_addr);

View File

@ -11,21 +11,21 @@ macro_rules! net_log_enabled {
}
macro_rules! net_trace {
($($arg:expr),*) => {
($($arg:expr),*) => {{
#[cfg(feature = "log")]
trace!($($arg),*);
#[cfg(not(feature = "log"))]
$( let _ = $arg );*; // suppress unused variable warnings
}
}}
}
macro_rules! net_debug {
($($arg:expr),*) => {
($($arg:expr),*) => {{
#[cfg(feature = "log")]
debug!($($arg),*);
#[cfg(not(feature = "log"))]
$( let _ = $arg );*; // suppress unused variable warnings
}
}}
}
macro_rules! enum_with_unknown {

View File

@ -119,6 +119,9 @@ mod raw_socket;
#[cfg(all(feature = "tap_interface", target_os = "linux"))]
mod tap_interface;
#[cfg(any(feature = "raw_socket", feature = "tap_interface"))]
pub use self::sys::wait;
pub use self::tracer::Tracer;
pub use self::fault_injector::FaultInjector;
pub use self::pcap_writer::{PcapLinkType, PcapMode, PcapSink, PcapWriter};

View File

@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};
use Result;
use super::{sys, DeviceLimits, Device};
@ -13,6 +14,12 @@ pub struct RawSocket {
mtu: usize
}
impl AsRawFd for RawSocket {
fn as_raw_fd(&self) -> RawFd {
self.lower.borrow().as_raw_fd()
}
}
impl RawSocket {
/// Creates a raw socket, bound to the interface called `name`.
///

View File

@ -1,5 +1,6 @@
use libc;
use std::io;
use std::{mem, ptr, io};
use std::os::unix::io::RawFd;
#[cfg(target_os = "linux")]
#[path = "linux.rs"]
@ -15,6 +16,34 @@ pub use self::raw_socket::RawSocketDesc;
#[cfg(all(feature = "tap_interface", target_os = "linux"))]
pub use self::tap_interface::TapInterfaceDesc;
/// Wait until given file descriptor becomes readable, but no longer than given timeout.
pub fn wait(fd: RawFd, millis: Option<u64>) -> io::Result<()> {
unsafe {
let mut readfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut readfds);
libc::FD_SET(fd, &mut readfds);
let mut writefds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut writefds);
let mut exceptfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut exceptfds);
let mut timeout = libc::timeval { tv_sec: 0, tv_usec: 0 };
let timeout_ptr =
if let Some(millis) = millis {
timeout.tv_usec = (millis * 1_000) as libc::suseconds_t;
&mut timeout as *mut _
} else {
ptr::null_mut()
};
let res = libc::select(fd + 1, &mut readfds, &mut writefds, &mut exceptfds, timeout_ptr);
if res == -1 { return Err(io::Error::last_os_error()) }
Ok(())
}
}
#[repr(C)]
#[derive(Debug)]
struct ifreq {

View File

@ -1,5 +1,6 @@
use libc;
use std::{mem, io};
use std::os::unix::io::{RawFd, AsRawFd};
use libc;
use super::*;
#[derive(Debug)]
@ -8,10 +9,16 @@ pub struct RawSocketDesc {
ifreq: ifreq
}
impl AsRawFd for RawSocketDesc {
fn as_raw_fd(&self) -> RawFd {
self.lower
}
}
impl RawSocketDesc {
pub fn new(name: &str) -> io::Result<RawSocketDesc> {
let lower = unsafe {
let lower = libc::socket(libc::AF_PACKET, libc::SOCK_RAW,
let lower = libc::socket(libc::AF_PACKET, libc::SOCK_RAW | libc::SOCK_NONBLOCK,
imp::ETH_P_ALL.to_be() as i32);
if lower == -1 { return Err(io::Error::last_os_error()) }
lower

View File

@ -1,20 +1,25 @@
use std::mem;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};
use libc;
use super::*;
#[cfg(target_os = "linux")]
#[derive(Debug)]
pub struct TapInterfaceDesc {
lower: libc::c_int,
ifreq: ifreq
}
impl AsRawFd for TapInterfaceDesc {
fn as_raw_fd(&self) -> RawFd {
self.lower
}
}
impl TapInterfaceDesc {
pub fn new(name: &str) -> io::Result<TapInterfaceDesc> {
let lower = unsafe {
let lower = libc::open("/dev/net/tun\0".as_ptr() as *const libc::c_char,
libc::O_RDWR);
libc::O_RDWR | libc::O_NONBLOCK);
if lower == -1 { return Err(io::Error::last_os_error()) }
lower
};
@ -44,28 +49,7 @@ impl TapInterfaceDesc {
mtu
}
fn wait(&mut self, ms: u32) -> io::Result<bool> {
unsafe {
let mut readfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut readfds);
libc::FD_SET(self.lower, &mut readfds);
let mut writefds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut writefds);
let mut exceptfds = mem::uninitialized::<libc::fd_set>();
libc::FD_ZERO(&mut exceptfds);
let mut timeout = libc::timeval { tv_sec: 0, tv_usec: (ms * 1_000) as libc::suseconds_t };
let res = libc::select(self.lower + 1, &mut readfds, &mut writefds, &mut exceptfds,
&mut timeout);
if res == -1 { return Err(io::Error::last_os_error()) }
Ok(res == 0)
}
}
pub fn recv(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
// FIXME: here we don't wait forever, in case we need to send several packets in a row
// ideally this would be implemented by going full nonblocking
if self.wait(100)? { return Err(io::ErrorKind::TimedOut)? }
unsafe {
let len = libc::read(self.lower, buffer.as_mut_ptr() as *mut libc::c_void,
buffer.len());
@ -75,8 +59,6 @@ impl TapInterfaceDesc {
}
pub fn send(&mut self, buffer: &[u8]) -> io::Result<usize> {
self.wait(100)?;
unsafe {
let len = libc::write(self.lower, buffer.as_ptr() as *const libc::c_void,
buffer.len());

View File

@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};
use {Error, Result};
use super::{sys, DeviceLimits, Device};
@ -13,6 +14,12 @@ pub struct TapInterface {
mtu: usize
}
impl AsRawFd for TapInterface {
fn as_raw_fd(&self) -> RawFd {
self.lower.borrow().as_raw_fd()
}
}
impl TapInterface {
/// Attaches to a TAP interface called `name`, or creates it if it does not exist.
///
@ -49,10 +56,10 @@ impl Device for TapInterface {
buffer.resize(size, 0);
Ok(buffer)
}
Err(ref err) if err.kind() == io::ErrorKind::TimedOut => {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
Err(Error::Exhausted)
}
Err(err) => panic!(err)
Err(err) => panic!("{}", err)
}
}

View File

@ -78,6 +78,10 @@ impl<'a, 'b> Socket<'a, 'b> {
pub fn set_debug_id(&mut self, id: usize) {
dispatch_socket!(self, |socket [mut]| socket.set_debug_id(id))
}
pub(crate) fn poll_at(&self) -> Option<u64> {
dispatch_socket!(self, |socket []| socket.poll_at())
}
}
/// A conversion trait for network sockets.

View File

@ -199,24 +199,33 @@ impl<'a, 'b> RawSocket<'a, 'b> {
}
}
let mut packet_buf = self.tx_buffer.dequeue()?;
match prepare(self.ip_protocol, packet_buf.as_mut()) {
Ok((ip_repr, raw_packet)) => {
net_trace!("[{}]:{}:{}: sending {} octets",
self.debug_id, self.ip_version, self.ip_protocol,
ip_repr.buffer_len() + raw_packet.len());
emit((ip_repr, raw_packet))
}
Err(error) => {
net_debug!("[{}]:{}:{}: dropping outgoing packet ({})",
self.debug_id, self.ip_version, self.ip_protocol,
error);
// This case is a bit special because in every other socket, no matter what data
// is put into the socket, it can be sent, but it's possible to put data into
// a raw socket that may not be, and we're generic over the result type, so
// we can't possibly return Ok(()) here.
Err(Error::Rejected)
let debug_id = self.debug_id;
let ip_protocol = self.ip_protocol;
let ip_version = self.ip_version;
self.tx_buffer.try_dequeue(|packet_buf| {
match prepare(ip_protocol, packet_buf.as_mut()) {
Ok((ip_repr, raw_packet)) => {
net_trace!("[{}]:{}:{}: sending {} octets",
debug_id, ip_version, ip_protocol,
ip_repr.buffer_len() + raw_packet.len());
emit((ip_repr, raw_packet))
}
Err(error) => {
net_debug!("[{}]:{}:{}: dropping outgoing packet ({})",
debug_id, ip_version, ip_protocol,
error);
// Return Ok(()) so the packet is dequeued.
Ok(())
}
}
})
}
pub(crate) fn poll_at(&self) -> Option<u64> {
if self.tx_buffer.empty() {
None
} else {
Some(0)
}
}
}
@ -285,13 +294,13 @@ mod test {
assert_eq!(ip_payload, &PACKET_PAYLOAD);
Err(Error::Unaddressable)
}), Err(Error::Unaddressable));
/*assert!(!socket.can_send());*/
assert!(!socket.can_send());
assert_eq!(socket.dispatch(|(ip_repr, ip_payload)| {
assert_eq!(ip_repr, HEADER_REPR);
assert_eq!(ip_payload, &PACKET_PAYLOAD);
Ok(())
}), /*Ok(())*/ Err(Error::Exhausted));
}), Ok(()));
assert!(socket.can_send());
}
@ -304,14 +313,14 @@ mod test {
assert_eq!(socket.send_slice(&wrong_version[..]), Ok(()));
assert_eq!(socket.dispatch(|_| unreachable!()),
Err(Error::Rejected));
Ok(()));
let mut wrong_protocol = PACKET_BYTES.clone();
Ipv4Packet::new(&mut wrong_protocol).set_protocol(IpProtocol::Tcp);
assert_eq!(socket.send_slice(&wrong_protocol[..]), Ok(()));
assert_eq!(socket.dispatch(|_| unreachable!()),
Err(Error::Rejected));
Ok(()));
}
#[test]

View File

@ -200,6 +200,14 @@ impl Timer {
}
}
fn poll_at(&self) -> Option<u64> {
match *self {
Timer::Idle => None,
Timer::Retransmit { expires_at, .. } => Some(expires_at),
Timer::Close { expires_at, .. } => Some(expires_at),
}
}
fn reset(&mut self) {
*self = Timer::Idle
}
@ -1256,6 +1264,16 @@ impl<'a> TcpSocket<'a> {
Ok(())
}
pub(crate) fn poll_at(&self) -> Option<u64> {
self.timer.poll_at().or_else(|| {
if self.tx_buffer.empty() {
None
} else {
Some(0)
}
})
}
}
impl<'a> fmt::Write for TcpSocket<'a> {
@ -2835,14 +2853,14 @@ mod test {
limits.max_burst_size = None;
s.send_slice(b"abcdef").unwrap();
s.dispatch(0, &limits, |(ip_repr, tcp_repr)| {
s.dispatch(0, &limits, |(_ip_repr, tcp_repr)| {
assert_eq!(tcp_repr.window_len, 32767);
Ok(())
}).unwrap();
limits.max_burst_size = Some(4);
s.send_slice(b"abcdef").unwrap();
s.dispatch(0, &limits, |(ip_repr, tcp_repr)| {
s.dispatch(0, &limits, |(_ip_repr, tcp_repr)| {
assert_eq!(tcp_repr.window_len, 5920);
Ok(())
}).unwrap();

View File

@ -196,23 +196,34 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
let packet_buf = self.tx_buffer.dequeue()?;
net_trace!("[{}]{}:{}: sending {} octets",
self.debug_id, self.endpoint,
packet_buf.endpoint, packet_buf.size);
let debug_id = self.debug_id;
let endpoint = self.endpoint;
self.tx_buffer.try_dequeue(|packet_buf| {
net_trace!("[{}]{}:{}: sending {} octets",
debug_id, endpoint,
packet_buf.endpoint, packet_buf.size);
let repr = UdpRepr {
src_port: self.endpoint.port,
dst_port: packet_buf.endpoint.port,
payload: &packet_buf.as_ref()[..]
};
let ip_repr = IpRepr::Unspecified {
src_addr: self.endpoint.addr,
dst_addr: packet_buf.endpoint.addr,
protocol: IpProtocol::Udp,
payload_len: repr.buffer_len()
};
emit((ip_repr, repr))
let repr = UdpRepr {
src_port: endpoint.port,
dst_port: packet_buf.endpoint.port,
payload: &packet_buf.as_ref()[..]
};
let ip_repr = IpRepr::Unspecified {
src_addr: endpoint.addr,
dst_addr: packet_buf.endpoint.addr,
protocol: IpProtocol::Udp,
payload_len: repr.buffer_len()
};
emit((ip_repr, repr))
})
}
pub(crate) fn poll_at(&self) -> Option<u64> {
if self.tx_buffer.empty() {
None
} else {
Some(0)
}
}
}
@ -310,13 +321,13 @@ mod test {
assert_eq!(udp_repr, LOCAL_UDP_REPR);
Err(Error::Unaddressable)
}), Err(Error::Unaddressable));
/*assert!(!socket.can_send());*/
assert!(!socket.can_send());
assert_eq!(socket.dispatch(|(ip_repr, udp_repr)| {
assert_eq!(ip_repr, LOCAL_IP_REPR);
assert_eq!(udp_repr, LOCAL_UDP_REPR);
Ok(())
}), /*Ok(())*/ Err(Error::Exhausted));
}), Ok(()));
assert!(socket.can_send());
}

View File

@ -61,7 +61,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
/// return `Err(Error::Exhausted)` if the buffer is full.
pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
where F: Fn(&'b mut T) -> Result<R> {
where F: FnOnce(&'b mut T) -> Result<R> {
if self.full() { return Err(Error::Exhausted) }
let index = self.mask(self.read_at + self.length);
@ -88,7 +88,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
/// return `Err(Error::Exhausted)` if the buffer is empty.
pub fn try_dequeue<'b, R, F>(&'b mut self, f: F) -> Result<R>
where F: Fn(&'b mut T) -> Result<R> {
where F: FnOnce(&'b mut T) -> Result<R> {
if self.empty() { return Err(Error::Exhausted) }
let next_at = self.incr(self.read_at);