From 6bbaff81bfb1b4d76064503ebc33480f0d0acc45 Mon Sep 17 00:00:00 2001 From: whitequark Date: Tue, 4 Oct 2016 05:20:56 +0000 Subject: [PATCH] Rust: implement idle kernels. --- artiq/runtime.rs/src/lib.rs | 2 +- artiq/runtime.rs/src/sched.rs | 3 +- artiq/runtime.rs/src/session.rs | 105 ++++++++++++++++---------- artiq/runtime.rs/src/session_proto.rs | 4 +- artiq/runtime.rs/src/urc.rs | 2 +- 5 files changed, 69 insertions(+), 47 deletions(-) diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 4ddb0ba52..40963f72a 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -1,5 +1,5 @@ #![no_std] -#![feature(libc, borrow_state, const_fn, try_borrow)] +#![feature(libc, const_fn, try_borrow)] #[macro_use] extern crate std_artiq as std; diff --git a/artiq/runtime.rs/src/sched.rs b/artiq/runtime.rs/src/sched.rs index a5ab646ce..87d8562ff 100644 --- a/artiq/runtime.rs/src/sched.rs +++ b/artiq/runtime.rs/src/sched.rs @@ -1,8 +1,7 @@ #![allow(dead_code)] -use std::cell::{RefCell, BorrowState}; +use std::cell::RefCell; use std::vec::Vec; -use std::rc::Rc; use std::time::{Instant, Duration}; use std::io::{Read, Write, Result, Error, ErrorKind}; use fringe::OwnedStack; diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index 3b280498c..2188bd29b 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -95,7 +95,7 @@ fn check_magic(stream: &mut TcpStream) -> io::Result<()> { fn host_read(stream: &mut TcpStream) -> io::Result { let request = try!(host::Request::read_from(stream)); match &request { - &host::Request::LoadLibrary(_) => trace!("comm<-host LoadLibrary(...)"), + &host::Request::LoadKernel(_) => trace!("comm<-host LoadLibrary(...)"), _ => trace!("comm<-host {:?}", request) } Ok(request) @@ -127,6 +127,38 @@ fn kern_acknowledge() -> io::Result<()> { Ok(()) } +unsafe fn kern_load(waiter: Waiter, session: &mut Session, library: &[u8]) -> io::Result<()> { + if session.running() { + unexpected!("attempted to load a new kernel while a kernel was running") + } + + kernel::start(); + + try!(kern_send(waiter, kern::LoadRequest(&library))); + kern_recv(waiter, |reply| { + match reply { + kern::LoadReply { error: None } => { + session.kernel_state = KernelState::Loaded; + Ok(()) + } + kern::LoadReply { error: Some(cause) } => + unexpected!("cannot load kernel: {}", cause), + other => + unexpected!("unexpected reply from kernel CPU: {:?}", other) + } + }) +} + +fn kern_run(session: &mut Session) -> io::Result<()> { + if session.kernel_state != KernelState::Loaded { + unexpected!("attempted to run a kernel while not in Loaded state") + } + + session.kernel_state = KernelState::Running; + // TODO: make this a separate request + kern_acknowledge() +} + fn process_host_message(waiter: Waiter, stream: &mut TcpStream, session: &mut Session) -> io::Result<()> { @@ -187,40 +219,17 @@ fn process_host_message(waiter: Waiter, } } - host::Request::LoadLibrary(library) => { - if session.running() { - error!("attempted to load a new kernel while a kernel was running"); - return host_write(stream, host::Reply::LoadFailed) - } + host::Request::LoadKernel(kernel) => + match unsafe { kern_load(waiter, session, &kernel) } { + Ok(()) => host_write(stream, host::Reply::LoadCompleted), + Err(_) => host_write(stream, host::Reply::LoadFailed) + }, - unsafe { kernel::start() } - - try!(kern_send(waiter, kern::LoadRequest(&library))); - kern_recv(waiter, |reply| { - match reply { - kern::LoadReply { error: None } => { - session.kernel_state = KernelState::Loaded; - host_write(stream, host::Reply::LoadCompleted) - } - kern::LoadReply { error: Some(cause) } => { - error!("cannot load kernel: {}", cause); - host_write(stream, host::Reply::LoadFailed) - } - other => unexpected!("unexpected reply from kernel CPU: {:?}", other) - } - }) - } - - host::Request::RunKernel => { - if session.kernel_state != KernelState::Loaded { - error!("attempted to run a kernel while not in Loaded state"); - return host_write(stream, host::Reply::KernelStartupFailed) - } - - session.kernel_state = KernelState::Running; - // TODO: make this a separate request - kern_acknowledge() - } + host::Request::RunKernel => + match kern_run(session) { + Ok(()) => Ok(()), + Err(_) => host_write(stream, host::Reply::KernelStartupFailed) + }, request => unexpected!("unexpected request {:?} from host machine", request) } @@ -289,6 +298,7 @@ fn host_kernel_worker(waiter: Waiter, stream: &mut TcpStream, congress: &mut Congress) -> io::Result<()> { let mut session = Session::new(congress); + loop { if stream.readable() { try!(process_host_message(waiter, stream, &mut session)); @@ -315,8 +325,14 @@ fn host_kernel_worker(waiter: Waiter, } fn flash_kernel_worker(waiter: Waiter, - congress: &mut Congress) -> io::Result<()> { + congress: &mut Congress, + config_key: &str) -> io::Result<()> { let mut session = Session::new(congress); + + let kernel = config::read_to_end(config_key); + try!(unsafe { kern_load(waiter, &mut session, &kernel) }); + try!(kern_run(&mut session)); + loop { try!(process_kern_message(waiter, &mut session)) } @@ -328,9 +344,11 @@ fn respawn(spawner: Spawner, waiter: Waiter, match handle.take() { None => (), Some(handle) => { - info!("terminating running kernel"); - handle.interrupt(); - waiter.join(handle).expect("cannot join interrupt thread") + if !handle.terminated() { + info!("terminating running kernel"); + handle.interrupt(); + waiter.join(handle).expect("cannot join interrupt thread") + } } } @@ -372,16 +390,21 @@ pub fn handler(waiter: Waiter, spawner: Spawner) { }) } - if kernel_thread.is_none() { + if kernel_thread.as_ref().map_or(true, |h| h.terminated()) { info!("no connection, starting idle kernel"); + let congress = congress.clone(); respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| { let mut congress = congress.borrow_mut(); - match flash_kernel_worker(waiter, &mut congress) { + match flash_kernel_worker(waiter, &mut congress, "idle_kernel") { Ok(()) => info!("idle kernel finished, standing by"), Err(err) => { - error!("idle kernel aborted: {:?}", err); + if err.kind() == io::ErrorKind::Interrupted { + info!("idle kernel interrupted"); + } else { + error!("idle kernel aborted: {:?}", err); + } } } }) diff --git a/artiq/runtime.rs/src/session_proto.rs b/artiq/runtime.rs/src/session_proto.rs index ba3a64ae2..b3b90d8b5 100644 --- a/artiq/runtime.rs/src/session_proto.rs +++ b/artiq/runtime.rs/src/session_proto.rs @@ -122,7 +122,7 @@ pub enum Request { Ident, SwitchClock(u8), - LoadLibrary(Vec), + LoadKernel(Vec), RunKernel, RpcReply { tag: String }, // FIXME @@ -150,7 +150,7 @@ impl Request { 5 => { let mut code = vec![0; length - HEADER_SIZE]; try!(reader.read_exact(&mut code)); - Request::LoadLibrary(code) + Request::LoadKernel(code) }, 6 => Request::RunKernel, 7 => Request::RpcReply { diff --git a/artiq/runtime.rs/src/urc.rs b/artiq/runtime.rs/src/urc.rs index 3ca67f9ec..cec7751ea 100644 --- a/artiq/runtime.rs/src/urc.rs +++ b/artiq/runtime.rs/src/urc.rs @@ -1,5 +1,5 @@ use std::rc::Rc; -use std::ops::{Deref, DerefMut}; +use std::ops::Deref; use std::fmt; pub struct Urc(Rc);