Compare commits

...

1 Commits

Author SHA1 Message Date
Simon Renblad 83dc9ef609 add tcp listener 2024-09-05 14:24:03 +08:00
1 changed files with 82 additions and 0 deletions

View File

@ -19,6 +19,7 @@ use smoltcp::{
use crate::task; use crate::task;
use super::Sockets; use super::Sockets;
/// References a smoltcp TcpSocket /// References a smoltcp TcpSocket
pub struct TcpStream { pub struct TcpStream {
handle: SocketHandle, handle: SocketHandle,
@ -281,6 +282,87 @@ impl Drop for TcpStream {
} }
} }
// copied from artiq/runtime/sched.rs with modifications for async polling
pub struct TcpListener {
handle: SocketHandle,
buffer_size: usize,
endpoint: IpEndpoint,
}
impl TcpListener {
fn new_socket_handle(buffer_size: usize) -> SocketHandle {
fn uninit_vec<T>(size: usize) -> Vec<T> {
let mut result = Vec::with_capacity(size);
unsafe {
result.set_len(size);
}
result
}
let rx_buffer = TcpSocketBuffer::new(uninit_vec(rx_bufsize));
let tx_buffer = TcpSocketBuffer::new(uninit_vec(tx_bufsize));
let socket = TcpSocket::new(rx_buffer, tx_buffer);
let handle = Sockets::instance().sockets.borrow_mut()
.add(socket);
handle
}
pub fn new(buffer_size: usize) -> TcpListener {
TcpListener {
handle: Self::new_socket_handle(buffer_size),
buffer_size,
endpoint: IpEndpoint::default(),
}
}
fn with_lower<F, R>(&self, f: F) -> R
where
F: FnOnce(SocketRef<TcpSocket>) -> R,
{
let mut sockets = Sockets::instance().sockets.borrow_mut();
let mut socket_ref = sockets.get::<TcpSocket>(self.handle);
f(socket_ref)
}
pub fn can_accept(&self) -> bool {
self.with_lower(|s| s.is_active())
}
pub fn listen<T: Into<IpEndpoint>>(&self, endpoint: T) -> Result<(), Error> {
let endpoint = endpoint.into();
self.with_lower(|s| s.listen(endpoint))
.map(|| {
self.endpoint.set(endpoint)
()
})
.map_err(|err| err.into())
}
pub async fn accept(&self) -> Result<TcpStream, Error> {
let handle = self.handle.get();
let stream = TcpStream { handle };
poll_stream!(&stream, (), |socket| {
if socket.state() != TcpState::Listen {
Poll::Ready(())
} else {
Poll::Pending
}
}).await;
self.handle.set(Self::new_lower(self.buffer_size.get()));
match self.listen(self.endpoint.get()) {
Ok(()) => (),
_ => unreachable!()
}
Ok(stream)
}
pub fn close(&self) {
self.with_lower(|mut s| s.close())
}
}
fn socket_is_handhshaking(socket: &SocketRef<TcpSocket>) -> bool { fn socket_is_handhshaking(socket: &SocketRef<TcpSocket>) -> bool {
match socket.state() { match socket.state() {
TcpState::SynSent | TcpState::SynReceived => TcpState::SynSent | TcpState::SynReceived =>