diff --git a/artiq/runtime.rs/Cargo.lock b/artiq/runtime.rs/Cargo.lock index 6324f7558..6e5a7538d 100644 --- a/artiq/runtime.rs/Cargo.lock +++ b/artiq/runtime.rs/Cargo.lock @@ -2,7 +2,7 @@ name = "runtime" version = "0.0.0" dependencies = [ - "fringe 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "fringe 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "lwip 0.0.0", "std_artiq 0.0.0", ] @@ -13,7 +13,7 @@ version = "0.0.0" [[package]] name = "fringe" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -44,5 +44,5 @@ dependencies = [ ] [metadata] -"checksum fringe 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c5257685076edffc8f1a85a382135cdf27ba915ac74e47ae55aba19945c17955" +"checksum fringe 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "987689dcfad85eee8d76b477865641ec483e63fb86d52966bfc350c4a647d78a" "checksum libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "23e3757828fa702a20072c37ff47938e9dd331b92fac6e223d26d4b7a55f7ee2" diff --git a/artiq/runtime.rs/Cargo.toml b/artiq/runtime.rs/Cargo.toml index c9a0de84f..962e076cb 100644 --- a/artiq/runtime.rs/Cargo.toml +++ b/artiq/runtime.rs/Cargo.toml @@ -10,7 +10,7 @@ path = "src/lib.rs" [dependencies] std_artiq = { path = "libstd_artiq" } -fringe = { version = "1.0.5", default-features = false, features = ["alloc"] } +fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] } lwip = { path = "liblwip" } [profile.dev] diff --git a/artiq/runtime.rs/liblwip/lib.rs b/artiq/runtime.rs/liblwip/lib.rs index bfa332a1f..fb1903fa6 100644 --- a/artiq/runtime.rs/liblwip/lib.rs +++ b/artiq/runtime.rs/liblwip/lib.rs @@ -5,8 +5,10 @@ extern crate alloc; extern crate collections; extern crate libc; extern crate lwip_sys; +extern crate std_artiq as std; use core::marker::PhantomData; +use core::cell::RefCell; use alloc::boxed::Box; use collections::LinkedList; use libc::c_void; @@ -31,6 +33,47 @@ pub enum Error { IllegalArgument, } +impl Error { + fn as_str(&self) -> &str { + match *self { + Error::OutOfMemory => "out of memory error", + Error::Buffer => "buffer error", + Error::Timeout => "timeout", + Error::Routing => "routing error", + Error::InProgress => "operation in progress", + Error::IllegalValue => "illegal value", + Error::WouldBlock => "operation would block", + Error::AddressInUse => "address in use", + Error::AlreadyConnecting => "already connecting", + Error::AlreadyConnected => "already connected", + Error::NotConnected => "not connected", + Error::Interface => "low-level netif error", + Error::ConnectionAborted => "connection aborted", + Error::ConnectionReset => "connection reset", + Error::ConnectionClosed => "connection closed", + Error::IllegalArgument => "illegal argument", + } + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl std::error::Error for Error { + fn description(&self) -> &str { + self.as_str() + } +} + +impl From for std::io::Error { + fn from(lower: Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, lower) + } +} + pub type Result = core::result::Result; fn result_from(err: lwip_sys::err, f: F) -> Result @@ -165,6 +208,10 @@ impl<'payload> Pbuf<'payload> { Self::from_slice_with_type(slice, lwip_sys::PBUF_ROM) } + pub fn len(&self) -> usize { + unsafe { (*self.raw).len as usize } + } + pub fn as_slice(&self) -> &'payload [u8] { unsafe { core::slice::from_raw_parts((*self.raw).payload as *const u8, @@ -196,7 +243,7 @@ impl<'a> Drop for Pbuf<'a> { #[derive(Debug)] pub struct UdpSocketState { - recv_buffer: LinkedList<(SocketAddr, Pbuf<'static>)> + recv_buffer: LinkedList<(Pbuf<'static>, SocketAddr)> } impl UdpSocketState { @@ -208,7 +255,7 @@ impl UdpSocketState { #[derive(Debug)] pub struct UdpSocket { raw: *mut lwip_sys::udp_pcb, - state: Box + state: Box> } impl UdpSocket { @@ -217,9 +264,9 @@ impl UdpSocket { pbuf: *mut lwip_sys::pbuf, addr: *mut lwip_sys::ip_addr, port: u16) { unsafe { - let state = arg as *mut UdpSocketState; + let state = arg as *mut RefCell; let socket_addr = SocketAddr { ip: IpAddr::from_raw(addr), port: port }; - (*state).recv_buffer.push_back((socket_addr, Pbuf::from_raw(pbuf))); + (*state).borrow_mut().recv_buffer.push_back((Pbuf::from_raw(pbuf), socket_addr)); } } @@ -227,54 +274,52 @@ impl UdpSocket { let raw = lwip_sys::udp_new(); if raw.is_null() { return Err(Error::OutOfMemory) } - let mut state = Box::new(UdpSocketState { recv_buffer: LinkedList::new() }); - let arg = &mut *state as *mut UdpSocketState as *mut _; + let mut state = Box::new(RefCell::new(UdpSocketState { + recv_buffer: LinkedList::new() + })); + let arg = &mut *state as *mut RefCell as *mut _; lwip_sys::udp_recv(raw, recv, arg); Ok(UdpSocket { raw: raw, state: state }) } } - pub fn state(&self) -> *const UdpSocketState { + pub fn state(&self) -> *const RefCell { &*self.state } - pub fn bind(&mut self, addr: SocketAddr) -> Result<()> { + pub fn bind(&self, addr: SocketAddr) -> Result<()> { result_from(unsafe { lwip_sys::udp_bind(self.raw, &mut addr.ip.into_raw(), addr.port) }, || ()) } - pub fn connect(&mut self, addr: SocketAddr) -> Result<()> { + pub fn connect(&self, addr: SocketAddr) -> Result<()> { result_from(unsafe { lwip_sys::udp_connect(self.raw, &mut addr.ip.into_raw(), addr.port) }, || ()) } - pub fn disconnect(&mut self) -> Result<()> { + pub fn disconnect(&self) -> Result<()> { result_from(unsafe { lwip_sys::udp_disconnect(self.raw) }, || ()) } - pub fn send<'a>(&'a mut self, pbuf: Pbuf<'a>) -> Result<()> { + pub fn send<'a>(&'a self, pbuf: Pbuf<'a>) -> Result<()> { result_from(unsafe { lwip_sys::udp_send(self.raw, pbuf.as_raw()) }, || ()) } - pub fn send_to<'a>(&'a mut self, addr: SocketAddr, pbuf: Pbuf<'a>) -> Result<()> { + pub fn send_to<'a>(&'a self, pbuf: Pbuf<'a>, addr: SocketAddr) -> Result<()> { result_from(unsafe { lwip_sys::udp_sendto(self.raw, pbuf.as_raw(), &mut addr.ip.into_raw(), addr.port) }, || ()) } - pub fn try_recv(&mut self) -> Option<(SocketAddr, Pbuf<'static>)> { - self.state.recv_buffer.pop_front() - } - - pub fn close(self) { - // just drop + pub fn try_recv(&self) -> Option<(Pbuf<'static>, SocketAddr)> { + self.state.borrow_mut().recv_buffer.pop_front() } } @@ -298,7 +343,7 @@ impl TcpListenerState { #[derive(Debug)] pub struct TcpListener { raw: *mut lwip_sys::tcp_pcb, - state: Box + state: Box> } impl TcpListener { @@ -307,8 +352,8 @@ impl TcpListener { err: lwip_sys::err) -> lwip_sys::err { if err != lwip_sys::ERR_OK { return err } unsafe { - let state = arg as *mut TcpListenerState; - (*state).backlog.push_back(TcpStream::from_raw(newpcb)); + let state = arg as *mut RefCell; + (*state).borrow_mut().backlog.push_back(TcpStream::from_raw(newpcb)); } lwip_sys::ERR_OK } @@ -325,26 +370,22 @@ impl TcpListener { return Err(Error::OutOfMemory) } - let mut state = Box::new(TcpListenerState { + let mut state = Box::new(RefCell::new(TcpListenerState { backlog: LinkedList::new() - }); - let arg = &mut *state as *mut TcpListenerState as *mut _; + })); + let arg = &mut *state as *mut RefCell as *mut _; lwip_sys::tcp_arg(raw, arg); lwip_sys::tcp_accept(raw2, accept); Ok(TcpListener { raw: raw2, state: state }) } } - pub fn state(&self) -> *const TcpListenerState { + pub fn state(&self) -> *const RefCell { &*self.state } - pub fn try_accept(&mut self) -> Option { - self.state.backlog.pop_front() - } - - pub fn close(self) { - // just drop + pub fn try_accept(&self) -> Option { + self.state.borrow_mut().backlog.pop_front() } } @@ -383,7 +424,7 @@ impl TcpStreamState { #[derive(Debug)] pub struct TcpStream { raw: *mut lwip_sys::tcp_pcb, - state: Box + state: Box> } impl TcpStream { @@ -392,11 +433,11 @@ impl TcpStream { pbuf: *mut lwip_sys::pbuf, err: lwip_sys::err) -> lwip_sys::err { if err != lwip_sys::ERR_OK { return err } unsafe { - let state = arg as *mut TcpStreamState; + let state = arg as *mut RefCell; if pbuf.is_null() { - (*state).recv_buffer.push_back(Err(Error::ConnectionClosed)) + (*state).borrow_mut().recv_buffer.push_back(Err(Error::ConnectionClosed)) } else { - (*state).recv_buffer.push_back(Ok(Pbuf::from_raw(pbuf))) + (*state).borrow_mut().recv_buffer.push_back(Ok(Pbuf::from_raw(pbuf))) } } lwip_sys::ERR_OK @@ -405,25 +446,25 @@ impl TcpStream { extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb, _len: u16) -> lwip_sys::err { unsafe { - let state = arg as *mut TcpStreamState; - (*state).send_avail = lwip_sys::tcp_sndbuf_(raw) as usize; + let state = arg as *mut RefCell; + (*state).borrow_mut().send_avail = lwip_sys::tcp_sndbuf_(raw) as usize; } lwip_sys::ERR_OK } extern fn err(arg: *mut c_void, err: lwip_sys::err) { unsafe { - let state = arg as *mut TcpStreamState; - (*state).recv_buffer.push_back(result_from(err, || unreachable!())) + let state = arg as *mut RefCell; + (*state).borrow_mut().recv_buffer.push_back(result_from(err, || unreachable!())) } } unsafe { - let mut state = Box::new(TcpStreamState { + let mut state = Box::new(RefCell::new(TcpStreamState { recv_buffer: LinkedList::new(), send_avail: lwip_sys::tcp_sndbuf_(raw) as usize - }); - let arg = &mut *state as *mut TcpStreamState as *mut _; + })); + let arg = &mut *state as *mut RefCell as *mut _; lwip_sys::tcp_arg(raw, arg); lwip_sys::tcp_recv(raw, recv); lwip_sys::tcp_sent(raw, sent); @@ -432,34 +473,43 @@ impl TcpStream { } } - pub fn state(&self) -> *const TcpStreamState { + pub fn state(&self) -> *const RefCell { &*self.state } - pub fn write(&mut self, data: &[u8]) -> Result { + pub fn write(&self, data: &[u8]) -> Result { let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; let len = if data.len() < sndbuf { data.len() } else { sndbuf }; let result = result_from(unsafe { lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16, - lwip_sys::TCP_WRITE_FLAG_COPY) + lwip_sys::TCP_WRITE_FLAG_COPY | + lwip_sys::TCP_WRITE_FLAG_MORE) }, || len); - self.state.send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; + self.state.borrow_mut().send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; result } - pub fn try_read(&mut self) -> Result>> { - match self.state.recv_buffer.front() { + pub fn flush(&self) -> Result<()> { + const EMPTY_DATA: [u8; 0] = []; + result_from(unsafe { + lwip_sys::tcp_write(self.raw, &EMPTY_DATA as *const [u8] as *const _, 0, 0) + }, || ()) + } + + pub fn try_read(&self) -> Result>> { + let mut state = self.state.borrow_mut(); + match state.recv_buffer.front() { None => return Ok(None), Some(&Err(err)) => return Err(err), Some(_) => () } - match self.state.recv_buffer.pop_front() { + match state.recv_buffer.pop_front() { Some(Ok(pbuf)) => return Ok(Some(pbuf)), _ => unreachable!() } } - pub fn shutdown(&mut self, how: Shutdown) -> Result<()> { + pub fn shutdown(&self, how: Shutdown) -> Result<()> { let (shut_rx, shut_tx) = match how { Shutdown::Read => (1, 0), Shutdown::Write => (0, 1), diff --git a/artiq/runtime.rs/src/io.rs b/artiq/runtime.rs/src/io.rs index 48dee82c4..fc7e802ae 100644 --- a/artiq/runtime.rs/src/io.rs +++ b/artiq/runtime.rs/src/io.rs @@ -1,8 +1,10 @@ extern crate fringe; extern crate lwip; +use std::cell::RefCell; use std::vec::Vec; use std::time::{Instant, Duration}; +use std::io::{Read, Write, Result, Error, ErrorKind}; use self::fringe::OwnedStack; use self::fringe::generator::{Generator, Yielder}; @@ -37,7 +39,7 @@ impl Scheduler { Scheduler { threads: Vec::new(), index: 0 } } - pub unsafe fn spawn(&mut self, stack_size: usize, f: F) { + pub unsafe fn spawn(&mut self, stack_size: usize, f: F) { let stack = OwnedStack::new(stack_size); let thread = Thread { generator: Generator::unsafe_new(stack, move |yielder, _| { @@ -104,43 +106,34 @@ impl Scheduler { #[derive(Debug)] enum WaitEvent { - UdpReadable(*const lwip::UdpSocketState), - TcpAcceptable(*const lwip::TcpListenerState), - TcpWriteable(*const lwip::TcpStreamState), - TcpReadable(*const lwip::TcpStreamState), + UdpReadable(*const RefCell), + TcpAcceptable(*const RefCell), + TcpWriteable(*const RefCell), + TcpReadable(*const RefCell), } impl WaitEvent { fn completed(&self) -> bool { match *self { WaitEvent::UdpReadable(state) => - unsafe { (*state).readable() }, + unsafe { (*state).borrow().readable() }, WaitEvent::TcpAcceptable(state) => - unsafe { (*state).acceptable() }, + unsafe { (*state).borrow().acceptable() }, WaitEvent::TcpWriteable(state) => - unsafe { (*state).writeable() }, + unsafe { (*state).borrow().writeable() }, WaitEvent::TcpReadable(state) => - unsafe { (*state).readable() }, + unsafe { (*state).borrow().readable() }, } } } unsafe impl Send for WaitEvent {} -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub enum Error { - Lwip(lwip::Error), - TimedOut, - Interrupted -} - -pub type Result = ::std::result::Result; - -#[derive(Debug)] -pub struct Waiter<'a>(&'a mut Yielder); +#[derive(Debug, Clone, Copy)] +pub struct Waiter<'a>(&'a Yielder); impl<'a> Waiter<'a> { - pub fn sleep(&mut self, duration: Duration) -> Result<()> { + pub fn sleep(&self, duration: Duration) -> Result<()> { let request = WaitRequest { timeout: Some(Instant::now() + duration), event: None @@ -148,44 +141,174 @@ impl<'a> Waiter<'a> { match self.0.suspend(request) { WaitResult::TimedOut => Ok(()), - WaitResult::Interrupted => Err(Error::Interrupted), + WaitResult::Interrupted => Err(Error::new(ErrorKind::Interrupted, "")), _ => unreachable!() } } - fn suspend(&mut self, request: WaitRequest) -> Result<()> { + fn suspend(&self, request: WaitRequest) -> Result<()> { match self.0.suspend(request) { WaitResult::Completed => Ok(()), - WaitResult::TimedOut => Err(Error::TimedOut), - WaitResult::Interrupted => Err(Error::Interrupted) + WaitResult::TimedOut => Err(Error::new(ErrorKind::TimedOut, "")), + WaitResult::Interrupted => Err(Error::new(ErrorKind::Interrupted, "")) } } - pub fn udp_readable(&mut self, socket: &lwip::UdpSocket) -> Result<()> { + pub fn udp_readable(&self, socket: &lwip::UdpSocket) -> Result<()> { self.suspend(WaitRequest { timeout: None, event: Some(WaitEvent::UdpReadable(socket.state())) }) } - pub fn tcp_acceptable(&mut self, socket: &lwip::TcpListener) -> Result<()> { + pub fn tcp_acceptable(&self, socket: &lwip::TcpListener) -> Result<()> { self.suspend(WaitRequest { timeout: None, event: Some(WaitEvent::TcpAcceptable(socket.state())) }) } - pub fn tcp_writeable(&mut self, socket: &lwip::TcpStream) -> Result<()> { + pub fn tcp_writeable(&self, socket: &lwip::TcpStream) -> Result<()> { self.suspend(WaitRequest { timeout: None, event: Some(WaitEvent::TcpWriteable(socket.state())) }) } - pub fn tcp_readable(&mut self, socket: &lwip::TcpStream) -> Result<()> { + pub fn tcp_readable(&self, socket: &lwip::TcpStream) -> Result<()> { self.suspend(WaitRequest { timeout: None, event: Some(WaitEvent::TcpReadable(socket.state())) }) } } + +// Wrappers around lwip + +pub use self::lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr}; + +#[derive(Debug)] +pub struct UdpSocket<'a> { + waiter: Waiter<'a>, + lower: lwip::UdpSocket +} + +impl<'a> UdpSocket<'a> { + pub fn new(waiter: Waiter<'a>) -> Result { + Ok(UdpSocket { + waiter: waiter, + lower: try!(lwip::UdpSocket::new()) + }) + } + + pub fn bind(&self, addr: SocketAddr) -> Result<()> { + Ok(try!(self.lower.bind(addr))) + } + + pub fn connect(&self, addr: SocketAddr) -> Result<()> { + Ok(try!(self.lower.connect(addr))) + } + + pub fn disconnect(&self) -> Result<()> { + Ok(try!(self.lower.disconnect())) + } + + pub fn send_to(&self, buf: &[u8], addr: SocketAddr) -> Result { + try!(self.lower.send_to(lwip::Pbuf::from_slice(buf), addr)); + Ok(buf.len()) + } + + pub fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> { + try!(self.waiter.udp_readable(&self.lower)); + let (pbuf, addr) = self.lower.try_recv().unwrap(); + let len = ::std::cmp::min(buf.len(), pbuf.len()); + (&mut buf[..len]).copy_from_slice(&pbuf.as_slice()[..len]); + Ok((len, addr)) + } + + pub fn send(&self, buf: &[u8]) -> Result { + try!(self.lower.send(lwip::Pbuf::from_slice(buf))); + Ok(buf.len()) + } + + pub fn recv(&self, buf: &mut [u8]) -> Result { + try!(self.waiter.udp_readable(&self.lower)); + let (pbuf, _addr) = self.lower.try_recv().unwrap(); + // lwip checks that addr matches the bind/connect call + let len = ::std::cmp::min(buf.len(), pbuf.len()); + (&mut buf[..len]).copy_from_slice(&pbuf.as_slice()[..len]); + Ok(len) + } +} + +#[derive(Debug)] +pub struct TcpListener<'a> { + waiter: Waiter<'a>, + lower: lwip::TcpListener +} + +impl<'a> TcpListener<'a> { + pub fn bind(waiter: Waiter<'a>, addr: SocketAddr) -> Result { + Ok(TcpListener { + waiter: waiter, + lower: try!(lwip::TcpListener::bind(addr)) + }) + } + + pub fn accept(&self) -> Result<(TcpStream, SocketAddr)> { + try!(self.waiter.tcp_acceptable(&self.lower)); + loop {} + let stream_lower = self.lower.try_accept().unwrap(); + let addr = SocketAddr::new(IP_ANY, 0); // FIXME: coax lwip into giving real addr here + Ok((TcpStream { + waiter: self.waiter, + lower: stream_lower, + buffer: None + }, addr)) + } +} + +pub use self::lwip::Shutdown; + +#[derive(Debug)] +pub struct TcpStream<'a> { + waiter: Waiter<'a>, + lower: lwip::TcpStream, + buffer: Option<(lwip::Pbuf<'static>, usize)> +} + +impl<'a> TcpStream<'a> { + pub fn shutdown(&self, how: Shutdown) -> Result<()> { + Ok(try!(self.lower.shutdown(how))) + } +} + +impl<'a> Read for TcpStream<'a> { + fn read(&mut self, buf: &mut [u8]) -> Result { + if self.buffer.is_none() { + try!(self.waiter.tcp_readable(&self.lower)); + let pbuf = try!(self.lower.try_read()).unwrap(); + self.buffer = Some((pbuf, 0)) + } + + let (pbuf, pos) = self.buffer.take().unwrap(); + let slice = &pbuf.as_slice()[pos..]; + let len = ::std::cmp::min(buf.len(), slice.len()); + buf.copy_from_slice(&slice[..len]); + if len < slice.len() { + self.buffer = Some((pbuf, pos + len)) + } + Ok(len) + } +} + +impl<'a> Write for TcpStream<'a> { + fn write(&mut self, buf: &[u8]) -> Result { + try!(self.waiter.tcp_writeable(&self.lower)); + Ok(try!(self.lower.write(buf))) + } + + fn flush(&mut self) -> Result<()> { + Ok(try!(self.lower.flush())) + } +} diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 71407129c..04e987129 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -2,8 +2,6 @@ #[macro_use] extern crate std_artiq as std; -extern crate fringe; -extern crate lwip; use std::prelude::v1::*; @@ -14,21 +12,23 @@ extern { fn lwip_service(); } -fn timer(mut waiter: io::Waiter) { +fn timer(waiter: io::Waiter) { loop { println!("tick"); waiter.sleep(std::time::Duration::from_millis(1000)).unwrap(); } } -fn echo(mut waiter: io::Waiter) { - let mut socket = lwip::UdpSocket::new().unwrap(); - socket.bind(lwip::SocketAddr::new(lwip::IP_ANY, 1234)).unwrap(); +fn echo(waiter: io::Waiter) { + let addr = io::SocketAddr::new(io::IP_ANY, 1234); + let listener = io::TcpListener::bind(waiter, addr).unwrap(); loop { - waiter.udp_readable(&socket).unwrap(); - let (addr, pbuf) = socket.try_recv().unwrap(); - println!("{:?}", core::str::from_utf8(pbuf.as_slice())); - socket.send_to(addr, pbuf).unwrap(); + let (mut stream, _addr) = listener.accept().unwrap(); + loop { + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + stream.write(&buf).unwrap(); + } } }