diff --git a/artiq/firmware/runtime/analyzer.rs b/artiq/firmware/runtime/analyzer.rs index 5b090f732..2e9d74c14 100644 --- a/artiq/firmware/runtime/analyzer.rs +++ b/artiq/firmware/runtime/analyzer.rs @@ -79,7 +79,5 @@ pub fn thread(io: Io) { Ok(()) => (), Err(err) => error!("analyzer aborted: {}", err) } - - stream.close().expect("analyzer: close socket") } } diff --git a/artiq/firmware/runtime/mgmt.rs b/artiq/firmware/runtime/mgmt.rs index 449ed91be..c100df05f 100644 --- a/artiq/firmware/runtime/mgmt.rs +++ b/artiq/firmware/runtime/mgmt.rs @@ -131,7 +131,6 @@ pub fn thread(io: Io) { Err(Error::Io(IoError::UnexpectedEnd)) => (), Err(err) => error!("aborted: {}", err) } - stream.close().expect("mgmt: close socket"); }); } } diff --git a/artiq/firmware/runtime/moninj.rs b/artiq/firmware/runtime/moninj.rs index 36d4bbb47..6feb7ec55 100644 --- a/artiq/firmware/runtime/moninj.rs +++ b/artiq/firmware/runtime/moninj.rs @@ -214,7 +214,6 @@ pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc {}, Err(err) => error!("moninj aborted: {}", err) } - stream.close().expect("moninj: close socket"); }); } } diff --git a/artiq/firmware/runtime/sched.rs b/artiq/firmware/runtime/sched.rs index 860ec5347..fc3c7ac04 100644 --- a/artiq/firmware/runtime/sched.rs +++ b/artiq/firmware/runtime/sched.rs @@ -250,14 +250,26 @@ impl<'a> Io<'a> { }) } - pub fn until bool>(&self, mut f: F) -> Result<(), Error> { + pub fn inner_until bool>( + &self, + timeout: Option, + mut f: F + ) -> Result<(), Error> { let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) }; self.suspend(WaitRequest { - timeout: None, - event: Some(f) + timeout, + event: Some(f), }) } + pub fn until bool>(&self, f: F) -> Result<(), Error> { + self.inner_until(None, f) + } + + pub fn until_with_timeout bool>(&self, timeout: u64, f: F) -> Result<(), Error> { + self.inner_until(Some(timeout), f) + } + pub fn until_ok(&self, mut f: F) -> Result where F: FnMut() -> result::Result { @@ -313,7 +325,15 @@ macro_rules! until { let $var = network.get_socket::<$ty>(handle); $cond }) - }) + }); + ($socket:expr, $ty:ty, timeout=$timeout:expr, |$var:ident| $cond:expr) => ({ + let (network, handle) = ($socket.io.network.clone(), $socket.handle); + $socket.io.until_with_timeout($timeout, move || { + let mut network = network.borrow_mut(); + let $var = network.get_socket::<$ty>(handle); + $cond + }) + }); } type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>; @@ -377,10 +397,6 @@ impl<'a> TcpListener<'a> { .map_err(|err| err.into()) } - /// Accept a TCP connection - /// - /// When the returned TcpStream is dropped it is immediately forgotten about. In order to - /// ensure that pending data is sent and the far end is notified, `close` must be called. pub fn accept(&self) -> Result, Error> { // We're waiting until at least one half of the connection becomes open. // This handles the case where a remote socket immediately sends a FIN-- @@ -570,20 +586,28 @@ impl<'a> Write for TcpStream<'a> { impl<'a> Drop for TcpStream<'a> { fn drop(&mut self) { - // There's no point calling the lower close here unless we also defer the removal of the - // socket from smoltcp until it's had a chance to process the event - let (unsent_bytes, is_open) = self.with_lower( - |s| (s.send_queue(), s.is_open()) + self.with_lower(|s| s.close()); + let result = until!( + self, TcpSocketLower, timeout=clock::get_ms() + 1000, |s| !s.is_active() ); - if is_open { - warn!( - "Dropping open TcpStream in state {}, with {} unsent bytes", - self.with_lower(|s| s.state()), unsent_bytes - ) - } else if unsent_bytes != 0 { - debug!("Dropping socket with {} bytes unsent", unsent_bytes) + let unsent_bytes = self.with_lower(|s| s.send_queue()); + match result{ + Ok(()) => { + if unsent_bytes != 0 { + // This is normal if we received a reset whilst sending + debug!("Dropping socket with {} bytes unsent", unsent_bytes) + } + } + Err(Error::TimedOut) => { + warn!( + "Timed out whilst waiting for socket to close during drop, with {} unsent bytes", + unsent_bytes + ); + } + Err(e) => { + error!("Unexpected error whilst waiting for socket to close during drop: {:?}", e) + } } - self.io.network.borrow_mut().remove_socket(self.handle); } } diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index c6d745545..b7bb39f4e 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -650,7 +650,6 @@ pub fn thread(io: Io, aux_mutex: &Mutex, error!("session aborted: {}", err); } } - stream.close().expect("session: close socket"); }); }