forked from M-Labs/artiq
Require explicitly closing TcpStreams
Instead of automatically closing and draining the TcpStream in the Drop implementation instead expect the user to call TcpStream::close. Add close called to all users of TcpStream. Document the requirement to call close on TcpListener::accept, this seems to be the only way to get a new TcpStream at the moment.
This commit is contained in:
parent
1fe59d27dc
commit
671453938b
|
@ -79,5 +79,7 @@ pub fn thread(io: Io) {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(err) => error!("analyzer aborted: {}", err)
|
Err(err) => error!("analyzer aborted: {}", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream.close().expect("analyzer: close socket")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,6 +131,7 @@ pub fn thread(io: Io) {
|
||||||
Err(Error::Io(IoError::UnexpectedEnd)) => (),
|
Err(Error::Io(IoError::UnexpectedEnd)) => (),
|
||||||
Err(err) => error!("aborted: {}", err)
|
Err(err) => error!("aborted: {}", err)
|
||||||
}
|
}
|
||||||
|
stream.close().expect("mgmt: close socket");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,6 +214,7 @@ pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc<RefCell<drtio_routi
|
||||||
Ok(()) => {},
|
Ok(()) => {},
|
||||||
Err(err) => error!("moninj aborted: {}", err)
|
Err(err) => error!("moninj aborted: {}", err)
|
||||||
}
|
}
|
||||||
|
stream.close().expect("moninj: close socket");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,26 +250,14 @@ impl<'a> Io<'a> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn inner_until<F: FnMut() -> bool>(
|
pub fn until<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Error> {
|
||||||
&self,
|
|
||||||
timeout: Option<u64>,
|
|
||||||
mut f: F
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) };
|
let f = unsafe { mem::transmute::<&mut dyn FnMut() -> bool, *mut dyn FnMut() -> bool>(&mut f) };
|
||||||
self.suspend(WaitRequest {
|
self.suspend(WaitRequest {
|
||||||
timeout,
|
timeout: None,
|
||||||
event: Some(f),
|
event: Some(f)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn until<F: FnMut() -> bool>(&self, f: F) -> Result<(), Error> {
|
|
||||||
self.inner_until(None, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn until_with_timeout<F: FnMut() -> bool>(&self, timeout: u64, f: F) -> Result<(), Error> {
|
|
||||||
self.inner_until(Some(timeout), f)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn until_ok<T, E, F>(&self, mut f: F) -> Result<T, Error>
|
pub fn until_ok<T, E, F>(&self, mut f: F) -> Result<T, Error>
|
||||||
where F: FnMut() -> result::Result<T, E>
|
where F: FnMut() -> result::Result<T, E>
|
||||||
{
|
{
|
||||||
|
@ -325,15 +313,7 @@ macro_rules! until {
|
||||||
let $var = network.get_socket::<$ty>(handle);
|
let $var = network.get_socket::<$ty>(handle);
|
||||||
$cond
|
$cond
|
||||||
})
|
})
|
||||||
});
|
|
||||||
($socket:expr, $ty:ty, timeout=$timeout:expr, |$var:ident| $cond:expr) => ({
|
|
||||||
let (network, handle) = ($socket.io.network.clone(), $socket.handle);
|
|
||||||
$socket.io.until_with_timeout($timeout, move || {
|
|
||||||
let mut network = network.borrow_mut();
|
|
||||||
let $var = network.get_socket::<$ty>(handle);
|
|
||||||
$cond
|
|
||||||
})
|
})
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>;
|
type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>;
|
||||||
|
@ -397,6 +377,10 @@ impl<'a> TcpListener<'a> {
|
||||||
.map_err(|err| err.into())
|
.map_err(|err| err.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Accept a TCP connection
|
||||||
|
///
|
||||||
|
/// When the returned TcpStream is dropped it is immediately forgotten about. In order to
|
||||||
|
/// ensure that pending data is sent and the far end is notified, `close` must be called.
|
||||||
pub fn accept(&self) -> Result<TcpStream<'a>, Error> {
|
pub fn accept(&self) -> Result<TcpStream<'a>, Error> {
|
||||||
// We're waiting until at least one half of the connection becomes open.
|
// We're waiting until at least one half of the connection becomes open.
|
||||||
// This handles the case where a remote socket immediately sends a FIN--
|
// This handles the case where a remote socket immediately sends a FIN--
|
||||||
|
@ -586,28 +570,20 @@ impl<'a> Write for TcpStream<'a> {
|
||||||
|
|
||||||
impl<'a> Drop for TcpStream<'a> {
|
impl<'a> Drop for TcpStream<'a> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.with_lower(|s| s.close());
|
// There's no point calling the lower close here unless we also defer the removal of the
|
||||||
let result = until!(
|
// socket from smoltcp until it's had a chance to process the event
|
||||||
self, TcpSocketLower, timeout=clock::get_ms() + 1000, |s| !s.is_active()
|
let (unsent_bytes, is_open) = self.with_lower(
|
||||||
|
|s| (s.send_queue(), s.is_open())
|
||||||
);
|
);
|
||||||
let unsent_bytes = self.with_lower(|s| s.send_queue());
|
if is_open {
|
||||||
match result{
|
warn!(
|
||||||
Ok(()) => {
|
"Dropping open TcpStream in state {}, with {} unsent bytes",
|
||||||
if unsent_bytes != 0 {
|
self.with_lower(|s| s.state()), unsent_bytes
|
||||||
// This is normal if we received a reset whilst sending
|
)
|
||||||
|
} else if unsent_bytes != 0 {
|
||||||
debug!("Dropping socket with {} bytes unsent", unsent_bytes)
|
debug!("Dropping socket with {} bytes unsent", unsent_bytes)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Err(Error::TimedOut) => {
|
|
||||||
warn!(
|
|
||||||
"Timed out whilst waiting for socket to close during drop, with {} unsent bytes",
|
|
||||||
unsent_bytes
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Unexpected error whilst waiting for socket to close during drop: {:?}", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.io.network.borrow_mut().remove_socket(self.handle);
|
self.io.network.borrow_mut().remove_socket(self.handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -650,6 +650,7 @@ pub fn thread(io: Io, aux_mutex: &Mutex,
|
||||||
error!("session aborted: {}", err);
|
error!("session aborted: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
stream.close().expect("session: close socket");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue