
512 lines
15 KiB
Raw Normal View History

2016-10-02 02:24:53 +08:00
use std::mem;
use std::result;
2017-10-25 10:31:27 +08:00
use std::cell::{Cell, RefCell};
use std::vec::Vec;
use std::io::{Read, Write, Result, Error, ErrorKind};
use fringe::OwnedStack;
use fringe::generator::{Generator, Yielder, State as GeneratorState};
use smoltcp::wire::IpEndpoint;
2017-10-25 10:31:27 +08:00
use smoltcp::socket::{SocketHandle, SocketRef};
2017-01-02 01:23:27 +08:00
use board;
use urc::Urc;
2016-08-30 19:20:04 +08:00
2017-10-25 10:31:27 +08:00
type SocketSet = ::smoltcp::socket::SocketSet<'static, 'static, 'static>;
2016-08-30 19:20:04 +08:00
struct WaitRequest {
event: Option<*mut FnMut() -> bool>,
timeout: Option<u64>
2016-08-30 19:20:04 +08:00
unsafe impl Send for WaitRequest {}
2016-08-30 19:20:04 +08:00
enum WaitResult {
2016-08-30 19:20:04 +08:00
struct Thread {
generator: Generator<WaitResult, WaitRequest, OwnedStack>,
2016-08-30 19:20:04 +08:00
waiting_for: WaitRequest,
interrupted: bool
impl Thread {
unsafe fn new<F>(io: &Io, stack_size: usize, f: F) -> ThreadHandle
where F: 'static + FnOnce(Io) + Send {
let spawned = io.spawned.clone();
let sockets = io.sockets.clone();
let stack = OwnedStack::new(stack_size);
ThreadHandle::new(Thread {
generator: Generator::unsafe_new(stack, |yielder, _| {
f(Io {
yielder: Some(yielder),
spawned: spawned,
sockets: sockets
waiting_for: WaitRequest {
event: None,
timeout: None
interrupted: false
pub fn terminated(&self) -> bool {
match self.generator.state() {
GeneratorState::Unavailable => true,
GeneratorState::Runnable => false
pub fn interrupt(&mut self) {
self.interrupted = true
pub struct ThreadHandle(Urc<RefCell<Thread>>);
impl ThreadHandle {
fn new(thread: Thread) -> ThreadHandle {
pub fn terminated(&self) -> bool {
2016-10-04 11:26:53 +08:00
match self.0.try_borrow() {
Ok(thread) => thread.terminated(),
Err(_) => false // the running thread hasn't terminated
pub fn interrupt(&self) {
2016-10-04 11:26:53 +08:00
match self.0.try_borrow_mut() {
Ok(mut thread) => thread.interrupt(),
Err(_) => panic!("cannot interrupt the running thread")
2016-08-30 19:20:04 +08:00
pub struct Scheduler {
threads: Vec<ThreadHandle>,
spawned: Urc<RefCell<Vec<ThreadHandle>>>,
sockets: Urc<RefCell<SocketSet>>,
run_idx: usize,
2016-08-30 19:20:04 +08:00
impl Scheduler {
pub fn new() -> Scheduler {
Scheduler {
threads: Vec::new(),
spawned: Urc::new(RefCell::new(Vec::new())),
sockets: Urc::new(RefCell::new(SocketSet::new(Vec::new()))),
run_idx: 0,
2016-08-30 19:20:04 +08:00
pub fn io(&self) -> Io<'static> {
Io {
yielder: None,
spawned: self.spawned.clone(),
sockets: self.sockets.clone()
2016-08-30 19:20:04 +08:00
pub fn run(&mut self) {
self.threads.append(&mut *self.spawned.borrow_mut());
2016-08-30 19:20:04 +08:00
if self.threads.len() == 0 { return }
2017-01-02 01:23:27 +08:00
let now = board::clock::get_ms();
let start_idx = self.run_idx;
2016-08-30 19:20:04 +08:00
loop {
self.run_idx = (self.run_idx + 1) % self.threads.len();
2016-08-30 19:20:04 +08:00
let result = {
let &mut Thread { ref mut generator, ref mut interrupted, ref waiting_for } =
&mut *self.threads[self.run_idx].0.borrow_mut();
if *interrupted {
*interrupted = false;
} else if waiting_for.event.is_none() && waiting_for.timeout.is_none() {
} else if|instant| now >= instant).unwrap_or(false) {
} else if|event| unsafe { (*event)() }).unwrap_or(false) {
} else if self.run_idx == start_idx {
// We've checked every thread and none of them are runnable.
} else {
2016-08-30 19:20:04 +08:00
match result {
None => {
// The thread has terminated.
self.run_idx = 0
2016-08-30 19:20:04 +08:00
Some(wait_request) => {
// The thread has suspended itself.
let mut thread = self.threads[self.run_idx].0.borrow_mut();
thread.waiting_for = wait_request
2016-08-30 19:20:04 +08:00
pub fn sockets(&self) -> &RefCell<SocketSet> {
pub struct Io<'a> {
yielder: Option<&'a Yielder<WaitResult, WaitRequest, OwnedStack>>,
spawned: Urc<RefCell<Vec<ThreadHandle>>>,
sockets: Urc<RefCell<SocketSet>>,
2016-08-30 19:20:04 +08:00
impl<'a> Io<'a> {
pub fn spawn<F>(&self, stack_size: usize, f: F) -> ThreadHandle
where F: 'static + FnOnce(Io) + Send {
let handle = unsafe { Thread::new(self, stack_size, f) };
2016-08-30 19:20:04 +08:00
fn yielder(&self) -> &'a Yielder<WaitResult, WaitRequest, OwnedStack> {
self.yielder.expect("cannot suspend the scheduler thread")
pub fn sleep(&self, duration_ms: u64) -> Result<()> {
2016-08-30 19:20:04 +08:00
let request = WaitRequest {
2017-01-02 01:23:27 +08:00
timeout: Some(board::clock::get_ms() + duration_ms),
2016-08-30 19:20:04 +08:00
event: None
match self.yielder().suspend(request) {
2016-08-30 19:20:04 +08:00
WaitResult::TimedOut => Ok(()),
WaitResult::Interrupted => Err(Error::new(ErrorKind::Interrupted, "")),
2016-08-30 19:20:04 +08:00
_ => unreachable!()
fn suspend(&self, request: WaitRequest) -> Result<()> {
match self.yielder().suspend(request) {
WaitResult::Completed => Ok(()),
WaitResult::TimedOut => Err(Error::new(ErrorKind::TimedOut, "")),
WaitResult::Interrupted => Err(Error::new(ErrorKind::Interrupted, ""))
pub fn relinquish(&self) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: None
pub fn until<F: FnMut() -> bool>(&self, mut f: F) -> Result<()> {
let f = unsafe { mem::transmute::<&mut FnMut() -> bool, *mut FnMut() -> bool>(&mut f) };
self.suspend(WaitRequest {
timeout: None,
event: Some(f)
pub fn until_ok<T, E, F: FnMut() -> result::Result<T, E>>(&self, mut f: F) -> Result<T> {
let mut value = None;
self.until(|| {
if let Ok(result) = f() {
value = Some(result)
pub fn join(&self, handle: ThreadHandle) -> Result<()> {
self.until(move || handle.terminated())
macro_rules! until {
($socket:expr, $ty:ty, |$var:ident| $cond:expr) => ({
let (sockets, handle) = ($, $socket.handle);
$ || {
let mut sockets = sockets.borrow_mut();
2017-10-25 10:31:27 +08:00
let $var = sockets.get::<$ty>(handle);
2016-08-30 19:20:04 +08:00
2017-10-25 10:31:27 +08:00
2017-08-24 13:50:59 +08:00
use ::smoltcp::Error as ErrorLower;
2017-08-24 13:50:59 +08:00
// type ErrorLower = ::smoltcp::Error;
type TcpSocketBuffer = ::smoltcp::socket::TcpSocketBuffer<'static>;
type TcpSocketLower = ::smoltcp::socket::TcpSocket<'static>;
pub struct TcpSocketHandle(SocketHandle);
pub struct TcpListener<'a> {
io: &'a Io<'a>,
handle: Cell<SocketHandle>,
buffer_size: Cell<usize>,
endpoint: Cell<IpEndpoint>
impl<'a> TcpListener<'a> {
fn new_lower(io: &'a Io<'a>, buffer_size: usize) -> SocketHandle {
let rx_buffer = vec![0; buffer_size];
let tx_buffer = vec![0; buffer_size];
pub fn new(io: &'a Io<'a>, buffer_size: usize) -> TcpListener<'a> {
TcpListener {
io: io,
handle: Cell::new(Self::new_lower(io, buffer_size)),
buffer_size: Cell::new(buffer_size),
endpoint: Cell::new(IpEndpoint::default())
2017-10-25 10:31:27 +08:00
fn with_lower<F, R>(&self, f: F) -> R
where F: FnOnce(SocketRef<TcpSocketLower>) -> R {
let mut sockets =;
2017-10-25 10:31:27 +08:00
let result = f(sockets.get(self.handle.get()));
pub fn is_open(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.is_open())
pub fn can_accept(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.is_active())
pub fn local_endpoint(&self) -> IpEndpoint {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.local_endpoint())
pub fn listen<T: Into<IpEndpoint>>(&self, endpoint: T) -> Result<()> {
let endpoint = endpoint.into();
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.listen(endpoint))
.map(|()| {
.map_err(|err| {
match err {
ErrorLower::Illegal =>
Error::new(ErrorKind::Other, "already listening"),
ErrorLower::Unaddressable =>
Error::new(ErrorKind::InvalidInput, "port cannot be zero"),
_ => unreachable!()
pub fn accept(&self) -> Result<TcpStream<'a>> {
// We're waiting until at least one half of the connection becomes open.
// This handles the case where a remote socket immediately sends a FIN--
// that still counts as accepting even though nothing may be sent.
let (sockets, handle) = (, self.handle.get()); || {
let mut sockets = sockets.borrow_mut();
2017-10-25 10:31:27 +08:00
let socket = sockets.get::<TcpSocketLower>(handle);
socket.may_send() || socket.may_recv()
let accepted = self.handle.get();
self.handle.set(Self::new_lower(, self.buffer_size.get()));
2017-08-24 13:50:59 +08:00
match self.listen(self.endpoint.get()) {
Ok(()) => (),
_ => unreachable!()
Ok(TcpStream {
handle: accepted
pub fn close(&self) {
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.close())
impl<'a> Drop for TcpListener<'a> {
fn drop(&mut self) {
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.close());
pub struct TcpStream<'a> {
io: &'a Io<'a>,
handle: SocketHandle
impl<'a> TcpStream<'a> {
pub fn into_handle(self) -> TcpSocketHandle {
let handle = self.handle;
pub fn from_handle(io: &'a Io<'a>, handle: TcpSocketHandle) -> TcpStream<'a> {
TcpStream {
io: io,
handle: handle.0
2017-10-25 10:31:27 +08:00
fn with_lower<F, R>(&self, f: F) -> R
where F: FnOnce(SocketRef<TcpSocketLower>) -> R {
let mut sockets =;
2017-10-25 10:31:27 +08:00
let result = f(sockets.get(self.handle));
pub fn is_open(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.is_open())
pub fn may_send(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.may_send())
pub fn may_recv(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.may_recv())
pub fn can_send(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.can_send())
pub fn can_recv(&self) -> bool {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.can_recv())
pub fn local_endpoint(&self) -> IpEndpoint {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.local_endpoint())
pub fn remote_endpoint(&self) -> IpEndpoint {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.remote_endpoint())
2017-09-25 06:57:27 +08:00
pub fn timeout(&self) -> Option<u64> {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.timeout())
2017-09-25 06:57:27 +08:00
pub fn set_timeout(&self, value: Option<u64>) {
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.set_timeout(value))
2017-09-25 06:57:27 +08:00
pub fn keep_alive(&self) -> Option<u64> {
2017-10-25 10:31:27 +08:00
self.with_lower(|s| s.keep_alive())
2017-09-25 06:57:27 +08:00
pub fn set_keep_alive(&self, value: Option<u64>) {
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.set_keep_alive(value))
2017-09-25 06:57:27 +08:00
pub fn close(&self) -> Result<()> {
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.close());
until!(self, TcpSocketLower, |s| !s.is_open())?;
// right now the socket may be in TIME-WAIT state. if we don't give it a chance to send
// a packet, and the user code executes a loop { s.listen();; s.close(); }
// then the last ACK will never be sent.
impl<'a> Read for TcpStream<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// Only borrow the underlying socket for the span of the next statement.
2017-10-25 10:31:27 +08:00
let result = self.with_lower(|mut s| s.recv_slice(buf));
match result {
// Slow path: we need to block until buffer is non-empty.
2017-08-24 13:50:59 +08:00
Ok(0) => {
until!(self, TcpSocketLower, |s| s.can_recv() || !s.may_recv())?;
2017-10-25 10:31:27 +08:00
match self.with_lower(|mut s| s.recv_slice(buf)) {
2017-08-24 13:50:59 +08:00
Ok(length) => Ok(length),
Err(ErrorLower::Illegal) => Ok(0),
_ => unreachable!()
2016-09-29 02:25:25 +08:00
// Fast path: we had data in buffer.
2017-08-24 13:50:59 +08:00
Ok(length) => Ok(length),
// Error path: the receive half of the socket is not open.
Err(ErrorLower::Illegal) => Ok(0),
// No other error may be returned.
Err(_) => unreachable!()
impl<'a> Write for TcpStream<'a> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
// Only borrow the underlying socket for the span of the next statement.
2017-10-25 10:31:27 +08:00
let result = self.with_lower(|mut s| s.send_slice(buf));
match result {
// Slow path: we need to block until buffer is non-full.
2017-08-24 13:50:59 +08:00
Ok(0) => {
until!(self, TcpSocketLower, |s| s.can_send() || !s.may_send())?;
2017-10-25 10:31:27 +08:00
match self.with_lower(|mut s| s.send_slice(buf)) {
2017-08-24 13:50:59 +08:00
Ok(length) => Ok(length),
Err(ErrorLower::Illegal) => Ok(0),
_ => unreachable!()
// Fast path: we had space in buffer.
2017-08-24 13:50:59 +08:00
Ok(length) => Ok(length),
// Error path: the transmit half of the socket is not open.
Err(ErrorLower::Illegal) => Ok(0),
// No other error may be returned.
Err(_) => unreachable!()
fn flush(&mut self) -> Result<()> {
until!(self, TcpSocketLower, |s| s.send_queue() == 0 || !s.may_send())?;
2017-10-25 10:31:27 +08:00
if self.with_lower(|s| s.send_queue()) == 0 {
} else {
Err(Error::new(ErrorKind::ConnectionAborted, "connection aborted"))
impl<'a> Drop for TcpStream<'a> {
fn drop(&mut self) {
2017-10-25 10:31:27 +08:00
self.with_lower(|mut s| s.close());