From 30e997f0450951bd198280fae60f451e7544c4f6 Mon Sep 17 00:00:00 2001 From: whitequark Date: Sun, 2 Oct 2016 04:37:24 +0000 Subject: [PATCH] Rust: implement idle kernels and session takeover. --- artiq/runtime.rs/Cargo.lock | 6 +- artiq/runtime.rs/Cargo.toml | 2 +- artiq/runtime.rs/liblwip/Cargo.toml | 4 + artiq/runtime.rs/liblwip/lib.rs | 12 +++ artiq/runtime.rs/src/kernel.rs | 12 +-- artiq/runtime.rs/src/lib.rs | 19 ++-- artiq/runtime.rs/src/logger.rs | 16 ++- artiq/runtime.rs/src/sched.rs | 160 +++++++++++++++++++++++----- artiq/runtime.rs/src/session.rs | 155 ++++++++++++++++++--------- artiq/runtime.rs/src/urc.rs | 31 ++++++ 10 files changed, 316 insertions(+), 101 deletions(-) create mode 100644 artiq/runtime.rs/src/urc.rs diff --git a/artiq/runtime.rs/Cargo.lock b/artiq/runtime.rs/Cargo.lock index 1f5913d15..eb4659fcb 100644 --- a/artiq/runtime.rs/Cargo.lock +++ b/artiq/runtime.rs/Cargo.lock @@ -5,7 +5,7 @@ dependencies = [ "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "fringe 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "log_buffer 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log_buffer 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "lwip 0.0.0", "std_artiq 0.0.0", "walkdir 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -49,7 +49,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "log_buffer" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -96,7 +96,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "23e3757828fa702a20072c37ff47938e9dd331b92fac6e223d26d4b7a55f7ee2" "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" -"checksum log_buffer 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8beb5ba24eca52f9958874445c4de5e086a7e82a1ec6b7ab81e5fcfb134f25a" +"checksum log_buffer 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ec57723b84bbe7bdf76aa93169c9b59e67473317c6de3a83cb2a0f8ccb2aa493" "checksum walkdir 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "c66c0b9792f0a765345452775f3adbd28dde9d33f30d13e5dcc5ae17cf6f3780" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" diff --git a/artiq/runtime.rs/Cargo.toml b/artiq/runtime.rs/Cargo.toml index aba3d1aae..725da7533 100644 --- a/artiq/runtime.rs/Cargo.toml +++ b/artiq/runtime.rs/Cargo.toml @@ -14,7 +14,7 @@ path = "src/lib.rs" [dependencies] std_artiq = { path = "libstd_artiq" } -lwip = { path = "liblwip" } +lwip = { path = "liblwip", default-features = false } fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] } log = { version = "0.3", default-features = false } log_buffer = { version = "1.0" } diff --git a/artiq/runtime.rs/liblwip/Cargo.toml b/artiq/runtime.rs/liblwip/Cargo.toml index cd76fe969..886f69415 100644 --- a/artiq/runtime.rs/liblwip/Cargo.toml +++ b/artiq/runtime.rs/liblwip/Cargo.toml @@ -10,3 +10,7 @@ path = "lib.rs" [dependencies] lwip-sys = { path = "../liblwip-sys" } std_artiq = { path = "../libstd_artiq" } + +[features] +default = ["preemption"] +preemption = [] diff --git a/artiq/runtime.rs/liblwip/lib.rs b/artiq/runtime.rs/liblwip/lib.rs index f8938653b..c996074f0 100644 --- a/artiq/runtime.rs/liblwip/lib.rs +++ b/artiq/runtime.rs/liblwip/lib.rs @@ -175,6 +175,9 @@ pub struct Pbuf<'payload> { phantom: PhantomData<&'payload [u8]> } +#[cfg(not(feature = "preemption"))] +unsafe impl<'payload> Send for Pbuf<'payload> {} + impl<'payload> Pbuf<'payload> { unsafe fn from_raw(raw: *mut lwip_sys::pbuf) -> Pbuf<'payload> { Pbuf { raw: raw, phantom: PhantomData } @@ -259,6 +262,9 @@ pub struct UdpSocket { state: Box> } +#[cfg(not(feature = "preemption"))] +unsafe impl Send for UdpSocket {} + impl UdpSocket { pub fn new() -> Result { extern fn recv(arg: *mut c_void, _pcb: *mut lwip_sys::udp_pcb, @@ -347,6 +353,9 @@ pub struct TcpListener { state: Box> } +#[cfg(not(feature = "preemption"))] +unsafe impl Send for TcpListener {} + impl TcpListener { pub fn bind(addr: SocketAddr) -> Result { extern fn accept(arg: *mut c_void, newpcb: *mut lwip_sys::tcp_pcb, @@ -428,6 +437,9 @@ pub struct TcpStream { state: Box> } +#[cfg(not(feature = "preemption"))] +unsafe impl Send for TcpStream {} + impl TcpStream { fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream { extern fn recv(arg: *mut c_void, _raw: *mut lwip_sys::tcp_pcb, diff --git a/artiq/runtime.rs/src/kernel.rs b/artiq/runtime.rs/src/kernel.rs index 4967ec597..050b57ec4 100644 --- a/artiq/runtime.rs/src/kernel.rs +++ b/artiq/runtime.rs/src/kernel.rs @@ -15,14 +15,14 @@ pub unsafe fn start() { stop(); extern { - static _binary_ksupport_elf_start: (); - static _binary_ksupport_elf_end: (); + static _binary_ksupport_elf_start: u8; + static _binary_ksupport_elf_end: u8; } - let ksupport_start = &_binary_ksupport_elf_start as *const _ as usize; - let ksupport_end = &_binary_ksupport_elf_end as *const _ as usize; - ptr::copy_nonoverlapping(ksupport_start as *const u8, + let ksupport_start = &_binary_ksupport_elf_start as *const _; + let ksupport_end = &_binary_ksupport_elf_end as *const _; + ptr::copy_nonoverlapping(ksupport_start, (KERNELCPU_EXEC_ADDRESS - KSUPPORT_HEADER_SIZE) as *mut u8, - ksupport_end - ksupport_start); + ksupport_end as usize - ksupport_start as usize); csr::kernel_cpu::reset_write(0); } diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 8070552fc..9b913c9dd 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -1,5 +1,5 @@ #![no_std] -#![feature(libc)] +#![feature(libc, borrow_state, const_fn)] #[macro_use] extern crate std_artiq as std; @@ -8,16 +8,19 @@ extern crate libc; extern crate log; extern crate log_buffer; extern crate byteorder; +extern crate fringe; +extern crate lwip; use logger::BufferLogger; mod board; -mod sched; mod config; mod clock; mod rtio_crg; mod mailbox; +mod urc; +mod sched; mod logger; mod cache; @@ -36,9 +39,9 @@ include!(concat!(env!("OUT_DIR"), "/git_info.rs")); #[no_mangle] pub unsafe extern fn rust_main() { - static mut log_buffer: [u8; 4096] = [0; 4096]; - BufferLogger::new(&mut log_buffer[..]) - .register(move |logger| { + static mut LOG_BUFFER: [u8; 4096] = [0; 4096]; + BufferLogger::new(&mut LOG_BUFFER[..]) + .register(move || { info!("booting ARTIQ runtime ({})", GIT_COMMIT); clock::init(); @@ -46,13 +49,11 @@ pub unsafe extern fn rust_main() { network_init(); let mut scheduler = sched::Scheduler::new(); - scheduler.spawn(8192, move |waiter| { - session::handler(waiter, logger) - }); + scheduler.spawner().spawn(8192, session::handler); loop { + scheduler.run(); lwip_service(); - scheduler.run() } }) } diff --git a/artiq/runtime.rs/src/logger.rs b/artiq/runtime.rs/src/logger.rs index 377470a2e..d09995bf7 100644 --- a/artiq/runtime.rs/src/logger.rs +++ b/artiq/runtime.rs/src/logger.rs @@ -1,3 +1,4 @@ +use core::{mem, ptr}; use core::cell::RefCell; use log::{self, Log, LogMetadata, LogRecord, LogLevelFilter}; use log_buffer::LogBuffer; @@ -6,9 +7,10 @@ pub struct BufferLogger { buffer: RefCell> } -// We can never preempt from within the logger, so there can be no data races. unsafe impl Sync for BufferLogger {} +static mut LOGGER: *const BufferLogger = ptr::null(); + impl BufferLogger { pub fn new(buffer: &'static mut [u8]) -> BufferLogger { BufferLogger { @@ -16,7 +18,7 @@ impl BufferLogger { } } - pub fn register(&self, f: F) { + pub fn register(&self, f: F) { // log::set_logger_raw captures a pointer to ourselves, so we must prevent // ourselves from being moved or dropped after that function is called (and // before log::shutdown_logger_raw is called). @@ -25,9 +27,17 @@ impl BufferLogger { max_log_level.set(LogLevelFilter::Trace); self as *const Log }).expect("global logger can only be initialized once"); + LOGGER = self; } - f(self); + f(); log::shutdown_logger_raw().unwrap(); + unsafe { + LOGGER = ptr::null(); + } + } + + pub fn with_instance R>(f: F) -> R { + f(unsafe { mem::transmute::<*const BufferLogger, &BufferLogger>(LOGGER) }) } pub fn clear(&self) { diff --git a/artiq/runtime.rs/src/sched.rs b/artiq/runtime.rs/src/sched.rs index 1e7b58a5a..124af9410 100644 --- a/artiq/runtime.rs/src/sched.rs +++ b/artiq/runtime.rs/src/sched.rs @@ -1,14 +1,14 @@ #![allow(dead_code)] -extern crate fringe; -extern crate lwip; - -use std::cell::RefCell; +use std::cell::{RefCell, BorrowState}; use std::vec::Vec; +use std::rc::Rc; use std::time::{Instant, Duration}; use std::io::{Read, Write, Result, Error, ErrorKind}; -use self::fringe::OwnedStack; -use self::fringe::generator::{Generator, Yielder}; +use fringe::OwnedStack; +use fringe::generator::{Generator, Yielder, State as GeneratorState}; +use lwip; +use urc::Urc; #[derive(Debug)] struct WaitRequest { @@ -25,38 +25,87 @@ enum WaitResult { #[derive(Debug)] struct Thread { - generator: Generator, + generator: Generator, waiting_for: WaitRequest, interrupted: bool } -#[derive(Debug)] -pub struct Scheduler { - threads: Vec, - index: usize -} - -impl Scheduler { - pub fn new() -> Scheduler { - Scheduler { threads: Vec::new(), index: 0 } - } - - pub unsafe fn spawn(&mut self, stack_size: usize, f: F) { +impl Thread { + unsafe fn new(spawner: Spawner, stack_size: usize, f: F) -> ThreadHandle + where F: 'static + FnOnce(Waiter, Spawner) + Send { let stack = OwnedStack::new(stack_size); - let thread = Thread { - generator: Generator::unsafe_new(stack, move |yielder, _| { - f(Waiter(yielder)) + ThreadHandle::new(Thread { + generator: Generator::unsafe_new(stack, |yielder, _| { + f(Waiter(yielder), spawner) }), waiting_for: WaitRequest { timeout: None, event: None }, interrupted: false - }; - self.threads.push(thread) + }) + } + + pub fn terminated(&self) -> bool { + // FIXME: https://github.com/nathan7/libfringe/pull/56 + match self.generator.state() { + GeneratorState::Unavailable => true, + GeneratorState::Runnable => false + } + } + + pub fn interrupt(&mut self) { + self.interrupted = true + } +} + +#[derive(Debug, Clone)] +pub struct ThreadHandle(Urc>); + +impl ThreadHandle { + fn new(thread: Thread) -> ThreadHandle { + ThreadHandle(Urc::new(RefCell::new(thread))) + } + + pub fn terminated(&self) -> bool { + match self.0.borrow_state() { + BorrowState::Unused => self.0.borrow().terminated(), + _ => false // the running thread hasn't terminated + } + } + + pub fn interrupt(&self) { + // FIXME: use try_borrow() instead once it's available + match self.0.borrow_state() { + BorrowState::Unused => self.0.borrow_mut().interrupt(), + _ => panic!("cannot interrupt the running thread") + } + } +} + +#[derive(Debug)] +pub struct Scheduler { + threads: Vec, + index: usize, + spawner: Spawner +} + +impl Scheduler { + pub fn new() -> Scheduler { + Scheduler { + threads: Vec::new(), + index: 0, + spawner: Spawner::new() + } + } + + pub fn spawner(&self) -> &Spawner { + &self.spawner } pub fn run(&mut self) { + self.threads.append(&mut *self.spawner.queue.borrow_mut()); + if self.threads.len() == 0 { return } let now = Instant::now(); @@ -66,7 +115,7 @@ impl Scheduler { self.index = (self.index + 1) % self.threads.len(); let result = { - let thread = &mut self.threads[self.index]; + let thread = &mut *self.threads[self.index].0.borrow_mut(); match thread.waiting_for { _ if thread.interrupted => { thread.interrupted = false; @@ -97,7 +146,8 @@ impl Scheduler { }, Some(wait_request) => { // The thread has suspended itself. - self.threads[self.index].waiting_for = wait_request + let thread = &mut *self.threads[self.index].0.borrow_mut(); + thread.waiting_for = wait_request } } @@ -106,8 +156,27 @@ impl Scheduler { } } +#[derive(Debug, Clone)] +pub struct Spawner { + queue: Urc>> +} + +impl Spawner { + fn new() -> Spawner { + Spawner { queue: Urc::new(RefCell::new(Vec::new())) } + } + + pub fn spawn(&self, stack_size: usize, f: F) -> ThreadHandle + where F: 'static + FnOnce(Waiter, Spawner) + Send { + let handle = unsafe { Thread::new(self.clone(), stack_size, f) }; + self.queue.borrow_mut().push(handle.clone()); + handle + } +} + enum WaitEvent { Completion(*const (Fn() -> bool + 'static)), + Termination(*const RefCell), UdpReadable(*const RefCell), TcpAcceptable(*const RefCell), TcpWriteable(*const RefCell), @@ -119,6 +188,8 @@ impl WaitEvent { match *self { WaitEvent::Completion(f) => unsafe { (*f)() }, + WaitEvent::Termination(thread) => + unsafe { (*thread).borrow().terminated() }, WaitEvent::UdpReadable(state) => unsafe { (*state).borrow().readable() }, WaitEvent::TcpAcceptable(state) => @@ -173,6 +244,13 @@ impl<'a> Waiter<'a> { } } + pub fn join(&self, thread: ThreadHandle) -> Result<()> { + self.suspend(WaitRequest { + timeout: None, + event: Some(WaitEvent::Termination(&*thread.0)) + }) + } + pub fn until bool + 'static>(&self, f: F) -> Result<()> { self.suspend(WaitRequest { timeout: None, @@ -211,7 +289,7 @@ impl<'a> Waiter<'a> { // Wrappers around lwip -pub use self::lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr}; +pub use lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr}; #[derive(Debug)] pub struct UdpSocket<'a> { @@ -227,6 +305,14 @@ impl<'a> UdpSocket<'a> { }) } + pub fn into_lower(self) -> lwip::UdpSocket { + self.lower + } + + pub fn from_lower(waiter: Waiter<'a>, inner: lwip::UdpSocket) -> UdpSocket { + UdpSocket { waiter: waiter, lower: inner } + } + pub fn bind(&self, addr: SocketAddr) -> Result<()> { Ok(try!(self.lower.bind(addr))) } @@ -285,6 +371,14 @@ impl<'a> TcpListener<'a> { }) } + pub fn into_lower(self) -> lwip::TcpListener { + self.lower + } + + pub fn from_lower(waiter: Waiter<'a>, inner: lwip::TcpListener) -> TcpListener { + TcpListener { waiter: waiter, lower: inner } + } + pub fn accept(&self) -> Result<(TcpStream, SocketAddr)> { try!(self.waiter.tcp_acceptable(&self.lower)); let stream_lower = self.lower.try_accept().unwrap(); @@ -301,7 +395,9 @@ impl<'a> TcpListener<'a> { } } -pub use self::lwip::Shutdown; +pub use lwip::Shutdown; + +pub struct TcpStreamInner(lwip::TcpStream, Option<(lwip::Pbuf<'static>, usize)>); #[derive(Debug)] pub struct TcpStream<'a> { @@ -311,6 +407,14 @@ pub struct TcpStream<'a> { } impl<'a> TcpStream<'a> { + pub fn into_lower(self) -> TcpStreamInner { + TcpStreamInner(self.lower, self.buffer) + } + + pub fn from_lower(waiter: Waiter<'a>, inner: TcpStreamInner) -> TcpStream { + TcpStream { waiter: waiter, lower: inner.0, buffer: inner.1 } + } + pub fn shutdown(&self, how: Shutdown) -> Result<()> { Ok(try!(self.lower.shutdown(how))) } diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index 56d7d81f6..3b280498c 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -1,11 +1,13 @@ use std::prelude::v1::*; -use std::mem; -use std::str; +use std::{mem, str}; +use std::cell::RefCell; use std::io::{self, Read}; use {config, rtio_crg, clock, mailbox, kernel}; use logger::BufferLogger; use cache::Cache; -use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY}; +use urc::Urc; +use sched::{ThreadHandle, Waiter, Spawner}; +use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY}; use session_proto as host; use kernel_proto as kern; @@ -49,14 +51,16 @@ enum KernelState { // Per-connection state #[derive(Debug)] -struct Session { +struct Session<'a> { + congress: &'a mut Congress, kernel_state: KernelState, watchdog_set: clock::WatchdogSet } -impl Session { - fn new() -> Session { +impl<'a> Session<'a> { + fn new(congress: &mut Congress) -> Session { Session { + congress: congress, kernel_state: KernelState::Absent, watchdog_set: clock::WatchdogSet::new() } @@ -70,7 +74,7 @@ impl Session { } } -impl Drop for Session { +impl<'a> Drop for Session<'a> { fn drop(&mut self) { kernel::stop() } @@ -123,10 +127,9 @@ fn kern_acknowledge() -> io::Result<()> { Ok(()) } -fn comm_handle(logger: &BufferLogger, - waiter: Waiter, - stream: &mut TcpStream, - session: &mut Session) -> io::Result<()> { +fn process_host_message(waiter: Waiter, + stream: &mut TcpStream, + session: &mut Session) -> io::Result<()> { match try!(host_read(stream)) { host::Request::Ident => host_write(stream, host::Reply::Ident(::board::ident(&mut [0; 64]))), @@ -135,13 +138,15 @@ fn comm_handle(logger: &BufferLogger, host::Request::Log => { // Logging the packet with the log is inadvisable trace!("comm->host Log(...)"); - logger.extract(move |log| { - host::Reply::Log(log).write_to(stream) + BufferLogger::with_instance(|logger| { + logger.extract(|log| { + host::Reply::Log(log).write_to(stream) + }) }) } host::Request::LogClear => { - logger.clear(); + BufferLogger::with_instance(|logger| logger.clear()); host_write(stream, host::Reply::Log("")) } @@ -221,9 +226,8 @@ fn comm_handle(logger: &BufferLogger, } } -fn kern_handle(waiter: Waiter, - congress: &mut Congress, - session: &mut Session) -> io::Result<()> { +fn process_kern_message(waiter: Waiter, + session: &mut Session) -> io::Result<()> { kern::Message::wait_and_receive(waiter, |request| { match (&request, session.kernel_state) { (&kern::LoadReply { .. }, KernelState::Loaded) | @@ -246,10 +250,10 @@ fn kern_handle(waiter: Waiter, } kern::NowInitRequest => - kern_send(waiter, kern::NowInitReply(congress.now)), + kern_send(waiter, kern::NowInitReply(session.congress.now)), kern::NowSave(now) => { - congress.now = now; + session.congress.now = now; kern_acknowledge() } @@ -265,14 +269,14 @@ fn kern_handle(waiter: Waiter, } kern::CacheGetRequest { key } => { - let value = congress.cache.get(key); + let value = session.congress.cache.get(key); kern_send(waiter, kern::CacheGetReply { value: unsafe { mem::transmute::<*const [u32], &'static [u32]>(value) } }) } kern::CachePutRequest { key, value } => { - let succeeded = congress.cache.put(key, value).is_ok(); + let succeeded = session.congress.cache.put(key, value).is_ok(); kern_send(waiter, kern::CachePutReply { succeeded: succeeded }) } @@ -281,20 +285,17 @@ fn kern_handle(waiter: Waiter, }) } -fn handle(logger: &BufferLogger, - waiter: Waiter, - stream: &mut TcpStream, - congress: &mut Congress) -> io::Result<()> { - try!(check_magic(stream)); - - let mut session = Session::new(); +fn host_kernel_worker(waiter: Waiter, + stream: &mut TcpStream, + congress: &mut Congress) -> io::Result<()> { + let mut session = Session::new(congress); loop { if stream.readable() { - try!(comm_handle(logger, waiter, stream, &mut session)) + try!(process_host_message(waiter, stream, &mut session)); } if mailbox::receive() != 0 { - try!(kern_handle(waiter, congress, &mut session)) + try!(process_kern_message(waiter, &mut session)) } if session.kernel_state == KernelState::Running { @@ -313,27 +314,79 @@ fn handle(logger: &BufferLogger, } } -pub fn handler(waiter: Waiter, - logger: &BufferLogger) { - let mut congress = Congress::new(); - - let addr = SocketAddr::new(IP_ANY, 1381); - let listener = TcpListener::bind(waiter, addr).unwrap(); - info!("accepting network sessions in Rust"); - +fn flash_kernel_worker(waiter: Waiter, + congress: &mut Congress) -> io::Result<()> { + let mut session = Session::new(congress); loop { - let (mut stream, addr) = listener.accept().unwrap(); - info!("new connection from {:?}", addr); - - match handle(logger, waiter, &mut stream, &mut congress) { - Ok(()) => (), - Err(err) => { - if err.kind() == io::ErrorKind::UnexpectedEof { - info!("connection closed"); - } else { - error!("session aborted: {:?}", err); - } - } - } + try!(process_kern_message(waiter, &mut session)) + } +} + +fn respawn(spawner: Spawner, waiter: Waiter, + handle: &mut Option, + f: F) where F: 'static + FnOnce(Waiter, Spawner) + Send { + match handle.take() { + None => (), + Some(handle) => { + info!("terminating running kernel"); + handle.interrupt(); + waiter.join(handle).expect("cannot join interrupt thread") + } + } + + *handle = Some(spawner.spawn(8192, f)) +} + +pub fn handler(waiter: Waiter, spawner: Spawner) { + let congress = Urc::new(RefCell::new(Congress::new())); + + let addr = SocketAddr::new(IP_ANY, 1381); + let listener = TcpListener::bind(waiter, addr).expect("cannot bind socket"); + info!("accepting network sessions in Rust"); + + let mut kernel_thread = None; + loop { + if listener.acceptable() { + let (mut stream, addr) = listener.accept().expect("cannot accept client"); + match check_magic(&mut stream) { + Ok(()) => (), + Err(_) => continue + } + info!("new connection from {:?}", addr); + + let stream = stream.into_lower(); + let congress = congress.clone(); + respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| { + let mut stream = TcpStream::from_lower(waiter, stream); + let mut congress = congress.borrow_mut(); + match host_kernel_worker(waiter, &mut stream, &mut congress) { + Ok(()) => (), + Err(err) => { + if err.kind() == io::ErrorKind::UnexpectedEof { + info!("connection closed"); + } else { + error!("session aborted: {:?}", err); + } + } + } + }) + } + + if kernel_thread.is_none() { + info!("no connection, starting idle kernel"); + let congress = congress.clone(); + respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| { + let mut congress = congress.borrow_mut(); + match flash_kernel_worker(waiter, &mut congress) { + Ok(()) => + info!("idle kernel finished, standing by"), + Err(err) => { + error!("idle kernel aborted: {:?}", err); + } + } + }) + } + + waiter.relinquish() } } diff --git a/artiq/runtime.rs/src/urc.rs b/artiq/runtime.rs/src/urc.rs new file mode 100644 index 000000000..3ca67f9ec --- /dev/null +++ b/artiq/runtime.rs/src/urc.rs @@ -0,0 +1,31 @@ +use std::rc::Rc; +use std::ops::{Deref, DerefMut}; +use std::fmt; + +pub struct Urc(Rc); + +impl Urc { + pub fn new(value: T) -> Urc { Urc(Rc::new(value)) } +} + +unsafe impl Send for Urc {} + +unsafe impl Sync for Urc {} + +impl Deref for Urc { + type Target = T; + + fn deref(&self) -> &Self::Target { self.0.deref() } +} + +impl Clone for Urc { + fn clone(&self) -> Urc { + Urc(self.0.clone()) + } +} + +impl fmt::Debug for Urc { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +}