From b3b1ea71c5db8ef171dfc41f621fab146ea3babc Mon Sep 17 00:00:00 2001 From: whitequark Date: Sat, 1 Oct 2016 04:20:27 +0000 Subject: [PATCH] Rust: implement basic communication with kernel CPU. --- artiq/runtime.rs/Cargo.toml | 2 +- artiq/runtime.rs/liblwip-sys/lib.rs | 18 +- artiq/runtime.rs/liblwip/lib.rs | 21 +- artiq/runtime.rs/src/kernel.rs | 37 +++ artiq/runtime.rs/src/kernel_proto.rs | 323 +++++++++++++++++++++++++++ artiq/runtime.rs/src/lib.rs | 6 +- artiq/runtime.rs/src/logger.rs | 4 +- artiq/runtime.rs/src/mailbox.rs | 28 +-- artiq/runtime.rs/src/sched.rs | 42 +++- artiq/runtime.rs/src/session.rs | 244 ++++++++++++++------ artiq/runtime/Makefile | 6 +- artiq/runtime/kloader.c | 2 +- 12 files changed, 619 insertions(+), 114 deletions(-) create mode 100644 artiq/runtime.rs/src/kernel.rs create mode 100644 artiq/runtime.rs/src/kernel_proto.rs diff --git a/artiq/runtime.rs/Cargo.toml b/artiq/runtime.rs/Cargo.toml index eb8c3f2c9..aba3d1aae 100644 --- a/artiq/runtime.rs/Cargo.toml +++ b/artiq/runtime.rs/Cargo.toml @@ -8,7 +8,7 @@ build = "build.rs" walkdir = "0.1" [lib] -name = "artiq_rust" +name = "runtime" crate-type = ["staticlib"] path = "src/lib.rs" diff --git a/artiq/runtime.rs/liblwip-sys/lib.rs b/artiq/runtime.rs/liblwip-sys/lib.rs index aaac1d424..5ea2e7d1e 100644 --- a/artiq/runtime.rs/liblwip-sys/lib.rs +++ b/artiq/runtime.rs/liblwip-sys/lib.rs @@ -122,25 +122,25 @@ extern { pub fn tcp_bind(pcb: *mut tcp_pcb, ipaddr: *mut ip_addr, port: u16) -> err; pub fn tcp_listen_with_backlog(pcb: *mut tcp_pcb, backlog: u8) -> *mut tcp_pcb; pub fn tcp_accept(pcb: *mut tcp_pcb, - accept: extern fn(arg: *mut c_void, newpcb: *mut tcp_pcb, - err: err) -> err); + accept: Option err>); pub fn tcp_connect(pcb: *mut tcp_pcb, ipaddr: *mut ip_addr, port: u16, connected: extern fn(arg: *mut c_void, tcb: *mut tcp_pcb, err: err)) -> err; pub fn tcp_write(pcb: *mut tcp_pcb, dataptr: *const c_void, len: u16, apiflags: u8) -> err; pub fn tcp_sent(pcb: *mut tcp_pcb, - sent: extern fn(arg: *mut c_void, tcb: *mut tcp_pcb, len: u16) -> err); + sent: Option err>); pub fn tcp_recv(pcb: *mut tcp_pcb, - recv: extern fn(arg: *mut c_void, tcb: *mut tcp_pcb, p: *mut pbuf, - err: err) -> err); + recv: Option err>); pub fn tcp_recved(pcb: *mut tcp_pcb, len: u16); pub fn tcp_poll(pcb: *mut tcp_pcb, - poll: extern fn(arg: *mut c_void, tcb: *mut tcp_pcb), + poll: Option, interval: u8); pub fn tcp_shutdown(pcb: *mut tcp_pcb, shut_rx: c_int, shut_tx: c_int) -> err; pub fn tcp_close(pcb: *mut tcp_pcb) -> err; pub fn tcp_abort(pcb: *mut tcp_pcb); pub fn tcp_err(pcb: *mut tcp_pcb, - err: extern fn(arg: *mut c_void, err: err)); + err: Option); // nonstandard pub fn tcp_sndbuf_(pcb: *mut tcp_pcb) -> u16; @@ -154,7 +154,7 @@ extern { pub fn udp_send(pcb: *mut udp_pcb, p: *mut pbuf) -> err; pub fn udp_sendto(pcb: *mut udp_pcb, p: *mut pbuf, ipaddr: *mut ip_addr, port: u16) -> err; pub fn udp_recv(pcb: *mut udp_pcb, - recv: extern fn(arg: *mut c_void, upcb: *mut udp_pcb, p: *mut pbuf, - addr: *mut ip_addr, port: u16), + recv: Option, recv_arg: *mut c_void); } diff --git a/artiq/runtime.rs/liblwip/lib.rs b/artiq/runtime.rs/liblwip/lib.rs index 7c1c32c53..f8938653b 100644 --- a/artiq/runtime.rs/liblwip/lib.rs +++ b/artiq/runtime.rs/liblwip/lib.rs @@ -279,12 +279,12 @@ impl UdpSocket { recv_buffer: LinkedList::new() })); let arg = &mut *state as *mut RefCell as *mut _; - lwip_sys::udp_recv(raw, recv, arg); + lwip_sys::udp_recv(raw, Some(recv), arg); Ok(UdpSocket { raw: raw, state: state }) } } - pub fn state(&self) -> *const RefCell { + pub fn state(&self) -> &RefCell { &*self.state } @@ -376,12 +376,12 @@ impl TcpListener { })); let arg = &mut *state as *mut RefCell as *mut _; lwip_sys::tcp_arg(raw2, arg); - lwip_sys::tcp_accept(raw2, accept); + lwip_sys::tcp_accept(raw2, Some(accept)); Ok(TcpListener { raw: raw2, state: state }) } } - pub fn state(&self) -> *const RefCell { + pub fn state(&self) -> &RefCell { &*self.state } @@ -467,14 +467,14 @@ impl TcpStream { })); let arg = &mut *state as *mut RefCell as *mut _; lwip_sys::tcp_arg(raw, arg); - lwip_sys::tcp_recv(raw, recv); - lwip_sys::tcp_sent(raw, sent); - lwip_sys::tcp_err(raw, err); + lwip_sys::tcp_recv(raw, Some(recv)); + lwip_sys::tcp_sent(raw, Some(sent)); + lwip_sys::tcp_err(raw, Some(err)); TcpStream { raw: raw, state: state } } } - pub fn state(&self) -> *const RefCell { + pub fn state(&self) -> &RefCell { &*self.state } @@ -536,6 +536,11 @@ impl TcpStream { impl Drop for TcpStream { fn drop(&mut self) { unsafe { + // lwip *will* try to call back after tcp_close + lwip_sys::tcp_recv(self.raw, None); + lwip_sys::tcp_sent(self.raw, None); + lwip_sys::tcp_err(self.raw, None); + // tcp_close can fail here, but in drop() we don't care let _ = lwip_sys::tcp_close(self.raw); } diff --git a/artiq/runtime.rs/src/kernel.rs b/artiq/runtime.rs/src/kernel.rs new file mode 100644 index 000000000..06f9c4822 --- /dev/null +++ b/artiq/runtime.rs/src/kernel.rs @@ -0,0 +1,37 @@ +use core::ptr; +use board::csr::kernel_cpu; +use mailbox; + +const KERNELCPU_EXEC_ADDRESS: usize = 0x42000000; +const KERNELCPU_PAYLOAD_ADDRESS: usize = 0x42020000; +const KERNELCPU_LAST_ADDRESS: usize = (0x4fffffff - 1024*1024); +const KSUPPORT_HEADER_SIZE: usize = 0x80; + +pub unsafe fn start() { + if kernel_cpu::reset_read() == 0 { + panic!("attempted to start kernel CPU when it is already running") + } + + stop(); + + extern { + static _binary_ksupport_elf_start: (); + static _binary_ksupport_elf_end: (); + } + let ksupport_start = &_binary_ksupport_elf_start as *const _ as usize; + let ksupport_end = &_binary_ksupport_elf_end as *const _ as usize; + ptr::copy_nonoverlapping(ksupport_start as *const u8, + (KERNELCPU_EXEC_ADDRESS - KSUPPORT_HEADER_SIZE) as *mut u8, + ksupport_end - ksupport_start); + + kernel_cpu::reset_write(0); +} + +pub fn stop() { + unsafe { kernel_cpu::reset_write(1) } + mailbox::acknowledge(); +} + +pub fn validate(ptr: usize) -> bool { + ptr >= KERNELCPU_EXEC_ADDRESS && ptr <= KERNELCPU_LAST_ADDRESS +} diff --git a/artiq/runtime.rs/src/kernel_proto.rs b/artiq/runtime.rs/src/kernel_proto.rs new file mode 100644 index 000000000..9a16e1c1a --- /dev/null +++ b/artiq/runtime.rs/src/kernel_proto.rs @@ -0,0 +1,323 @@ +use std::io; +use mailbox; +use kernel; + +#[derive(Debug)] +pub struct Exception<'a> { + pub name: &'a str, + pub file: &'a str, + pub line: u32, + pub column: u32, + pub function: &'a str, + pub message: &'a str, + pub param: [u64; 3], +} + +pub use self::c::BacktraceItem; + +#[derive(Debug)] +pub enum Message<'a> { + LoadRequest(&'a [u8]), + LoadReply { error: Option<&'a str> }, + + NowInitRequest, + NowInitReply(u64), + NowSave(u64), + + RunFinished, + RunException { + exception: Exception<'a>, + backtrace: &'a [BacktraceItem] + }, + + WatchdogSetRequest { ms: u64 }, + WatchdogSetReply { id: usize }, + WatchdogClear { id: usize }, + + RpcSend { + service: u32, + tag: &'a [u8], + data: *const *const () + }, + RpcRecvRequest { + slot: *mut () + }, + RpcRecvReply { + alloc_size: usize, + exception: Option> + }, + + CacheGetRequest { key: &'a str }, + CacheGetReply { value: &'static [u32] }, + CachePutRequest { key: &'a str, value: &'static [u32] }, + CachePutReply { succeeded: bool }, + + Log(&'a str) +} + +pub use self::Message::*; + +impl<'a> Message<'a> { + fn into_lower R>(self, f: F) -> R { + match self { + Message::LoadRequest(library) => { + let msg = c::LoadRequest { + ty: c::Type::LoadRequest, + library: library.as_ptr() as *const _ + }; + f(&msg as *const _ as *const _) + } + + Message::NowInitReply(now) => { + let msg = c::NowInitReply { + ty: c::Type::NowInitReply, + now: now + }; + f(&msg as *const _ as *const _) + } + + other => panic!("Message::into_lower: {:?} unimplemented", other) + } + } + + unsafe fn from_lower(ptr: *const ()) -> Self { + let msg = ptr as *const c::Message; + match (*msg).ty { + c::Type::LoadReply => { + let msg = ptr as *const c::LoadReply; + let error = if (*msg).error.is_null() { + None + } else { + Some(c::from_c_str((*msg).error)) + }; + Message::LoadReply { error: error } + } + + c::Type::NowInitRequest => Message::NowInitRequest, + c::Type::NowSave => { + let msg = ptr as *const c::NowSave; + Message::NowSave((*msg).now) + } + + c::Type::RunFinished => Message::RunFinished, + + c::Type::Log => { + let msg = ptr as *const c::Log; + Message::Log(c::from_c_str_len((*msg).buf, (*msg).len)) + } + + ref other => panic!("Message::from_lower: {:?} unimplemented", other) + } + } + + pub fn send_and_wait(self, waiter: ::sched::Waiter) -> io::Result<()> { + self.into_lower(|ptr| { + unsafe { mailbox::send(ptr as usize) } + waiter.until(mailbox::acknowledged) + }) + } + + pub fn wait_and_receive(waiter: ::sched::Waiter, f: F) -> io::Result + where F: FnOnce(Message<'a>) -> io::Result { + try!(waiter.until(|| mailbox::receive() != 0)); + if !kernel::validate(mailbox::receive()) { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid kernel CPU pointer")) + } + + let msg = unsafe { Self::from_lower(mailbox::receive() as *const ()) }; + Ok(try!(f(msg))) + } + + pub fn acknowledge() { + unsafe { mailbox::acknowledge() } + } +} + +// Low-level representation, compatible with the C code in ksupport +mod c { + use libc::{c_void, c_int, c_char, size_t}; + + #[repr(u32)] + #[derive(Debug)] + pub enum Type { + LoadRequest, + LoadReply, + NowInitRequest, + NowInitReply, + NowSave, + RunFinished, + RunException, + WatchdogSetRequest, + WatchdogSetReply, + WatchdogClear, + RpcSend, + RpcRecvRequest, + RpcRecvReply, + RpcBatch, + CacheGetRequest, + CacheGetReply, + CachePutRequest, + CachePutReply, + Log, + } + + #[repr(C)] + #[derive(Debug)] + pub struct Message { + pub ty: Type + } + + // kernel messages + + #[repr(C)] + #[derive(Debug)] + pub struct LoadRequest { + pub ty: Type, + pub library: *const c_void, + } + + #[repr(C)] + #[derive(Debug)] + pub struct LoadReply { + pub ty: Type, + pub error: *const c_char + } + + #[repr(C)] + #[derive(Debug)] + pub struct NowInitReply { + pub ty: Type, + pub now: u64 + } + + #[repr(C)] + #[derive(Debug)] + pub struct NowSave { + pub ty: Type, + pub now: u64 + } + + #[repr(C)] + #[derive(Debug)] + pub struct RunException { + pub ty: Type, + pub exception: *const Exception, + pub backtrace: *const BacktraceItem, + pub backtrace_size: size_t + } + + #[repr(C)] + #[derive(Debug)] + pub struct WatchdogSetRequest { + pub ty: Type, + pub ms: c_int + } + + #[repr(C)] + #[derive(Debug)] + pub struct WatchdogSetReply { + pub ty: Type, + pub id: c_int + } + + #[repr(C)] + #[derive(Debug)] + pub struct WatchdogClear { + pub ty: Type, + pub id: c_int + } + + #[repr(C)] + #[derive(Debug)] + pub struct RpcSend { + pub ty: Type, + pub service: c_int, + pub tag: *const c_char, + pub data: *const *const c_void + } + + #[repr(C)] + #[derive(Debug)] + pub struct RpcRecvRequest { + pub ty: Type, + pub slot: *mut c_void + } + + #[repr(C)] + #[derive(Debug)] + pub struct RpcRecvReply { + pub ty: Type, + pub alloc_size: c_int, + pub exception: *const Exception + } + + #[repr(C)] + #[derive(Debug)] + pub struct CacheGetRequest { + pub ty: Type, + pub key: *const c_char + } + + #[repr(C)] + #[derive(Debug)] + pub struct CacheGetReply { + pub ty: Type, + pub length: size_t, + pub elements: *const u32 + } + + #[repr(C)] + #[derive(Debug)] + pub struct CachePutRequest { + pub ty: Type, + pub key: *const c_char, + pub length: size_t, + pub elements: *const u32 + } + + #[repr(C)] + #[derive(Debug)] + pub struct CachePutReply { + pub ty: Type, + pub succeeded: c_int + } + + #[repr(C)] + #[derive(Debug)] + pub struct Log { + pub ty: Type, + pub buf: *const c_char, + pub len: size_t + } + + // Supplementary structures + + #[repr(C)] + #[derive(Debug)] + pub struct Exception { + pub name: *const c_char, // or typeinfo + pub file: *const c_char, + pub line: u32, + pub column: u32, + pub function: *const c_char, + pub message: *const c_char, + pub param: [u64; 3], + } + + #[repr(C)] + #[derive(Debug)] + pub struct BacktraceItem { + pub function: usize, + pub offset: usize + } + + pub unsafe fn from_c_str_len<'a>(ptr: *const c_char, len: size_t) -> &'a str { + use core::{str, slice}; + str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len)) + } + + pub unsafe fn from_c_str<'a>(ptr: *const c_char) -> &'a str { + extern { fn strlen(cs: *const c_char) -> size_t; } + from_c_str_len(ptr, strlen(ptr)) + } +} diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 215f3c7d5..5f3e382ca 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -20,7 +20,10 @@ mod mailbox; mod logger; +mod kernel_proto; mod session_proto; + +mod kernel; mod session; extern { @@ -42,9 +45,10 @@ pub unsafe extern fn rust_main() { network_init(); let mut scheduler = sched::Scheduler::new(); - scheduler.spawn(4096, move |waiter| { + scheduler.spawn(8192, move |waiter| { session::handler(waiter, logger) }); + loop { lwip_service(); scheduler.run() diff --git a/artiq/runtime.rs/src/logger.rs b/artiq/runtime.rs/src/logger.rs index 7e7537f8e..377470a2e 100644 --- a/artiq/runtime.rs/src/logger.rs +++ b/artiq/runtime.rs/src/logger.rs @@ -48,9 +48,9 @@ impl Log for BufferLogger { if self.enabled(record.metadata()) { use core::fmt::Write; writeln!(self.buffer.borrow_mut(), "{:>5}({}): {}", - record.level(), record.location().module_path(), record.args()).unwrap(); + record.level(), record.target(), record.args()).unwrap(); println!("{:>5}({}): {}", - record.level(), record.location().module_path(), record.args()); + record.level(), record.target(), record.args()); } } } diff --git a/artiq/runtime.rs/src/mailbox.rs b/artiq/runtime.rs/src/mailbox.rs index 5d13225ce..030b7fb28 100644 --- a/artiq/runtime.rs/src/mailbox.rs +++ b/artiq/runtime.rs/src/mailbox.rs @@ -1,14 +1,12 @@ use core::ptr::{read_volatile, write_volatile}; use board; -const MAILBOX: *mut u32 = board::mem::MAILBOX_BASE as *mut u32; -static mut last: u32 = 0; +const MAILBOX: *mut usize = board::mem::MAILBOX_BASE as *mut usize; +static mut last: usize = 0; -pub fn send(data: u32) { - unsafe { - last = data; - write_volatile(MAILBOX, data) - } +pub unsafe fn send(data: usize) { + last = data; + write_volatile(MAILBOX, data) } pub fn acknowledged() -> bool { @@ -18,12 +16,7 @@ pub fn acknowledged() -> bool { } } -pub fn send_and_wait(data: u32) { - send(data); - while !acknowledged() {} -} - -pub fn receive() -> u32 { +pub fn receive() -> usize { unsafe { let data = read_volatile(MAILBOX); if data == last { @@ -37,15 +30,6 @@ pub fn receive() -> u32 { } } -pub fn wait_and_receive() -> u32 { - loop { - let data = receive(); - if data != 0 { - return data - } - } -} - pub fn acknowledge() { unsafe { write_volatile(MAILBOX, 0) } } diff --git a/artiq/runtime.rs/src/sched.rs b/artiq/runtime.rs/src/sched.rs index f48d6d6ff..3d1dcc8c4 100644 --- a/artiq/runtime.rs/src/sched.rs +++ b/artiq/runtime.rs/src/sched.rs @@ -104,8 +104,8 @@ impl Scheduler { } } -#[derive(Debug)] enum WaitEvent { + Completion(*const (Fn() -> bool + 'static)), UdpReadable(*const RefCell), TcpAcceptable(*const RefCell), TcpWriteable(*const RefCell), @@ -115,6 +115,8 @@ enum WaitEvent { impl WaitEvent { fn completed(&self) -> bool { match *self { + WaitEvent::Completion(f) => + unsafe { (*f)() }, WaitEvent::UdpReadable(state) => unsafe { (*state).borrow().readable() }, WaitEvent::TcpAcceptable(state) => @@ -127,12 +129,27 @@ impl WaitEvent { } } +// *const DST doesn't have impl Debug +impl ::core::fmt::Debug for WaitEvent { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> + ::core::result::Result<(), ::core::fmt::Error> { + write!(f, "WaitEvent...") + } +} + unsafe impl Send for WaitEvent {} #[derive(Debug, Clone, Copy)] pub struct Waiter<'a>(&'a Yielder); impl<'a> Waiter<'a> { + pub fn relinquish(&self) { + self.0.suspend(WaitRequest { + timeout: None, + event: None + }); + } + pub fn sleep(&self, duration: Duration) -> Result<()> { let request = WaitRequest { timeout: Some(Instant::now() + duration), @@ -154,6 +171,13 @@ impl<'a> Waiter<'a> { } } + pub fn until bool + 'static>(&self, f: F) -> Result<()> { + self.suspend(WaitRequest { + timeout: None, + event: Some(WaitEvent::Completion(&f as *const _)) + }) + } + pub fn udp_readable(&self, socket: &lwip::UdpSocket) -> Result<()> { self.suspend(WaitRequest { timeout: None, @@ -239,6 +263,10 @@ impl<'a> UdpSocket<'a> { (&mut buf[..len]).copy_from_slice(&pbuf.as_slice()[..len]); Ok(len) } + + pub fn readable(&self) -> bool { + self.lower.state().borrow().readable() + } } #[derive(Debug)] @@ -265,6 +293,10 @@ impl<'a> TcpListener<'a> { buffer: None }, addr)) } + + pub fn acceptable(&self) -> bool { + self.lower.state().borrow().acceptable() + } } pub use self::lwip::Shutdown; @@ -280,6 +312,14 @@ impl<'a> TcpStream<'a> { pub fn shutdown(&self, how: Shutdown) -> Result<()> { Ok(try!(self.lower.shutdown(how))) } + + pub fn readable(&self) -> bool { + self.buffer.is_some() || self.lower.state().borrow().readable() + } + + pub fn writeable(&self) -> bool { + self.lower.state().borrow().writeable() + } } impl<'a> Read for TcpStream<'a> { diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index d95bc9619..03def4132 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -1,12 +1,23 @@ use std::prelude::v1::*; use std::str; -use std::io::{self, Read, ErrorKind}; -use {config, rtio_crg}; +use std::io::{self, Read}; +use {config, rtio_crg, clock, mailbox, kernel}; use logger::BufferLogger; use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY}; -use session_proto::*; -#[derive(Debug, Clone, Copy)] +use session_proto as host; +use kernel_proto as kern; + +macro_rules! unexpected { + ($($arg:tt)*) => { + { + error!($($arg)*); + return Err(io::Error::new(io::ErrorKind::InvalidData, "protocol error")) + } + }; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum KernelState { Absent, Loaded, @@ -17,19 +28,16 @@ enum KernelState { #[derive(Debug)] pub struct Session { kernel_state: KernelState, -} - -extern { - fn kloader_stop(); - fn watchdog_init(); - fn kloader_start_idle_kernel(); + watchdog_set: clock::WatchdogSet, + now: u64 } impl Session { pub fn new() -> Session { - unsafe { kloader_stop(); } Session { - kernel_state: KernelState::Absent + kernel_state: KernelState::Absent, + watchdog_set: clock::WatchdogSet::new(), + now: 0 } } @@ -41,16 +49,6 @@ impl Session { } } -impl Drop for Session { - fn drop(&mut self) { - unsafe { - kloader_stop(); - watchdog_init(); - kloader_start_idle_kernel(); - } - } -} - fn check_magic(stream: &mut TcpStream) -> io::Result<()> { const MAGIC: &'static [u8] = b"ARTIQ coredev\n"; @@ -63,89 +61,203 @@ fn check_magic(stream: &mut TcpStream) -> io::Result<()> { } } -fn handle_request(stream: &mut TcpStream, - logger: &BufferLogger, - session: &mut Session) -> io::Result<()> { - fn read_request(stream: &mut TcpStream) -> io::Result { - let request = try!(Request::read_from(stream)); - match &request { - &Request::LoadLibrary(_) => trace!("comm<-host LoadLibrary(...)"), - _ => trace!("comm<-host {:?}", request) - } - Ok(request) +fn host_read(stream: &mut TcpStream) -> io::Result { + let request = try!(host::Request::read_from(stream)); + match &request { + &host::Request::LoadLibrary(_) => trace!("comm<-host LoadLibrary(...)"), + _ => trace!("comm<-host {:?}", request) } + Ok(request) +} - fn write_reply(stream: &mut TcpStream, reply: Reply) -> io::Result<()> { - trace!("comm->host {:?}", reply); - reply.write_to(stream) +fn host_write(stream: &mut TcpStream, reply: host::Reply) -> io::Result<()> { + trace!("comm->host {:?}", reply); + reply.write_to(stream) +} + +fn kern_send<'a>(waiter: Waiter, request: kern::Message<'a>) -> io::Result<()> { + match &request { + &kern::LoadRequest(_) => trace!("comm->kern LoadRequest(...)"), + _ => trace!("comm->kern {:?}", request) } + request.send_and_wait(waiter) +} - match try!(read_request(stream)) { - Request::Ident => - write_reply(stream, Reply::Ident(::board::ident(&mut [0; 64]))), +fn kern_recv(waiter: Waiter, f: F) -> io::Result + where F: FnOnce(kern::Message) -> io::Result { + kern::Message::wait_and_receive(waiter, |reply| { + trace!("comm<-kern {:?}", reply); + f(reply) + }) +} + +fn kern_acknowledge() -> io::Result<()> { + kern::Message::acknowledge(); + Ok(()) +} + +fn comm_handle(waiter: Waiter, + stream: &mut TcpStream, + logger: &BufferLogger, + session: &mut Session) -> io::Result<()> { + match try!(host_read(stream)) { + host::Request::Ident => + host_write(stream, host::Reply::Ident(::board::ident(&mut [0; 64]))), // artiq_corelog - Request::Log => { + host::Request::Log => { // Logging the packet with the log is inadvisable trace!("comm->host Log(...)"); logger.extract(move |log| { - Reply::Log(log).write_to(stream) + host::Reply::Log(log).write_to(stream) }) } - Request::LogClear => { + host::Request::LogClear => { logger.clear(); - write_reply(stream, Reply::Log("")) + host_write(stream, host::Reply::Log("")) } // artiq_coreconfig - Request::FlashRead { ref key } => { + host::Request::FlashRead { ref key } => { let value = config::read_to_end(key); - write_reply(stream, Reply::FlashRead(&value)) + host_write(stream, host::Reply::FlashRead(&value)) } - Request::FlashWrite { ref key, ref value } => { + host::Request::FlashWrite { ref key, ref value } => { match config::write(key, value) { - Ok(_) => write_reply(stream, Reply::FlashOk), - Err(_) => write_reply(stream, Reply::FlashError) + Ok(_) => host_write(stream, host::Reply::FlashOk), + Err(_) => host_write(stream, host::Reply::FlashError) } } - Request::FlashRemove { ref key } => { + host::Request::FlashRemove { ref key } => { config::remove(key); - write_reply(stream, Reply::FlashOk) + host_write(stream, host::Reply::FlashOk) } - Request::FlashErase => { + host::Request::FlashErase => { config::erase(); - write_reply(stream, Reply::FlashOk) + host_write(stream, host::Reply::FlashOk) } // artiq_run/artiq_master - Request::SwitchClock(clk) => { + host::Request::SwitchClock(clk) => { if session.running() { - error!("attempted to switch RTIO clock while kernel was running"); - write_reply(stream, Reply::ClockSwitchFailed) + error!("attempted to switch RTIO clock while a kernel was running"); + return host_write(stream, host::Reply::ClockSwitchFailed) + } + + if rtio_crg::switch_clock(clk) { + host_write(stream, host::Reply::ClockSwitchCompleted) } else { - if rtio_crg::switch_clock(clk) { - write_reply(stream, Reply::ClockSwitchCompleted) - } else { - write_reply(stream, Reply::ClockSwitchFailed) - } + host_write(stream, host::Reply::ClockSwitchFailed) } } - _ => unreachable!() + host::Request::LoadLibrary(library) => { + if session.running() { + error!("attempted to load a new kernel while a kernel was running"); + return host_write(stream, host::Reply::LoadFailed) + } + + unsafe { kernel::start() } + + try!(kern_send(waiter, kern::LoadRequest(&library))); + kern_recv(waiter, |reply| { + match reply { + kern::LoadReply { error: None } => { + session.kernel_state = KernelState::Loaded; + host_write(stream, host::Reply::LoadCompleted) + } + kern::LoadReply { error: Some(cause) } => { + error!("cannot load kernel: {}", cause); + host_write(stream, host::Reply::LoadFailed) + } + other => unexpected!("unexpected reply from kernel CPU: {:?}", other) + } + }) + } + + host::Request::RunKernel => { + if session.kernel_state != KernelState::Loaded { + error!("attempted to run a kernel while not in Loaded state"); + return host_write(stream, host::Reply::KernelStartupFailed) + } + + session.kernel_state = KernelState::Running; + kern_acknowledge() + } + + request => unexpected!("unexpected {:?}", request) } } -fn handle_requests(stream: &mut TcpStream, - logger: &BufferLogger) -> io::Result<()> { +fn kern_handle(waiter: Waiter, + stream: &mut TcpStream, + session: &mut Session) -> io::Result<()> { + kern::Message::wait_and_receive(waiter, |request| { + match (&request, session.kernel_state) { + (&kern::LoadReply { .. }, KernelState::Loaded) | + (&kern::RpcRecvRequest { .. }, KernelState::RpcWait) => { + // We're standing by; ignore the message. + return Ok(()) + } + (_, KernelState::Running) => (), + _ => { + unexpected!("unexpected request {:?} from kernel CPU in {:?} state", + request, session.kernel_state) + } + } + + trace!("comm<-kern {:?}", request); + match request { + kern::Log(log) => { + info!(target: "kernel", "{}", log); + kern_acknowledge() + } + + kern::NowInitRequest => + kern_send(waiter, kern::NowInitReply(session.now)), + + kern::NowSave(now) => { + session.now = now; + kern_acknowledge() + } + + request => unexpected!("unexpected {:?}", request) + } + }) +} + +fn handle(waiter: Waiter, + stream: &mut TcpStream, + logger: &BufferLogger) -> io::Result<()> { try!(check_magic(stream)); let mut session = Session::new(); loop { - try!(handle_request(stream, logger, &mut session)) + if stream.readable() { + try!(comm_handle(waiter, stream, logger, &mut session)) + } + + if mailbox::receive() != 0 { + try!(kern_handle(waiter, stream, &mut session)) + } + + if session.kernel_state == KernelState::Running { + if session.watchdog_set.expired() { + try!(host_write(stream, host::Reply::WatchdogExpired)); + return Err(io::Error::new(io::ErrorKind::Other, "watchdog expired")) + } + + if !rtio_crg::check() { + try!(host_write(stream, host::Reply::ClockFailure)); + return Err(io::Error::new(io::ErrorKind::Other, "RTIO clock failure")) + } + } + + waiter.relinquish() } } @@ -159,13 +271,13 @@ pub fn handler(waiter: Waiter, let (mut stream, addr) = listener.accept().unwrap(); info!("new connection from {:?}", addr); - match handle_requests(&mut stream, logger) { + match handle(waiter, &mut stream, logger) { Ok(()) => (), Err(err) => { - if err.kind() == ErrorKind::UnexpectedEof { + if err.kind() == io::ErrorKind::UnexpectedEof { info!("connection closed"); } else { - error!("cannot handle network request: {:?}", err); + error!("session aborted: {:?}", err); } } } diff --git a/artiq/runtime/Makefile b/artiq/runtime/Makefile index bc9439a84..a29982ecf 100644 --- a/artiq/runtime/Makefile +++ b/artiq/runtime/Makefile @@ -27,7 +27,7 @@ all: runtime.bin runtime.fbi %.fbi: %.bin @echo " MSCIMG " $@ && $(PYTHON) -m misoc.tools.mkmscimg -f -o $@ $< -runtime.elf: $(OBJECTS) libartiq_rust.a +runtime.elf: $(OBJECTS) libruntime.a $(LD) $(LDFLAGS) \ --gc-sections \ -T $(RUNTIME_DIRECTORY)/runtime.ld \ @@ -40,7 +40,7 @@ runtime.elf: $(OBJECTS) libartiq_rust.a -L../liballoc \ -L../liblwip \ -Lcargo/or1k-unknown-none/debug/ \ - -lartiq_rust -lbase -lm -lcompiler-rt -lalloc -llwip + -lruntime -lbase -lm -lcompiler-rt -lalloc -llwip @chmod -x $@ ksupport.elf: $(OBJECTS_KSUPPORT) @@ -60,7 +60,7 @@ ksupport.elf: $(OBJECTS_KSUPPORT) ksupport_data.o: ksupport.elf $(LD) -r -b binary -o $@ $< -libartiq_rust.a: +libruntime.a: CARGO_TARGET_DIR="./cargo" \ cargo rustc --verbose \ --manifest-path $(RUNTIME_DIRECTORY)/../runtime.rs/Cargo.toml \ diff --git a/artiq/runtime/kloader.c b/artiq/runtime/kloader.c index 06cf737c5..3dc98e539 100644 --- a/artiq/runtime/kloader.c +++ b/artiq/runtime/kloader.c @@ -167,7 +167,7 @@ void kloader_service_essential_kmsg(void) case MESSAGE_TYPE_LOG: { struct msg_log *msg = (struct msg_log *)umsg; - core_log_va(msg->fmt, msg->args); + core_log("%s", msg->buf); mailbox_acknowledge(); break; }