From 71fc81b7c51acbe744b07000ddab2aa83acefe0c Mon Sep 17 00:00:00 2001 From: whitequark Date: Sun, 18 Dec 2016 19:40:11 +0000 Subject: [PATCH] Implement TCP stream ring buffers. --- src/socket/mod.rs | 3 ++ src/socket/tcp.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++++ src/socket/udp.rs | 2 +- 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/socket/tcp.rs diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 8b16342..0cf3da1 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -14,11 +14,14 @@ use Error; use wire::{InternetAddress as Address, InternetProtocolType as ProtocolType}; mod udp; +mod tcp; pub use self::udp::Packet as UdpPacket; pub use self::udp::Buffer as UdpBuffer; pub use self::udp::UdpSocket as UdpSocket; +pub use self::tcp::Buffer as TcpBuffer; + /// A packet representation. /// /// This interface abstracts the various types of packets layered under the IP protocol, diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs new file mode 100644 index 0000000..09df781 --- /dev/null +++ b/src/socket/tcp.rs @@ -0,0 +1,74 @@ +use Managed; + +/// A TCP stream ring buffer. +#[derive(Debug)] +pub struct Buffer<'a> { + storage: Managed<'a, [u8]>, + read_at: usize, + length: usize +} + +impl<'a> Buffer<'a> { + /// Create a packet buffer with the given storage. + pub fn new(storage: T) -> Buffer<'a> + where T: Into> { + Buffer { + storage: storage.into(), + read_at: 0, + length: 0 + } + } + + /// Enqueue a slice of octets up to the given size into the buffer, and return a pointer + /// to the slice. + /// + /// The returned slice may be shorter than requested, as short as an empty slice, + /// if there is not enough contiguous free space in the buffer. + pub fn enqueue(&mut self, mut size: usize) -> &mut [u8] { + let write_at = (self.read_at + self.length) % self.storage.len(); + // We can't enqueue more than there is free space. + let free = self.storage.len() - self.length; + if size > free { size = free } + // We can't contiguously enqueue past the beginning of the storage. + let until_end = self.storage.len() - write_at; + if size > until_end { size = until_end } + + self.length += size; + &mut self.storage[write_at..write_at + size] + } + + /// Dequeue a slice of octets up to the given size from the buffer, and return a pointer + /// to the slice. + /// + /// The returned slice may be shorter than requested, as short as an empty slice, + /// if there is not enough contiguous filled space in the buffer. + pub fn dequeue(&mut self, mut size: usize) -> &[u8] { + let read_at = self.read_at; + // We can't dequeue more than was queued. + if size > self.length { size = self.length } + // We can't contiguously dequeue past the end of the storage. + let until_end = self.storage.len() - self.read_at; + if size > until_end { size = until_end } + + self.read_at = (self.read_at + size) % self.storage.len(); + self.length -= size; + &self.storage[read_at..read_at + size] + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_buffer() { + let mut buffer = Buffer::new(vec![0; 8]); // ........ + buffer.enqueue(6).copy_from_slice(b"foobar"); // foobar.. + assert_eq!(buffer.dequeue(3), b"foo"); // ...bar.. + buffer.enqueue(6).copy_from_slice(b"ba"); // ...barba + buffer.enqueue(4).copy_from_slice(b"zho"); // zhobarba + assert_eq!(buffer.dequeue(6), b"barba"); // zho..... + assert_eq!(buffer.dequeue(8), b"zho"); // ........ + buffer.enqueue(8).copy_from_slice(b"gefug"); // ...gefug + } +} diff --git a/src/socket/udp.rs b/src/socket/udp.rs index 73f80b5..7bd2cc1 100644 --- a/src/socket/udp.rs +++ b/src/socket/udp.rs @@ -33,7 +33,7 @@ impl<'a> Packet<'a> { } } -/// An UDP packet buffer. +/// An UDP packet ring buffer. #[derive(Debug)] pub struct Buffer<'a, 'b: 'a> { storage: Managed<'a, [Packet<'b>]>,