From 269cedd782a2feecc8e811ea49f9c6a2066633ef Mon Sep 17 00:00:00 2001 From: whitequark Date: Tue, 6 Sep 2016 22:52:16 +0000 Subject: [PATCH] Rust: integrate lwip with the I/O scheduler. --- artiq/runtime.rs/liblwip/lib.rs | 149 ++++++++++++++++++++++++-------- artiq/runtime.rs/src/io.rs | 68 ++++++++++++++- artiq/runtime.rs/src/lib.rs | 32 +++---- 3 files changed, 192 insertions(+), 57 deletions(-) diff --git a/artiq/runtime.rs/liblwip/lib.rs b/artiq/runtime.rs/liblwip/lib.rs index 27d9a716e..bfa332a1f 100644 --- a/artiq/runtime.rs/liblwip/lib.rs +++ b/artiq/runtime.rs/liblwip/lib.rs @@ -59,11 +59,13 @@ fn result_from(err: lwip_sys::err, f: F) -> Result #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub enum IpAddr { Ip4([u8; 4]), - Ip6([u16; 8]) + Ip6([u16; 8]), + IpAny } pub const IP4_ANY: IpAddr = IpAddr::Ip4([0, 0, 0, 0]); pub const IP6_ANY: IpAddr = IpAddr::Ip6([0, 0, 0, 0, 0, 0, 0, 0]); +pub const IP_ANY: IpAddr = IpAddr::IpAny; impl IpAddr { fn into_raw(self) -> lwip_sys::ip_addr { @@ -84,6 +86,11 @@ impl IpAddr { (segments[4] as u32) << 16 | (segments[5] as u32), (segments[6] as u32) << 16 | (segments[7] as u32)], type_: lwip_sys::IPADDR_TYPE_V6 + }, + IpAddr::IpAny => + lwip_sys::ip_addr { + data: [0; 4], + type_: lwip_sys::IPADDR_TYPE_ANY } } } @@ -100,7 +107,8 @@ impl IpAddr { (data[1] >> 16) as u16, data[1] as u16, (data[2] >> 16) as u16, data[2] as u16, (data[3] >> 16) as u16, data[3] as u16]), - _ => panic!("unknown IP address type") + lwip_sys::ip_addr { type_: lwip_sys::IPADDR_TYPE_ANY, .. } => + IpAddr::IpAny } } } @@ -186,10 +194,21 @@ impl<'a> Drop for Pbuf<'a> { } } +#[derive(Debug)] +pub struct UdpSocketState { + recv_buffer: LinkedList<(SocketAddr, Pbuf<'static>)> +} + +impl UdpSocketState { + pub fn readable(&self) -> bool { + !self.recv_buffer.is_empty() + } +} + #[derive(Debug)] pub struct UdpSocket { - raw: *mut lwip_sys::udp_pcb, - buffer: Box)>> + raw: *mut lwip_sys::udp_pcb, + state: Box } impl UdpSocket { @@ -198,9 +217,9 @@ impl UdpSocket { pbuf: *mut lwip_sys::pbuf, addr: *mut lwip_sys::ip_addr, port: u16) { unsafe { - let buffer = arg as *mut LinkedList<(SocketAddr, Pbuf)>; + let state = arg as *mut UdpSocketState; let socket_addr = SocketAddr { ip: IpAddr::from_raw(addr), port: port }; - (*buffer).push_back((socket_addr, Pbuf::from_raw(pbuf))); + (*state).recv_buffer.push_back((socket_addr, Pbuf::from_raw(pbuf))); } } @@ -208,13 +227,17 @@ impl UdpSocket { let raw = lwip_sys::udp_new(); if raw.is_null() { return Err(Error::OutOfMemory) } - let mut buffer = Box::new(LinkedList::new()); - let arg = &mut *buffer as *mut LinkedList<(SocketAddr, Pbuf)> as *mut _; + let mut state = Box::new(UdpSocketState { recv_buffer: LinkedList::new() }); + let arg = &mut *state as *mut UdpSocketState as *mut _; lwip_sys::udp_recv(raw, recv, arg); - Ok(UdpSocket { raw: raw, buffer: buffer }) + Ok(UdpSocket { raw: raw, state: state }) } } + pub fn state(&self) -> *const UdpSocketState { + &*self.state + } + pub fn bind(&mut self, addr: SocketAddr) -> Result<()> { result_from(unsafe { lwip_sys::udp_bind(self.raw, &mut addr.ip.into_raw(), addr.port) @@ -247,7 +270,11 @@ impl UdpSocket { } pub fn try_recv(&mut self) -> Option<(SocketAddr, Pbuf<'static>)> { - self.buffer.pop_front() + self.state.recv_buffer.pop_front() + } + + pub fn close(self) { + // just drop } } @@ -257,10 +284,21 @@ impl Drop for UdpSocket { } } +#[derive(Debug)] +pub struct TcpListenerState { + backlog: LinkedList +} + +impl TcpListenerState { + pub fn acceptable(&self) -> bool { + !self.backlog.is_empty() + } +} + #[derive(Debug)] pub struct TcpListener { - raw: *mut lwip_sys::tcp_pcb, - backlog: Box> + raw: *mut lwip_sys::tcp_pcb, + state: Box } impl TcpListener { @@ -269,8 +307,8 @@ impl TcpListener { err: lwip_sys::err) -> lwip_sys::err { if err != lwip_sys::ERR_OK { return err } unsafe { - let backlog = arg as *mut LinkedList; - (*backlog).push_back(TcpStream::from_raw(newpcb)); + let state = arg as *mut TcpListenerState; + (*state).backlog.push_back(TcpStream::from_raw(newpcb)); } lwip_sys::ERR_OK } @@ -278,10 +316,6 @@ impl TcpListener { unsafe { let raw = lwip_sys::tcp_new(); if raw.is_null() { return Err(Error::OutOfMemory) } - - let mut backlog = Box::new(LinkedList::new()); - let arg = &mut *backlog as *mut LinkedList as *mut _; - lwip_sys::tcp_arg(raw, arg); try!(result_from(lwip_sys::tcp_bind(raw, &mut addr.ip.into_raw(), addr.port), || ())); @@ -290,13 +324,23 @@ impl TcpListener { lwip_sys::tcp_abort(raw); return Err(Error::OutOfMemory) } + + let mut state = Box::new(TcpListenerState { + backlog: LinkedList::new() + }); + let arg = &mut *state as *mut TcpListenerState as *mut _; + lwip_sys::tcp_arg(raw, arg); lwip_sys::tcp_accept(raw2, accept); - Ok(TcpListener { raw: raw2, backlog: backlog }) + Ok(TcpListener { raw: raw2, state: state }) } } + pub fn state(&self) -> *const TcpListenerState { + &*self.state + } + pub fn try_accept(&mut self) -> Option { - self.backlog.pop_front() + self.state.backlog.pop_front() } pub fn close(self) { @@ -320,61 +364,96 @@ pub enum Shutdown { Both, } +#[derive(Debug)] +pub struct TcpStreamState { + recv_buffer: LinkedList>>, + send_avail: usize +} + +impl TcpStreamState { + pub fn readable(&self) -> bool { + !self.recv_buffer.is_empty() + } + + pub fn writeable(&self) -> bool { + !(self.send_avail == 0) + } +} + #[derive(Debug)] pub struct TcpStream { - raw: *mut lwip_sys::tcp_pcb, - buffer: Box>>> + raw: *mut lwip_sys::tcp_pcb, + state: Box } impl TcpStream { fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream { - extern fn recv(arg: *mut c_void, _tcb: *mut lwip_sys::tcp_pcb, + extern fn recv(arg: *mut c_void, _raw: *mut lwip_sys::tcp_pcb, pbuf: *mut lwip_sys::pbuf, err: lwip_sys::err) -> lwip_sys::err { if err != lwip_sys::ERR_OK { return err } unsafe { - let buffer = arg as *mut LinkedList>>; + let state = arg as *mut TcpStreamState; if pbuf.is_null() { - (*buffer).push_back(Err(Error::ConnectionClosed)) + (*state).recv_buffer.push_back(Err(Error::ConnectionClosed)) } else { - (*buffer).push_back(Ok(Pbuf::from_raw(pbuf))) + (*state).recv_buffer.push_back(Ok(Pbuf::from_raw(pbuf))) } } lwip_sys::ERR_OK } + extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb, + _len: u16) -> lwip_sys::err { + unsafe { + let state = arg as *mut TcpStreamState; + (*state).send_avail = lwip_sys::tcp_sndbuf_(raw) as usize; + } + lwip_sys::ERR_OK + } + extern fn err(arg: *mut c_void, err: lwip_sys::err) { unsafe { - let buffer = arg as *mut LinkedList>>; - (*buffer).push_back(result_from(err, || unreachable!())) + let state = arg as *mut TcpStreamState; + (*state).recv_buffer.push_back(result_from(err, || unreachable!())) } } unsafe { - let mut buffer = Box::new(LinkedList::new()); - let arg = &mut *buffer as *mut LinkedList>> as *mut _; + let mut state = Box::new(TcpStreamState { + recv_buffer: LinkedList::new(), + send_avail: lwip_sys::tcp_sndbuf_(raw) as usize + }); + let arg = &mut *state as *mut TcpStreamState as *mut _; lwip_sys::tcp_arg(raw, arg); lwip_sys::tcp_recv(raw, recv); + lwip_sys::tcp_sent(raw, sent); lwip_sys::tcp_err(raw, err); - TcpStream { raw: raw, buffer: buffer } + TcpStream { raw: raw, state: state } } } + pub fn state(&self) -> *const TcpStreamState { + &*self.state + } + pub fn write(&mut self, data: &[u8]) -> Result { let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; let len = if data.len() < sndbuf { data.len() } else { sndbuf }; - result_from(unsafe { + let result = result_from(unsafe { lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16, lwip_sys::TCP_WRITE_FLAG_COPY) - }, || len) + }, || len); + self.state.send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; + result } pub fn try_read(&mut self) -> Result>> { - match self.buffer.front() { + match self.state.recv_buffer.front() { None => return Ok(None), Some(&Err(err)) => return Err(err), Some(_) => () } - match self.buffer.pop_front() { + match self.state.recv_buffer.pop_front() { Some(Ok(pbuf)) => return Ok(Some(pbuf)), _ => unreachable!() } diff --git a/artiq/runtime.rs/src/io.rs b/artiq/runtime.rs/src/io.rs index 43394c928..48dee82c4 100644 --- a/artiq/runtime.rs/src/io.rs +++ b/artiq/runtime.rs/src/io.rs @@ -1,4 +1,5 @@ extern crate fringe; +extern crate lwip; use std::vec::Vec; use std::time::{Instant, Duration}; @@ -102,15 +103,38 @@ impl Scheduler { } #[derive(Debug)] -enum WaitEvent {} +enum WaitEvent { + UdpReadable(*const lwip::UdpSocketState), + TcpAcceptable(*const lwip::TcpListenerState), + TcpWriteable(*const lwip::TcpStreamState), + TcpReadable(*const lwip::TcpStreamState), +} impl WaitEvent { fn completed(&self) -> bool { - match *self {} + match *self { + WaitEvent::UdpReadable(state) => + unsafe { (*state).readable() }, + WaitEvent::TcpAcceptable(state) => + unsafe { (*state).acceptable() }, + WaitEvent::TcpWriteable(state) => + unsafe { (*state).writeable() }, + WaitEvent::TcpReadable(state) => + unsafe { (*state).readable() }, + } } } -pub type Result = ::std::result::Result; +unsafe impl Send for WaitEvent {} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum Error { + Lwip(lwip::Error), + TimedOut, + Interrupted +} + +pub type Result = ::std::result::Result; #[derive(Debug)] pub struct Waiter<'a>(&'a mut Yielder); @@ -124,8 +148,44 @@ impl<'a> Waiter<'a> { match self.0.suspend(request) { WaitResult::TimedOut => Ok(()), - WaitResult::Interrupted => Err(()), + WaitResult::Interrupted => Err(Error::Interrupted), _ => unreachable!() } } + + fn suspend(&mut self, request: WaitRequest) -> Result<()> { + match self.0.suspend(request) { + WaitResult::Completed => Ok(()), + WaitResult::TimedOut => Err(Error::TimedOut), + WaitResult::Interrupted => Err(Error::Interrupted) + } + } + + pub fn udp_readable(&mut self, socket: &lwip::UdpSocket) -> Result<()> { + self.suspend(WaitRequest { + timeout: None, + event: Some(WaitEvent::UdpReadable(socket.state())) + }) + } + + pub fn tcp_acceptable(&mut self, socket: &lwip::TcpListener) -> Result<()> { + self.suspend(WaitRequest { + timeout: None, + event: Some(WaitEvent::TcpAcceptable(socket.state())) + }) + } + + pub fn tcp_writeable(&mut self, socket: &lwip::TcpStream) -> Result<()> { + self.suspend(WaitRequest { + timeout: None, + event: Some(WaitEvent::TcpWriteable(socket.state())) + }) + } + + pub fn tcp_readable(&mut self, socket: &lwip::TcpStream) -> Result<()> { + self.suspend(WaitRequest { + timeout: None, + event: Some(WaitEvent::TcpReadable(socket.state())) + }) + } } diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 39ecf44e1..71407129c 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -14,17 +14,21 @@ extern { fn lwip_service(); } -fn test1(mut waiter: io::Waiter) { +fn timer(mut waiter: io::Waiter) { loop { - println!("A"); - waiter.sleep(std::time::Duration::from_millis(1000)); + println!("tick"); + waiter.sleep(std::time::Duration::from_millis(1000)).unwrap(); } } -fn test2(mut waiter: io::Waiter) { +fn echo(mut waiter: io::Waiter) { + let mut socket = lwip::UdpSocket::new().unwrap(); + socket.bind(lwip::SocketAddr::new(lwip::IP_ANY, 1234)).unwrap(); loop { - println!("B"); - waiter.sleep(std::time::Duration::from_millis(500)); + waiter.udp_readable(&socket).unwrap(); + let (addr, pbuf) = socket.try_recv().unwrap(); + println!("{:?}", core::str::from_utf8(pbuf.as_slice())); + socket.send_to(addr, pbuf).unwrap(); } } @@ -33,19 +37,11 @@ pub unsafe extern fn rust_main() { println!("Accepting network sessions in Rust."); network_init(); - let addr = lwip::SocketAddr::new(lwip::IP4_ANY, 1234); - let mut listener = lwip::TcpListener::bind(addr).unwrap(); - let mut stream = None; + let mut scheduler = io::Scheduler::new(); + scheduler.spawn(4096, timer); + scheduler.spawn(4096, echo); loop { lwip_service(); - if let Some(new_stream) = listener.try_accept() { - stream = Some(new_stream) - } - if let Some(ref mut stream) = stream { - if let Some(pbuf) = stream.try_read().expect("read") { - println!("{:?}", pbuf.as_slice()); - stream.write(pbuf.as_slice()).expect("write"); - } - } + scheduler.run() } }