runtime: fix a bug causing sockets to get stuck in CLOSE-WAIT.

This commit is contained in:
whitequark 2017-07-05 16:24:16 +00:00
parent 86c027e9c5
commit 1e2603572a
2 changed files with 25 additions and 13 deletions

View File

@ -83,7 +83,8 @@ pub fn thread(io: Io) {
io.spawn(4096, move |io| { io.spawn(4096, move |io| {
let mut stream = TcpStream::from_handle(&io, stream); let mut stream = TcpStream::from_handle(&io, stream);
match worker(&mut stream) { match worker(&mut stream) {
Ok(()) => {}, Ok(()) => (),
Err(ref err) if err.kind() == io::ErrorKind::UnexpectedEof => (),
Err(err) => error!("aborted: {}", err) Err(err) => error!("aborted: {}", err)
} }
}); });

View File

@ -466,16 +466,22 @@ impl<'a> TcpStream<'a> {
impl<'a> Read for TcpStream<'a> { impl<'a> Read for TcpStream<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> { fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// fast path // Only borrow the underlying socket for the span of the next statement.
let result = self.as_lower().recv_slice(buf); let result = self.as_lower().recv_slice(buf);
match result { match result {
// Slow path: we need to block until buffer is non-empty.
Ok(0) | Err(()) => { Ok(0) | Err(()) => {
// slow path until!(self, TcpSocketLower, |s| s.can_recv() || !s.may_recv())?;
if !self.as_lower().may_recv() { return Ok(0) } let mut socket = self.as_lower();
until!(self, TcpSocketLower, |s| s.can_recv())?; if socket.may_recv() {
Ok(self.as_lower().recv_slice(buf) Ok(socket.recv_slice(buf)
.expect("may_recv implies that data was available")) .expect("can_recv implies that data was available"))
} else {
// This socket will never receive data again.
Ok(0)
}
} }
// Fast path: we had data in buffer.
Ok(length) => Ok(length) Ok(length) => Ok(length)
} }
} }
@ -483,16 +489,21 @@ impl<'a> Read for TcpStream<'a> {
impl<'a> Write for TcpStream<'a> { impl<'a> Write for TcpStream<'a> {
fn write(&mut self, buf: &[u8]) -> Result<usize> { fn write(&mut self, buf: &[u8]) -> Result<usize> {
// fast path // Only borrow the underlying socket for the span of the next statement.
let result = self.as_lower().send_slice(buf); let result = self.as_lower().send_slice(buf);
match result { match result {
// Slow path: we need to block until buffer is non-full.
Ok(0) | Err(()) => { Ok(0) | Err(()) => {
// slow path until!(self, TcpSocketLower, |s| s.can_send() || !s.may_send())?;
if !self.as_lower().may_send() { return Ok(0) } if self.as_lower().may_send() {
until!(self, TcpSocketLower, |s| s.can_send())?; Ok(self.as_lower().send_slice(buf)
Ok(self.as_lower().send_slice(buf) .expect("can_send implies that data was available"))
.expect("may_send implies that data was available")) } else {
// This socket will never send data again.
Ok(0)
}
} }
// Fast path: we had space in buffer.
Ok(length) => Ok(length) Ok(length) => Ok(length)
} }
} }