Compare commits
No commits in common. "bcedd02ad95f6afd241e0759f5c3a33a1b26d32d" and "be35be8d381449149040938f24c124861da9860e" have entirely different histories.
bcedd02ad9
...
be35be8d38
|
@ -241,27 +241,17 @@ pub fn main_core0() {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut counter = alloc::rc::Rc::new(core::cell::RefCell::new(0));
|
TcpStream::listen(TCP_PORT, 2048, 2048, 8, |stream| async {
|
||||||
task::spawn(async move {
|
|
||||||
while let stream = TcpStream::accept(TCP_PORT, 2048, 2408).await.unwrap() {
|
|
||||||
let counter = counter.clone();
|
|
||||||
task::spawn(async move {
|
|
||||||
*counter.borrow_mut() += 1;
|
|
||||||
println!("Serving {} connections", *counter.borrow());
|
|
||||||
handle_connection(stream)
|
handle_connection(stream)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| println!("Connection: {:?}", e));
|
.map_err(|e| println!("Connection: {:?}", e));
|
||||||
*counter.borrow_mut() -= 1;
|
|
||||||
println!("Now serving {} connections", *counter.borrow());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut time = 0u32;
|
let mut time = 0u32;
|
||||||
Sockets::run(&mut iface, || {
|
Sockets::run(&mut iface, || {
|
||||||
time += 1;
|
time += 1;
|
||||||
Instant::from_millis(time)
|
Instant::from_millis(time)
|
||||||
})
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
static CORE1_REQ: Mutex<Option<sync_channel::Receiver<usize>>> = Mutex::new(None);
|
static CORE1_REQ: Mutex<Option<sync_channel::Receiver<usize>>> = Mutex::new(None);
|
||||||
|
|
|
@ -44,14 +44,14 @@ impl Sockets {
|
||||||
pub fn run<'b, 'c, 'e, D: for<'d> Device<'d>>(
|
pub fn run<'b, 'c, 'e, D: for<'d> Device<'d>>(
|
||||||
iface: &mut EthernetInterface<'b, 'c, 'e, D>,
|
iface: &mut EthernetInterface<'b, 'c, 'e, D>,
|
||||||
mut get_time: impl FnMut() -> Instant,
|
mut get_time: impl FnMut() -> Instant,
|
||||||
) -> ! {
|
) {
|
||||||
task::block_on(async {
|
task::block_on(async {
|
||||||
loop {
|
loop {
|
||||||
let instant = get_time();
|
let instant = get_time();
|
||||||
Self::instance().poll(iface, instant);
|
Self::instance().poll(iface, instant);
|
||||||
task::r#yield().await;
|
task::r#yield().await;
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn instance() -> &'static Self {
|
pub(crate) fn instance() -> &'static Self {
|
||||||
|
|
|
@ -9,12 +9,10 @@ use core::{
|
||||||
};
|
};
|
||||||
use alloc::vec::Vec;
|
use alloc::vec::Vec;
|
||||||
use smoltcp::{
|
use smoltcp::{
|
||||||
Error, Result,
|
|
||||||
socket::{
|
socket::{
|
||||||
SocketHandle, SocketRef,
|
SocketHandle, SocketRef,
|
||||||
TcpSocketBuffer, TcpSocket, TcpState,
|
TcpSocketBuffer, TcpSocket,
|
||||||
},
|
},
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
use crate::task;
|
use crate::task;
|
||||||
use super::Sockets;
|
use super::Sockets;
|
||||||
|
@ -79,26 +77,47 @@ impl TcpStream {
|
||||||
f(socket_ref)
|
f(socket_ref)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawns `backlog` tasks with listening sockets so that more
|
||||||
|
/// connections can be accepted while some are still
|
||||||
|
/// handshaking. Spawns additional tasks for each connection.
|
||||||
|
pub fn listen<F, R, T>(port: u16, rx_bufsize: usize, tx_bufsize: usize, backlog: usize, f: F)
|
||||||
|
where
|
||||||
|
F: Fn(Self) -> R + Clone + 'static,
|
||||||
|
R: Future<Output = T> + 'static,
|
||||||
|
{
|
||||||
|
for _ in 0..backlog {
|
||||||
|
let f = f.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
// Wait for new connection
|
||||||
|
let stream = TcpStream::accept(port, rx_bufsize, tx_bufsize).await;
|
||||||
|
// Spawn async task for new connection
|
||||||
|
task::spawn(f(stream));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Listen for the next incoming connection on a TCP
|
/// Listen for the next incoming connection on a TCP
|
||||||
/// port. Succeeds on connection attempt.
|
/// port. Succeeds on connection attempt.
|
||||||
///
|
///
|
||||||
/// Calling this serially in a loop will cause slow/botched
|
/// Calling this serially in a loop will cause slow/botched
|
||||||
/// connection attempts stall any more new connections. Use
|
/// connection attempts stall any more new connections. Use
|
||||||
/// `listen()` with a backlog instead.
|
/// `listen()` with a backlog instead.
|
||||||
pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Result<Self> {
|
pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self {
|
||||||
let stream = Self::new(rx_bufsize, tx_bufsize);
|
let stream = Self::new(rx_bufsize, tx_bufsize);
|
||||||
// Set socket to listen
|
// Set socket to listen
|
||||||
stream.with_socket(|mut s| s.listen(port))?;
|
stream.with_socket(|mut s| s.listen(port))
|
||||||
|
.expect("listen");
|
||||||
// Wait for a connection
|
// Wait for a connection
|
||||||
poll_stream!(&stream, (), |socket| {
|
poll_stream!(&stream, (), |socket| {
|
||||||
if socket.state() != TcpState::Listen {
|
if socket.is_active() {
|
||||||
Poll::Ready(())
|
Poll::Ready(())
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}).await;
|
}).await;
|
||||||
|
stream
|
||||||
Ok(stream)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Probe the receive buffer
|
/// Probe the receive buffer
|
||||||
|
@ -108,7 +127,7 @@ impl TcpStream {
|
||||||
/// just return `Poll::Pending` if there is not enough data
|
/// just return `Poll::Pending` if there is not enough data
|
||||||
/// yet. Likewise, return the amount of bytes consumed from the
|
/// yet. Likewise, return the amount of bytes consumed from the
|
||||||
/// buffer in the `Poll::Ready` result.
|
/// buffer in the `Poll::Ready` result.
|
||||||
pub async fn recv<F, R>(&self, f: F) -> Result<R>
|
pub async fn recv<F, R>(&self, f: F) -> smoltcp::Result<R>
|
||||||
where
|
where
|
||||||
F: Fn(&[u8]) -> Poll<(usize, R)>,
|
F: Fn(&[u8]) -> Poll<(usize, R)>,
|
||||||
{
|
{
|
||||||
|
@ -118,14 +137,10 @@ impl TcpStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> {
|
impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> {
|
||||||
type Output = Result<R>;
|
type Output = smoltcp::Result<R>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let result = self.stream.with_socket(|mut socket| {
|
let result = self.stream.with_socket(|mut socket| {
|
||||||
if socket_is_handhshaking(&socket) {
|
|
||||||
return Ok(Poll::Pending);
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.recv(|buf| {
|
socket.recv(|buf| {
|
||||||
if buf.len() > 0 {
|
if buf.len() > 0 {
|
||||||
match (self.f)(buf) {
|
match (self.f)(buf) {
|
||||||
|
@ -141,11 +156,10 @@ impl TcpStream {
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
match result {
|
match result {
|
||||||
Ok(Poll::Pending) => {
|
|
||||||
Sockets::register_waker(cx.waker().clone());
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
|
if !result.is_ready() {
|
||||||
|
Sockets::register_waker(cx.waker().clone());
|
||||||
|
}
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
Err(e) =>
|
Err(e) =>
|
||||||
|
@ -161,14 +175,12 @@ impl TcpStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wait until there is any space in the socket's send queue
|
/// Wait until there is any space in the socket's send queue
|
||||||
async fn wait_can_send(&self) -> Result<()> {
|
async fn wait_can_send(&self) -> smoltcp::Result<()> {
|
||||||
poll_stream!(self, Result<()>, |socket| {
|
poll_stream!(self, smoltcp::Result<()>, |socket| {
|
||||||
if socket_is_handhshaking(&socket) {
|
if !socket.is_active() {
|
||||||
Poll::Pending
|
Poll::Ready(Err(smoltcp::Error::Illegal))
|
||||||
} else if socket.can_send() {
|
} else if socket.can_send() {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
} else if ! socket.may_send() {
|
|
||||||
Poll::Ready(Err(Error::Truncated))
|
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
|
@ -176,7 +188,7 @@ impl TcpStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Yields to wait for more buffer space
|
/// Yields to wait for more buffer space
|
||||||
pub async fn send<I: IntoIterator<Item = u8>>(&self, data: I) -> Result<()> {
|
pub async fn send<I: IntoIterator<Item = u8>>(&self, data: I) -> Result<(), smoltcp::Error> {
|
||||||
let mut data = data.into_iter();
|
let mut data = data.into_iter();
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
while !done {
|
while !done {
|
||||||
|
@ -205,51 +217,15 @@ impl TcpStream {
|
||||||
/// **Warning:** this may not work as immediately as expected! The
|
/// **Warning:** this may not work as immediately as expected! The
|
||||||
/// other side may wait until it sends packets to you for
|
/// other side may wait until it sends packets to you for
|
||||||
/// piggybacking the ACKs.
|
/// piggybacking the ACKs.
|
||||||
pub async fn flush(&self) -> Result<()> {
|
pub async fn flush(&self) {
|
||||||
poll_stream!(self, Result<()>, |socket| {
|
poll_stream!(self, (), |socket| {
|
||||||
if socket_is_handhshaking(&socket) {
|
if socket.may_send() && socket.send_queue() > 0 {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
} else if socket.may_send() && socket.send_queue() > 0 {
|
|
||||||
Poll::Pending
|
|
||||||
} else if socket.may_send() {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(Err(Error::Truncated))
|
Poll::Ready(())
|
||||||
}
|
}
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Close the transmit half of the connection
|
|
||||||
pub async fn close(&self) {
|
|
||||||
self.with_socket(|mut socket| socket.close());
|
|
||||||
|
|
||||||
// Yield for one iface.poll() to send the packet
|
|
||||||
task::r#yield().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Destroy the socket, sending the RST
|
|
||||||
pub async fn abort(self) {
|
|
||||||
self.with_socket(|mut socket| socket.abort());
|
|
||||||
|
|
||||||
// Yield for one iface.poll() to send the packet
|
|
||||||
task::r#yield().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn keep_alive(&self) -> Option<Duration> {
|
|
||||||
self.with_socket(|socket| socket.keep_alive())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_keep_alive(&mut self, interval: Option<Duration>) {
|
|
||||||
self.with_socket(|mut socket| socket.set_keep_alive(interval));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn timeout(&self) -> Option<Duration> {
|
|
||||||
self.with_socket(|socket| socket.timeout())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_timeout(&mut self, duration: Option<Duration>) {
|
|
||||||
self.with_socket(|mut socket| socket.set_timeout(duration));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for TcpStream {
|
impl Drop for TcpStream {
|
||||||
|
@ -260,12 +236,3 @@ impl Drop for TcpStream {
|
||||||
.remove(self.handle);
|
.remove(self.handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn socket_is_handhshaking(socket: &SocketRef<TcpSocket>) -> bool {
|
|
||||||
match socket.state() {
|
|
||||||
TcpState::SynSent | TcpState::SynReceived =>
|
|
||||||
true,
|
|
||||||
_ =>
|
|
||||||
false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue