Rust: integrate lwip with the I/O scheduler.

This commit is contained in:
whitequark 2016-09-06 22:52:16 +00:00
parent b7b6e7b9db
commit 269cedd782
3 changed files with 192 additions and 57 deletions

View File

@ -59,11 +59,13 @@ fn result_from<T, F>(err: lwip_sys::err, f: F) -> Result<T>
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum IpAddr { pub enum IpAddr {
Ip4([u8; 4]), Ip4([u8; 4]),
Ip6([u16; 8]) Ip6([u16; 8]),
IpAny
} }
pub const IP4_ANY: IpAddr = IpAddr::Ip4([0, 0, 0, 0]); pub const IP4_ANY: IpAddr = IpAddr::Ip4([0, 0, 0, 0]);
pub const IP6_ANY: IpAddr = IpAddr::Ip6([0, 0, 0, 0, 0, 0, 0, 0]); pub const IP6_ANY: IpAddr = IpAddr::Ip6([0, 0, 0, 0, 0, 0, 0, 0]);
pub const IP_ANY: IpAddr = IpAddr::IpAny;
impl IpAddr { impl IpAddr {
fn into_raw(self) -> lwip_sys::ip_addr { fn into_raw(self) -> lwip_sys::ip_addr {
@ -84,6 +86,11 @@ impl IpAddr {
(segments[4] as u32) << 16 | (segments[5] as u32), (segments[4] as u32) << 16 | (segments[5] as u32),
(segments[6] as u32) << 16 | (segments[7] as u32)], (segments[6] as u32) << 16 | (segments[7] as u32)],
type_: lwip_sys::IPADDR_TYPE_V6 type_: lwip_sys::IPADDR_TYPE_V6
},
IpAddr::IpAny =>
lwip_sys::ip_addr {
data: [0; 4],
type_: lwip_sys::IPADDR_TYPE_ANY
} }
} }
} }
@ -100,7 +107,8 @@ impl IpAddr {
(data[1] >> 16) as u16, data[1] as u16, (data[1] >> 16) as u16, data[1] as u16,
(data[2] >> 16) as u16, data[2] as u16, (data[2] >> 16) as u16, data[2] as u16,
(data[3] >> 16) as u16, data[3] as u16]), (data[3] >> 16) as u16, data[3] as u16]),
_ => panic!("unknown IP address type") lwip_sys::ip_addr { type_: lwip_sys::IPADDR_TYPE_ANY, .. } =>
IpAddr::IpAny
} }
} }
} }
@ -186,10 +194,21 @@ impl<'a> Drop for Pbuf<'a> {
} }
} }
#[derive(Debug)]
pub struct UdpSocketState {
recv_buffer: LinkedList<(SocketAddr, Pbuf<'static>)>
}
impl UdpSocketState {
pub fn readable(&self) -> bool {
!self.recv_buffer.is_empty()
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct UdpSocket { pub struct UdpSocket {
raw: *mut lwip_sys::udp_pcb, raw: *mut lwip_sys::udp_pcb,
buffer: Box<LinkedList<(SocketAddr, Pbuf<'static>)>> state: Box<UdpSocketState>
} }
impl UdpSocket { impl UdpSocket {
@ -198,9 +217,9 @@ impl UdpSocket {
pbuf: *mut lwip_sys::pbuf, pbuf: *mut lwip_sys::pbuf,
addr: *mut lwip_sys::ip_addr, port: u16) { addr: *mut lwip_sys::ip_addr, port: u16) {
unsafe { unsafe {
let buffer = arg as *mut LinkedList<(SocketAddr, Pbuf)>; let state = arg as *mut UdpSocketState;
let socket_addr = SocketAddr { ip: IpAddr::from_raw(addr), port: port }; let socket_addr = SocketAddr { ip: IpAddr::from_raw(addr), port: port };
(*buffer).push_back((socket_addr, Pbuf::from_raw(pbuf))); (*state).recv_buffer.push_back((socket_addr, Pbuf::from_raw(pbuf)));
} }
} }
@ -208,13 +227,17 @@ impl UdpSocket {
let raw = lwip_sys::udp_new(); let raw = lwip_sys::udp_new();
if raw.is_null() { return Err(Error::OutOfMemory) } if raw.is_null() { return Err(Error::OutOfMemory) }
let mut buffer = Box::new(LinkedList::new()); let mut state = Box::new(UdpSocketState { recv_buffer: LinkedList::new() });
let arg = &mut *buffer as *mut LinkedList<(SocketAddr, Pbuf)> as *mut _; let arg = &mut *state as *mut UdpSocketState as *mut _;
lwip_sys::udp_recv(raw, recv, arg); lwip_sys::udp_recv(raw, recv, arg);
Ok(UdpSocket { raw: raw, buffer: buffer }) Ok(UdpSocket { raw: raw, state: state })
} }
} }
pub fn state(&self) -> *const UdpSocketState {
&*self.state
}
pub fn bind(&mut self, addr: SocketAddr) -> Result<()> { pub fn bind(&mut self, addr: SocketAddr) -> Result<()> {
result_from(unsafe { result_from(unsafe {
lwip_sys::udp_bind(self.raw, &mut addr.ip.into_raw(), addr.port) lwip_sys::udp_bind(self.raw, &mut addr.ip.into_raw(), addr.port)
@ -247,7 +270,11 @@ impl UdpSocket {
} }
pub fn try_recv(&mut self) -> Option<(SocketAddr, Pbuf<'static>)> { pub fn try_recv(&mut self) -> Option<(SocketAddr, Pbuf<'static>)> {
self.buffer.pop_front() self.state.recv_buffer.pop_front()
}
pub fn close(self) {
// just drop
} }
} }
@ -257,10 +284,21 @@ impl Drop for UdpSocket {
} }
} }
#[derive(Debug)]
pub struct TcpListenerState {
backlog: LinkedList<TcpStream>
}
impl TcpListenerState {
pub fn acceptable(&self) -> bool {
!self.backlog.is_empty()
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TcpListener { pub struct TcpListener {
raw: *mut lwip_sys::tcp_pcb, raw: *mut lwip_sys::tcp_pcb,
backlog: Box<LinkedList<TcpStream>> state: Box<TcpListenerState>
} }
impl TcpListener { impl TcpListener {
@ -269,8 +307,8 @@ impl TcpListener {
err: lwip_sys::err) -> lwip_sys::err { err: lwip_sys::err) -> lwip_sys::err {
if err != lwip_sys::ERR_OK { return err } if err != lwip_sys::ERR_OK { return err }
unsafe { unsafe {
let backlog = arg as *mut LinkedList<TcpStream>; let state = arg as *mut TcpListenerState;
(*backlog).push_back(TcpStream::from_raw(newpcb)); (*state).backlog.push_back(TcpStream::from_raw(newpcb));
} }
lwip_sys::ERR_OK lwip_sys::ERR_OK
} }
@ -278,10 +316,6 @@ impl TcpListener {
unsafe { unsafe {
let raw = lwip_sys::tcp_new(); let raw = lwip_sys::tcp_new();
if raw.is_null() { return Err(Error::OutOfMemory) } if raw.is_null() { return Err(Error::OutOfMemory) }
let mut backlog = Box::new(LinkedList::new());
let arg = &mut *backlog as *mut LinkedList<TcpStream> as *mut _;
lwip_sys::tcp_arg(raw, arg);
try!(result_from(lwip_sys::tcp_bind(raw, &mut addr.ip.into_raw(), addr.port), try!(result_from(lwip_sys::tcp_bind(raw, &mut addr.ip.into_raw(), addr.port),
|| ())); || ()));
@ -290,13 +324,23 @@ impl TcpListener {
lwip_sys::tcp_abort(raw); lwip_sys::tcp_abort(raw);
return Err(Error::OutOfMemory) return Err(Error::OutOfMemory)
} }
let mut state = Box::new(TcpListenerState {
backlog: LinkedList::new()
});
let arg = &mut *state as *mut TcpListenerState as *mut _;
lwip_sys::tcp_arg(raw, arg);
lwip_sys::tcp_accept(raw2, accept); lwip_sys::tcp_accept(raw2, accept);
Ok(TcpListener { raw: raw2, backlog: backlog }) Ok(TcpListener { raw: raw2, state: state })
} }
} }
pub fn state(&self) -> *const TcpListenerState {
&*self.state
}
pub fn try_accept(&mut self) -> Option<TcpStream> { pub fn try_accept(&mut self) -> Option<TcpStream> {
self.backlog.pop_front() self.state.backlog.pop_front()
} }
pub fn close(self) { pub fn close(self) {
@ -320,61 +364,96 @@ pub enum Shutdown {
Both, Both,
} }
#[derive(Debug)]
pub struct TcpStreamState {
recv_buffer: LinkedList<Result<Pbuf<'static>>>,
send_avail: usize
}
impl TcpStreamState {
pub fn readable(&self) -> bool {
!self.recv_buffer.is_empty()
}
pub fn writeable(&self) -> bool {
!(self.send_avail == 0)
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TcpStream { pub struct TcpStream {
raw: *mut lwip_sys::tcp_pcb, raw: *mut lwip_sys::tcp_pcb,
buffer: Box<LinkedList<Result<Pbuf<'static>>>> state: Box<TcpStreamState>
} }
impl TcpStream { impl TcpStream {
fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream { fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream {
extern fn recv(arg: *mut c_void, _tcb: *mut lwip_sys::tcp_pcb, extern fn recv(arg: *mut c_void, _raw: *mut lwip_sys::tcp_pcb,
pbuf: *mut lwip_sys::pbuf, err: lwip_sys::err) -> lwip_sys::err { pbuf: *mut lwip_sys::pbuf, err: lwip_sys::err) -> lwip_sys::err {
if err != lwip_sys::ERR_OK { return err } if err != lwip_sys::ERR_OK { return err }
unsafe { unsafe {
let buffer = arg as *mut LinkedList<Result<Pbuf<'static>>>; let state = arg as *mut TcpStreamState;
if pbuf.is_null() { if pbuf.is_null() {
(*buffer).push_back(Err(Error::ConnectionClosed)) (*state).recv_buffer.push_back(Err(Error::ConnectionClosed))
} else { } else {
(*buffer).push_back(Ok(Pbuf::from_raw(pbuf))) (*state).recv_buffer.push_back(Ok(Pbuf::from_raw(pbuf)))
} }
} }
lwip_sys::ERR_OK lwip_sys::ERR_OK
} }
extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb,
_len: u16) -> lwip_sys::err {
unsafe {
let state = arg as *mut TcpStreamState;
(*state).send_avail = lwip_sys::tcp_sndbuf_(raw) as usize;
}
lwip_sys::ERR_OK
}
extern fn err(arg: *mut c_void, err: lwip_sys::err) { extern fn err(arg: *mut c_void, err: lwip_sys::err) {
unsafe { unsafe {
let buffer = arg as *mut LinkedList<Result<Pbuf<'static>>>; let state = arg as *mut TcpStreamState;
(*buffer).push_back(result_from(err, || unreachable!())) (*state).recv_buffer.push_back(result_from(err, || unreachable!()))
} }
} }
unsafe { unsafe {
let mut buffer = Box::new(LinkedList::new()); let mut state = Box::new(TcpStreamState {
let arg = &mut *buffer as *mut LinkedList<Result<Pbuf<'static>>> as *mut _; recv_buffer: LinkedList::new(),
send_avail: lwip_sys::tcp_sndbuf_(raw) as usize
});
let arg = &mut *state as *mut TcpStreamState as *mut _;
lwip_sys::tcp_arg(raw, arg); lwip_sys::tcp_arg(raw, arg);
lwip_sys::tcp_recv(raw, recv); lwip_sys::tcp_recv(raw, recv);
lwip_sys::tcp_sent(raw, sent);
lwip_sys::tcp_err(raw, err); lwip_sys::tcp_err(raw, err);
TcpStream { raw: raw, buffer: buffer } TcpStream { raw: raw, state: state }
} }
} }
pub fn state(&self) -> *const TcpStreamState {
&*self.state
}
pub fn write(&mut self, data: &[u8]) -> Result<usize> { pub fn write(&mut self, data: &[u8]) -> Result<usize> {
let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize;
let len = if data.len() < sndbuf { data.len() } else { sndbuf }; let len = if data.len() < sndbuf { data.len() } else { sndbuf };
result_from(unsafe { let result = result_from(unsafe {
lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16, lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16,
lwip_sys::TCP_WRITE_FLAG_COPY) lwip_sys::TCP_WRITE_FLAG_COPY)
}, || len) }, || len);
self.state.send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize;
result
} }
pub fn try_read(&mut self) -> Result<Option<Pbuf<'static>>> { pub fn try_read(&mut self) -> Result<Option<Pbuf<'static>>> {
match self.buffer.front() { match self.state.recv_buffer.front() {
None => return Ok(None), None => return Ok(None),
Some(&Err(err)) => return Err(err), Some(&Err(err)) => return Err(err),
Some(_) => () Some(_) => ()
} }
match self.buffer.pop_front() { match self.state.recv_buffer.pop_front() {
Some(Ok(pbuf)) => return Ok(Some(pbuf)), Some(Ok(pbuf)) => return Ok(Some(pbuf)),
_ => unreachable!() _ => unreachable!()
} }

View File

@ -1,4 +1,5 @@
extern crate fringe; extern crate fringe;
extern crate lwip;
use std::vec::Vec; use std::vec::Vec;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
@ -102,15 +103,38 @@ impl Scheduler {
} }
#[derive(Debug)] #[derive(Debug)]
enum WaitEvent {} enum WaitEvent {
UdpReadable(*const lwip::UdpSocketState),
TcpAcceptable(*const lwip::TcpListenerState),
TcpWriteable(*const lwip::TcpStreamState),
TcpReadable(*const lwip::TcpStreamState),
}
impl WaitEvent { impl WaitEvent {
fn completed(&self) -> bool { fn completed(&self) -> bool {
match *self {} match *self {
WaitEvent::UdpReadable(state) =>
unsafe { (*state).readable() },
WaitEvent::TcpAcceptable(state) =>
unsafe { (*state).acceptable() },
WaitEvent::TcpWriteable(state) =>
unsafe { (*state).writeable() },
WaitEvent::TcpReadable(state) =>
unsafe { (*state).readable() },
}
} }
} }
pub type Result<T> = ::std::result::Result<T, ()>; unsafe impl Send for WaitEvent {}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum Error {
Lwip(lwip::Error),
TimedOut,
Interrupted
}
pub type Result<T> = ::std::result::Result<T, Error>;
#[derive(Debug)] #[derive(Debug)]
pub struct Waiter<'a>(&'a mut Yielder<WaitResult, WaitRequest, OwnedStack>); pub struct Waiter<'a>(&'a mut Yielder<WaitResult, WaitRequest, OwnedStack>);
@ -124,8 +148,44 @@ impl<'a> Waiter<'a> {
match self.0.suspend(request) { match self.0.suspend(request) {
WaitResult::TimedOut => Ok(()), WaitResult::TimedOut => Ok(()),
WaitResult::Interrupted => Err(()), WaitResult::Interrupted => Err(Error::Interrupted),
_ => unreachable!() _ => unreachable!()
} }
} }
fn suspend(&mut self, request: WaitRequest) -> Result<()> {
match self.0.suspend(request) {
WaitResult::Completed => Ok(()),
WaitResult::TimedOut => Err(Error::TimedOut),
WaitResult::Interrupted => Err(Error::Interrupted)
}
}
pub fn udp_readable(&mut self, socket: &lwip::UdpSocket) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: Some(WaitEvent::UdpReadable(socket.state()))
})
}
pub fn tcp_acceptable(&mut self, socket: &lwip::TcpListener) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: Some(WaitEvent::TcpAcceptable(socket.state()))
})
}
pub fn tcp_writeable(&mut self, socket: &lwip::TcpStream) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: Some(WaitEvent::TcpWriteable(socket.state()))
})
}
pub fn tcp_readable(&mut self, socket: &lwip::TcpStream) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: Some(WaitEvent::TcpReadable(socket.state()))
})
}
} }

View File

@ -14,17 +14,21 @@ extern {
fn lwip_service(); fn lwip_service();
} }
fn test1(mut waiter: io::Waiter) { fn timer(mut waiter: io::Waiter) {
loop { loop {
println!("A"); println!("tick");
waiter.sleep(std::time::Duration::from_millis(1000)); waiter.sleep(std::time::Duration::from_millis(1000)).unwrap();
} }
} }
fn test2(mut waiter: io::Waiter) { fn echo(mut waiter: io::Waiter) {
let mut socket = lwip::UdpSocket::new().unwrap();
socket.bind(lwip::SocketAddr::new(lwip::IP_ANY, 1234)).unwrap();
loop { loop {
println!("B"); waiter.udp_readable(&socket).unwrap();
waiter.sleep(std::time::Duration::from_millis(500)); let (addr, pbuf) = socket.try_recv().unwrap();
println!("{:?}", core::str::from_utf8(pbuf.as_slice()));
socket.send_to(addr, pbuf).unwrap();
} }
} }
@ -33,19 +37,11 @@ pub unsafe extern fn rust_main() {
println!("Accepting network sessions in Rust."); println!("Accepting network sessions in Rust.");
network_init(); network_init();
let addr = lwip::SocketAddr::new(lwip::IP4_ANY, 1234); let mut scheduler = io::Scheduler::new();
let mut listener = lwip::TcpListener::bind(addr).unwrap(); scheduler.spawn(4096, timer);
let mut stream = None; scheduler.spawn(4096, echo);
loop { loop {
lwip_service(); lwip_service();
if let Some(new_stream) = listener.try_accept() { scheduler.run()
stream = Some(new_stream)
}
if let Some(ref mut stream) = stream {
if let Some(pbuf) = stream.try_read().expect("read") {
println!("{:?}", pbuf.as_slice());
stream.write(pbuf.as_slice()).expect("write");
}
}
} }
} }