Merge the TCP ring buffer and generic ring buffer.
This adds a few methods to RingBuffer that don't quite fit into its interface (the slice ones), but we can fix that later.v0.7.x
parent
27a23ed3c3
commit
a9719f4a13
|
@ -8,122 +8,9 @@ use {Error, Result};
|
|||
use phy::DeviceLimits;
|
||||
use wire::{IpProtocol, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
|
||||
use socket::{Socket, IpRepr};
|
||||
use storage::RingBuffer;
|
||||
|
||||
/// A TCP stream ring buffer.
|
||||
#[derive(Debug)]
|
||||
pub struct SocketBuffer<'a> {
|
||||
storage: Managed<'a, [u8]>,
|
||||
read_at: usize,
|
||||
length: usize
|
||||
}
|
||||
|
||||
impl<'a> SocketBuffer<'a> {
|
||||
/// Create a packet buffer with the given storage.
|
||||
pub fn new<T>(storage: T) -> SocketBuffer<'a>
|
||||
where T: Into<Managed<'a, [u8]>> {
|
||||
SocketBuffer {
|
||||
storage: storage.into(),
|
||||
read_at: 0,
|
||||
length: 0
|
||||
}
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.read_at = 0;
|
||||
self.length = 0;
|
||||
}
|
||||
|
||||
fn capacity(&self) -> usize {
|
||||
self.storage.len()
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.length
|
||||
}
|
||||
|
||||
fn window(&self) -> usize {
|
||||
self.capacity() - self.len()
|
||||
}
|
||||
|
||||
fn empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
fn full(&self) -> bool {
|
||||
self.window() == 0
|
||||
}
|
||||
|
||||
fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
|
||||
let write_at = (self.read_at + self.length) % self.storage.len();
|
||||
// We can't enqueue more than there is free space.
|
||||
let free = self.storage.len() - self.length;
|
||||
if size > free { size = free }
|
||||
// We can't contiguously enqueue past the beginning of the storage.
|
||||
let until_end = self.storage.len() - write_at;
|
||||
if size > until_end { size = until_end }
|
||||
|
||||
(write_at, size)
|
||||
}
|
||||
|
||||
fn enqueue(&mut self, size: usize) -> &mut [u8] {
|
||||
let (write_at, size) = self.clamp_writer(size);
|
||||
self.length += size;
|
||||
&mut self.storage[write_at..write_at + size]
|
||||
}
|
||||
|
||||
fn enqueue_slice(&mut self, data: &[u8]) {
|
||||
let data = {
|
||||
let mut dest = self.enqueue(data.len());
|
||||
let (data, rest) = data.split_at(dest.len());
|
||||
dest.copy_from_slice(data);
|
||||
rest
|
||||
};
|
||||
// Retry, in case we had a wraparound.
|
||||
let mut dest = self.enqueue(data.len());
|
||||
let (data, _) = data.split_at(dest.len());
|
||||
dest.copy_from_slice(data);
|
||||
}
|
||||
|
||||
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
|
||||
let read_at = (self.read_at + offset) % self.storage.len();
|
||||
// We can't read past the end of the queued data.
|
||||
if offset > self.length { return (read_at, 0) }
|
||||
// We can't dequeue more than was queued.
|
||||
let clamped_length = self.length - offset;
|
||||
if size > clamped_length { size = clamped_length }
|
||||
// We can't contiguously dequeue past the end of the storage.
|
||||
let until_end = self.storage.len() - read_at;
|
||||
if size > until_end { size = until_end }
|
||||
|
||||
(read_at, size)
|
||||
}
|
||||
|
||||
fn dequeue(&mut self, size: usize) -> &[u8] {
|
||||
let (read_at, size) = self.clamp_reader(0, size);
|
||||
self.read_at = (self.read_at + size) % self.storage.len();
|
||||
self.length -= size;
|
||||
&self.storage[read_at..read_at + size]
|
||||
}
|
||||
|
||||
fn peek(&self, offset: usize, size: usize) -> &[u8] {
|
||||
let (read_at, size) = self.clamp_reader(offset, size);
|
||||
&self.storage[read_at..read_at + size]
|
||||
}
|
||||
|
||||
fn advance(&mut self, size: usize) {
|
||||
if size > self.length {
|
||||
panic!("advancing {} octets into free space", size - self.length)
|
||||
}
|
||||
self.read_at = (self.read_at + size) % self.storage.len();
|
||||
self.length -= size;
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Into<SocketBuffer<'a>> for Managed<'a, [u8]> {
|
||||
fn into(self) -> SocketBuffer<'a> {
|
||||
SocketBuffer::new(self)
|
||||
}
|
||||
}
|
||||
pub type SocketBuffer<'a> = RingBuffer<'a, u8>;
|
||||
|
||||
/// The state of a TCP socket, according to [RFC 793][rfc793].
|
||||
/// [rfc793]: https://tools.ietf.org/html/rfc793
|
||||
|
@ -590,7 +477,7 @@ impl<'a> TcpSocket<'a> {
|
|||
|
||||
#[cfg(any(test, feature = "verbose"))]
|
||||
let old_length = self.tx_buffer.len();
|
||||
let buffer = self.tx_buffer.enqueue(size);
|
||||
let buffer = self.tx_buffer.enqueue_slice(size);
|
||||
if buffer.len() > 0 {
|
||||
#[cfg(any(test, feature = "verbose"))]
|
||||
net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
|
||||
|
@ -630,7 +517,7 @@ impl<'a> TcpSocket<'a> {
|
|||
|
||||
#[cfg(any(test, feature = "verbose"))]
|
||||
let old_length = self.rx_buffer.len();
|
||||
let buffer = self.rx_buffer.dequeue(size);
|
||||
let buffer = self.rx_buffer.dequeue_slice(size);
|
||||
self.remote_seq_no += buffer.len();
|
||||
if buffer.len() > 0 {
|
||||
#[cfg(any(test, feature = "verbose"))]
|
||||
|
@ -1085,7 +972,8 @@ impl<'a> TcpSocket<'a> {
|
|||
net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
|
||||
self.debug_id, self.local_endpoint, self.remote_endpoint,
|
||||
ack_len, self.tx_buffer.len() - ack_len);
|
||||
self.tx_buffer.advance(ack_len);
|
||||
let acked = self.tx_buffer.dequeue_slice(ack_len);
|
||||
debug_assert!(acked.len() == ack_len);
|
||||
}
|
||||
|
||||
// We've processed everything in the incoming segment, so advance the local
|
||||
|
@ -1099,7 +987,7 @@ impl<'a> TcpSocket<'a> {
|
|||
net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
|
||||
self.debug_id, self.local_endpoint, self.remote_endpoint,
|
||||
repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
|
||||
self.rx_buffer.enqueue_slice(repr.payload);
|
||||
self.rx_buffer.enqueue_slice_all(repr.payload);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
|
@ -1317,34 +1205,6 @@ mod test {
|
|||
use wire::{IpAddress, Ipv4Address};
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_buffer() {
|
||||
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
|
||||
buffer.enqueue(6).copy_from_slice(b"foobar"); // foobar..
|
||||
assert_eq!(buffer.dequeue(3), b"foo"); // ...bar..
|
||||
buffer.enqueue(6).copy_from_slice(b"ba"); // ...barba
|
||||
buffer.enqueue(4).copy_from_slice(b"zho"); // zhobarba
|
||||
assert_eq!(buffer.dequeue(6), b"barba"); // zho.....
|
||||
assert_eq!(buffer.dequeue(8), b"zho"); // ........
|
||||
buffer.enqueue(8).copy_from_slice(b"gefug"); // ...gefug
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_buffer_wraparound() {
|
||||
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
|
||||
buffer.enqueue_slice(&b"foobar"[..]); // foobar..
|
||||
assert_eq!(buffer.dequeue(3), b"foo"); // ...bar..
|
||||
buffer.enqueue_slice(&b"bazhoge"[..]); // zhobarba
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_buffer_peek() {
|
||||
let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
|
||||
buffer.enqueue_slice(&b"foobar"[..]); // foobar..
|
||||
assert_eq!(buffer.peek(0, 8), &b"foobar"[..]);
|
||||
assert_eq!(buffer.peek(3, 8), &b"bar"[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timer_retransmit() {
|
||||
let mut r = Timer::Idle;
|
||||
|
@ -1890,7 +1750,7 @@ mod test {
|
|||
window_len: 58,
|
||||
..RECV_TEMPL
|
||||
}]);
|
||||
assert_eq!(s.rx_buffer.dequeue(6), &b"abcdef"[..]);
|
||||
assert_eq!(s.rx_buffer.dequeue_slice(6), &b"abcdef"[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use managed::Managed;
|
||||
use managed::{Managed, ManagedSlice};
|
||||
|
||||
use {Error, Result};
|
||||
use super::Resettable;
|
||||
|
@ -6,9 +6,9 @@ use super::Resettable;
|
|||
/// A ring buffer.
|
||||
#[derive(Debug)]
|
||||
pub struct RingBuffer<'a, T: 'a> {
|
||||
storage: Managed<'a, [T]>,
|
||||
storage: ManagedSlice<'a, T>,
|
||||
read_at: usize,
|
||||
length: usize,
|
||||
length: usize,
|
||||
}
|
||||
|
||||
impl<'a, T: 'a> RingBuffer<'a, T> {
|
||||
|
@ -16,55 +16,66 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
|
|||
///
|
||||
/// During creation, every element in `storage` is reset.
|
||||
pub fn new<S>(storage: S) -> RingBuffer<'a, T>
|
||||
where S: Into<Managed<'a, [T]>>, T: Resettable,
|
||||
where S: Into<ManagedSlice<'a, T>>,
|
||||
{
|
||||
let mut storage = storage.into();
|
||||
for elem in storage.iter_mut() {
|
||||
elem.reset();
|
||||
}
|
||||
|
||||
RingBuffer {
|
||||
storage: storage,
|
||||
storage: storage.into(),
|
||||
read_at: 0,
|
||||
length: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn mask(&self, index: usize) -> usize {
|
||||
index % self.storage.len()
|
||||
/// Clear the ring buffer.
|
||||
pub fn clear(&mut self) {
|
||||
self.read_at = 0;
|
||||
self.length = 0;
|
||||
}
|
||||
|
||||
fn incr(&self, index: usize) -> usize {
|
||||
self.mask(index + 1)
|
||||
/// Clear the ring buffer, and reset every element.
|
||||
pub fn reset(&mut self)
|
||||
where T: Resettable {
|
||||
self.clear();
|
||||
for elem in self.storage.iter_mut() {
|
||||
elem.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the current number of elements in the ring buffer.
|
||||
pub fn len(&self) -> usize {
|
||||
self.length
|
||||
}
|
||||
|
||||
/// Return the maximum number of elements in the ring buffer.
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.storage.len()
|
||||
}
|
||||
|
||||
/// Return the number of elements that can be added to the ring buffer.
|
||||
pub fn window(&self) -> usize {
|
||||
self.capacity() - self.len()
|
||||
}
|
||||
|
||||
/// Query whether the buffer is empty.
|
||||
pub fn empty(&self) -> bool {
|
||||
self.length == 0
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
/// Query whether the buffer is full.
|
||||
pub fn full(&self) -> bool {
|
||||
self.length == self.storage.len()
|
||||
self.window() == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Enqueue an element into the buffer, and return a pointer to it, or return
|
||||
/// `Err(Error::Exhausted)` if the buffer is full.
|
||||
pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
|
||||
if self.full() { return Err(Error::Exhausted) }
|
||||
|
||||
let index = self.mask(self.read_at + self.length);
|
||||
self.length += 1;
|
||||
Ok(&mut self.storage[index])
|
||||
}
|
||||
|
||||
/// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
|
||||
/// return `Err(Error::Exhausted)` if the buffer is full.
|
||||
// This is the "discrete" ring buffer interface: it operates with single elements,
|
||||
// and boundary conditions (empty/full) are errors.
|
||||
impl<'a, T: 'a> RingBuffer<'a, T> {
|
||||
/// Call `f` with a single buffer element, and enqueue the element if `f`
|
||||
/// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
|
||||
pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
|
||||
where F: FnOnce(&'b mut T) -> Result<R> {
|
||||
if self.full() { return Err(Error::Exhausted) }
|
||||
|
||||
let index = self.mask(self.read_at + self.length);
|
||||
let index = (self.read_at + self.length) % self.capacity();
|
||||
match f(&mut self.storage[index]) {
|
||||
Ok(result) => {
|
||||
self.length += 1;
|
||||
|
@ -74,15 +85,10 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Dequeue an element from the buffer, and return a mutable reference to it, or return
|
||||
/// `Err(Error::Exhausted)` if the buffer is empty.
|
||||
pub fn dequeue(&mut self) -> Result<&mut T> {
|
||||
if self.empty() { return Err(Error::Exhausted) }
|
||||
|
||||
let read_at = self.read_at;
|
||||
self.length -= 1;
|
||||
self.read_at = self.incr(self.read_at);
|
||||
Ok(&mut self.storage[read_at])
|
||||
/// Enqueue a single element into the buffer, and return a pointer to it,
|
||||
/// or return `Err(Error::Exhausted)` if the buffer is full.
|
||||
pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
|
||||
self.try_enqueue(Ok)
|
||||
}
|
||||
|
||||
/// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
|
||||
|
@ -91,7 +97,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
|
|||
where F: FnOnce(&'b mut T) -> Result<R> {
|
||||
if self.empty() { return Err(Error::Exhausted) }
|
||||
|
||||
let next_at = self.incr(self.read_at);
|
||||
let next_at = (self.read_at + 1) % self.capacity();
|
||||
match f(&mut self.storage[self.read_at]) {
|
||||
Ok(result) => {
|
||||
self.length -= 1;
|
||||
|
@ -101,32 +107,91 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
|
|||
Err(error) => Err(error)
|
||||
}
|
||||
}
|
||||
|
||||
/// Dequeue an element from the buffer, and return a mutable reference to it, or return
|
||||
/// `Err(Error::Exhausted)` if the buffer is empty.
|
||||
pub fn dequeue(&mut self) -> Result<&mut T> {
|
||||
self.try_dequeue(Ok)
|
||||
}
|
||||
}
|
||||
|
||||
// This is the "continuous" ring buffer interface: it operates with element slices,
|
||||
// and boundary conditions (empty/full) simply result in empty slices.
|
||||
impl<'a, T: 'a> RingBuffer<'a, T> {
|
||||
fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
|
||||
let write_at = (self.read_at + self.length) % self.capacity();
|
||||
// We can't enqueue more than there is free space.
|
||||
let free = self.capacity() - self.length;
|
||||
if size > free { size = free }
|
||||
// We can't contiguously enqueue past the beginning of the storage.
|
||||
let until_end = self.capacity() - write_at;
|
||||
if size > until_end { size = until_end }
|
||||
|
||||
(write_at, size)
|
||||
}
|
||||
|
||||
pub(crate) fn enqueue_slice<'b>(&'b mut self, size: usize) -> &'b mut [T] {
|
||||
let (write_at, size) = self.clamp_writer(size);
|
||||
self.length += size;
|
||||
&mut self.storage[write_at..write_at + size]
|
||||
}
|
||||
|
||||
pub(crate) fn enqueue_slice_all(&mut self, data: &[T])
|
||||
where T: Copy {
|
||||
let data = {
|
||||
let mut dest = self.enqueue_slice(data.len());
|
||||
let (data, rest) = data.split_at(dest.len());
|
||||
dest.copy_from_slice(data);
|
||||
rest
|
||||
};
|
||||
// Retry, in case we had a wraparound.
|
||||
let mut dest = self.enqueue_slice(data.len());
|
||||
let (data, _) = data.split_at(dest.len());
|
||||
dest.copy_from_slice(data);
|
||||
}
|
||||
|
||||
fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
|
||||
let read_at = (self.read_at + offset) % self.capacity();
|
||||
// We can't read past the end of the queued data.
|
||||
if offset > self.length { return (read_at, 0) }
|
||||
// We can't dequeue more than was queued.
|
||||
let clamped_length = self.length - offset;
|
||||
if size > clamped_length { size = clamped_length }
|
||||
// We can't contiguously dequeue past the end of the storage.
|
||||
let until_end = self.capacity() - read_at;
|
||||
if size > until_end { size = until_end }
|
||||
|
||||
(read_at, size)
|
||||
}
|
||||
|
||||
pub(crate) fn dequeue_slice(&mut self, size: usize) -> &[T] {
|
||||
let (read_at, size) = self.clamp_reader(0, size);
|
||||
self.read_at = (self.read_at + size) % self.capacity();
|
||||
self.length -= size;
|
||||
&self.storage[read_at..read_at + size]
|
||||
}
|
||||
|
||||
pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
|
||||
let (read_at, size) = self.clamp_reader(offset, size);
|
||||
&self.storage[read_at..read_at + size]
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
|
||||
fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
|
||||
RingBuffer::new(slice)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
impl Resettable for usize {
|
||||
fn reset(&mut self) {
|
||||
*self = 0;
|
||||
}
|
||||
}
|
||||
|
||||
const SIZE: usize = 5;
|
||||
|
||||
fn buffer() -> RingBuffer<'static, usize> {
|
||||
let mut storage = vec![];
|
||||
for i in 0..SIZE {
|
||||
storage.push(i + 10);
|
||||
}
|
||||
|
||||
RingBuffer::new(storage)
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_buffer() {
|
||||
let mut buf = buffer();
|
||||
let mut buf = RingBuffer::new(vec![0; SIZE]);
|
||||
assert!(buf.empty());
|
||||
assert!(!buf.full());
|
||||
assert_eq!(buf.dequeue(), Err(Error::Exhausted));
|
||||
|
@ -152,7 +217,7 @@ mod test {
|
|||
|
||||
#[test]
|
||||
pub fn test_buffer_try() {
|
||||
let mut buf = buffer();
|
||||
let mut buf = RingBuffer::new(vec![0; SIZE]);
|
||||
assert!(buf.empty());
|
||||
assert!(!buf.full());
|
||||
assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,
|
||||
|
|
Loading…
Reference in New Issue