Refactor the "random access" ring buffer interface.

This commit is contained in:
whitequark 2017-09-07 23:47:42 +00:00
parent 5dc0353b2a
commit 1102bd94e7
2 changed files with 109 additions and 29 deletions

View File

@ -565,7 +565,7 @@ impl<'a> TcpSocket<'a> {
// See recv() above.
if !self.may_recv() { return Err(Error::Illegal) }
let buffer = self.rx_buffer.peek(0, size);
let buffer = self.rx_buffer.get_allocated(0, size);
if buffer.len() > 0 {
#[cfg(any(test, feature = "verbose"))]
net_trace!("[{}]{}:{}: rx buffer: peeking at {} octets",
@ -1074,7 +1074,7 @@ impl<'a> TcpSocket<'a> {
// from the transmit buffer.
let offset = self.remote_last_seq - self.local_seq_no;
let size = cmp::min(self.remote_win_len, self.remote_mss);
repr.payload = self.tx_buffer.peek(offset, size);
repr.payload = self.tx_buffer.get_allocated(offset, size);
// If we've sent everything we had in the buffer, follow it with the PSH or FIN
// flags, depending on whether the transmit half of the connection is open.
if offset + repr.payload.len() == self.tx_buffer.len() {

View File

@ -5,6 +5,19 @@ use {Error, Result};
use super::Resettable;
/// A ring buffer.
///
/// This ring buffer implementation provides many ways to interact with it:
///
/// * Enqueueing or dequeueing one element from corresponding side of the buffer;
/// * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer;
/// * Accessing allocated and unallocated areas directly.
///
/// It is also zero-copy; all methods provide references into the buffer's storage.
/// Note that all references are mutable; it is considered more important to allow
/// in-place processing than to protect from accidental mutation.
///
/// This implementation is suitable for both simple uses such as a FIFO queue
/// of UDP packets, and advanced ones such as a TCP reassembly buffer.
#[derive(Debug)]
pub struct RingBuffer<'a, T: 'a> {
storage: ManagedSlice<'a, T>,
@ -78,8 +91,8 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
}
}
// This is the "discrete" ring buffer interface: it operates with single elements,
// and boundary conditions (empty/full) are errors.
/// This is the "discrete" ring buffer interface: it operates with single elements,
/// and boundary conditions (empty/full) are errors.
impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with a single buffer element, and enqueue the element if `f`
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
@ -131,8 +144,8 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
}
}
// This is the "continuous" ring buffer interface: it operates with element slices,
// and boundary conditions (empty/full) simply result in empty slices.
/// This is the "continuous" ring buffer interface: it operates with element slices,
/// and boundary conditions (empty/full) simply result in empty slices.
impl<'a, T: 'a> RingBuffer<'a, T> {
/// Call `f` with the largest contiguous slice of unallocated buffer elements,
/// and enqueue the amount of elements returned by `f`.
@ -226,26 +239,42 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
}
}
// This is the "random access" ring buffer interface: it operates with element slices,
// and allows to access elements of the buffer that are not adjacent to its head or tail.
/// This is the "random access" ring buffer interface: it operates with element slices,
/// and allows to access elements of the buffer that are not adjacent to its head or tail.
///
/// After calling these functions to inject or extract elements, one would normally
/// use the `enqueue_many` or `dequeue_many` methods to adjust the head or tail.
impl<'a, T: 'a> RingBuffer<'a, T> {
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
let read_at = (self.read_at + offset) % self.capacity();
// We can't read past the end of the queued data.
if offset > self.length { return (read_at, 0) }
// We can't dequeue more than was queued.
/// Return the largest contiguous slice of unallocated buffer elements starting
/// at the given offset past the last allocated element, and up to the given size.
pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
let start_at = (self.read_at + self.length + offset) % self.capacity();
// We can't access past the end of unallocated data.
if offset > self.window() { return &mut [] }
// We can't enqueue more than there is free space.
let clamped_window = self.window() - offset;
if size > clamped_window { size = clamped_window }
// We can't contiguously enqueue past the end of the storage.
let until_end = self.capacity() - start_at;
if size > until_end { size = until_end }
&mut self.storage[start_at..start_at + size]
}
/// Return the largest contiguous slice of allocated buffer elements starting
/// at the given offset past the first allocated element, and up to the given size.
pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
let start_at = (self.read_at + offset) % self.capacity();
// We can't read past the end of the allocated data.
if offset > self.length { return &mut [] }
// We can't read more than we have allocated.
let clamped_length = self.length - offset;
if size > clamped_length { size = clamped_length }
// We can't contiguously dequeue past the end of the storage.
let until_end = self.capacity() - read_at;
let until_end = self.capacity() - start_at;
if size > until_end { size = until_end }
(read_at, size)
}
pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
let (read_at, size) = self.clamp_reader(offset, size);
&self.storage[read_at..read_at + size]
&self.storage[start_at..start_at + size]
}
}
@ -260,7 +289,7 @@ mod test {
use super::*;
#[test]
pub fn test_buffer_length_changes() {
fn test_buffer_length_changes() {
let mut ring = RingBuffer::new(vec![0; 2]);
assert!(ring.empty());
assert!(!ring.full());
@ -284,7 +313,7 @@ mod test {
}
#[test]
pub fn test_buffer_enqueue_dequeue_one_with() {
fn test_buffer_enqueue_dequeue_one_with() {
let mut ring = RingBuffer::new(vec![0; 5]);
assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
Err(Error::Exhausted));
@ -311,7 +340,7 @@ mod test {
}
#[test]
pub fn test_buffer_enqueue_dequeue_one() {
fn test_buffer_enqueue_dequeue_one() {
let mut ring = RingBuffer::new(vec![0; 5]);
assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
@ -335,7 +364,7 @@ mod test {
}
#[test]
pub fn test_buffer_enqueue_many_with() {
fn test_buffer_enqueue_many_with() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);
assert_eq!(ring.enqueue_many_with(|buf| {
@ -392,7 +421,7 @@ mod test {
}
#[test]
pub fn test_buffer_enqueue_many() {
fn test_buffer_enqueue_many() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);
ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
@ -405,7 +434,7 @@ mod test {
}
#[test]
pub fn test_buffer_enqueue_slice() {
fn test_buffer_enqueue_slice() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);
assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
@ -424,7 +453,7 @@ mod test {
}
#[test]
pub fn test_buffer_dequeue_many_with() {
fn test_buffer_dequeue_many_with() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);
assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
@ -464,7 +493,7 @@ mod test {
}
#[test]
pub fn test_buffer_dequeue_many() {
fn test_buffer_dequeue_many() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);
assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
@ -487,7 +516,7 @@ mod test {
}
#[test]
pub fn test_buffer_dequeue_slice() {
fn test_buffer_dequeue_slice() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);
assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
@ -508,4 +537,55 @@ mod test {
assert_eq!(ring.len(), 0);
}
}
#[test]
fn test_buffer_get_unallocated() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);;
assert_eq!(ring.get_unallocated(16, 4), b"");
{
let buf = ring.get_unallocated(0, 4);
buf.copy_from_slice(b"abcd");
}
assert_eq!(&ring.storage[..], b"abcd........");
ring.enqueue_many(4);
assert_eq!(ring.len(), 4);
{
let buf = ring.get_unallocated(4, 8);
buf.copy_from_slice(b"ijkl");
}
assert_eq!(&ring.storage[..], b"abcd....ijkl");
ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
ring.dequeue_many(4).copy_from_slice(b"abcd");
assert_eq!(ring.len(), 8);
assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
{
let buf = ring.get_unallocated(0, 8);
buf.copy_from_slice(b"ABCD");
}
assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
}
#[test]
fn test_buffer_get_allocated() {
let mut ring = RingBuffer::new(vec![b'.'; 12]);;
assert_eq!(ring.get_allocated(16, 4), b"");
assert_eq!(ring.get_allocated(0, 4), b"");
ring.enqueue_slice(b"abcd");
assert_eq!(ring.get_allocated(0, 8), b"abcd");
ring.enqueue_slice(b"efghijkl");
ring.dequeue_many(4).copy_from_slice(b"....");
assert_eq!(ring.get_allocated(4, 8), b"ijkl");
ring.enqueue_slice(b"abcd");
assert_eq!(ring.get_allocated(4, 8), b"ijkl");
}
}