diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 95d14af..af5c6f1 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -565,7 +565,7 @@ impl<'a> TcpSocket<'a> { // See recv() above. if !self.may_recv() { return Err(Error::Illegal) } - let buffer = self.rx_buffer.peek(0, size); + let buffer = self.rx_buffer.get_allocated(0, size); if buffer.len() > 0 { #[cfg(any(test, feature = "verbose"))] net_trace!("[{}]{}:{}: rx buffer: peeking at {} octets", @@ -1074,7 +1074,7 @@ impl<'a> TcpSocket<'a> { // from the transmit buffer. let offset = self.remote_last_seq - self.local_seq_no; let size = cmp::min(self.remote_win_len, self.remote_mss); - repr.payload = self.tx_buffer.peek(offset, size); + repr.payload = self.tx_buffer.get_allocated(offset, size); // If we've sent everything we had in the buffer, follow it with the PSH or FIN // flags, depending on whether the transmit half of the connection is open. if offset + repr.payload.len() == self.tx_buffer.len() { diff --git a/src/storage/ring_buffer.rs b/src/storage/ring_buffer.rs index 3006f90..30b8302 100644 --- a/src/storage/ring_buffer.rs +++ b/src/storage/ring_buffer.rs @@ -5,6 +5,19 @@ use {Error, Result}; use super::Resettable; /// A ring buffer. +/// +/// This ring buffer implementation provides many ways to interact with it: +/// +/// * Enqueueing or dequeueing one element from corresponding side of the buffer; +/// * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer; +/// * Accessing allocated and unallocated areas directly. +/// +/// It is also zero-copy; all methods provide references into the buffer's storage. +/// Note that all references are mutable; it is considered more important to allow +/// in-place processing than to protect from accidental mutation. +/// +/// This implementation is suitable for both simple uses such as a FIFO queue +/// of UDP packets, and advanced ones such as a TCP reassembly buffer. #[derive(Debug)] pub struct RingBuffer<'a, T: 'a> { storage: ManagedSlice<'a, T>, @@ -78,8 +91,8 @@ impl<'a, T: 'a> RingBuffer<'a, T> { } } -// This is the "discrete" ring buffer interface: it operates with single elements, -// and boundary conditions (empty/full) are errors. +/// This is the "discrete" ring buffer interface: it operates with single elements, +/// and boundary conditions (empty/full) are errors. impl<'a, T: 'a> RingBuffer<'a, T> { /// Call `f` with a single buffer element, and enqueue the element if `f` /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full. @@ -131,8 +144,8 @@ impl<'a, T: 'a> RingBuffer<'a, T> { } } -// This is the "continuous" ring buffer interface: it operates with element slices, -// and boundary conditions (empty/full) simply result in empty slices. +/// This is the "continuous" ring buffer interface: it operates with element slices, +/// and boundary conditions (empty/full) simply result in empty slices. impl<'a, T: 'a> RingBuffer<'a, T> { /// Call `f` with the largest contiguous slice of unallocated buffer elements, /// and enqueue the amount of elements returned by `f`. @@ -226,26 +239,42 @@ impl<'a, T: 'a> RingBuffer<'a, T> { } } -// This is the "random access" ring buffer interface: it operates with element slices, -// and allows to access elements of the buffer that are not adjacent to its head or tail. +/// This is the "random access" ring buffer interface: it operates with element slices, +/// and allows to access elements of the buffer that are not adjacent to its head or tail. +/// +/// After calling these functions to inject or extract elements, one would normally +/// use the `enqueue_many` or `dequeue_many` methods to adjust the head or tail. impl<'a, T: 'a> RingBuffer<'a, T> { - fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) { - let read_at = (self.read_at + offset) % self.capacity(); - // We can't read past the end of the queued data. - if offset > self.length { return (read_at, 0) } - // We can't dequeue more than was queued. + /// Return the largest contiguous slice of unallocated buffer elements starting + /// at the given offset past the last allocated element, and up to the given size. + pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] { + let start_at = (self.read_at + self.length + offset) % self.capacity(); + // We can't access past the end of unallocated data. + if offset > self.window() { return &mut [] } + // We can't enqueue more than there is free space. + let clamped_window = self.window() - offset; + if size > clamped_window { size = clamped_window } + // We can't contiguously enqueue past the end of the storage. + let until_end = self.capacity() - start_at; + if size > until_end { size = until_end } + + &mut self.storage[start_at..start_at + size] + } + + /// Return the largest contiguous slice of allocated buffer elements starting + /// at the given offset past the first allocated element, and up to the given size. + pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] { + let start_at = (self.read_at + offset) % self.capacity(); + // We can't read past the end of the allocated data. + if offset > self.length { return &mut [] } + // We can't read more than we have allocated. let clamped_length = self.length - offset; if size > clamped_length { size = clamped_length } // We can't contiguously dequeue past the end of the storage. - let until_end = self.capacity() - read_at; + let until_end = self.capacity() - start_at; if size > until_end { size = until_end } - (read_at, size) - } - - pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] { - let (read_at, size) = self.clamp_reader(offset, size); - &self.storage[read_at..read_at + size] + &self.storage[start_at..start_at + size] } } @@ -260,7 +289,7 @@ mod test { use super::*; #[test] - pub fn test_buffer_length_changes() { + fn test_buffer_length_changes() { let mut ring = RingBuffer::new(vec![0; 2]); assert!(ring.empty()); assert!(!ring.full()); @@ -284,7 +313,7 @@ mod test { } #[test] - pub fn test_buffer_enqueue_dequeue_one_with() { + fn test_buffer_enqueue_dequeue_one_with() { let mut ring = RingBuffer::new(vec![0; 5]); assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>, Err(Error::Exhausted)); @@ -311,7 +340,7 @@ mod test { } #[test] - pub fn test_buffer_enqueue_dequeue_one() { + fn test_buffer_enqueue_dequeue_one() { let mut ring = RingBuffer::new(vec![0; 5]); assert_eq!(ring.dequeue_one(), Err(Error::Exhausted)); @@ -335,7 +364,7 @@ mod test { } #[test] - pub fn test_buffer_enqueue_many_with() { + fn test_buffer_enqueue_many_with() { let mut ring = RingBuffer::new(vec![b'.'; 12]); assert_eq!(ring.enqueue_many_with(|buf| { @@ -392,7 +421,7 @@ mod test { } #[test] - pub fn test_buffer_enqueue_many() { + fn test_buffer_enqueue_many() { let mut ring = RingBuffer::new(vec![b'.'; 12]); ring.enqueue_many(8).copy_from_slice(b"abcdefgh"); @@ -405,7 +434,7 @@ mod test { } #[test] - pub fn test_buffer_enqueue_slice() { + fn test_buffer_enqueue_slice() { let mut ring = RingBuffer::new(vec![b'.'; 12]); assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8); @@ -424,7 +453,7 @@ mod test { } #[test] - pub fn test_buffer_dequeue_many_with() { + fn test_buffer_dequeue_many_with() { let mut ring = RingBuffer::new(vec![b'.'; 12]); assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12); @@ -464,7 +493,7 @@ mod test { } #[test] - pub fn test_buffer_dequeue_many() { + fn test_buffer_dequeue_many() { let mut ring = RingBuffer::new(vec![b'.'; 12]); assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12); @@ -487,7 +516,7 @@ mod test { } #[test] - pub fn test_buffer_dequeue_slice() { + fn test_buffer_dequeue_slice() { let mut ring = RingBuffer::new(vec![b'.'; 12]); assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12); @@ -508,4 +537,55 @@ mod test { assert_eq!(ring.len(), 0); } } + + #[test] + fn test_buffer_get_unallocated() { + let mut ring = RingBuffer::new(vec![b'.'; 12]);; + + assert_eq!(ring.get_unallocated(16, 4), b""); + + { + let buf = ring.get_unallocated(0, 4); + buf.copy_from_slice(b"abcd"); + } + assert_eq!(&ring.storage[..], b"abcd........"); + + ring.enqueue_many(4); + assert_eq!(ring.len(), 4); + + { + let buf = ring.get_unallocated(4, 8); + buf.copy_from_slice(b"ijkl"); + } + assert_eq!(&ring.storage[..], b"abcd....ijkl"); + + ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL"); + ring.dequeue_many(4).copy_from_slice(b"abcd"); + assert_eq!(ring.len(), 8); + assert_eq!(&ring.storage[..], b"abcdEFGHIJKL"); + + { + let buf = ring.get_unallocated(0, 8); + buf.copy_from_slice(b"ABCD"); + } + assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL"); + } + + #[test] + fn test_buffer_get_allocated() { + let mut ring = RingBuffer::new(vec![b'.'; 12]);; + + assert_eq!(ring.get_allocated(16, 4), b""); + assert_eq!(ring.get_allocated(0, 4), b""); + + ring.enqueue_slice(b"abcd"); + assert_eq!(ring.get_allocated(0, 8), b"abcd"); + + ring.enqueue_slice(b"efghijkl"); + ring.dequeue_many(4).copy_from_slice(b"...."); + assert_eq!(ring.get_allocated(4, 8), b"ijkl"); + + ring.enqueue_slice(b"abcd"); + assert_eq!(ring.get_allocated(4, 8), b"ijkl"); + } }