forked from M-Labs/artiq
1
0
Fork 0

Ensure that pending data is sent when closing sockets

This is only necessary if close hasn't been called on the socket
but that's not always done. e.g. by the core analyzer server.
This commit is contained in:
Michael Birtwell 2022-03-25 15:25:24 +00:00 committed by Sebastien Bourdeauducq
parent 596b9a265c
commit 73082d116f
1 changed files with 45 additions and 4 deletions

View File

@ -250,14 +250,26 @@ impl<'a> Io<'a> {
}) })
} }
pub fn until<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Error> { pub fn inner_until<F: FnMut() -> bool>(
&self,
timeout: Option<u64>,
mut f: F
) -> Result<(), Error> {
let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) }; let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) };
self.suspend(WaitRequest { self.suspend(WaitRequest {
timeout: None, timeout,
event: Some(f) event: Some(f),
}) })
} }
pub fn until<F: FnMut() -> bool>(&self, f: F) -> Result<(), Error> {
self.inner_until(None, f)
}
pub fn until_with_timeout<F: FnMut() -> bool>(&self, timeout: u64, f: F) -> Result<(), Error> {
self.inner_until(Some(timeout), f)
}
pub fn until_ok<T, E, F>(&self, mut f: F) -> Result<T, Error> pub fn until_ok<T, E, F>(&self, mut f: F) -> Result<T, Error>
where F: FnMut() -> result::Result<T, E> where F: FnMut() -> result::Result<T, E>
{ {
@ -313,7 +325,15 @@ macro_rules! until {
let $var = network.get_socket::<$ty>(handle); let $var = network.get_socket::<$ty>(handle);
$cond $cond
}) })
}) });
($socket:expr, $ty:ty, timeout=$timeout:expr, |$var:ident| $cond:expr) => ({
let (network, handle) = ($socket.io.network.clone(), $socket.handle);
$socket.io.until_with_timeout($timeout, move || {
let mut network = network.borrow_mut();
let $var = network.get_socket::<$ty>(handle);
$cond
})
});
} }
type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>; type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>;
@ -567,6 +587,27 @@ impl<'a> Write for TcpStream<'a> {
impl<'a> Drop for TcpStream<'a> { impl<'a> Drop for TcpStream<'a> {
fn drop(&mut self) { fn drop(&mut self) {
self.with_lower(|s| s.close()); self.with_lower(|s| s.close());
let result = until!(
self, TcpSocketLower, timeout=clock::get_ms() + 1000, |s| !s.is_active()
);
let unsent_bytes = self.with_lower(|s| s.send_queue());
match result{
Ok(()) => {
if unsent_bytes != 0 {
// This is normal if we received a reset whilst sending
debug!("Dropping socket with {} bytes unsent", unsent_bytes)
}
}
Err(Error::TimedOut) => {
warn!(
"Timed out whilst waiting for socket to close during drop, with {} unsent bytes",
unsent_bytes
);
}
Err(e) => {
error!("Unexpected error whilst waiting for socket to close during drop: {:?}", e)
}
}
self.io.network.borrow_mut().remove_socket(self.handle); self.io.network.borrow_mut().remove_socket(self.handle);
} }
} }