From 671453938b43824f6f13d19ac4639a18bdf1a6e6 Mon Sep 17 00:00:00 2001 From: Michael Birtwell Date: Tue, 12 Apr 2022 12:08:01 +0100 Subject: [PATCH] Require explicitly closing TcpStreams Instead of automatically closing and draining the TcpStream in the Drop implementation instead expect the user to call TcpStream::close. Add close called to all users of TcpStream. Document the requirement to call close on TcpListener::accept, this seems to be the only way to get a new TcpStream at the moment. --- artiq/firmware/runtime/analyzer.rs | 2 + artiq/firmware/runtime/mgmt.rs | 1 + artiq/firmware/runtime/moninj.rs | 1 + artiq/firmware/runtime/sched.rs | 64 ++++++++++-------------------- artiq/firmware/runtime/session.rs | 1 + 5 files changed, 25 insertions(+), 44 deletions(-) diff --git a/artiq/firmware/runtime/analyzer.rs b/artiq/firmware/runtime/analyzer.rs index 2e9d74c14..5b090f732 100644 --- a/artiq/firmware/runtime/analyzer.rs +++ b/artiq/firmware/runtime/analyzer.rs @@ -79,5 +79,7 @@ pub fn thread(io: Io) { Ok(()) => (), Err(err) => error!("analyzer aborted: {}", err) } + + stream.close().expect("analyzer: close socket") } } diff --git a/artiq/firmware/runtime/mgmt.rs b/artiq/firmware/runtime/mgmt.rs index c100df05f..449ed91be 100644 --- a/artiq/firmware/runtime/mgmt.rs +++ b/artiq/firmware/runtime/mgmt.rs @@ -131,6 +131,7 @@ pub fn thread(io: Io) { Err(Error::Io(IoError::UnexpectedEnd)) => (), Err(err) => error!("aborted: {}", err) } + stream.close().expect("mgmt: close socket"); }); } } diff --git a/artiq/firmware/runtime/moninj.rs b/artiq/firmware/runtime/moninj.rs index 6feb7ec55..36d4bbb47 100644 --- a/artiq/firmware/runtime/moninj.rs +++ b/artiq/firmware/runtime/moninj.rs @@ -214,6 +214,7 @@ pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc {}, Err(err) => error!("moninj aborted: {}", err) } + stream.close().expect("moninj: close socket"); }); } } diff --git a/artiq/firmware/runtime/sched.rs b/artiq/firmware/runtime/sched.rs index fc3c7ac04..860ec5347 100644 --- a/artiq/firmware/runtime/sched.rs +++ b/artiq/firmware/runtime/sched.rs @@ -250,26 +250,14 @@ impl<'a> Io<'a> { }) } - pub fn inner_until bool>( - &self, - timeout: Option, - mut f: F - ) -> Result<(), Error> { + pub fn until bool>(&self, mut f: F) -> Result<(), Error> { let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) }; self.suspend(WaitRequest { - timeout, - event: Some(f), + timeout: None, + event: Some(f) }) } - pub fn until bool>(&self, f: F) -> Result<(), Error> { - self.inner_until(None, f) - } - - pub fn until_with_timeout bool>(&self, timeout: u64, f: F) -> Result<(), Error> { - self.inner_until(Some(timeout), f) - } - pub fn until_ok(&self, mut f: F) -> Result where F: FnMut() -> result::Result { @@ -325,15 +313,7 @@ macro_rules! until { let $var = network.get_socket::<$ty>(handle); $cond }) - }); - ($socket:expr, $ty:ty, timeout=$timeout:expr, |$var:ident| $cond:expr) => ({ - let (network, handle) = ($socket.io.network.clone(), $socket.handle); - $socket.io.until_with_timeout($timeout, move || { - let mut network = network.borrow_mut(); - let $var = network.get_socket::<$ty>(handle); - $cond - }) - }); + }) } type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>; @@ -397,6 +377,10 @@ impl<'a> TcpListener<'a> { .map_err(|err| err.into()) } + /// Accept a TCP connection + /// + /// When the returned TcpStream is dropped it is immediately forgotten about. In order to + /// ensure that pending data is sent and the far end is notified, `close` must be called. pub fn accept(&self) -> Result, Error> { // We're waiting until at least one half of the connection becomes open. // This handles the case where a remote socket immediately sends a FIN-- @@ -586,28 +570,20 @@ impl<'a> Write for TcpStream<'a> { impl<'a> Drop for TcpStream<'a> { fn drop(&mut self) { - self.with_lower(|s| s.close()); - let result = until!( - self, TcpSocketLower, timeout=clock::get_ms() + 1000, |s| !s.is_active() + // There's no point calling the lower close here unless we also defer the removal of the + // socket from smoltcp until it's had a chance to process the event + let (unsent_bytes, is_open) = self.with_lower( + |s| (s.send_queue(), s.is_open()) ); - let unsent_bytes = self.with_lower(|s| s.send_queue()); - match result{ - Ok(()) => { - if unsent_bytes != 0 { - // This is normal if we received a reset whilst sending - debug!("Dropping socket with {} bytes unsent", unsent_bytes) - } - } - Err(Error::TimedOut) => { - warn!( - "Timed out whilst waiting for socket to close during drop, with {} unsent bytes", - unsent_bytes - ); - } - Err(e) => { - error!("Unexpected error whilst waiting for socket to close during drop: {:?}", e) - } + if is_open { + warn!( + "Dropping open TcpStream in state {}, with {} unsent bytes", + self.with_lower(|s| s.state()), unsent_bytes + ) + } else if unsent_bytes != 0 { + debug!("Dropping socket with {} bytes unsent", unsent_bytes) } + self.io.network.borrow_mut().remove_socket(self.handle); } } diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index aec47afe1..bd1562269 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -650,6 +650,7 @@ pub fn thread(io: Io, aux_mutex: &Mutex, error!("session aborted: {}", err); } } + stream.close().expect("session: close socket"); }); }