Significantly simplify buffering.

v0.7.x
whitequark 2016-12-15 17:07:56 +00:00
parent 3fb5f04b07
commit d1d910b46d
5 changed files with 171 additions and 149 deletions

View File

@ -5,7 +5,7 @@ use std::env;
use smoltcp::phy::{Tracer, TapInterface};
use smoltcp::wire::{EthernetFrame, EthernetAddress, InternetAddress, InternetEndpoint};
use smoltcp::iface::{SliceArpCache, EthernetInterface};
use smoltcp::socket::{Socket, UdpSocket, UdpUnitaryBuffer};
use smoltcp::socket::{Socket, UdpSocket, UdpBuffer, UdpBufferElem};
fn main() {
let ifname = env::args().nth(1).unwrap();
@ -20,8 +20,8 @@ fn main() {
let listen_address = InternetAddress::ipv4([0, 0, 0, 0]);
let endpoint = InternetEndpoint::new(listen_address, 6969);
let udp_rx_buffer = UdpUnitaryBuffer::new(vec![0; 2048]);
let udp_tx_buffer = UdpUnitaryBuffer::new(vec![0; 2048]);
let udp_rx_buffer = UdpBuffer::new([UdpBufferElem::new(vec![0; 2048])]);
let udp_tx_buffer = UdpBuffer::new([UdpBufferElem::new(vec![0; 2048])]);
let mut udp_socket = UdpSocket::new(endpoint, udp_rx_buffer, udp_tx_buffer);
let mut sockets: [&mut Socket; 1] = [&mut udp_socket];

View File

@ -17,7 +17,7 @@ pub mod iface;
pub mod socket;
/// The error type for the networking stack.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Error {
/// An incoming packet could not be parsed, or an outgoing packet could not be emitted
/// because a field was out of bounds for the underlying buffer.

View File

@ -9,10 +9,6 @@
//! the operating system decides on the good size for a buffer and manages it.
//! The interface implemented by this module uses explicit buffering: you decide on the good
//! size for a buffer, allocate it, and let the networking stack use it.
//!
//! Every socket implementation allows selecting transmit and receive buffers separately;
//! this means that, for example, a socket that never receives data does not have to allocate
//! any storage to receive buffers.
use Error;
use wire::{InternetAddress as Address, InternetProtocolType as ProtocolType};
@ -20,8 +16,7 @@ use wire::{InternetAddress as Address, InternetProtocolType as ProtocolType};
mod udp;
pub use self::udp::Buffer as UdpBuffer;
pub use self::udp::NullBuffer as UdpNullBuffer;
pub use self::udp::UnitaryBuffer as UdpUnitaryBuffer;
pub use self::udp::BufferElem as UdpBufferElem;
pub use self::udp::UdpSocket as UdpSocket;
/// A packet representation.

View File

@ -1,3 +1,4 @@
use core::marker::PhantomData;
use core::borrow::BorrowMut;
use Error;
@ -6,104 +7,95 @@ use wire::{InternetEndpoint as Endpoint};
use wire::{UdpPacket, UdpRepr};
use socket::{Socket, PacketRepr};
/// A packet buffer.
///
/// The packet buffer interface allows enqueueing and dequeueing separate packets.
/// A packet is a sequence of octets and its associated endpoint.
pub trait Buffer {
/// Enqueue a packet.
///
/// This function allocates a sequence of octets the given size and associates
/// the given endpoint with it, then calls `f`; if the buffer is full, it
/// returns `Err(Error::Exhausted)` instead.
fn enqueue<R, F>(&mut self, endpoint: Endpoint, size: usize, f: F) -> Result<R, Error>
where F: FnOnce(&mut [u8]) -> Result<R, Error>;
/// Dequeue a packet.
///
/// This function calls `f` with the oldest enqueued packet; if the buffer is empty,
/// it returns `Err(Error::Exhausted)` instead.
fn dequeue<R, F>(&mut self, f: F) -> Result<R, Error>
where F: FnOnce(Endpoint, &[u8]) -> Result<R, Error>;
}
/// A packet buffer that does not have any storage.
///
/// The null buffer rejects enqueue and dequeue operations with `Error::Exhausted`.
pub struct NullBuffer(());
impl NullBuffer {
/// Create a null packet buffer.
pub fn new() -> NullBuffer {
NullBuffer(())
}
}
impl Buffer for NullBuffer {
fn enqueue<R, F>(&mut self, _endpoint: Endpoint, _size: usize, _f: F) -> Result<R, Error>
where F: FnOnce(&mut [u8]) -> Result<R, Error> {
Err(Error::Exhausted)
}
fn dequeue<R, F>(&mut self, _f: F) -> Result<R, Error>
where F: FnOnce(Endpoint, &[u8]) -> Result<R, Error> {
Err(Error::Exhausted)
}
}
/// A packet buffer that only stores, at most, a single packet.
///
/// The unitary buffer uses a provided slice to store no more than one packet at any time.
/// If there is an enqueued packet, or if the requested size is larger than the storage size,
/// the unitary rejects the enqueue operation with `Error::Exhausted`.
pub struct UnitaryBuffer<T: BorrowMut<[u8]>> {
/// A buffered UDP packet.
#[derive(Debug, Default)]
pub struct BufferElem<T: BorrowMut<[u8]>> {
endpoint: Endpoint,
storage: T,
size: usize
size: usize,
payload: T
}
impl<T: BorrowMut<[u8]>> UnitaryBuffer<T> {
/// Create an unitary packet buffer, using the given storage.
pub fn new(storage: T) -> UnitaryBuffer<T> {
UnitaryBuffer {
endpoint: Default::default(),
storage: storage,
size: 0
impl<T: BorrowMut<[u8]>> BufferElem<T> {
/// Create a buffered packet.
pub fn new(payload: T) -> BufferElem<T> {
BufferElem {
endpoint: Endpoint::INVALID,
size: 0,
payload: payload
}
}
}
impl<T: BorrowMut<[u8]>> Buffer for UnitaryBuffer<T> {
fn enqueue<R, F>(&mut self, endpoint: Endpoint, size: usize, f: F) -> Result<R, Error>
where F: FnOnce(&mut [u8]) -> Result<R, Error> {
let mut storage = self.storage.borrow_mut();
match self.endpoint {
Endpoint { addr: Address::Invalid, .. }
if size <= storage.len() => {
// If `f` fails, don't enqueue the packet.
let result = try!(f(&mut storage[..size]));
self.endpoint = endpoint;
Ok(result)
},
_ => {
Err(Error::Exhausted)
}
/// An UDP packet buffer.
#[derive(Debug)]
pub struct Buffer<
T: BorrowMut<[u8]>,
U: BorrowMut<[BufferElem<T>]>
> {
storage: U,
read_at: usize,
length: usize,
phantom: PhantomData<T>
}
impl<
T: BorrowMut<[u8]>,
U: BorrowMut<[BufferElem<T>]>
> Buffer<T, U> {
/// Create a packet buffer with the given storage.
pub fn new(mut storage: U) -> Buffer<T, U> {
for elem in storage.borrow_mut() {
elem.endpoint = Default::default();
elem.size = 0;
}
Buffer {
storage: storage,
read_at: 0,
length: 0,
phantom: PhantomData
}
}
fn dequeue<R, F>(&mut self, f: F) -> Result<R, Error>
where F: FnOnce(Endpoint, &[u8]) -> Result<R, Error> {
let storage = self.storage.borrow_mut();
match self.endpoint {
Endpoint { addr: Address::Invalid, .. } => {
Err(Error::Exhausted)
},
_ => {
// If `f` fails, still dequeue the packet.
let result = f(self.endpoint, &storage[..self.size]);
self.endpoint = Default::default();
result
}
fn mask(&self, index: usize) -> usize {
index % self.storage.borrow().len()
}
fn incr(&self, index: usize) -> usize {
self.mask(index + 1)
}
fn empty(&self) -> bool {
self.length == 0
}
fn full(&self) -> bool {
self.length == self.storage.borrow().len()
}
/// Enqueue an element into the buffer, and return a pointer to it, or return
/// `Err(Error::Exhausted)` if the buffer is full.
pub fn enqueue(&mut self) -> Result<&mut BufferElem<T>, Error> {
if self.full() {
Err(Error::Exhausted)
} else {
let index = self.mask(self.read_at + self.length);
let result = &mut self.storage.borrow_mut()[index];
self.length += 1;
Ok(result)
}
}
/// Dequeue an element from the buffer, and return a pointer to it, or return
/// `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue(&mut self) -> Result<&BufferElem<T>, Error> {
if self.empty() {
Err(Error::Exhausted)
} else {
self.length -= 1;
let result = &self.storage.borrow()[self.read_at];
self.read_at = self.incr(self.read_at);
Ok(result)
}
}
}
@ -112,17 +104,22 @@ impl<T: BorrowMut<[u8]>> Buffer for UnitaryBuffer<T> {
///
/// An UDP socket is bound to a specific endpoint, and owns transmit and receive
/// packet buffers.
pub struct UdpSocket<RxBufferT: Buffer, TxBufferT: Buffer> {
pub struct UdpSocket<
T: BorrowMut<[u8]>,
U: BorrowMut<[BufferElem<T>]>
> {
endpoint: Endpoint,
rx_buffer: RxBufferT,
tx_buffer: TxBufferT
rx_buffer: Buffer<T, U>,
tx_buffer: Buffer<T, U>
}
impl<RxBufferT: Buffer, TxBufferT: Buffer> UdpSocket<RxBufferT, TxBufferT> {
impl<
T: BorrowMut<[u8]>,
U: BorrowMut<[BufferElem<T>]>
> UdpSocket<T, U> {
/// Create an UDP socket with the given buffers.
pub fn new(endpoint: Endpoint,
rx_buffer: RxBufferT,
tx_buffer: TxBufferT) -> UdpSocket<RxBufferT, TxBufferT> {
pub fn new(endpoint: Endpoint, rx_buffer: Buffer<T, U>, tx_buffer: Buffer<T, U>)
-> UdpSocket<T, U> {
UdpSocket {
endpoint: endpoint,
rx_buffer: rx_buffer,
@ -130,42 +127,32 @@ impl<RxBufferT: Buffer, TxBufferT: Buffer> UdpSocket<RxBufferT, TxBufferT> {
}
}
/// Send a packet to a remote endpoint, without copying.
pub fn send<R, F>(&mut self, endpoint: Endpoint, size: usize, f: F) -> Result<R, Error>
where F: FnOnce(&mut [u8]) -> Result<R, Error> {
self.tx_buffer.enqueue(endpoint, size, f)
}
/// Send a packet to remote endpoint, copying the given slice to the internal buffer.
/// Enqueue a packet to be sent to a given remote endpoint, and return a pointer
/// to its payload.
///
/// This function returns `Err(Error::Exhausted)` if the slice is larger than the internal
/// buffer can accomodate.
pub fn send_slice(&mut self, endpoint: Endpoint, data: &[u8]) -> Result<(), Error> {
self.tx_buffer.enqueue(endpoint, data.len(), |buffer| {
Ok(buffer.copy_from_slice(data))
})
/// This function returns `Err(Error::Exhausted)` if the size is greater than what
/// the transmit buffer can accomodate.
pub fn send(&mut self, endpoint: Endpoint, size: usize) -> Result<&mut [u8], Error> {
let packet_buf = try!(self.tx_buffer.enqueue());
packet_buf.endpoint = endpoint;
packet_buf.size = size;
Ok(&mut packet_buf.payload.borrow_mut()[..size])
}
/// Receive a packet from a remote endpoint, without copying.
pub fn recv<R, F>(&mut self, f: F) -> Result<R, Error>
where F: FnOnce(Endpoint, &[u8]) -> Result<R, Error> {
self.rx_buffer.dequeue(f)
}
/// Receive a packet from a remote endpoint, copying the given slice to the internal buffer.
/// Dequeue a packet received from a remote endpoint, and return the endpoint as well
/// as a pointer to the payload.
///
/// This function returns `Err(Error::Exhausted)` if the slice is smaller than the packet
/// queued in the internal buffer.
pub fn recv_slice(&mut self, data: &mut [u8]) -> Result<(usize, Endpoint), Error> {
self.rx_buffer.dequeue(|endpoint, buffer| {
if data.len() < buffer.len() { return Err(Error::Exhausted) }
data[..buffer.len()].copy_from_slice(buffer);
Ok((buffer.len(), endpoint))
})
/// This function returns `Err(Error::Exhausted)` if the receive buffer is empty.
pub fn recv<R, F>(&mut self) -> Result<(Endpoint, &[u8]), Error> {
let packet_buf = try!(self.rx_buffer.dequeue());
Ok((packet_buf.endpoint, &packet_buf.payload.borrow()[..packet_buf.size]))
}
}
impl<RxBufferT: Buffer, TxBufferT: Buffer> Socket for UdpSocket<RxBufferT, TxBufferT> {
impl<
T: BorrowMut<[u8]>,
U: BorrowMut<[BufferElem<T>]>
> Socket for UdpSocket<T, U> {
fn collect(&mut self, src_addr: &Address, dst_addr: &Address,
protocol: ProtocolType, payload: &[u8])
-> Result<(), Error> {
@ -179,26 +166,25 @@ impl<RxBufferT: Buffer, TxBufferT: Buffer> Socket for UdpSocket<RxBufferT, TxBuf
if self.endpoint.addr != *dst_addr { return Err(Error::Rejected) }
}
let endpoint = Endpoint { addr: *src_addr, port: repr.src_port };
self.rx_buffer.enqueue(endpoint, repr.payload.len(), |buffer| {
Ok(buffer.copy_from_slice(repr.payload))
})
let packet_buf = try!(self.rx_buffer.enqueue());
packet_buf.endpoint = Endpoint { addr: *src_addr, port: repr.src_port };
packet_buf.size = repr.payload.len();
packet_buf.payload.borrow_mut()[..repr.payload.len()].copy_from_slice(repr.payload);
Ok(())
}
fn dispatch(&mut self, f: &mut FnMut(&Address, &Address,
ProtocolType, &PacketRepr) -> Result<(), Error>)
-> Result<(), Error> {
let src_endpoint = self.endpoint;
self.tx_buffer.dequeue(|dst_endpoint, buffer| {
f(&src_endpoint.addr,
&dst_endpoint.addr,
ProtocolType::Udp,
&UdpRepr {
src_port: src_endpoint.port,
dst_port: dst_endpoint.port,
payload: buffer
})
})
let packet_buf = try!(self.tx_buffer.dequeue());
f(&self.endpoint.addr,
&packet_buf.endpoint.addr,
ProtocolType::Udp,
&UdpRepr {
src_port: self.endpoint.port,
dst_port: packet_buf.endpoint.port,
payload: packet_buf.payload.borrow()
})
}
}
@ -212,3 +198,42 @@ impl<'a> PacketRepr for UdpRepr<'a> {
self.emit(&mut packet, src_addr, dst_addr)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
pub fn test_buffer() {
let mut storage = vec![];
for _ in 0..5 {
storage.push(BufferElem::new(vec![0]))
}
let mut buffer = Buffer::new(&mut storage[..]);
assert_eq!(buffer.empty(), true);
assert_eq!(buffer.full(), false);
buffer.enqueue().unwrap().size = 1;
assert_eq!(buffer.empty(), false);
assert_eq!(buffer.full(), false);
buffer.enqueue().unwrap().size = 2;
buffer.enqueue().unwrap().size = 3;
assert_eq!(buffer.dequeue().unwrap().size, 1);
assert_eq!(buffer.dequeue().unwrap().size, 2);
buffer.enqueue().unwrap().size = 4;
buffer.enqueue().unwrap().size = 5;
buffer.enqueue().unwrap().size = 6;
buffer.enqueue().unwrap().size = 7;
assert_eq!(buffer.enqueue().unwrap_err(), Error::Exhausted);
assert_eq!(buffer.empty(), false);
assert_eq!(buffer.full(), true);
assert_eq!(buffer.dequeue().unwrap().size, 3);
assert_eq!(buffer.dequeue().unwrap().size, 4);
assert_eq!(buffer.dequeue().unwrap().size, 5);
assert_eq!(buffer.dequeue().unwrap().size, 6);
assert_eq!(buffer.dequeue().unwrap().size, 7);
assert_eq!(buffer.dequeue().unwrap_err(), Error::Exhausted);
assert_eq!(buffer.empty(), true);
assert_eq!(buffer.full(), false);
}
}

View File

@ -84,6 +84,8 @@ pub struct Endpoint {
}
impl Endpoint {
pub const INVALID: Endpoint = Endpoint { addr: Address::Invalid, port: 0 };
/// Create an internet endpoint address.
pub fn new(addr: Address, port: u16) -> Endpoint {
Endpoint { addr: addr, port: port }