Redesign the phy::Device trait to avoid Drop impls.

This commit is contained in:
Philipp Oppermann 2017-11-03 19:15:07 -04:00 committed by whitequark
parent b64257812b
commit 6a8e21cec0
10 changed files with 616 additions and 468 deletions

View File

@ -1,14 +1,17 @@
extern crate smoltcp;
use std::env;
use smoltcp::phy::{Device, RawSocket};
use smoltcp::phy::{Device, RxToken, RawSocket};
use smoltcp::wire::{PrettyPrinter, EthernetFrame};
fn main() {
let ifname = env::args().nth(1).unwrap();
let mut socket = RawSocket::new(ifname.as_ref()).unwrap();
loop {
let buffer = socket.receive(/*timestamp=*/0).unwrap();
print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer))
let (rx_token, _) = socket.receive().unwrap();
rx_token.consume(/*timestamp = */ 0, |buffer| {
print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer));
Ok(())
}).unwrap();
}
}

View File

@ -92,7 +92,7 @@ pub fn add_middleware_options(opts: &mut Options, _free: &mut Vec<&str>) {
opts.optopt("", "shaping-interval", "Sets the interval for rate limiting (ms)", "RATE");
}
pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loopback: bool)
pub fn parse_middleware_options<D: for<'a> Device<'a>>(matches: &mut Matches, device: D, loopback: bool)
-> FaultInjector<EthernetTracer<PcapWriter<D, Rc<PcapSink>>>> {
let drop_chance = matches.opt_str("drop-chance").map(|s| u8::from_str(&s).unwrap())
.unwrap_or(0);

View File

@ -5,7 +5,7 @@ use core::cmp;
use managed::{Managed, ManagedSlice};
use {Error, Result};
use phy::Device;
use phy::{Device, DeviceCapabilities, RxToken, TxToken};
use wire::{EthernetAddress, EthernetProtocol, EthernetFrame};
use wire::{Ipv4Address};
use wire::{IpAddress, IpProtocol, IpRepr, IpCidr};
@ -25,12 +25,24 @@ use super::ArpCache;
/// The network interface logically owns a number of other data structures; to avoid
/// a dependency on heap allocation, it instead owns a `BorrowMut<[T]>`, which can be
/// a `&mut [T]`, or `Vec<T>` if a heap is available.
pub struct Interface<'a, 'b, 'c, DeviceT: Device + 'a> {
device: Managed<'a, DeviceT>,
arp_cache: Managed<'b, ArpCache>,
ethernet_addr: EthernetAddress,
ip_addrs: ManagedSlice<'c, IpCidr>,
ipv4_gateway: Option<Ipv4Address>,
pub struct Interface<'a, 'b, 'c, DeviceT: for<'d> Device<'d> + 'a> {
device: Managed<'a, DeviceT>,
inner: InterfaceInner<'b, 'c>,
}
/// The device independent part of an Ethernet network interface.
///
/// Separating the device from the data required for prorcessing and dispatching makes
/// it possible to borrow them independently. For example, the tx and rx tokens borrow
/// the `device` mutably until they're used, which makes it impossible to call other
/// methods on the `Interface` in this time (since its `device` field is borrowed
/// exclusively). However, it is still possible to call methods on its `inner` field.
struct InterfaceInner<'b, 'c> {
arp_cache: Managed<'b, ArpCache>,
ethernet_addr: EthernetAddress,
ip_addrs: ManagedSlice<'c, IpCidr>,
ipv4_gateway: Option<Ipv4Address>,
device_capabilities: DeviceCapabilities,
}
#[derive(Debug, PartialEq)]
@ -46,7 +58,8 @@ enum Packet<'a> {
Tcp((IpRepr, TcpRepr<'a>))
}
impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
impl<'a, 'b, 'c, DeviceT> Interface<'a, 'b, 'c, DeviceT>
where DeviceT: for<'d> Device<'d> + 'a {
/// Create a network interface using the provided network device.
///
/// # Panics
@ -63,24 +76,24 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
ProtocolAddrsMT: Into<ManagedSlice<'c, IpCidr>>,
Ipv4GatewayAddrT: Into<Option<Ipv4Address>>, {
let device = device.into();
let arp_cache = arp_cache.into();
let ip_addrs = ip_addrs.into();
let ipv4_gateway = ipv4_gateway.into();
InterfaceInner::check_ethernet_addr(&ethernet_addr);
InterfaceInner::check_ip_addrs(&ip_addrs);
Self::check_ethernet_addr(&ethernet_addr);
Self::check_ip_addrs(&ip_addrs);
Interface { device, arp_cache, ethernet_addr, ip_addrs, ipv4_gateway }
}
let inner = InterfaceInner {
ethernet_addr,
ip_addrs,
arp_cache: arp_cache.into(),
ipv4_gateway: ipv4_gateway.into(),
device_capabilities: device.capabilities(),
};
fn check_ethernet_addr(addr: &EthernetAddress) {
if addr.is_multicast() {
panic!("Ethernet address {} is not unicast", addr)
}
Interface { device, inner }
}
/// Get the Ethernet address of the interface.
pub fn ethernet_addr(&self) -> EthernetAddress {
self.ethernet_addr
self.inner.ethernet_addr
}
/// Set the Ethernet address of the interface.
@ -88,21 +101,13 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
/// # Panics
/// This function panics if the address is not unicast.
pub fn set_ethernet_addr(&mut self, addr: EthernetAddress) {
self.ethernet_addr = addr;
Self::check_ethernet_addr(&self.ethernet_addr);
}
fn check_ip_addrs(addrs: &[IpCidr]) {
for cidr in addrs {
if !cidr.address().is_unicast() {
panic!("IP address {} is not unicast", cidr.address())
}
}
self.inner.ethernet_addr = addr;
InterfaceInner::check_ethernet_addr(&self.inner.ethernet_addr);
}
/// Get the IP addresses of the interface.
pub fn ip_addrs(&self) -> &[IpCidr] {
self.ip_addrs.as_ref()
self.inner.ip_addrs.as_ref()
}
/// Update the IP addresses of the interface.
@ -110,25 +115,24 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
/// # Panics
/// This function panics if any of the addresses is not unicast.
pub fn update_ip_addrs<F: FnOnce(&mut ManagedSlice<'c, IpCidr>)>(&mut self, f: F) {
f(&mut self.ip_addrs);
Self::check_ip_addrs(&self.ip_addrs)
f(&mut self.inner.ip_addrs);
InterfaceInner::check_ip_addrs(&self.inner.ip_addrs)
}
/// Check whether the interface has the given IP address assigned.
pub fn has_ip_addr<T: Into<IpAddress>>(&self, addr: T) -> bool {
let addr = addr.into();
self.ip_addrs.iter().any(|probe| probe.address() == addr)
self.inner.has_ip_addr(addr)
}
/// Get the IPv4 gateway of the interface.
pub fn ipv4_gateway(&self) -> Option<Ipv4Address> {
self.ipv4_gateway
self.inner.ipv4_gateway
}
/// Set the IPv4 gateway of the interface.
pub fn set_ipv4_gateway<GatewayAddrT>(&mut self, gateway: GatewayAddrT)
where GatewayAddrT: Into<Option<Ipv4Address>> {
self.ipv4_gateway = gateway.into();
self.inner.ipv4_gateway = gateway.into();
}
/// Transmit packets queued in the given sockets, and receive packets queued
@ -162,38 +166,16 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
}
fn icmpv4_reply<'frame, 'icmp: 'frame>(&self,
ipv4_repr: Ipv4Repr,
icmp_repr: Icmpv4Repr<'icmp>)
-> Packet<'frame> {
if ipv4_repr.dst_addr.is_unicast() {
let ipv4_reply_repr = Ipv4Repr {
src_addr: ipv4_repr.dst_addr,
dst_addr: ipv4_repr.src_addr,
protocol: IpProtocol::Icmp,
payload_len: icmp_repr.buffer_len(),
ttl: 64
};
Packet::Icmpv4(ipv4_reply_repr, icmp_repr)
} else {
// Do not send Protocol Unreachable ICMPv4 error responses to datagrams
// with a broadcast destination address.
Packet::None
}
}
fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
let mut processed_any = false;
loop {
let frame =
match self.device.receive(timestamp) {
Ok(frame) => frame,
Err(Error::Exhausted) => break, // nothing to receive
Err(err) => return Err(err)
};
let response =
self.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
let &mut Self { ref mut device, ref mut inner } = self;
let (rx_token, tx_token) = match device.receive() {
None => break,
Some(tokens) => tokens,
};
let dispatch_result = rx_token.consume(timestamp, |frame| {
let response = inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
net_debug!("cannot process ingress packet: {}", err);
if net_log_enabled!(debug) {
match EthernetFrame::new_checked(frame.as_ref()) {
@ -207,9 +189,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
err
})?;
processed_any = true;
processed_any = true;
self.dispatch(timestamp, response).map_err(|err| {
inner.dispatch(tx_token, timestamp, response)
});
dispatch_result.map_err(|err| {
net_debug!("cannot dispatch response packet: {}", err);
err
})?;
@ -223,24 +207,28 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
for mut socket in sockets.iter_mut() {
let mut device_result = Ok(());
let &mut Self { ref mut device, ref mut inner } = self;
let socket_result =
match *socket {
#[cfg(feature = "proto-raw")]
Socket::Raw(ref mut socket) =>
socket.dispatch(|response| {
device_result = self.dispatch(timestamp, Packet::Raw(response));
let tx_token = device.transmit().ok_or(Error::Exhausted)?;
device_result = inner.dispatch(tx_token, timestamp, Packet::Raw(response));
device_result
}, &caps.checksum),
#[cfg(feature = "proto-udp")]
Socket::Udp(ref mut socket) =>
socket.dispatch(|response| {
device_result = self.dispatch(timestamp, Packet::Udp(response));
let tx_token = device.transmit().ok_or(Error::Exhausted)?;
device_result = inner.dispatch(tx_token, timestamp, Packet::Udp(response));
device_result
}),
#[cfg(feature = "proto-tcp")]
Socket::Tcp(ref mut socket) =>
socket.dispatch(timestamp, &caps, |response| {
device_result = self.dispatch(timestamp, Packet::Tcp(response));
let tx_token = device.transmit().ok_or(Error::Exhausted)?;
device_result = inner.dispatch(tx_token, timestamp, Packet::Tcp(response));
device_result
}),
Socket::__Nonexhaustive(_) => unreachable!()
@ -256,14 +244,35 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
(Ok(()), Ok(())) => ()
}
}
Ok(())
}
}
impl<'b, 'c> InterfaceInner<'b, 'c> {
fn check_ethernet_addr(addr: &EthernetAddress) {
if addr.is_multicast() {
panic!("Ethernet address {} is not unicast", addr)
}
}
fn check_ip_addrs(addrs: &[IpCidr]) {
for cidr in addrs {
if !cidr.address().is_unicast() {
panic!("IP address {} is not unicast", cidr.address())
}
}
}
/// Check whether the interface has the given IP address assigned.
fn has_ip_addr<T: Into<IpAddress>>(&self, addr: T) -> bool {
let addr = addr.into();
self.ip_addrs.iter().any(|probe| probe.address() == addr)
}
fn process_ethernet<'frame, T: AsRef<[u8]>>
(&mut self, sockets: &mut SocketSet, timestamp: u64,
frame: &'frame T) ->
Result<Packet<'frame>> {
(&mut self, sockets: &mut SocketSet, timestamp: u64, frame: &'frame T) ->
Result<Packet<'frame>>
{
let eth_frame = EthernetFrame::new_checked(frame)?;
// Ignore any packets not directed to our hardware address.
@ -284,7 +293,8 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
fn process_arp<'frame, T: AsRef<[u8]>>
(&mut self, eth_frame: &EthernetFrame<&'frame T>) ->
Result<Packet<'frame>> {
Result<Packet<'frame>>
{
let arp_packet = ArpPacket::new_checked(eth_frame.payload())?;
let arp_repr = ArpRepr::parse(&arp_packet)?;
@ -324,9 +334,10 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
fn process_ipv4<'frame, T: AsRef<[u8]>>
(&mut self, sockets: &mut SocketSet, _timestamp: u64,
eth_frame: &EthernetFrame<&'frame T>) ->
Result<Packet<'frame>> {
Result<Packet<'frame>>
{
let ipv4_packet = Ipv4Packet::new_checked(eth_frame.payload())?;
let checksum_caps = self.device.capabilities().checksum;
let checksum_caps = self.device_capabilities.checksum.clone();
let ipv4_repr = Ipv4Repr::parse(&ipv4_packet, &checksum_caps)?;
if !ipv4_repr.src_addr.is_unicast() {
@ -386,7 +397,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
_ => {
// Send back as much of the original payload as we can
let payload_len = cmp::min(
ip_payload.len(), self.device.capabilities().max_transmission_unit);
ip_payload.len(), self.device_capabilities.max_transmission_unit);
let icmp_reply_repr = Icmpv4Repr::DstUnreachable {
reason: Icmpv4DstUnreachable::ProtoUnreachable,
header: ipv4_repr,
@ -398,9 +409,10 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
fn process_icmpv4<'frame>(&self, ipv4_repr: Ipv4Repr, ip_payload: &'frame [u8]) ->
Result<Packet<'frame>> {
Result<Packet<'frame>>
{
let icmp_packet = Icmpv4Packet::new_checked(ip_payload)?;
let checksum_caps = self.device.capabilities().checksum;
let checksum_caps = self.device_capabilities.checksum.clone();
let icmp_repr = Icmpv4Repr::parse(&icmp_packet, &checksum_caps)?;
match icmp_repr {
@ -422,13 +434,33 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
}
fn icmpv4_reply<'frame, 'icmp: 'frame>
(&self, ipv4_repr: Ipv4Repr, icmp_repr: Icmpv4Repr<'icmp>) ->
Packet<'frame>
{
if ipv4_repr.dst_addr.is_unicast() {
let ipv4_reply_repr = Ipv4Repr {
src_addr: ipv4_repr.dst_addr,
dst_addr: ipv4_repr.src_addr,
protocol: IpProtocol::Icmp,
payload_len: icmp_repr.buffer_len(),
ttl: 64
};
Packet::Icmpv4(ipv4_reply_repr, icmp_repr)
} else {
// Do not send any ICMP replies to a broadcast destination address.
Packet::None
}
}
#[cfg(feature = "proto-udp")]
fn process_udp<'frame>(&self, sockets: &mut SocketSet,
ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
Result<Packet<'frame>> {
Result<Packet<'frame>>
{
let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
let udp_packet = UdpPacket::new_checked(ip_payload)?;
let checksum_caps = self.device.capabilities().checksum;
let checksum_caps = self.device_capabilities.checksum.clone();
let udp_repr = UdpRepr::parse(&udp_packet, &src_addr, &dst_addr, &checksum_caps)?;
for mut udp_socket in sockets.iter_mut().filter_map(UdpSocket::downcast) {
@ -447,7 +479,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
IpRepr::Ipv4(ipv4_repr) => {
// Send back as much of the original payload as we can
let payload_len = cmp::min(
ip_payload.len(), self.device.capabilities().max_transmission_unit);
ip_payload.len(), self.device_capabilities.max_transmission_unit);
let icmpv4_reply_repr = Icmpv4Repr::DstUnreachable {
reason: Icmpv4DstUnreachable::PortUnreachable,
header: ipv4_repr,
@ -464,10 +496,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
#[cfg(feature = "proto-tcp")]
fn process_tcp<'frame>(&self, sockets: &mut SocketSet, timestamp: u64,
ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
Result<Packet<'frame>> {
Result<Packet<'frame>>
{
let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
let tcp_packet = TcpPacket::new_checked(ip_payload)?;
let checksum_caps = self.device.capabilities().checksum;
let checksum_caps = self.device_capabilities.checksum.clone();
let tcp_repr = TcpRepr::parse(&tcp_packet, &src_addr, &dst_addr, &checksum_caps)?;
for mut tcp_socket in sockets.iter_mut().filter_map(TcpSocket::downcast) {
@ -491,8 +524,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
}
fn dispatch(&mut self, timestamp: u64, packet: Packet) -> Result<()> {
let checksum_caps = self.device.capabilities().checksum;
fn dispatch<Tx>(&mut self, tx_token: Tx, timestamp: u64,
packet: Packet) -> Result<()>
where Tx: TxToken
{
let checksum_caps = self.device_capabilities.checksum.clone();
match packet {
Packet::Arp(arp_repr) => {
let dst_hardware_addr =
@ -501,7 +537,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
_ => unreachable!()
};
self.dispatch_ethernet(timestamp, arp_repr.buffer_len(), |mut frame| {
self.dispatch_ethernet(tx_token, timestamp, arp_repr.buffer_len(), |mut frame| {
frame.set_dst_addr(dst_hardware_addr);
frame.set_ethertype(EthernetProtocol::Arp);
@ -510,19 +546,19 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
})
},
Packet::Icmpv4(ipv4_repr, icmpv4_repr) => {
self.dispatch_ip(timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
self.dispatch_ip(tx_token, timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
icmpv4_repr.emit(&mut Icmpv4Packet::new(payload), &checksum_caps);
})
}
#[cfg(feature = "proto-raw")]
Packet::Raw((ip_repr, raw_packet)) => {
self.dispatch_ip(timestamp, ip_repr, |_ip_repr, payload| {
self.dispatch_ip(tx_token, timestamp, ip_repr, |_ip_repr, payload| {
payload.copy_from_slice(raw_packet);
})
}
#[cfg(feature = "proto-udp")]
Packet::Udp((ip_repr, udp_repr)) => {
self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
self.dispatch_ip(tx_token, timestamp, ip_repr, |ip_repr, payload| {
udp_repr.emit(&mut UdpPacket::new(payload),
&ip_repr.src_addr(), &ip_repr.dst_addr(),
&checksum_caps);
@ -530,8 +566,8 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
#[cfg(feature = "proto-tcp")]
Packet::Tcp((ip_repr, mut tcp_repr)) => {
let caps = self.device.capabilities();
self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
let caps = self.device_capabilities.clone();
self.dispatch_ip(tx_token, timestamp, ip_repr, |ip_repr, payload| {
// This is a terrible hack to make TCP performance more acceptable on systems
// where the TCP buffers are significantly larger than network buffers,
// e.g. a 64 kB TCP receive buffer (and so, when empty, a 64k window)
@ -560,18 +596,20 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
}
fn dispatch_ethernet<F>(&mut self, timestamp: u64, buffer_len: usize, f: F) -> Result<()>
where F: FnOnce(EthernetFrame<&mut [u8]>) {
fn dispatch_ethernet<Tx, F>(&mut self, tx_token: Tx, timestamp: u64,
buffer_len: usize, f: F) -> Result<()>
where Tx: TxToken, F: FnOnce(EthernetFrame<&mut [u8]>)
{
let tx_len = EthernetFrame::<&[u8]>::buffer_len(buffer_len);
let mut tx_buffer = self.device.transmit(timestamp, tx_len)?;
debug_assert!(tx_buffer.as_ref().len() == tx_len);
tx_token.consume(timestamp, tx_len, |tx_buffer| {
debug_assert!(tx_buffer.as_ref().len() == tx_len);
let mut frame = EthernetFrame::new(tx_buffer.as_mut());
frame.set_src_addr(self.ethernet_addr);
let mut frame = EthernetFrame::new(tx_buffer.as_mut());
frame.set_src_addr(self.ethernet_addr);
f(frame);
f(frame);
Ok(())
Ok(())
})
}
fn route(&self, addr: &IpAddress) -> Result<IpAddress> {
@ -590,17 +628,19 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
})
}
fn lookup_hardware_addr(&mut self, timestamp: u64,
src_addr: &IpAddress, dst_addr: &IpAddress) ->
Result<EthernetAddress> {
fn lookup_hardware_addr<Tx>(&mut self, tx_token: Tx, timestamp: u64,
src_addr: &IpAddress, dst_addr: &IpAddress) ->
Result<(EthernetAddress, Tx)>
where Tx: TxToken
{
let dst_addr = self.route(dst_addr)?;
if let Some(hardware_addr) = self.arp_cache.lookup(&dst_addr) {
return Ok(hardware_addr)
return Ok((hardware_addr,tx_token))
}
if dst_addr.is_broadcast() {
return Ok(EthernetAddress::BROADCAST)
return Ok((EthernetAddress::BROADCAST, tx_token))
}
match (src_addr, dst_addr) {
@ -616,7 +656,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
target_protocol_addr: dst_addr,
};
self.dispatch_ethernet(timestamp, arp_repr.buffer_len(), |mut frame| {
self.dispatch_ethernet(tx_token, timestamp, arp_repr.buffer_len(), |mut frame| {
frame.set_dst_addr(EthernetAddress::BROADCAST);
frame.set_ethertype(EthernetProtocol::Arp);
@ -629,15 +669,18 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
}
}
fn dispatch_ip<F>(&mut self, timestamp: u64, ip_repr: IpRepr, f: F) -> Result<()>
where F: FnOnce(IpRepr, &mut [u8]) {
fn dispatch_ip<Tx, F>(&mut self, tx_token: Tx, timestamp: u64,
ip_repr: IpRepr, f: F) -> Result<()>
where Tx: TxToken, F: FnOnce(IpRepr, &mut [u8])
{
let ip_repr = ip_repr.lower(&self.ip_addrs)?;
let checksum_caps = self.device.capabilities().checksum;
let checksum_caps = self.device_capabilities.checksum.clone();
let dst_hardware_addr =
self.lookup_hardware_addr(timestamp, &ip_repr.src_addr(), &ip_repr.dst_addr())?;
let (dst_hardware_addr, tx_token) =
self.lookup_hardware_addr(tx_token, timestamp,
&ip_repr.src_addr(), &ip_repr.dst_addr())?;
self.dispatch_ethernet(timestamp, ip_repr.total_len(), |mut frame| {
self.dispatch_ethernet(tx_token, timestamp, ip_repr.total_len(), |mut frame| {
frame.set_dst_addr(dst_hardware_addr);
match ip_repr {
IpRepr::Ipv4(_) => frame.set_ethertype(EthernetProtocol::Ipv4),
@ -656,7 +699,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
mod test {
use std::boxed::Box;
use super::Packet;
use phy::{Loopback, ChecksumCapabilities};
use phy::{self, Loopback, ChecksumCapabilities};
use wire::{ArpOperation, ArpPacket, ArpRepr};
use wire::{EthernetAddress, EthernetFrame, EthernetProtocol};
use wire::{IpAddress, IpCidr, IpProtocol, IpRepr};
@ -665,6 +708,7 @@ mod test {
use wire::{UdpPacket, UdpRepr};
use iface::{ArpCache, SliceArpCache, EthernetInterface};
use socket::SocketSet;
use {Result, Error};
fn create_loopback<'a, 'b>() ->
(EthernetInterface<'a, 'static, 'b, Loopback>, SocketSet<'static, 'a, 'b>) {
@ -679,6 +723,16 @@ mod test {
EthernetAddress::default(), [ip_addr], None), SocketSet::new(vec![]))
}
#[derive(Debug, PartialEq)]
struct MockTxToken;
impl phy::TxToken for MockTxToken {
fn consume<R, F>(self, _: u64, _: usize, _: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R> {
Err(Error::__Nonexhaustive)
}
}
#[test]
fn no_icmp_to_broadcast() {
let (mut iface, mut socket_set) = create_loopback();
@ -710,7 +764,7 @@ mod test {
// Ensure that the unknown protocol frame does not trigger an
// ICMP error response when the destination address is a
// broadcast address
assert_eq!(iface.process_ipv4(&mut socket_set, 0, &frame),
assert_eq!(iface.inner.process_ipv4(&mut socket_set, 0, &frame),
Ok(Packet::None));
}
@ -767,7 +821,7 @@ mod test {
// Ensure that the unknown protocol triggers an error response.
// And we correctly handle no payload.
assert_eq!(iface.process_ipv4(&mut socket_set, 0, &frame),
assert_eq!(iface.inner.process_ipv4(&mut socket_set, 0, &frame),
Ok(expected_repr));
}
@ -832,7 +886,7 @@ mod test {
// Ensure that the unknown protocol triggers an error response.
// And we correctly handle no payload.
assert_eq!(iface.process_udp(&mut socket_set, ip_repr, data),
assert_eq!(iface.inner.process_udp(&mut socket_set, ip_repr, data),
Ok(expected_repr));
let ip_repr = IpRepr::Ipv4(Ipv4Repr {
@ -851,8 +905,8 @@ mod test {
// Ensure that the port unreachable error does not trigger an
// ICMP error response when the destination address is a
// broadcast address
assert_eq!(iface.process_udp(&mut socket_set, ip_repr, packet_broadcast.into_inner()),
Ok(Packet::None));
assert_eq!(iface.inner.process_udp(&mut socket_set, ip_repr,
packet_broadcast.into_inner()), Ok(Packet::None));
}
@ -885,7 +939,7 @@ mod test {
}
// Ensure an ARP Request for us triggers an ARP Reply
assert_eq!(iface.process_ethernet(&mut socket_set, 0, frame.into_inner()),
assert_eq!(iface.inner.process_ethernet(&mut socket_set, 0, frame.into_inner()),
Ok(Packet::Arp(ArpRepr::EthernetIpv4 {
operation: ArpOperation::Reply,
source_hardware_addr: local_hw_addr,
@ -895,10 +949,9 @@ mod test {
})));
// Ensure the address of the requestor was entered in the cache
assert_eq!(iface.lookup_hardware_addr(0,
&IpAddress::Ipv4(local_ip_addr),
&IpAddress::Ipv4(remote_ip_addr)),
Ok(remote_hw_addr));
assert_eq!(iface.inner.lookup_hardware_addr(MockTxToken, 0,
&IpAddress::Ipv4(local_ip_addr), &IpAddress::Ipv4(remote_ip_addr)),
Ok((remote_hw_addr, MockTxToken)));
}
#[test]
@ -928,13 +981,13 @@ mod test {
}
// Ensure an ARP Request for someone else does not trigger an ARP Reply
assert_eq!(iface.process_ethernet(&mut socket_set, 0, frame.into_inner()),
assert_eq!(iface.inner.process_ethernet(&mut socket_set, 0, frame.into_inner()),
Ok(Packet::None));
// Ensure the address of the requestor was entered in the cache
assert_eq!(iface.lookup_hardware_addr(0,
&IpAddress::Ipv4(Ipv4Address([0x7f, 0x00, 0x00, 0x01])),
&IpAddress::Ipv4(remote_ip_addr)),
Ok(remote_hw_addr));
assert_eq!(iface.inner.lookup_hardware_addr(MockTxToken, 0,
&IpAddress::Ipv4(Ipv4Address([0x7f, 0x00, 0x00, 0x01])),
&IpAddress::Ipv4(remote_ip_addr)),
Ok((remote_hw_addr, MockTxToken)));
}
}

View File

@ -1,5 +1,7 @@
use core::cell::RefCell;
use {Error, Result};
use super::{DeviceCapabilities, Device};
use phy::{self, DeviceCapabilities, Device};
// We use our own RNG to stay compatible with #![no_std].
// The use of the RNG below has a slight bias, but it doesn't matter.
@ -26,7 +28,7 @@ struct Config {
interval: u64,
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
struct State {
rng_seed: u32,
refilled_at: u64,
@ -86,13 +88,13 @@ impl State {
/// adverse network conditions (such as random packet loss or corruption), or software
/// or hardware limitations (such as a limited number or size of usable network buffers).
#[derive(Debug)]
pub struct FaultInjector<D: Device> {
inner: D,
state: State,
config: Config
pub struct FaultInjector<D: for<'a> Device<'a>> {
inner: D,
state: RefCell<State>,
config: Config,
}
impl<D: Device> FaultInjector<D> {
impl<D: for<'a> Device<'a>> FaultInjector<D> {
/// Create a fault injector device, using the given random number generator seed.
pub fn new(inner: D, seed: u32) -> FaultInjector<D> {
let state = State {
@ -103,8 +105,8 @@ impl<D: Device> FaultInjector<D> {
};
FaultInjector {
inner: inner,
state: state,
config: Config::default()
state: RefCell::new(state),
config: Config::default(),
}
}
@ -178,15 +180,16 @@ impl<D: Device> FaultInjector<D> {
/// Set the interval for packet rate limiting, in milliseconds.
pub fn set_bucket_interval(&mut self, interval: u64) {
self.state.refilled_at = 0;
self.state.borrow_mut().refilled_at = 0;
self.config.interval = interval
}
}
impl<D: Device> Device for FaultInjector<D>
where D::RxBuffer: AsMut<[u8]> {
type RxBuffer = D::RxBuffer;
type TxBuffer = TxBuffer<D::TxBuffer>;
impl<'a, D> Device<'a> for FaultInjector<D>
where D: for<'b> Device<'b>,
{
type RxToken = RxToken<'a, <D as Device<'a>>::RxToken>;
type TxToken = TxToken<'a, <D as Device<'a>>::TxToken>;
fn capabilities(&self) -> DeviceCapabilities {
let mut caps = self.inner.capabilities();
@ -196,88 +199,109 @@ impl<D: Device> Device for FaultInjector<D>
caps
}
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
let mut buffer = self.inner.receive(timestamp)?;
if self.state.maybe(self.config.drop_pct) {
net_trace!("rx: randomly dropping a packet");
return Err(Error::Exhausted)
}
if self.state.maybe(self.config.corrupt_pct) {
net_trace!("rx: randomly corrupting a packet");
self.state.corrupt(&mut buffer)
}
if self.config.max_size > 0 && buffer.as_ref().len() > self.config.max_size {
net_trace!("rx: dropping a packet that is too large");
return Err(Error::Exhausted)
}
if !self.state.maybe_receive(&self.config, timestamp) {
net_trace!("rx: dropping a packet because of rate limiting");
return Err(Error::Exhausted)
}
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let &mut Self { ref mut inner, ref state, config } = self;
inner.receive().map(|(rx_token, tx_token)| {
let rx = RxToken {
state: &state,
config: config,
token: rx_token,
corrupt: [0; MTU],
};
let tx = TxToken {
state: &state,
config: config,
token: tx_token,
junk: [0; MTU],
};
(rx, tx)
})
}
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let buffer;
if self.state.maybe(self.config.drop_pct) {
net_trace!("tx: randomly dropping a packet");
buffer = None;
} else if self.config.max_size > 0 && length > self.config.max_size {
net_trace!("tx: dropping a packet that is too large");
buffer = None;
} else if !self.state.maybe_transmit(&self.config, timestamp) {
net_trace!("tx: dropping a packet because of rate limiting");
buffer = None;
} else {
buffer = Some(self.inner.transmit(timestamp, length)?);
}
Ok(TxBuffer {
buffer: buffer,
state: self.state.clone(),
config: self.config,
fn transmit(&'a mut self) -> Option<Self::TxToken> {
let &mut Self { ref mut inner, ref state, config } = self;
inner.transmit().map(|token| TxToken {
state: &state,
config: config,
token: token,
junk: [0; MTU],
length: length
})
}
}
#[doc(hidden)]
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>> {
state: State,
pub struct RxToken<'a, Rx: phy::RxToken> {
state: &'a RefCell<State>,
config: Config,
token: Rx,
corrupt: [u8; MTU],
}
impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
if self.state.borrow_mut().maybe(self.config.drop_pct) {
net_trace!("rx: randomly dropping a packet");
return Err(Error::Exhausted)
}
if !self.state.borrow_mut().maybe_receive(&self.config, timestamp) {
net_trace!("rx: dropping a packet because of rate limiting");
return Err(Error::Exhausted)
}
let Self { token, config, state, mut corrupt } = self;
token.consume(timestamp, |buffer| {
if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
net_trace!("rx: dropping a packet that is too large");
return Err(Error::Exhausted)
}
if state.borrow_mut().maybe(config.corrupt_pct) {
net_trace!("rx: randomly corrupting a packet");
let mut corrupt = &mut corrupt[..buffer.len()];
corrupt.copy_from_slice(buffer);
state.borrow_mut().corrupt(&mut corrupt);
f(&mut corrupt)
} else {
f(buffer)
}
})
}
}
#[doc(hidden)]
pub struct TxToken<'a, Tx: phy::TxToken> {
state: &'a RefCell<State>,
config: Config,
buffer: Option<B>,
token: Tx,
junk: [u8; MTU],
length: usize
}
impl<B: AsRef<[u8]> + AsMut<[u8]>> AsRef<[u8]> for TxBuffer<B> {
fn as_ref(&self) -> &[u8] {
match self.buffer {
Some(ref buf) => buf.as_ref(),
None => &self.junk[..self.length]
}
}
}
impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(mut self, timestamp: u64, len: usize, f: F)
-> Result<R>
{
let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) {
net_trace!("tx: randomly dropping a packet");
true
} else if self.config.max_size > 0 && len > self.config.max_size {
net_trace!("tx: dropping a packet that is too large");
true
} else if !self.state.borrow_mut().maybe_transmit(&self.config, timestamp) {
net_trace!("tx: dropping a packet because of rate limiting");
true
} else {
false
};
impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for TxBuffer<B> {
fn as_mut(&mut self) -> &mut [u8] {
match self.buffer {
Some(ref mut buf) => buf.as_mut(),
None => &mut self.junk[..self.length]
if drop {
return f(&mut self.junk);
}
}
}
impl<B: AsRef<[u8]> + AsMut<[u8]>> Drop for TxBuffer<B> {
fn drop(&mut self) {
match self.buffer {
Some(ref mut buf) => {
if self.state.maybe(self.config.corrupt_pct) {
net_trace!("tx: corrupting a packet");
self.state.corrupt(buf)
}
},
None => ()
}
let Self { token, state, config, .. } = self;
token.consume(timestamp, len, |mut buf| {
if state.borrow_mut().maybe(config.corrupt_pct) {
net_trace!("tx: corrupting a packet");
state.borrow_mut().corrupt(&mut buf)
}
f(buf)
})
}
}

View File

@ -1,9 +1,3 @@
use core::mem::swap;
use core::cell::RefCell;
#[cfg(feature = "std")]
use std::rc::Rc;
#[cfg(feature = "alloc")]
use alloc::rc::Rc;
#[cfg(feature = "std")]
use std::vec::Vec;
#[cfg(feature = "std")]
@ -11,12 +5,14 @@ use std::collections::VecDeque;
#[cfg(feature = "alloc")]
use alloc::{Vec, VecDeque};
use {Error, Result};
use super::{Device, DeviceCapabilities};
use Result;
use phy::{self, Device, DeviceCapabilities};
/// A loopback device.
#[derive(Debug)]
pub struct Loopback(Rc<RefCell<VecDeque<Vec<u8>>>>);
pub struct Loopback {
queue: VecDeque<Vec<u8>>,
}
impl Loopback {
/// Creates a loopback device.
@ -24,13 +20,15 @@ impl Loopback {
/// Every packet transmitted through this device will be received through it
/// in FIFO order.
pub fn new() -> Loopback {
Loopback(Rc::new(RefCell::new(VecDeque::new())))
Loopback {
queue: VecDeque::new(),
}
}
}
impl Device for Loopback {
type RxBuffer = Vec<u8>;
type TxBuffer = TxBuffer;
impl<'a> Device<'a> for Loopback {
type RxToken = RxToken;
type TxToken = TxToken<'a>;
fn capabilities(&self) -> DeviceCapabilities {
DeviceCapabilities {
@ -39,41 +37,45 @@ impl Device for Loopback {
}
}
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
match self.0.borrow_mut().pop_front() {
Some(packet) => Ok(packet),
None => Err(Error::Exhausted)
}
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
self.queue.pop_front().map(move |buffer| {
let rx = RxToken { buffer: buffer };
let tx = TxToken { queue: &mut self.queue };
(rx, tx)
})
}
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let mut buffer = Vec::new();
buffer.resize(length, 0);
Ok(TxBuffer {
queue: self.0.clone(),
buffer: buffer
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(TxToken {
queue: &mut self.queue,
})
}
}
#[doc(hidden)]
pub struct TxBuffer {
queue: Rc<RefCell<VecDeque<Vec<u8>>>>,
buffer: Vec<u8>
pub struct RxToken {
buffer: Vec<u8>,
}
impl AsRef<[u8]> for TxBuffer {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
}
impl AsMut<[u8]> for TxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
}
impl Drop for TxBuffer {
fn drop(&mut self) {
let mut buffer = Vec::new();
swap(&mut buffer, &mut self.buffer);
self.queue.borrow_mut().push_back(buffer)
impl phy::RxToken for RxToken {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
f(&self.buffer)
}
}
#[doc(hidden)]
pub struct TxToken<'a> {
queue: &'a mut VecDeque<Vec<u8>>,
}
impl<'a> phy::TxToken for TxToken<'a> {
fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let mut buffer = Vec::new();
buffer.resize(len, 0);
let result = f(&mut buffer);
self.queue.push_back(buffer);
result
}
}

View File

@ -19,87 +19,66 @@
//!
/*!
```rust
use std::slice;
use smoltcp::{Error, Result};
use smoltcp::phy::{DeviceCapabilities, Device};
use smoltcp::Result;
use smoltcp::phy::{self, DeviceCapabilities, Device};
const TX_BUFFERS: [*mut u8; 2] = [0x10000000 as *mut u8, 0x10001000 as *mut u8];
const RX_BUFFERS: [*mut u8; 2] = [0x10002000 as *mut u8, 0x10003000 as *mut u8];
fn rx_full() -> bool {
/* platform-specific code to check if an incoming packet has arrived */
false
struct StmPhy {
rx_buffer: [u8; 1536],
tx_buffer: [u8; 1536],
}
fn rx_setup(_buf: *mut u8, _length: &mut usize) {
/* platform-specific code to receive a packet into a buffer */
impl<'a> StmPhy {
fn new() -> StmPhy {
StmPhy {
rx_buffer: [0; 1536],
tx_buffer: [0; 1536],
}
}
}
fn tx_empty() -> bool {
/* platform-specific code to check if an outgoing packet can be sent */
false
}
impl<'a> phy::Device<'a> for StmPhy {
type RxToken = StmPhyRxToken<'a>;
type TxToken = StmPhyTxToken<'a>;
fn tx_setup(_buf: *const u8, _length: usize) {
/* platform-specific code to send a buffer with a packet */
}
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
Some((StmPhyRxToken(&mut self.rx_buffer[..]),
StmPhyTxToken(&mut self.tx_buffer[..])))
}
# #[allow(dead_code)]
pub struct EthernetDevice {
tx_next: usize,
rx_next: usize
}
impl Device for EthernetDevice {
type RxBuffer = &'static [u8];
type TxBuffer = EthernetTxBuffer;
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(StmPhyTxToken(&mut self.tx_buffer[..]))
}
fn capabilities(&self) -> DeviceCapabilities {
let mut caps = DeviceCapabilities::default();
caps.max_transmission_unit = 1536;
caps.max_burst_size = Some(2);
caps.max_burst_size = Some(1);
caps
}
}
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
if rx_full() {
let index = self.rx_next;
self.rx_next = (self.rx_next + 1) % RX_BUFFERS.len();
let mut length = 0;
rx_setup(RX_BUFFERS[self.rx_next], &mut length);
Ok(unsafe {
slice::from_raw_parts(RX_BUFFERS[index], length)
})
} else {
Err(Error::Exhausted)
}
}
struct StmPhyRxToken<'a>(&'a [u8]);
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
if tx_empty() {
let index = self.tx_next;
self.tx_next = (self.tx_next + 1) % TX_BUFFERS.len();
Ok(EthernetTxBuffer(unsafe {
slice::from_raw_parts_mut(TX_BUFFERS[index], length)
}))
} else {
Err(Error::Exhausted)
}
impl<'a> phy::RxToken for StmPhyRxToken<'a> {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
// TODO: receive packet into buffer
let result = f(self.0);
println!("rx called");
result
}
}
pub struct EthernetTxBuffer(&'static mut [u8]);
struct StmPhyTxToken<'a>(&'a mut [u8]);
impl AsRef<[u8]> for EthernetTxBuffer {
fn as_ref(&self) -> &[u8] { self.0 }
}
impl AsMut<[u8]> for EthernetTxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.0 }
}
impl Drop for EthernetTxBuffer {
fn drop(&mut self) { tx_setup(self.0.as_ptr(), self.0.len()) }
impl<'a> phy::TxToken for StmPhyTxToken<'a> {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
-> Result<R>
{
let result = f(&mut self.0[..len]);
println!("tx called {}", len);
// TODO: send packet out
result
}
}
```
*/
@ -229,27 +208,52 @@ pub struct DeviceCapabilities {
/// An interface for sending and receiving raw network frames.
///
/// It is expected that a `Device` implementation would allocate memory for both sending
/// and receiving packets from memory pools; hence, the stack borrows the buffer for a packet
/// that it is about to receive, as well for a packet that it is about to send, from the device.
pub trait Device {
type RxBuffer: AsRef<[u8]>;
type TxBuffer: AsRef<[u8]> + AsMut<[u8]>;
/// The interface is based on _tokens_, which are types that allow to receive/transmit a
/// single packet. The `receive` and `transmit` functions only construct such tokens, the
/// real sending/receiving operation are performed when the tokens are consumed.
pub trait Device<'a> {
type RxToken: RxToken + 'a;
type TxToken: TxToken + 'a;
/// Construct a token pair consisting of one receive token and one transmit token.
///
/// The additional transmit token makes it possible to generate a reply packet based
/// on the contents of the received packet. For example, this makes it possible to
/// handle arbitrarily large ICMP echo ("ping") requests, where the all received bytes
/// need to be sent back, without heap allocation.
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)>;
/// Construct a transmit token.
fn transmit(&'a mut self) -> Option<Self::TxToken>;
/// Get a description of device capabilities.
fn capabilities(&self) -> DeviceCapabilities;
/// Receive a frame.
///
/// It is expected that a `receive` implementation, once a packet is written to memory
/// through DMA, would gain ownership of the underlying buffer, provide it for parsing,
/// and return it to the network device once it is dropped.
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer>;
/// Transmit a frame.
///
/// It is expected that a `transmit` implementation would gain ownership of a buffer with
/// the requested length, provide it for emission, and schedule it to be read from
/// memory by the network device once it is dropped.
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer>;
}
/// A token to receive a single network packet.
pub trait RxToken {
/// Consumes the token to receive a single network packet.
///
/// This method receives a packet and then calls the given closure `f` with the raw
/// packet bytes as argument.
///
/// The timestamp must be a number of milliseconds, monotonically increasing since an
/// arbitrary moment in time, such as system startup.
fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
where F: FnOnce(&[u8]) -> Result<R>;
}
/// A token to transmit a single network packet.
pub trait TxToken {
/// Consumes the token to send a single network packet.
///
/// This method constructs a transmit buffer of size `len` and calls the passed
/// closure `f` with a mutable reference to that buffer. The closure should construct
/// a valid network packet (e.g. an ethernet packet) in the buffer. When the closure
/// returns, the transmit buffer is sent out.
///
/// The timestamp must be a number of milliseconds, monotonically increasing since an
/// arbitrary moment in time, such as system startup.
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>;
}

View File

@ -5,7 +5,7 @@ use std::io::Write;
use byteorder::{ByteOrder, NativeEndian};
use Result;
use super::{DeviceCapabilities, Device};
use phy::{self, DeviceCapabilities, Device};
enum_with_unknown! {
/// Captured packet header type.
@ -114,13 +114,16 @@ impl<T: AsMut<Write>> PcapSink for RefCell<T> {
/// [libpcap]: https://wiki.wireshark.org/Development/LibpcapFileFormat
/// [sink]: trait.PcapSink.html
#[derive(Debug)]
pub struct PcapWriter<D: Device, S: PcapSink + Clone> {
pub struct PcapWriter<D, S>
where D: for<'a> Device<'a>,
S: PcapSink + Clone,
{
lower: D,
sink: S,
mode: PcapMode
mode: PcapMode,
}
impl<D: Device, S: PcapSink + Clone> PcapWriter<D, S> {
impl<D: for<'a> Device<'a>, S: PcapSink + Clone> PcapWriter<D, S> {
/// Creates a packet capture writer.
pub fn new(lower: D, sink: S, mode: PcapMode, link_type: PcapLinkType) -> PcapWriter<D, S> {
sink.global_header(link_type);
@ -128,53 +131,73 @@ impl<D: Device, S: PcapSink + Clone> PcapWriter<D, S> {
}
}
impl<D: Device, S: PcapSink + Clone> Device for PcapWriter<D, S> {
type RxBuffer = D::RxBuffer;
type TxBuffer = TxBuffer<D::TxBuffer, S>;
impl<'a, D, S> Device<'a> for PcapWriter<D, S>
where D: for<'b> Device<'b>,
S: PcapSink + Clone + 'a,
{
type RxToken = RxToken<<D as Device<'a>>::RxToken, S>;
type TxToken = TxToken<<D as Device<'a>>::TxToken, S>;
fn capabilities(&self) -> DeviceCapabilities { self.lower.capabilities() }
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
let buffer = self.lower.receive(timestamp)?;
match self.mode {
PcapMode::Both | PcapMode::RxOnly =>
self.sink.packet(timestamp, buffer.as_ref()),
PcapMode::TxOnly => ()
}
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let &mut Self { ref mut lower, ref sink, mode, .. } = self;
lower.receive().map(|(rx_token, tx_token)| {
let rx = RxToken { token: rx_token, sink: sink.clone(), mode: mode };
let tx = TxToken { token: tx_token, sink: sink.clone(), mode: mode };
(rx, tx)
})
}
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let buffer = self.lower.transmit(timestamp, length)?;
Ok(TxBuffer { buffer, timestamp, sink: self.sink.clone(), mode: self.mode })
fn transmit(&'a mut self) -> Option<Self::TxToken> {
let &mut Self { ref mut lower, ref sink, mode } = self;
lower.transmit().map(|token| {
TxToken { token, sink: sink.clone(), mode: mode }
})
}
}
#[doc(hidden)]
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink> {
buffer: B,
timestamp: u64,
sink: S,
mode: PcapMode
pub struct RxToken<Rx: phy::RxToken, S: PcapSink> {
token: Rx,
sink: S,
mode: PcapMode,
}
impl<B, S> AsRef<[u8]> for TxBuffer<B, S>
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
}
impl<B, S> AsMut<[u8]> for TxBuffer<B, S>
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
}
impl<B, S> Drop for TxBuffer<B, S>
where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
fn drop(&mut self) {
match self.mode {
PcapMode::Both | PcapMode::TxOnly =>
self.sink.packet(self.timestamp, self.as_ref()),
PcapMode::RxOnly => ()
}
impl<Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<Rx, S> {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
let Self { token, sink, mode } = self;
token.consume(timestamp, |buffer| {
match mode {
PcapMode::Both | PcapMode::RxOnly =>
sink.packet(timestamp, buffer.as_ref()),
PcapMode::TxOnly => ()
}
f(buffer)
})
}
}
#[doc(hidden)]
pub struct TxToken<Tx: phy::TxToken, S: PcapSink> {
token: Tx,
sink: S,
mode: PcapMode
}
impl<Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<Tx, S> {
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let Self { token, sink, mode } = self;
token.consume(timestamp, len, |buffer| {
let result = f(buffer);
match mode {
PcapMode::Both | PcapMode::TxOnly =>
sink.packet(timestamp, &buffer),
PcapMode::RxOnly => ()
};
result
})
}
}

View File

@ -1,11 +1,10 @@
use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};
use Result;
use super::{sys, DeviceCapabilities, Device};
use phy::{self, sys, DeviceCapabilities, Device};
/// A socket that captures or transmits the complete frame.
#[derive(Debug)]
@ -36,9 +35,9 @@ impl RawSocket {
}
}
impl Device for RawSocket {
type RxBuffer = Vec<u8>;
type TxBuffer = TxBuffer;
impl<'a> Device<'a> for RawSocket {
type RxToken = RxToken;
type TxToken = TxToken;
fn capabilities(&self) -> DeviceCapabilities {
DeviceCapabilities {
@ -47,39 +46,48 @@ impl Device for RawSocket {
}
}
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; self.mtu];
let size = lower.recv(&mut buffer[..]).unwrap();
buffer.resize(size, 0);
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
let tx = TxToken { lower: self.lower.clone() };
Some((rx, tx))
}
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
Ok(TxBuffer {
lower: self.lower.clone(),
buffer: vec![0; length]
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(TxToken {
lower: self.lower.clone(),
})
}
}
#[doc(hidden)]
pub struct TxBuffer {
pub struct RxToken {
lower: Rc<RefCell<sys::RawSocketDesc>>,
buffer: Vec<u8>
mtu: usize,
}
impl AsRef<[u8]> for TxBuffer {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
}
impl AsMut<[u8]> for TxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
}
impl Drop for TxBuffer {
fn drop(&mut self) {
impl phy::RxToken for RxToken {
fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
let mut lower = self.lower.borrow_mut();
lower.send(&mut self.buffer[..]).unwrap();
let mut buffer = vec![0; self.mtu];
let size = lower.recv(&mut buffer[..]).unwrap();
buffer.resize(size, 0);
f(&mut buffer)
}
}
#[doc(hidden)]
pub struct TxToken {
lower: Rc<RefCell<sys::RawSocketDesc>>,
}
impl phy::TxToken for TxToken {
fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
-> Result<R>
{
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; len];
let result = f(&mut buffer);
lower.send(&mut buffer[..]).unwrap();
result
}
}

View File

@ -1,11 +1,10 @@
use std::cell::RefCell;
use std::vec::Vec;
use std::rc::Rc;
use std::io;
use std::os::unix::io::{RawFd, AsRawFd};
use {Error, Result};
use super::{sys, DeviceCapabilities, Device};
use phy::{self, sys, DeviceCapabilities, Device};
/// A virtual Ethernet interface.
#[derive(Debug)]
@ -37,9 +36,9 @@ impl TapInterface {
}
}
impl Device for TapInterface {
type RxBuffer = Vec<u8>;
type TxBuffer = TxBuffer;
impl<'a> Device<'a> for TapInterface {
type RxToken = RxToken;
type TxToken = TxToken;
fn capabilities(&self) -> DeviceCapabilities {
DeviceCapabilities {
@ -48,13 +47,35 @@ impl Device for TapInterface {
}
}
fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
let tx = TxToken { lower: self.lower.clone(), };
Some((rx, tx))
}
fn transmit(&'a mut self) -> Option<Self::TxToken> {
Some(TxToken {
lower: self.lower.clone(),
})
}
}
#[doc(hidden)]
pub struct RxToken {
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
mtu: usize,
}
impl phy::RxToken for RxToken {
fn consume<R, F>(self, _timestamp: u64, f: F) -> Result<R>
where F: FnOnce(&[u8]) -> Result<R>
{
let mut lower = self.lower.borrow_mut();
let mut buffer = vec![0; self.mtu];
match lower.recv(&mut buffer[..]) {
Ok(size) => {
buffer.resize(size, 0);
Ok(buffer)
f(&buffer)
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
Err(Error::Exhausted)
@ -62,32 +83,21 @@ impl Device for TapInterface {
Err(err) => panic!("{}", err)
}
}
fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
Ok(TxBuffer {
lower: self.lower.clone(),
buffer: vec![0; length]
})
}
}
#[doc(hidden)]
pub struct TxBuffer {
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
buffer: Vec<u8>
pub struct TxToken {
lower: Rc<RefCell<sys::TapInterfaceDesc>>,
}
impl AsRef<[u8]> for TxBuffer {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
}
impl AsMut<[u8]> for TxBuffer {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
}
impl Drop for TxBuffer {
fn drop(&mut self) {
impl phy::TxToken for TxToken {
fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let mut lower = self.lower.borrow_mut();
lower.send(&mut self.buffer[..]).unwrap();
let mut buffer = vec![0; len];
let result = f(&mut buffer);
lower.send(&mut buffer[..]).unwrap();
result
}
}

View File

@ -1,24 +1,21 @@
use Result;
use wire::pretty_print::{PrettyPrint, PrettyPrinter};
use super::{DeviceCapabilities, Device};
use phy::{self, DeviceCapabilities, Device};
/// A tracer device.
///
/// A tracer is a device that pretty prints all packets traversing it
/// using the provided writer function, and then passes them to another
/// device.
pub struct Tracer<D: Device, P: PrettyPrint> {
inner: D,
writer: fn(u64, PrettyPrinter<P>)
pub struct Tracer<D: for<'a> Device<'a>, P: PrettyPrint> {
inner: D,
writer: fn(u64, PrettyPrinter<P>),
}
impl<D: Device, P: PrettyPrint> Tracer<D, P> {
impl<D: for<'a> Device<'a>, P: PrettyPrint> Tracer<D, P> {
/// Create a tracer device.
pub fn new(inner: D, writer: fn(timestamp: u64, printer: PrettyPrinter<P>)) -> Tracer<D, P> {
Tracer {
inner: inner,
writer: writer
}
Tracer { inner, writer }
}
/// Return the underlying device, consuming the tracer.
@ -27,41 +24,65 @@ impl<D: Device, P: PrettyPrint> Tracer<D, P> {
}
}
impl<D: Device, P: PrettyPrint> Device for Tracer<D, P> {
type RxBuffer = D::RxBuffer;
type TxBuffer = TxBuffer<D::TxBuffer, P>;
impl<'a, D, P> Device<'a> for Tracer<D, P>
where D: for<'b> Device<'b>,
P: PrettyPrint + 'a,
{
type RxToken = RxToken<<D as Device<'a>>::RxToken, P>;
type TxToken = TxToken<<D as Device<'a>>::TxToken, P>;
fn capabilities(&self) -> DeviceCapabilities { self.inner.capabilities() }
fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
let buffer = self.inner.receive(timestamp)?;
(self.writer)(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
Ok(buffer)
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let &mut Self { ref mut inner, writer, .. } = self;
inner.receive().map(|(rx_token, tx_token)| {
let rx = RxToken { token: rx_token, writer: writer };
let tx = TxToken { token: tx_token, writer: writer };
(rx, tx)
})
}
fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
let buffer = self.inner.transmit(timestamp, length)?;
Ok(TxBuffer { buffer, timestamp, writer: self.writer })
fn transmit(&'a mut self) -> Option<Self::TxToken> {
let &mut Self { ref mut inner, writer } = self;
inner.transmit().map(|tx_token| {
TxToken { token: tx_token, writer: writer }
})
}
}
#[doc(hidden)]
pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> {
buffer: B,
timestamp: u64,
pub struct RxToken<Rx: phy::RxToken, P: PrettyPrint> {
token: Rx,
writer: fn(u64, PrettyPrinter<P>)
}
impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsRef<[u8]> for TxBuffer<B, P> {
fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
}
impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsMut<[u8]> for TxBuffer<B, P> {
fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
}
impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> Drop for TxBuffer<B, P> {
fn drop(&mut self) {
(self.writer)(self.timestamp, PrettyPrinter::<P>::new("-> ", &self.buffer));
impl<Rx: phy::RxToken, P: PrettyPrint> phy::RxToken for RxToken<Rx, P> {
fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
where F: FnOnce(&[u8]) -> Result<R>
{
let Self { token, writer } = self;
token.consume(timestamp, |buffer| {
writer(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
f(buffer)
})
}
}
#[doc(hidden)]
pub struct TxToken<Tx: phy::TxToken, P: PrettyPrint> {
token: Tx,
writer: fn(u64, PrettyPrinter<P>)
}
impl<Tx: phy::TxToken, P: PrettyPrint> phy::TxToken for TxToken<Tx, P> {
fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
where F: FnOnce(&mut [u8]) -> Result<R>
{
let Self { token, writer } = self;
token.consume(timestamp, len, |buffer| {
let result = f(buffer);
writer(timestamp, PrettyPrinter::<P>::new("-> ", &buffer));
result
})
}
}