From c474d0c32e0dabcf543c79b9afc2dbc3a46fe183 Mon Sep 17 00:00:00 2001 From: whitequark Date: Thu, 22 Feb 2018 11:34:58 +0000 Subject: [PATCH] Factor out storage::PacketBuffer from socket::UdpSocket. --- examples/server.rs | 4 +- src/iface/ethernet.rs | 6 +- src/socket/mod.rs | 4 +- src/socket/udp.rs | 277 +++++------------------------------ src/storage/mod.rs | 2 + src/storage/packet_buffer.rs | 235 +++++++++++++++++++++++++++++ 6 files changed, 278 insertions(+), 250 deletions(-) create mode 100644 src/storage/packet_buffer.rs diff --git a/examples/server.rs b/examples/server.rs index 0f7229a..4c1a3d1 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -32,8 +32,8 @@ fn main() { let neighbor_cache = NeighborCache::new(BTreeMap::new()); - let udp_rx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::default()], vec![0; 64]); - let udp_tx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::default()], vec![0; 128]); + let udp_rx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 64]); + let udp_tx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 128]); let udp_socket = UdpSocket::new(udp_rx_buffer, udp_tx_buffer); let tcp1_rx_buffer = TcpSocketBuffer::new(vec![0; 64]); diff --git a/src/iface/ethernet.rs b/src/iface/ethernet.rs index 09dc13a..411067a 100644 --- a/src/iface/ethernet.rs +++ b/src/iface/ethernet.rs @@ -1365,15 +1365,15 @@ mod test { #[test] #[cfg(all(feature = "socket-udp", feature = "proto-ipv4"))] fn test_handle_udp_broadcast() { - use socket::{UdpSocket, UdpSocketBuffer}; + use socket::{UdpSocket, UdpSocketBuffer, UdpPacketMetadata}; use wire::IpEndpoint; static UDP_PAYLOAD: [u8; 5] = [0x48, 0x65, 0x6c, 0x6c, 0x6f]; let (iface, mut socket_set) = create_loopback(); - let rx_buffer = UdpSocketBuffer::new(vec![Default::default()], vec![0; 15]); - let tx_buffer = UdpSocketBuffer::new(vec![Default::default()], vec![0; 15]); + let rx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 15]); + let tx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 15]); let udp_socket = UdpSocket::new(rx_buffer, tx_buffer); diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 32a3e74..ed2035f 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -40,8 +40,8 @@ pub use self::icmp::{PacketBuffer as IcmpPacketBuffer, IcmpSocket}; #[cfg(feature = "socket-udp")] -pub use self::udp::{PacketMetadata as UdpPacketMetadata, - SocketBuffer as UdpSocketBuffer, +pub use self::udp::{UdpPacketMetadata, + UdpSocketBuffer, UdpSocket}; #[cfg(feature = "socket-tcp")] diff --git a/src/socket/udp.rs b/src/socket/udp.rs index f3657e4..29561fa 100644 --- a/src/socket/udp.rs +++ b/src/socket/udp.rs @@ -1,79 +1,14 @@ use core::cmp::min; -use managed::ManagedSlice; use {Error, Result}; use socket::{Socket, SocketMeta, SocketHandle}; -use storage::RingBuffer; +use storage::{PacketBuffer, PacketMetadata}; use time::Instant; use wire::{IpProtocol, IpRepr, IpEndpoint, UdpRepr}; -/// Endpoint and size of an UDP packet. -#[derive(Debug, Clone, Copy, Default)] -pub struct PacketMetadata { - endpoint: IpEndpoint, - size: usize, - /// Padding packets can be used to avoid wrap-arounds of packets in the payload buffer - padding: bool, -} +pub type UdpPacketMetadata = PacketMetadata; -/// An UDP packet ring buffer. -#[derive(Debug)] -pub struct SocketBuffer<'a, 'b> { - metadata_buffer: RingBuffer<'a, PacketMetadata>, - payload_buffer: RingBuffer<'b, u8>, -} - -impl<'a, 'b> SocketBuffer<'a, 'b> { - /// Create a new socket buffer with the provided metadata and payload storage. - /// - /// Metadata storage limits the maximum _number_ of UDP packets in the buffer and payload - /// storage limits the maximum _cumulated size_ of UDP packets. - pub fn new(metadata_storage: MS, payload_storage: PS) -> SocketBuffer<'a, 'b> - where MS: Into>, PS: Into>, - { - SocketBuffer { - metadata_buffer: RingBuffer::new(metadata_storage), - payload_buffer: RingBuffer::new(payload_storage), - } - } - - fn is_full(&self) -> bool { - self.metadata_buffer.is_full() || self.payload_buffer.is_full() - } - - fn is_empty(&self) -> bool { - self.metadata_buffer.is_empty() - } - - fn enqueue(&mut self, required_size: usize, endpoint: IpEndpoint) -> Result<&mut [u8]> { - let window = self.payload_buffer.window(); - let contig_window = self.payload_buffer.contiguous_window(); - - if self.metadata_buffer.is_full() || self.payload_buffer.window() < required_size { - return Err(Error::Exhausted); - } - - if contig_window < required_size { - // we reached the end of buffer, so the data does not fit without wrap-around - // -> insert padding and try again - self.payload_buffer.enqueue_many(required_size); - let metadata_buf = self.metadata_buffer.enqueue_one()?; - metadata_buf.padding = true; - metadata_buf.size = required_size; - metadata_buf.endpoint = IpEndpoint::default(); - if window - contig_window < required_size { - return Err(Error::Exhausted); - } - } - - let metadata_buf = self.metadata_buffer.enqueue_one()?; - metadata_buf.endpoint = endpoint; - metadata_buf.size = required_size; - metadata_buf.padding = false; - - Ok(self.payload_buffer.enqueue_many(required_size)) - } -} +pub type UdpSocketBuffer<'a, 'b> = PacketBuffer<'a, 'b, IpEndpoint>; /// An User Datagram Protocol socket. /// @@ -83,16 +18,16 @@ impl<'a, 'b> SocketBuffer<'a, 'b> { pub struct UdpSocket<'a, 'b: 'a> { pub(crate) meta: SocketMeta, endpoint: IpEndpoint, - rx_buffer: SocketBuffer<'a, 'b>, - tx_buffer: SocketBuffer<'a, 'b>, + rx_buffer: UdpSocketBuffer<'a, 'b>, + tx_buffer: UdpSocketBuffer<'a, 'b>, /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. hop_limit: Option } impl<'a, 'b> UdpSocket<'a, 'b> { /// Create an UDP socket with the given buffers. - pub fn new(rx_buffer: SocketBuffer<'a, 'b>, - tx_buffer: SocketBuffer<'a, 'b>) -> UdpSocket<'a, 'b> { + pub fn new(rx_buffer: UdpSocketBuffer<'a, 'b>, + tx_buffer: UdpSocketBuffer<'a, 'b>) -> UdpSocket<'a, 'b> { UdpSocket { meta: SocketMeta::default(), endpoint: IpEndpoint::default(), @@ -185,8 +120,6 @@ impl<'a, 'b> UdpSocket<'a, 'b> { let payload_buf = self.tx_buffer.enqueue(size, endpoint)?; - debug_assert_eq!(payload_buf.len(), size); - net_trace!("{}:{}:{}: buffer to send {} octets", self.meta.handle, self.endpoint, endpoint, size); Ok(payload_buf) @@ -205,21 +138,12 @@ impl<'a, 'b> UdpSocket<'a, 'b> { /// /// This function returns `Err(Error::Exhausted)` if the receive buffer is empty. pub fn recv(&mut self) -> Result<(&[u8], IpEndpoint)> { - let mut metadata_buf = *self.rx_buffer.metadata_buffer.dequeue_one()?; - if metadata_buf.padding { - // packet is padding packet -> drop it and try again - self.rx_buffer.payload_buffer.dequeue_many(metadata_buf.size); - metadata_buf = *self.rx_buffer.metadata_buffer.dequeue_one()?; - } - - debug_assert!(!metadata_buf.padding); - let payload_buf = self.rx_buffer.payload_buffer.dequeue_many(metadata_buf.size); - debug_assert_eq!(metadata_buf.size, payload_buf.len()); // ensured by inserting logic + let (endpoint, payload_buf) = self.rx_buffer.dequeue()?; net_trace!("{}:{}:{}: receive {} buffered octets", self.meta.handle, self.endpoint, - metadata_buf.endpoint, metadata_buf.size); - Ok((payload_buf, metadata_buf.endpoint)) + endpoint, payload_buf.len()); + Ok((payload_buf, endpoint)) } /// Dequeue a packet received from a remote endpoint, copy the payload into the given slice, @@ -259,50 +183,28 @@ impl<'a, 'b> UdpSocket<'a, 'b> { pub(crate) fn dispatch(&mut self, emit: F) -> Result<()> where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> { - let handle = self.handle(); - let endpoint = self.endpoint; + let handle = self.handle(); + let endpoint = self.endpoint; let hop_limit = self.hop_limit.unwrap_or(64); - let SocketBuffer { ref mut metadata_buffer, ref mut payload_buffer } = self.tx_buffer; + self.tx_buffer.dequeue_with(|remote_endpoint, payload_buf| { + net_trace!("{}:{}:{}: sending {} octets", + handle, endpoint, + endpoint, payload_buf.len()); - // dequeue potential padding packet - let result = metadata_buffer.dequeue_one_with(|metadata_buf| { - if metadata_buf.padding { - Ok(metadata_buf.size) // dequeue metadata - } else { - Err(Error::Exhausted) // don't dequeue metadata - } - }); - if let Ok(size) = result { - payload_buffer.dequeue_many(size); // dequeue padding payload - } - - metadata_buffer.dequeue_one_with(move |metadata_buf| { - debug_assert!(!metadata_buf.padding); - payload_buffer.dequeue_many_with(|payload_buf| { - let payload_buf = &payload_buf[..metadata_buf.size]; - - net_trace!("{}:{}:{}: sending {} octets", - handle, endpoint, - metadata_buf.endpoint, metadata_buf.size); - - let repr = UdpRepr { - src_port: endpoint.port, - dst_port: metadata_buf.endpoint.port, - payload: payload_buf, - }; - let ip_repr = IpRepr::Unspecified { - src_addr: endpoint.addr, - dst_addr: metadata_buf.endpoint.addr, - protocol: IpProtocol::Udp, - payload_len: repr.buffer_len(), - hop_limit: hop_limit, - }; - match emit((ip_repr, repr)) { - Ok(ret) => (metadata_buf.size, Ok(ret)), - Err(ret) => (0, Err(ret)), - } - }).1 + let repr = UdpRepr { + src_port: endpoint.port, + dst_port: remote_endpoint.port, + payload: payload_buf, + }; + let ip_repr = IpRepr::Unspecified { + src_addr: endpoint.addr, + dst_addr: remote_endpoint.addr, + protocol: IpProtocol::Udp, + payload_len: repr.buffer_len(), + hop_limit: hop_limit, + }; + emit((ip_repr, repr)) }) } @@ -331,12 +233,12 @@ mod test { use wire::ip::test::{MOCK_IP_ADDR_1, MOCK_IP_ADDR_2, MOCK_IP_ADDR_3}; use super::*; - fn buffer(packets: usize) -> SocketBuffer<'static, 'static> { - SocketBuffer::new(vec![Default::default(); packets], vec![0; 16 * packets]) + fn buffer(packets: usize) -> UdpSocketBuffer<'static, 'static> { + UdpSocketBuffer::new(vec![UdpPacketMetadata::empty(); packets], vec![0; 16 * packets]) } - fn socket(rx_buffer: SocketBuffer<'static, 'static>, - tx_buffer: SocketBuffer<'static, 'static>) + fn socket(rx_buffer: UdpSocketBuffer<'static, 'static>, + tx_buffer: UdpSocketBuffer<'static, 'static>) -> UdpSocket<'static, 'static> { UdpSocket::new(rx_buffer, tx_buffer) } @@ -559,108 +461,9 @@ mod test { assert_eq!(socket.send_slice(&too_large[..16*4], REMOTE_END), Ok(())); } - #[test] - fn test_send_wraparound_1() { - let mut socket = socket(buffer(0), buffer(3)); - assert_eq!(socket.bind(LOCAL_END), Ok(())); - - let large = b"0123456789abcdef0123456789abcdef0123456789abcdef"; - - assert_eq!(socket.send_slice(&large[..15], REMOTE_END), Ok(())); - assert_eq!(socket.send_slice(&large[..16*2], REMOTE_END), Ok(())); - // no padding should be inserted because capacity does not suffice - assert_eq!(socket.send_slice(b"12", REMOTE_END), Err(Error::Exhausted)); - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-1); - - assert_eq!(socket.dispatch(|_| Ok(())), Ok(())); - // insert padding - assert_eq!(socket.send_slice(&large[..16], REMOTE_END), Err(Error::Exhausted)); - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-15); - - assert_eq!(socket.dispatch(|_| Ok(())), Ok(())); - // packet dequed, but padding is still there - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 1); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 1); - - assert_eq!(socket.dispatch(|_| Ok(())), Err(Error::Exhausted)); - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 0); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 0); - } - - #[test] - fn test_send_wraparound_2() { - let mut socket = socket(buffer(0), buffer(3)); - assert_eq!(socket.bind(LOCAL_END), Ok(())); - - let large = b"0123456789abcdef0123456789abcdef0123456789abcdef"; - - assert_eq!(socket.send_slice(&large[..16*2], REMOTE_END), Ok(())); - assert_eq!(socket.send_slice(&large[..15], REMOTE_END), Ok(())); - // no padding should be inserted because capacity does not suffice - assert_eq!(socket.send_slice(b"12", REMOTE_END), Err(Error::Exhausted)); - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-1); - - assert_eq!(socket.dispatch(|_| Ok(())), Ok(())); - // insert padding and slice - assert_eq!(socket.send_slice(&large[..16*2], REMOTE_END), Ok(())); - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 3); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3); - - assert_eq!(socket.dispatch(|_| Ok(())), Ok(())); - // packet dequed, but padding is still there - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-15); - - assert_eq!(socket.dispatch(|_| Ok(())), Ok(())); - // padding and packet dequeued - assert_eq!(socket.tx_buffer.metadata_buffer.len(), 0); - assert_eq!(socket.tx_buffer.payload_buffer.len(), 0); - } - - #[test] - fn test_process_wraparound() { - // every packet will be 6 bytes - let recv_buffer = SocketBuffer::new(vec![Default::default(); 4], vec![0; 6*3 + 2]); - let mut socket = socket(recv_buffer, buffer(0)); - assert_eq!(socket.bind(LOCAL_PORT), Ok(())); - - assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(())); - assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(())); - assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(())); - assert_eq!(socket.rx_buffer.metadata_buffer.len(), 3); - assert_eq!(socket.rx_buffer.payload_buffer.len(), 6*3); - - assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), - Err(Error::Exhausted)); - // no padding inserted because capacity does not suffice - assert_eq!(socket.rx_buffer.metadata_buffer.len(), 3); - assert_eq!(socket.rx_buffer.payload_buffer.len(), 6*3); - - assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END))); - assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(())); - // padding inserted - assert_eq!(socket.rx_buffer.metadata_buffer.len(), 4); - assert_eq!(socket.rx_buffer.payload_buffer.len(), 6*3 + 2); - - assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END))); - assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END))); - // two packets dequed, last packet and padding still there - assert_eq!(socket.rx_buffer.metadata_buffer.len(), 2); - assert_eq!(socket.rx_buffer.payload_buffer.len(), 6 + 2); - - assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END))); - // everything dequed - assert_eq!(socket.rx_buffer.metadata_buffer.len(), 0); - assert_eq!(socket.rx_buffer.payload_buffer.len(), 0); - } - #[test] fn test_process_empty_payload() { - // every packet will be 6 bytes - let recv_buffer = SocketBuffer::new(vec![Default::default(); 1], vec![]); + let recv_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty(); 1], vec![]); let mut socket = socket(recv_buffer, buffer(0)); assert_eq!(socket.bind(LOCAL_PORT), Ok(())); @@ -669,18 +472,6 @@ mod test { dst_port: LOCAL_PORT, payload: &[] }; - - assert_eq!(socket.process(&remote_ip_repr(), &repr), Ok(())); - assert_eq!(socket.rx_buffer.metadata_buffer.len(), 1); - assert_eq!(socket.rx_buffer.payload_buffer.len(), 0); - - // The metatdata has been queued into the metadata buffer - assert!(!socket.rx_buffer.metadata_buffer.is_empty()); - // The no payload data has been queued into the payload buffer - assert!(socket.rx_buffer.payload_buffer.is_empty()); - // The received packets buffer is not empty and we can recv - assert!(socket.can_recv()); - assert_eq!(socket.recv(), Ok((&[][..], REMOTE_END))); assert_eq!(socket.process(&remote_ip_repr(), &repr), Ok(())); assert_eq!(socket.recv(), Ok((&[][..], REMOTE_END))); } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d89fd29..8b9fbe6 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -7,9 +7,11 @@ or `alloc` crates being available, and heap-allocated memory. mod assembler; mod ring_buffer; +mod packet_buffer; pub use self::assembler::Assembler; pub use self::ring_buffer::RingBuffer; +pub use self::packet_buffer::{PacketBuffer, PacketMetadata}; /// A trait for setting a value to a known state. /// diff --git a/src/storage/packet_buffer.rs b/src/storage/packet_buffer.rs new file mode 100644 index 0000000..4b8c00d --- /dev/null +++ b/src/storage/packet_buffer.rs @@ -0,0 +1,235 @@ +use managed::ManagedSlice; + +use {Error, Result}; +use super::RingBuffer; + +/// Size and header of a packet. +#[derive(Debug, Clone, Copy)] +pub struct PacketMetadata { + size: usize, + header: Option +} + +impl PacketMetadata { + /// Create an empty packet description. + pub fn empty() -> PacketMetadata { + Self::padding(0) + } + + fn padding(size: usize) -> PacketMetadata { + PacketMetadata { + size: size, + header: None + } + } + + fn packet(size: usize, header: H) -> PacketMetadata { + PacketMetadata { + size: size, + header: Some(header) + } + } + + fn is_padding(&self) -> bool { + self.header.is_none() + } +} + +/// An UDP packet ring buffer. +#[derive(Debug)] +pub struct PacketBuffer<'a, 'b, H: 'a> { + metadata_ring: RingBuffer<'a, PacketMetadata>, + payload_ring: RingBuffer<'b, u8>, +} + +impl<'a, 'b, H> PacketBuffer<'a, 'b, H> { + /// Create a new packet buffer with the provided metadata and payload storage. + /// + /// Metadata storage limits the maximum _number_ of packets in the buffer and payload + /// storage limits the maximum _total size_ of packets. + pub fn new(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, 'b, H> + where MS: Into>>, + PS: Into>, + { + PacketBuffer { + metadata_ring: RingBuffer::new(metadata_storage), + payload_ring: RingBuffer::new(payload_storage), + } + } + + /// Query whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.metadata_ring.is_empty() + } + + /// Query whether the buffer is full. + pub fn is_full(&self) -> bool { + self.metadata_ring.is_full() + } + + // There is currently no enqueue_with() because of the complexity of managing padding + // in case of failure. + + /// Enqueue a single packet with the given header into the buffer, and + /// return a reference to its payload, or return `Err(Error::Exhausted)` + /// if the buffer is full or does not have enough spare payload space. + pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> { + let window = self.payload_ring.window(); + let contig_window = self.payload_ring.contiguous_window(); + + if self.metadata_ring.is_full() || window < size || + (window != contig_window && window - contig_window < size) { + return Err(Error::Exhausted) + } + + if contig_window < size { + *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(size); + self.payload_ring.enqueue_many(size); + } + + *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); + + let payload_buf = self.payload_ring.enqueue_many(size); + debug_assert!(payload_buf.len() == size); + Ok(payload_buf) + } + + fn dequeue_padding(&mut self) { + let Self { ref mut metadata_ring, ref mut payload_ring } = *self; + + let _ = metadata_ring.dequeue_one_with(|metadata| { + if metadata.is_padding() { + payload_ring.dequeue_many(metadata.size); + Ok(()) // dequeue metadata + } else { + Err(Error::Exhausted) // don't dequeue metadata + } + }); + } + + /// Call `f` with a single packet from the buffer, and dequeue the packet if `f` + /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty. + pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result + where F: FnOnce(&mut H, &'c mut [u8]) -> Result { + self.dequeue_padding(); + + let Self { ref mut metadata_ring, ref mut payload_ring } = *self; + + metadata_ring.dequeue_one_with(move |metadata| { + let PacketMetadata { ref mut header, size } = *metadata; + + payload_ring.dequeue_many_with(|payload_buf| { + debug_assert!(payload_buf.len() >= size); + + match f(header.as_mut().unwrap(), &mut payload_buf[..size]) { + Ok(val) => (size, Ok(val)), + Err(err) => (0, Err(err)), + } + }).1 + }) + } + + /// Dequeue a single packet from the buffer, and return a reference to its payload + /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty. + pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> { + self.dequeue_padding(); + + let PacketMetadata { ref mut header, size } = *self.metadata_ring.dequeue_one()?; + + let payload_buf = self.payload_ring.dequeue_many(size); + debug_assert!(payload_buf.len() == size); + Ok((header.take().unwrap(), payload_buf)) + } +} + +#[cfg(test)] +mod test { + use super::*; + + fn buffer() -> PacketBuffer<'static, 'static, ()> { + PacketBuffer::new(vec![PacketMetadata::empty(); 4], + vec![0u8; 16]) + } + + #[test] + fn test_simple() { + let mut buffer = buffer(); + buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); + assert_eq!(buffer.enqueue(32, ()), Err(Error::Exhausted)); + assert_eq!(buffer.metadata_ring.len(), 1); + assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.dequeue(), Err(Error::Exhausted)); + } + + #[test] + fn test_padding() { + let mut buffer = buffer(); + assert!(buffer.enqueue(6, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); + assert_eq!(buffer.metadata_ring.len(), 3); + assert!(buffer.dequeue().is_ok()); + + assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]); + assert_eq!(buffer.metadata_ring.len(), 0); + } + + #[test] + fn test_dequeue_with() { + let mut buffer = buffer(); + assert!(buffer.enqueue(6, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); + assert_eq!(buffer.metadata_ring.len(), 3); + assert!(buffer.dequeue().is_ok()); + + assert!(buffer.dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>).is_err()); + assert_eq!(buffer.metadata_ring.len(), 1); + + assert!(buffer.dequeue_with(|&mut (), payload| { + assert_eq!(payload, &b"abcd"[..]); + Ok(()) + }).is_ok()); + assert_eq!(buffer.metadata_ring.len(), 0); + } + + #[test] + fn test_metadata_full_empty() { + let mut buffer = buffer(); + assert_eq!(buffer.is_empty(), true); + assert_eq!(buffer.is_full(), false); + assert!(buffer.enqueue(1, ()).is_ok()); + assert_eq!(buffer.is_empty(), false); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert_eq!(buffer.is_full(), false); + assert_eq!(buffer.is_empty(), false); + assert!(buffer.enqueue(1, ()).is_ok()); + assert_eq!(buffer.is_full(), true); + assert_eq!(buffer.is_empty(), false); + assert_eq!(buffer.metadata_ring.len(), 4); + assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted)); + } + + #[test] + fn test_window_too_small() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted)); + assert_eq!(buffer.metadata_ring.len(), 1); + } + + #[test] + fn test_contiguous_window_too_small() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted)); + assert_eq!(buffer.metadata_ring.len(), 1); + } +}