From 1e2603572a4c05a489e9b28c0f31346ecbb278cd Mon Sep 17 00:00:00 2001 From: whitequark Date: Wed, 5 Jul 2017 16:24:16 +0000 Subject: [PATCH] runtime: fix a bug causing sockets to get stuck in CLOSE-WAIT. --- artiq/firmware/runtime/mgmt.rs | 3 ++- artiq/firmware/runtime/sched.rs | 35 ++++++++++++++++++++++----------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/artiq/firmware/runtime/mgmt.rs b/artiq/firmware/runtime/mgmt.rs index f1eff23c5..2d60fddfc 100644 --- a/artiq/firmware/runtime/mgmt.rs +++ b/artiq/firmware/runtime/mgmt.rs @@ -83,7 +83,8 @@ pub fn thread(io: Io) { io.spawn(4096, move |io| { let mut stream = TcpStream::from_handle(&io, stream); match worker(&mut stream) { - Ok(()) => {}, + Ok(()) => (), + Err(ref err) if err.kind() == io::ErrorKind::UnexpectedEof => (), Err(err) => error!("aborted: {}", err) } }); diff --git a/artiq/firmware/runtime/sched.rs b/artiq/firmware/runtime/sched.rs index 409cf69a8..8de5f5de0 100644 --- a/artiq/firmware/runtime/sched.rs +++ b/artiq/firmware/runtime/sched.rs @@ -466,16 +466,22 @@ impl<'a> TcpStream<'a> { impl<'a> Read for TcpStream<'a> { fn read(&mut self, buf: &mut [u8]) -> Result { - // fast path + // Only borrow the underlying socket for the span of the next statement. let result = self.as_lower().recv_slice(buf); match result { + // Slow path: we need to block until buffer is non-empty. Ok(0) | Err(()) => { - // slow path - if !self.as_lower().may_recv() { return Ok(0) } - until!(self, TcpSocketLower, |s| s.can_recv())?; - Ok(self.as_lower().recv_slice(buf) - .expect("may_recv implies that data was available")) + until!(self, TcpSocketLower, |s| s.can_recv() || !s.may_recv())?; + let mut socket = self.as_lower(); + if socket.may_recv() { + Ok(socket.recv_slice(buf) + .expect("can_recv implies that data was available")) + } else { + // This socket will never receive data again. + Ok(0) + } } + // Fast path: we had data in buffer. Ok(length) => Ok(length) } } @@ -483,16 +489,21 @@ impl<'a> Read for TcpStream<'a> { impl<'a> Write for TcpStream<'a> { fn write(&mut self, buf: &[u8]) -> Result { - // fast path + // Only borrow the underlying socket for the span of the next statement. let result = self.as_lower().send_slice(buf); match result { + // Slow path: we need to block until buffer is non-full. Ok(0) | Err(()) => { - // slow path - if !self.as_lower().may_send() { return Ok(0) } - until!(self, TcpSocketLower, |s| s.can_send())?; - Ok(self.as_lower().send_slice(buf) - .expect("may_send implies that data was available")) + until!(self, TcpSocketLower, |s| s.can_send() || !s.may_send())?; + if self.as_lower().may_send() { + Ok(self.as_lower().send_slice(buf) + .expect("can_send implies that data was available")) + } else { + // This socket will never send data again. + Ok(0) + } } + // Fast path: we had space in buffer. Ok(length) => Ok(length) } }