394: rj/stream rework r=ryan-summers a=jordens

- network: add some useful log messages
- stream: redo socket management logic
- derive Defgault for StreamTarget

* depends on #393
* tested on fls, makes it a bit faster but the big chunk will be #385 

Co-authored-by: Robert Jördens <rj@quartiq.de>
Co-authored-by: Ryan Summers <ryan.summers@vertigo-designs.com>
master
bors[bot] 2021-06-25 12:52:04 +00:00 committed by GitHub
commit a4f0746aa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 55 deletions

View File

@ -30,21 +30,12 @@ const BLOCK_BUFFER_SIZE: usize = 30;
const SUBSAMPLE_RATE: usize = 1; const SUBSAMPLE_RATE: usize = 1;
/// Represents the destination for the UDP stream to send data to. /// Represents the destination for the UDP stream to send data to.
#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)] #[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize, Default)]
pub struct StreamTarget { pub struct StreamTarget {
pub ip: [u8; 4], pub ip: [u8; 4],
pub port: u16, pub port: u16,
} }
impl Default for StreamTarget {
fn default() -> Self {
Self {
ip: [0; 4],
port: 0,
}
}
}
impl From<StreamTarget> for SocketAddr { impl From<StreamTarget> for SocketAddr {
fn from(target: StreamTarget) -> SocketAddr { fn from(target: StreamTarget) -> SocketAddr {
SocketAddr::new( SocketAddr::new(
@ -271,28 +262,28 @@ impl DataStream {
} }
fn close(&mut self) { fn close(&mut self) {
log::info!("Closing stream"); if let Some(socket) = self.socket.take() {
// Note(unwrap): We guarantee that the socket is available above. log::info!("Closing stream");
let socket = self.socket.take().unwrap(); // Note(unwrap): We guarantee that the socket is available above.
self.stack.close(socket).unwrap(); self.stack.close(socket).unwrap();
}
} }
fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { // Open new socket.
if self.socket.is_some() { fn open(&mut self) -> Result<(), ()> {
self.close(); // If there is already a socket of if remote address is unspecified,
} // do not open a new socket.
if self.socket.is_some() || self.remote.ip().is_unspecified() {
// If the remote address is unspecified, don't open a new socket.
if remote.ip().is_unspecified() {
return Err(()); return Err(());
} }
let mut socket = self.stack.socket().map_err(|_| ())?;
log::info!("Opening stream"); log::info!("Opening stream");
let mut socket = self.stack.socket().or(Err(()))?;
// Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be // Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be
// bound. // bound.
self.stack.connect(&mut socket, remote).unwrap(); self.stack.connect(&mut socket, self.remote).unwrap();
self.socket.replace(socket); self.socket.replace(socket);
@ -304,48 +295,43 @@ impl DataStream {
/// # Args /// # Args
/// * `remote` - The destination to send stream data to. /// * `remote` - The destination to send stream data to.
pub fn set_remote(&mut self, remote: SocketAddr) { pub fn set_remote(&mut self, remote: SocketAddr) {
// If the remote is identical to what we already have, do nothing. // Close socket to be reopened if the remote has changed.
if remote == self.remote { if remote != self.remote {
return; self.close();
} }
// Open the new remote connection.
self.open(remote).ok();
self.remote = remote; self.remote = remote;
} }
/// Process any data for transmission. /// Process any data for transmission.
pub fn process(&mut self) { pub fn process(&mut self) {
// If there's no socket available, try to connect to our remote. match self.socket.as_mut() {
if self.socket.is_none() { None => {
// If we can't open the socket (e.g. we do not have an IP address yet), clear data from // If there's no socket available, try to connect to our remote.
// the queue. if self.open().is_ok() {
if self.open(self.remote).is_err() { // If we just successfully opened the socket, flush old data from queue.
while self.queue.ready() { while self.queue.dequeue().is_some() {}
self.queue.dequeue();
} }
return;
} }
} Some(handle) => {
if self.queue.ready() {
// Dequeue data from the queue into a larger block structure.
let mut packet =
DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE);
while self
.queue
.peek()
.and_then(|batch| packet.add_batch(batch).ok())
.is_some()
{
// Dequeue the batch that we just added to the packet.
self.queue.dequeue();
}
if self.queue.ready() { // Transmit the data packet.
// Dequeue data from the queue into a larger block structure. let size = packet.finish();
let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); self.stack.send(handle, &self.buffer[..size]).ok();
while self.queue.ready() {
// Note(unwrap): We check above that the queue is ready before calling this.
if packet.add_batch(self.queue.peek().unwrap()).is_err() {
// If we cannot add another batch, break out of the loop and send the packet.
break;
} }
// Remove the batch that we just added.
self.queue.dequeue();
} }
// Transmit the data block.
let mut handle = self.socket.as_mut().unwrap();
let size = packet.finish();
self.stack.send(&mut handle, &self.buffer[..size]).ok();
} }
} }
} }