Revert "Require explicitly closing TcpStreams"

This reverts commit 671453938b.
This commit is contained in:
Sebastien Bourdeauducq 2022-07-08 17:56:12 +08:00
parent f1b2a7041a
commit 542a5f934f
5 changed files with 44 additions and 25 deletions

View File

@ -79,7 +79,5 @@ pub fn thread(io: Io) {
Ok(()) => (), Ok(()) => (),
Err(err) => error!("analyzer aborted: {}", err) Err(err) => error!("analyzer aborted: {}", err)
} }
stream.close().expect("analyzer: close socket")
} }
} }

View File

@ -131,7 +131,6 @@ pub fn thread(io: Io) {
Err(Error::Io(IoError::UnexpectedEnd)) => (), Err(Error::Io(IoError::UnexpectedEnd)) => (),
Err(err) => error!("aborted: {}", err) Err(err) => error!("aborted: {}", err)
} }
stream.close().expect("mgmt: close socket");
}); });
} }
} }

View File

@ -214,7 +214,6 @@ pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc<RefCell<drtio_routi
Ok(()) => {}, Ok(()) => {},
Err(err) => error!("moninj aborted: {}", err) Err(err) => error!("moninj aborted: {}", err)
} }
stream.close().expect("moninj: close socket");
}); });
} }
} }

View File

@ -250,14 +250,26 @@ impl<'a> Io<'a> {
}) })
} }
pub fn until<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Error> { pub fn inner_until<F: FnMut() -> bool>(
&self,
timeout: Option<u64>,
mut f: F
) -> Result<(), Error> {
let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) }; let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) };
self.suspend(WaitRequest { self.suspend(WaitRequest {
timeout: None, timeout,
event: Some(f) event: Some(f),
}) })
} }
pub fn until<F: FnMut() -> bool>(&self, f: F) -> Result<(), Error> {
self.inner_until(None, f)
}
pub fn until_with_timeout<F: FnMut() -> bool>(&self, timeout: u64, f: F) -> Result<(), Error> {
self.inner_until(Some(timeout), f)
}
pub fn until_ok<T, E, F>(&self, mut f: F) -> Result<T, Error> pub fn until_ok<T, E, F>(&self, mut f: F) -> Result<T, Error>
where F: FnMut() -> result::Result<T, E> where F: FnMut() -> result::Result<T, E>
{ {
@ -313,7 +325,15 @@ macro_rules! until {
let $var = network.get_socket::<$ty>(handle); let $var = network.get_socket::<$ty>(handle);
$cond $cond
}) })
});
($socket:expr, $ty:ty, timeout=$timeout:expr, |$var:ident| $cond:expr) => ({
let (network, handle) = ($socket.io.network.clone(), $socket.handle);
$socket.io.until_with_timeout($timeout, move || {
let mut network = network.borrow_mut();
let $var = network.get_socket::<$ty>(handle);
$cond
}) })
});
} }
type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>; type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>;
@ -377,10 +397,6 @@ impl<'a> TcpListener<'a> {
.map_err(|err| err.into()) .map_err(|err| err.into())
} }
/// Accept a TCP connection
///
/// When the returned TcpStream is dropped it is immediately forgotten about. In order to
/// ensure that pending data is sent and the far end is notified, `close` must be called.
pub fn accept(&self) -> Result<TcpStream<'a>, Error> { pub fn accept(&self) -> Result<TcpStream<'a>, Error> {
// We're waiting until at least one half of the connection becomes open. // We're waiting until at least one half of the connection becomes open.
// This handles the case where a remote socket immediately sends a FIN-- // This handles the case where a remote socket immediately sends a FIN--
@ -570,20 +586,28 @@ impl<'a> Write for TcpStream<'a> {
impl<'a> Drop for TcpStream<'a> { impl<'a> Drop for TcpStream<'a> {
fn drop(&mut self) { fn drop(&mut self) {
// There's no point calling the lower close here unless we also defer the removal of the self.with_lower(|s| s.close());
// socket from smoltcp until it's had a chance to process the event let result = until!(
let (unsent_bytes, is_open) = self.with_lower( self, TcpSocketLower, timeout=clock::get_ms() + 1000, |s| !s.is_active()
|s| (s.send_queue(), s.is_open())
); );
if is_open { let unsent_bytes = self.with_lower(|s| s.send_queue());
warn!( match result{
"Dropping open TcpStream in state {}, with {} unsent bytes", Ok(()) => {
self.with_lower(|s| s.state()), unsent_bytes if unsent_bytes != 0 {
) // This is normal if we received a reset whilst sending
} else if unsent_bytes != 0 {
debug!("Dropping socket with {} bytes unsent", unsent_bytes) debug!("Dropping socket with {} bytes unsent", unsent_bytes)
} }
}
Err(Error::TimedOut) => {
warn!(
"Timed out whilst waiting for socket to close during drop, with {} unsent bytes",
unsent_bytes
);
}
Err(e) => {
error!("Unexpected error whilst waiting for socket to close during drop: {:?}", e)
}
}
self.io.network.borrow_mut().remove_socket(self.handle); self.io.network.borrow_mut().remove_socket(self.handle);
} }
} }

View File

@ -650,7 +650,6 @@ pub fn thread(io: Io, aux_mutex: &Mutex,
error!("session aborted: {}", err); error!("session aborted: {}", err);
} }
} }
stream.close().expect("session: close socket");
}); });
} }