From 73082d116f639e5985451d157a5f23e72531f2b4 Mon Sep 17 00:00:00 2001 From: Michael Birtwell Date: Fri, 25 Mar 2022 15:25:24 +0000 Subject: [PATCH] Ensure that pending data is sent when closing sockets This is only necessary if close hasn't been called on the socket but that's not always done. e.g. by the core analyzer server. --- artiq/firmware/runtime/sched.rs | 49 ++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/artiq/firmware/runtime/sched.rs b/artiq/firmware/runtime/sched.rs index a5903c2d1..fc3c7ac04 100644 --- a/artiq/firmware/runtime/sched.rs +++ b/artiq/firmware/runtime/sched.rs @@ -250,14 +250,26 @@ impl<'a> Io<'a> { }) } - pub fn until bool>(&self, mut f: F) -> Result<(), Error> { + pub fn inner_until bool>( + &self, + timeout: Option, + mut f: F + ) -> Result<(), Error> { let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) }; self.suspend(WaitRequest { - timeout: None, - event: Some(f) + timeout, + event: Some(f), }) } + pub fn until bool>(&self, f: F) -> Result<(), Error> { + self.inner_until(None, f) + } + + pub fn until_with_timeout bool>(&self, timeout: u64, f: F) -> Result<(), Error> { + self.inner_until(Some(timeout), f) + } + pub fn until_ok(&self, mut f: F) -> Result where F: FnMut() -> result::Result { @@ -313,7 +325,15 @@ macro_rules! until { let $var = network.get_socket::<$ty>(handle); $cond }) - }) + }); + ($socket:expr, $ty:ty, timeout=$timeout:expr, |$var:ident| $cond:expr) => ({ + let (network, handle) = ($socket.io.network.clone(), $socket.handle); + $socket.io.until_with_timeout($timeout, move || { + let mut network = network.borrow_mut(); + let $var = network.get_socket::<$ty>(handle); + $cond + }) + }); } type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>; @@ -567,6 +587,27 @@ impl<'a> Write for TcpStream<'a> { impl<'a> Drop for TcpStream<'a> { fn drop(&mut self) { self.with_lower(|s| s.close()); + let result = until!( + self, TcpSocketLower, timeout=clock::get_ms() + 1000, |s| !s.is_active() + ); + let unsent_bytes = self.with_lower(|s| s.send_queue()); + match result{ + Ok(()) => { + if unsent_bytes != 0 { + // This is normal if we received a reset whilst sending + debug!("Dropping socket with {} bytes unsent", unsent_bytes) + } + } + Err(Error::TimedOut) => { + warn!( + "Timed out whilst waiting for socket to close during drop, with {} unsent bytes", + unsent_bytes + ); + } + Err(e) => { + error!("Unexpected error whilst waiting for socket to close during drop: {:?}", e) + } + } self.io.network.borrow_mut().remove_socket(self.handle); } }