Improve the user-facing TCP socket API.

This commit is contained in:
whitequark 2016-12-27 17:49:40 +00:00
parent 09040f3e91
commit a8fc4fd832
4 changed files with 150 additions and 89 deletions

View File

@ -111,7 +111,7 @@ impl<'a, 'b: 'a,
&mut self.sockets
}
/// Receive and process a packet, if available.
/// Receive and process a packet, if available, and then transmit a packet, if necessary.
pub fn poll(&mut self) -> Result<(), Error> {
enum Response<'a> {
Nop,
@ -214,7 +214,7 @@ impl<'a, 'b: 'a,
let mut handled = false;
for socket in self.sockets.borrow_mut() {
let ip_repr = IpRepr::Ipv4(ipv4_repr);
match socket.collect(&ip_repr, ipv4_packet.payload()) {
match socket.process(&ip_repr, ipv4_packet.payload()) {
Ok(()) => {
// The packet was valid and handled by socket.
handled = true;
@ -355,7 +355,7 @@ impl<'a, 'b: 'a,
}
}
pub fn emit(&mut self) -> Result<bool, Error> {
fn emit(&mut self) -> Result<bool, Error> {
// Borrow checker is being overly careful around closures, so we have
// to hack around that.
let src_hardware_addr = self.hardware_addr;

View File

@ -30,10 +30,10 @@ pub use self::tcp::TcpSocket;
/// To downcast a `Socket` value down to a concrete socket, use
/// the [AsSocket](trait.AsSocket.html) trait, and call e.g. `socket.as_socket::<UdpSocket<_>>()`.
///
/// The `collect` and `dispatch` functions are fundamentally asymmetric and thus differ in
/// their use of the [trait PacketRepr](trait.PacketRepr.html). When `collect` is called,
/// The `process` and `dispatch` functions are fundamentally asymmetric and thus differ in
/// their use of the [trait PacketRepr](trait.PacketRepr.html). When `process` is called,
/// the packet length is already known and no allocation is required; on the other hand,
/// `collect` would have to downcast a `&PacketRepr` to e.g. an `&UdpRepr` through `Any`,
/// `process` would have to downcast a `&PacketRepr` to e.g. an `&UdpRepr` through `Any`,
/// which is rather inelegant. Conversely, when `dispatch` is called, the packet length is
/// not yet known and the packet storage has to be allocated; but the `&PacketRepr` is sufficient
/// since the lower layers treat the packet as an opaque octet sequence.
@ -52,12 +52,12 @@ impl<'a, 'b> Socket<'a, 'b> {
/// is returned.
///
/// This function is used internally by the networking stack.
pub fn collect(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> {
pub fn process(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> {
match self {
&mut Socket::Udp(ref mut socket) =>
socket.collect(ip_repr, payload),
socket.process(ip_repr, payload),
&mut Socket::Tcp(ref mut socket) =>
socket.collect(ip_repr, payload),
socket.process(ip_repr, payload),
&mut Socket::__Nonexhaustive => unreachable!()
}
}

View File

@ -106,6 +106,8 @@ impl<'a> Into<SocketBuffer<'a>> for Managed<'a, [u8]> {
}
}
/// The state of a TCP socket, according to [RFC 793][rfc793].
/// [rfc793]: https://tools.ietf.org/html/rfc793
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum State {
Closed,
@ -160,7 +162,12 @@ impl Retransmit {
}
}
/// A Transmission Control Protocol data stream.
/// A Transmission Control Protocol socket.
///
/// A TCP socket may passively listen for connections or actively connect to another endpoint.
/// Note that, for listening sockets, there is no "backlog"; to be able to simultaneously
/// accept several connections, as many sockets must be allocated, or any new connection
/// attempts will be reset.
#[derive(Debug)]
pub struct TcpSocket<'a> {
/// State of the socket.
@ -222,12 +229,6 @@ impl<'a> TcpSocket<'a> {
})
}
/// Return the connection state.
#[inline(always)]
pub fn state(&self) -> State {
self.state
}
/// Return the local endpoint.
#[inline(always)]
pub fn local_endpoint(&self) -> IpEndpoint {
@ -240,6 +241,134 @@ impl<'a> TcpSocket<'a> {
self.remote_endpoint
}
/// Start listening on the given endpoint.
///
/// This function returns an error if the socket was open; see [is_open](#method.is_open).
pub fn listen(&mut self, endpoint: IpEndpoint) -> Result<(), ()> {
if self.is_open() { return Err(()) }
self.listen_address = endpoint.addr;
self.local_endpoint = endpoint;
self.remote_endpoint = IpEndpoint::default();
self.set_state(State::Listen);
Ok(())
}
/// Return whether the connection is open.
///
/// This function returns true if the socket will process incoming or dispatch outgoing
/// packets. Note that this does not mean that it is possible to send or receive data through
/// the socket; for that, use [can_send](#method.can_send) or [can_recv](#method.can_recv).
pub fn is_open(&self) -> bool {
match self.state {
State::Closed => false,
State::TimeWait => false,
_ => true
}
}
/// Return whether the transmit half of the full-duplex connection is open.
///
/// This function returns true if it's possible to send data and have it arrive
/// to the remote endpoint. However, it does not make any guarantees about the state
/// of the transmit buffer, and even if it returns true, [send](#method.send) may
/// not be able to enqueue any octets.
pub fn can_send(&self) -> bool {
match self.state {
State::Established => true,
// In CLOSE_WAIT, the remote endpoint has closed our receive half of the connection
// but we still can transmit indefinitely.
State::CloseWait => true,
_ => false
}
}
/// Return whether the receive half of the full-duplex connection is open.
///
/// This function returns true if it's possible to receive data from the remote endpoint.
/// It will return true while there is data in the receive buffer, and if there isn't,
/// as long as the remote endpoint has not closed the connection.
pub fn can_recv(&self) -> bool {
match self.state {
State::Established => true,
// In FIN_WAIT_1/2, we have closed our transmit half of the connection but
// we still can receive indefinitely.
State::FinWait1 | State::FinWait2 => true,
_ => false
}
}
/// Enqueue a sequence of octets to be sent, and return a pointer to it.
///
/// This function may return a slice smaller than the requested size in case
/// there is not enough contiguous free space in the transmit buffer, down to
/// an empty slice.
///
/// This function returns an error if the transmit half of the connection is not open;
/// see [can_send](#method.can_send).
pub fn send(&mut self, size: usize) -> Result<&mut [u8], ()> {
if !self.can_send() { return Err(()) }
let buffer = self.tx_buffer.enqueue(size);
if buffer.len() > 0 {
net_trace!("tcp:{}:{}: tx buffer: enqueueing {} octets",
self.local_endpoint, self.remote_endpoint, buffer.len());
}
Ok(buffer)
}
/// Enqueue a sequence of octets to be sent, and fill it from a slice.
///
/// This function returns the amount of bytes actually enqueued, which is limited
/// by the amount of free space in the transmit buffer; down to zero.
///
/// See also [send](#method.send).
pub fn send_slice(&mut self, data: &[u8]) -> Result<usize, ()> {
let buffer = try!(self.send(data.len()));
let data = &data[..buffer.len()];
buffer.copy_from_slice(data);
Ok(buffer.len())
}
/// Dequeue a sequence of received octets, and return a pointer to it.
///
/// This function may return a slice smaller than the requested size in case
/// there are not enough octets queued in the receive buffer, down to
/// an empty slice.
pub fn recv(&mut self, size: usize) -> Result<&[u8], ()> {
// We may have received some data inside the initial SYN ("TCP Fast Open"),
// but until the connection is fully open we refuse to dequeue any data.
if !self.can_recv() { return Err(()) }
let buffer = self.rx_buffer.dequeue(size);
self.remote_seq_no += buffer.len() as i32;
if buffer.len() > 0 {
net_trace!("tcp:{}:{}: rx buffer: dequeueing {} octets",
self.local_endpoint, self.remote_endpoint, buffer.len());
}
Ok(buffer)
}
/// Dequeue a sequence of received octets, and fill a slice from it.
///
/// This function returns the amount of bytes actually dequeued, which is limited
/// by the amount of free space in the transmit buffer; down to zero.
///
/// See also [recv](#method.recv).
pub fn recv_slice(&mut self, data: &mut [u8]) -> Result<usize, ()> {
let buffer = try!(self.recv(data.len()));
let data = &mut data[..buffer.len()];
data.copy_from_slice(buffer);
Ok(buffer.len())
}
/// Return the connection state.
///
/// This function is provided for debugging.
pub fn state(&self) -> State {
self.state
}
fn set_state(&mut self, state: State) {
if self.state != state {
if self.remote_endpoint.addr.is_unspecified() {
@ -253,76 +382,8 @@ impl<'a> TcpSocket<'a> {
self.state = state
}
/// Start listening on the given endpoint.
///
/// # Panics
/// This function will panic if the socket is not in the CLOSED state.
pub fn listen(&mut self, endpoint: IpEndpoint) {
assert!(self.state == State::Closed);
self.listen_address = endpoint.addr;
self.local_endpoint = endpoint;
self.remote_endpoint = IpEndpoint::default();
self.set_state(State::Listen);
}
/// Enqueue a sequence of octets to be sent, and return a pointer to it.
///
/// This function may return a slice smaller than the requested size in case
/// there is not enough contiguous free space in the transmit buffer, down to
/// an empty slice.
pub fn send(&mut self, size: usize) -> &mut [u8] {
let buffer = self.tx_buffer.enqueue(size);
if buffer.len() > 0 {
net_trace!("tcp:{}:{}: tx buffer: enqueueing {} octets",
self.local_endpoint, self.remote_endpoint, buffer.len());
}
buffer
}
/// Enqueue a sequence of octets to be sent, and fill it from a slice.
///
/// This function returns the amount of bytes actually enqueued, which is limited
/// by the amount of free space in the transmit buffer; down to zero.
///
/// See also [send](#method.send).
pub fn send_slice(&mut self, data: &[u8]) -> usize {
let buffer = self.send(data.len());
let data = &data[..buffer.len()];
buffer.copy_from_slice(data);
buffer.len()
}
/// Dequeue a sequence of received octets, and return a pointer to it.
///
/// This function may return a slice smaller than the requested size in case
/// there are not enough octets queued in the receive buffer, down to
/// an empty slice.
pub fn recv(&mut self, size: usize) -> &[u8] {
let buffer = self.rx_buffer.dequeue(size);
self.remote_seq_no += buffer.len() as i32;
if buffer.len() > 0 {
net_trace!("tcp:{}:{}: rx buffer: dequeueing {} octets",
self.local_endpoint, self.remote_endpoint, buffer.len());
}
buffer
}
/// Dequeue a sequence of received octets, and fill a slice from it.
///
/// This function returns the amount of bytes actually dequeued, which is limited
/// by the amount of free space in the transmit buffer; down to zero.
///
/// See also [recv](#method.recv).
pub fn recv_slice(&mut self, data: &mut [u8]) -> usize {
let buffer = self.recv(data.len());
let data = &mut data[..buffer.len()];
data.copy_from_slice(buffer);
buffer.len()
}
/// See [Socket::collect](enum.Socket.html#method.collect).
pub fn collect(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> {
/// See [Socket::process](enum.Socket.html#method.process).
pub fn process(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> {
if ip_repr.protocol() != IpProtocol::Tcp { return Err(Error::Rejected) }
let packet = try!(TcpPacket::new(payload));
@ -663,7 +724,7 @@ mod test {
dst_addr: LOCAL_IP,
protocol: IpProtocol::Tcp
};
socket.collect(&ip_repr, &packet.into_inner()[..])
socket.process(&ip_repr, &packet.into_inner()[..])
}
fn recv<F>(socket: &mut TcpSocket, mut f: F)

View File

@ -167,8 +167,8 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
Ok((endpoint, buffer.len()))
}
/// See [Socket::collect](enum.Socket.html#method.collect).
pub fn collect(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> {
/// See [Socket::process](enum.Socket.html#method.process).
pub fn process(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> {
if ip_repr.protocol() != IpProtocol::Udp { return Err(Error::Rejected) }
let packet = try!(UdpPacket::new(payload));