renet/src/storage/packet_buffer.rs

361 lines
12 KiB
Rust
Raw Normal View History

use managed::ManagedSlice;
2020-12-27 07:11:30 +08:00
use crate::storage::RingBuffer;
2021-06-27 15:31:59 +08:00
use crate::{Error, Result};
/// Size and header of a packet.
#[derive(Debug, Clone, Copy)]
2021-04-01 07:30:47 +08:00
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct PacketMetadata<H> {
2021-06-27 15:31:59 +08:00
size: usize,
header: Option<H>,
}
impl<H> PacketMetadata<H> {
/// Empty packet description.
2021-06-27 15:31:59 +08:00
pub const EMPTY: PacketMetadata<H> = PacketMetadata {
size: 0,
header: None,
};
fn padding(size: usize) -> PacketMetadata<H> {
PacketMetadata {
2021-06-27 15:31:59 +08:00
size: size,
header: None,
}
}
fn packet(size: usize, header: H) -> PacketMetadata<H> {
PacketMetadata {
2021-06-27 15:31:59 +08:00
size: size,
header: Some(header),
}
}
fn is_padding(&self) -> bool {
self.header.is_none()
}
}
/// An UDP packet ring buffer.
#[derive(Debug)]
2021-06-27 15:31:59 +08:00
pub struct PacketBuffer<'a, H: 'a> {
metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
2021-06-27 15:31:59 +08:00
payload_ring: RingBuffer<'a, u8>,
}
2021-01-09 08:52:08 +08:00
impl<'a, H> PacketBuffer<'a, H> {
/// Create a new packet buffer with the provided metadata and payload storage.
///
/// Metadata storage limits the maximum _number_ of packets in the buffer and payload
/// storage limits the maximum _total size_ of packets.
2021-01-09 08:52:08 +08:00
pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
2021-06-27 15:31:59 +08:00
where
MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
PS: Into<ManagedSlice<'a, u8>>,
{
PacketBuffer {
metadata_ring: RingBuffer::new(metadata_storage),
2021-06-27 15:31:59 +08:00
payload_ring: RingBuffer::new(payload_storage),
}
}
/// Query whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.metadata_ring.is_empty()
}
/// Query whether the buffer is full.
pub fn is_full(&self) -> bool {
self.metadata_ring.is_full()
}
// There is currently no enqueue_with() because of the complexity of managing padding
// in case of failure.
/// Enqueue a single packet with the given header into the buffer, and
/// return a reference to its payload, or return `Err(Error::Exhausted)`
/// if the buffer is full, or return `Err(Error::Truncated)` if the buffer
/// does not have enough spare payload space.
pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> {
if self.payload_ring.capacity() < size {
2021-06-27 15:31:59 +08:00
return Err(Error::Truncated);
}
if self.metadata_ring.is_full() {
2021-06-27 15:31:59 +08:00
return Err(Error::Exhausted);
}
let window = self.payload_ring.window();
let contig_window = self.payload_ring.contiguous_window();
if window < size {
2021-06-27 15:31:59 +08:00
return Err(Error::Exhausted);
} else if contig_window < size {
if window - contig_window < size {
// The buffer length is larger than the current contiguous window
// and is larger than the contiguous window will be after adding
// the padding necessary to circle around to the beginning of the
// ring buffer.
2021-06-27 15:31:59 +08:00
return Err(Error::Exhausted);
} else {
// Add padding to the end of the ring buffer so that the
// contiguous window is at the beginning of the ring buffer.
*self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
2021-10-27 16:35:05 +08:00
// note(discard): function does not write to the result
// enqueued padding buffer location
let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
}
}
*self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
let payload_buf = self.payload_ring.enqueue_many(size);
debug_assert!(payload_buf.len() == size);
Ok(payload_buf)
}
fn dequeue_padding(&mut self) {
2021-06-27 15:31:59 +08:00
let Self {
ref mut metadata_ring,
ref mut payload_ring,
} = *self;
let _ = metadata_ring.dequeue_one_with(|metadata| {
if metadata.is_padding() {
// note(discard): function does not use value of dequeued padding bytes
let _buf_dequeued = payload_ring.dequeue_many(metadata.size);
Ok(()) // dequeue metadata
} else {
Err(Error::Exhausted) // don't dequeue metadata
}
});
}
/// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
2021-06-27 15:31:59 +08:00
where
F: FnOnce(&mut H, &'c mut [u8]) -> Result<R>,
{
self.dequeue_padding();
2021-06-27 15:31:59 +08:00
let Self {
ref mut metadata_ring,
ref mut payload_ring,
} = *self;
metadata_ring.dequeue_one_with(move |metadata| {
2021-06-27 15:31:59 +08:00
let PacketMetadata {
ref mut header,
size,
} = *metadata;
payload_ring
.dequeue_many_with(|payload_buf| {
debug_assert!(payload_buf.len() >= size);
match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
Ok(val) => (size, Ok(val)),
Err(err) => (0, Err(err)),
}
})
.1
})
}
/// Dequeue a single packet from the buffer, and return a reference to its payload
/// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> {
self.dequeue_padding();
2021-06-27 15:31:59 +08:00
let PacketMetadata {
ref mut header,
size,
} = *self.metadata_ring.dequeue_one()?;
let payload_buf = self.payload_ring.dequeue_many(size);
debug_assert!(payload_buf.len() == size);
Ok((header.take().unwrap(), payload_buf))
}
/// Peek at a single packet from the buffer without removing it, and return a reference to
/// its payload as well as its header, or return `Err(Error:Exhaused)` if the buffer is empty.
///
/// This function otherwise behaves identically to [dequeue](#method.dequeue).
pub fn peek(&mut self) -> Result<(&H, &[u8])> {
self.dequeue_padding();
if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
2021-06-27 15:31:59 +08:00
Ok((
metadata.header.as_ref().unwrap(),
self.payload_ring.get_allocated(0, metadata.size),
))
} else {
Err(Error::Exhausted)
}
}
2019-11-06 07:07:12 +08:00
/// Return the maximum number packets that can be stored.
pub fn packet_capacity(&self) -> usize {
self.metadata_ring.capacity()
}
/// Return the maximum number of bytes in the payload ring buffer.
pub fn payload_capacity(&self) -> usize {
self.payload_ring.capacity()
}
/// Reset the packet buffer and clear any staged.
2021-06-09 17:30:02 +08:00
#[allow(unused)]
pub(crate) fn reset(&mut self) {
self.payload_ring.clear();
self.metadata_ring.clear();
}
}
#[cfg(test)]
mod test {
use super::*;
2021-01-09 08:52:08 +08:00
fn buffer() -> PacketBuffer<'static, ()> {
2021-06-27 15:31:59 +08:00
PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
}
#[test]
fn test_simple() {
let mut buffer = buffer();
buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
assert_eq!(buffer.metadata_ring.len(), 1);
assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
assert_eq!(buffer.dequeue(), Err(Error::Exhausted));
}
#[test]
fn test_peek() {
let mut buffer = buffer();
assert_eq!(buffer.peek(), Err(Error::Exhausted));
buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
assert_eq!(buffer.metadata_ring.len(), 1);
assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
assert_eq!(buffer.peek(), Err(Error::Exhausted));
}
#[test]
fn test_padding() {
let mut buffer = buffer();
assert!(buffer.enqueue(6, ()).is_ok());
assert!(buffer.enqueue(8, ()).is_ok());
assert!(buffer.dequeue().is_ok());
buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
assert_eq!(buffer.metadata_ring.len(), 3);
assert!(buffer.dequeue().is_ok());
assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
assert_eq!(buffer.metadata_ring.len(), 0);
}
#[test]
fn test_padding_with_large_payload() {
let mut buffer = buffer();
assert!(buffer.enqueue(12, ()).is_ok());
assert!(buffer.dequeue().is_ok());
2021-06-27 15:31:59 +08:00
buffer
.enqueue(12, ())
.unwrap()
.copy_from_slice(b"abcdefghijkl");
}
#[test]
fn test_dequeue_with() {
let mut buffer = buffer();
assert!(buffer.enqueue(6, ()).is_ok());
assert!(buffer.enqueue(8, ()).is_ok());
assert!(buffer.dequeue().is_ok());
buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
assert_eq!(buffer.metadata_ring.len(), 3);
assert!(buffer.dequeue().is_ok());
2021-06-27 15:31:59 +08:00
assert!(buffer
.dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>)
.is_err());
assert_eq!(buffer.metadata_ring.len(), 1);
2021-06-27 15:31:59 +08:00
assert!(buffer
.dequeue_with(|&mut (), payload| {
assert_eq!(payload, &b"abcd"[..]);
Ok(())
})
.is_ok());
assert_eq!(buffer.metadata_ring.len(), 0);
}
#[test]
fn test_metadata_full_empty() {
let mut buffer = buffer();
2021-08-18 15:29:29 +08:00
assert!(buffer.is_empty());
assert!(!buffer.is_full());
assert!(buffer.enqueue(1, ()).is_ok());
2021-08-18 15:29:29 +08:00
assert!(!buffer.is_empty());
assert!(buffer.enqueue(1, ()).is_ok());
assert!(buffer.enqueue(1, ()).is_ok());
2021-08-18 15:29:29 +08:00
assert!(!buffer.is_full());
assert!(!buffer.is_empty());
assert!(buffer.enqueue(1, ()).is_ok());
2021-08-18 15:29:29 +08:00
assert!(buffer.is_full());
assert!(!buffer.is_empty());
assert_eq!(buffer.metadata_ring.len(), 4);
assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted));
}
#[test]
fn test_window_too_small() {
let mut buffer = buffer();
assert!(buffer.enqueue(4, ()).is_ok());
assert!(buffer.enqueue(8, ()).is_ok());
assert!(buffer.dequeue().is_ok());
assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
assert_eq!(buffer.metadata_ring.len(), 1);
}
#[test]
fn test_contiguous_window_too_small() {
let mut buffer = buffer();
assert!(buffer.enqueue(4, ()).is_ok());
assert!(buffer.enqueue(8, ()).is_ok());
assert!(buffer.dequeue().is_ok());
assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted));
assert_eq!(buffer.metadata_ring.len(), 1);
}
#[test]
fn test_capacity_too_small() {
let mut buffer = buffer();
assert_eq!(buffer.enqueue(32, ()), Err(Error::Truncated));
}
#[test]
fn test_contig_window_prioritized() {
let mut buffer = buffer();
assert!(buffer.enqueue(4, ()).is_ok());
assert!(buffer.dequeue().is_ok());
assert!(buffer.enqueue(5, ()).is_ok());
}
#[test]
fn clear() {
let mut buffer = buffer();
// Ensure enqueuing data in teh buffer fills it somewhat.
assert!(buffer.is_empty());
assert!(buffer.enqueue(6, ()).is_ok());
// Ensure that resetting the buffer causes it to be empty.
assert!(!buffer.is_empty());
buffer.reset();
assert!(buffer.is_empty());
}
}