diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index fc8aaf7..da9a68b 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -271,28 +271,28 @@ impl DataStream { } fn close(&mut self) { - log::info!("Closing stream"); - // Note(unwrap): We guarantee that the socket is available above. - let socket = self.socket.take().unwrap(); - self.stack.close(socket).unwrap(); + if let Some(socket) = self.socket.take() { + log::info!("Closing stream"); + // Note(unwrap): We guarantee that the socket is available above. + self.stack.close(socket).unwrap(); + } } - fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { - if self.socket.is_some() { - self.close(); - } - - // If the remote address is unspecified, don't open a new socket. - if remote.ip().is_unspecified() { + // Open new socket. + fn open(&mut self) -> Result<(), ()> { + // If there is already a socket of if remote address is unspecified, + // do not open a new socket. + if self.socket.is_some() || self.remote.ip().is_unspecified() { return Err(()); } - let mut socket = self.stack.socket().map_err(|_| ())?; - log::info!("Opening stream"); + + let mut socket = self.stack.socket().or(Err(()))?; + // Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be // bound. - self.stack.connect(&mut socket, remote).unwrap(); + self.stack.connect(&mut socket, self.remote).unwrap(); self.socket.replace(socket); @@ -304,48 +304,43 @@ impl DataStream { /// # Args /// * `remote` - The destination to send stream data to. pub fn set_remote(&mut self, remote: SocketAddr) { - // If the remote is identical to what we already have, do nothing. - if remote == self.remote { - return; + // Close socket to be reopened if the remote has changed. + if remote != self.remote { + self.close(); } - - // Open the new remote connection. - self.open(remote).ok(); self.remote = remote; } /// Process any data for transmission. pub fn process(&mut self) { - // If there's no socket available, try to connect to our remote. - if self.socket.is_none() { - // If we can't open the socket (e.g. we do not have an IP address yet), clear data from - // the queue. - if self.open(self.remote).is_err() { - while self.queue.ready() { - self.queue.dequeue(); + match self.socket.as_mut() { + None => { + // If there's no socket available, try to connect to our remote. + if self.open().is_ok() { + // If we just successfully opened the socket, flush old data from queue. + while self.queue.dequeue().is_some() {} } - return; } - } + Some(handle) => { + if self.queue.ready() { + // Dequeue data from the queue into a larger block structure. + let mut packet = + DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); + while self + .queue + .peek() + .and_then(|batch| packet.add_batch(batch).ok()) + .is_some() + { + // Dequeue the batch that we just added to the packet. + self.queue.dequeue(); + } - if self.queue.ready() { - // Dequeue data from the queue into a larger block structure. - let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); - while self.queue.ready() { - // Note(unwrap): We check above that the queue is ready before calling this. - if packet.add_batch(self.queue.peek().unwrap()).is_err() { - // If we cannot add another batch, break out of the loop and send the packet. - break; + // Transmit the data packet. + let size = packet.finish(); + self.stack.send(handle, &self.buffer[..size]).ok(); } - - // Remove the batch that we just added. - self.queue.dequeue(); } - - // Transmit the data block. - let mut handle = self.socket.as_mut().unwrap(); - let size = packet.finish(); - self.stack.send(&mut handle, &self.buffer[..size]).ok(); } } }