Implement TCP data transmission.

v0.7.x
whitequark 2016-12-26 14:24:17 +00:00
parent 0bf822c77e
commit 0e20ea9205
2 changed files with 88 additions and 45 deletions

View File

@ -46,7 +46,8 @@ The UDP protocol is supported over IPv4.
The TCP protocol is supported over IPv4.
* TCP header checksum is supported.
* TCP options are **not** supported.
* TCP options are **not** supported. In particular, the MSS of the remote endpoint
is hardcoded at the default value, 536.
* Reassembly of out-of-order segments is **not** supported.
Installation

View File

@ -90,6 +90,7 @@ impl<'a> SocketBuffer<'a> {
self.length -= size;
}
#[allow(dead_code)] // only used in tests
fn dequeue(&mut self, size: usize) -> &[u8] {
let (read_at, size) = self.clamp_reader(size);
self.read_at = (self.read_at + size) % self.storage.len();
@ -422,54 +423,75 @@ impl<'a> TcpSocket<'a> {
/// See [Socket::dispatch](enum.Socket.html#method.dispatch).
pub fn dispatch<F, R>(&mut self, emit: &mut F) -> Result<R, Error>
where F: FnMut(&IpRepr, &IpPayload) -> Result<R, Error> {
let mut repr = TcpRepr {
src_port: self.local_endpoint.port,
dst_port: self.remote_endpoint.port,
control: TcpControl::None,
seq_number: 0,
ack_number: None,
window_len: self.rx_buffer.window() as u16,
payload: &[]
};
// FIXME: process
match self.state {
State::Closed |
State::Listen => {
return Err(Error::Exhausted)
}
State::SynReceived => {
if !self.retransmit.check() { return Err(Error::Exhausted) }
repr.control = TcpControl::Syn;
repr.seq_number = self.local_seq_no;
repr.ack_number = Some(self.remote_seq_no);
net_trace!("tcp:{}:{}: SYN|ACK sent",
self.local_endpoint, self.remote_endpoint);
self.remote_last_ack = self.remote_seq_no;
}
State::Established => {
let ack_number = self.remote_seq_no + self.rx_buffer.len() as i32;
if self.remote_last_ack == ack_number { return Err(Error::Exhausted) }
repr.seq_number = self.local_seq_no;
repr.ack_number = Some(ack_number);
net_trace!("tcp:{}:{}: ACK sent",
self.local_endpoint, self.remote_endpoint);
self.remote_last_ack = ack_number;
}
_ => unreachable!()
}
let ip_repr = IpRepr::Unspecified {
src_addr: self.local_endpoint.addr,
dst_addr: self.remote_endpoint.addr,
protocol: IpProtocol::Tcp,
};
let mut repr = TcpRepr {
src_port: self.local_endpoint.port,
dst_port: self.remote_endpoint.port,
control: TcpControl::None,
seq_number: self.local_seq_no,
ack_number: None,
window_len: self.rx_buffer.window() as u16,
payload: &[]
};
let ack_number = self.remote_seq_no + self.rx_buffer.len() as i32;
match self.state {
State::Closed | State::Listen => return Err(Error::Exhausted),
State::SynReceived => {
if !self.retransmit.check() { return Err(Error::Exhausted) }
repr.control = TcpControl::Syn;
net_trace!("tcp:{}:{}: sending SYN|ACK",
self.local_endpoint, self.remote_endpoint);
self.remote_last_ack = self.remote_seq_no;
}
State::Established => {
if self.tx_buffer.len() > 0 && self.remote_win_len > 0 {
if !self.retransmit.check() { return Err(Error::Exhausted) }
// We can send something, so let's do that.
let mut size = self.remote_win_len;
// Clamp to MSS. Currently we only support the default MSS value.
if size > 536 { size = 536 }
// Extract data from the buffer. This may return less than what we want,
// in case it's not possible to extract a contiguous slice.
let data = self.tx_buffer.peek(size);
net_trace!("tcp:{}:{}: sending {} octets",
self.local_endpoint, self.remote_endpoint, data.len());
repr.payload = data;
} else if self.remote_last_ack != ack_number {
// We don't have anything to send, or can't because the remote end does not
// have any space to accept it, but we haven't yet acknowledged everything
// we have received. So, do it.
net_trace!("tcp:{}:{}: sending ACK",
self.local_endpoint, self.remote_endpoint);
} else {
// We don't have anything to send and we've already acknowledged everything.
return Err(Error::Exhausted)
}
}
_ => unreachable!()
}
match self.state {
// We don't have anything to acknowledge yet.
State::Closed | State::Listen | State::SynSent => (),
// Acknowledge all data we have received, since it is all in order.
_ => {
self.remote_last_ack = ack_number;
repr.ack_number = Some(ack_number);
}
}
emit(&ip_repr, &repr)
}
}
@ -749,11 +771,12 @@ mod test {
s.remote_endpoint = REMOTE_END;
s.local_seq_no = LOCAL_SEQ + 1;
s.remote_seq_no = REMOTE_SEQ + 1;
s.remote_win_len = 128;
s
}
#[test]
fn test_established_data() {
fn test_established_receive() {
let mut s = socket_established();
send!(s, [TcpRepr {
seq_number: REMOTE_SEQ + 1,
@ -770,6 +793,25 @@ mod test {
assert_eq!(s.rx_buffer.dequeue(6), &b"abcdef"[..]);
}
#[test]
fn test_established_send() {
let mut s = socket_established();
s.tx_buffer.enqueue_slice(b"abcdef");
recv!(s, [TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"abcdef"[..],
..RECV_TEMPL
}]);
assert_eq!(s.tx_buffer.len(), 6);
send!(s, [TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1 + 6),
..SEND_TEMPL
}]);
assert_eq!(s.tx_buffer.len(), 0);
}
#[test]
fn test_established_no_ack() {
let mut s = socket_established();