diff --git a/artiq/runtime.rs/src/cache.rs b/artiq/runtime.rs/src/cache.rs new file mode 100644 index 000000000..cc755bab6 --- /dev/null +++ b/artiq/runtime.rs/src/cache.rs @@ -0,0 +1,47 @@ +use std::vec::Vec; +use std::string::String; +use std::btree_map::BTreeMap; + +#[derive(Debug)] +struct Entry { + data: Vec, + borrowed: bool +} + +#[derive(Debug)] +pub struct Cache { + entries: BTreeMap +} + +impl Cache { + pub fn new() -> Cache { + Cache { entries: BTreeMap::new() } + } + + pub fn get(&mut self, key: &str) -> *const [u32] { + match self.entries.get_mut(key) { + None => &[], + Some(ref mut entry) => { + entry.borrowed = true; + &entry.data[..] + } + } + } + + pub fn put(&mut self, key: &str, data: &[u32]) -> Result<(), ()> { + match self.entries.get_mut(key) { + None => (), + Some(ref mut entry) => { + if entry.borrowed { return Err(()) } + entry.data = Vec::from(data); + return Ok(()) + } + } + + self.entries.insert(String::from(key), Entry { + data: Vec::from(data), + borrowed: false + }); + Ok(()) + } +} diff --git a/artiq/runtime.rs/src/kernel_proto.rs b/artiq/runtime.rs/src/kernel_proto.rs index e1e4c4dc1..e7c76a508 100644 --- a/artiq/runtime.rs/src/kernel_proto.rs +++ b/artiq/runtime.rs/src/kernel_proto.rs @@ -249,6 +249,7 @@ mod c { #[repr(u32)] #[derive(Debug)] + #[allow(dead_code)] pub enum Type { LoadRequest, LoadReply, diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 5f3e382ca..8070552fc 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -19,6 +19,7 @@ mod rtio_crg; mod mailbox; mod logger; +mod cache; mod kernel_proto; mod session_proto; diff --git a/artiq/runtime.rs/src/sched.rs b/artiq/runtime.rs/src/sched.rs index 3d1dcc8c4..1e7b58a5a 100644 --- a/artiq/runtime.rs/src/sched.rs +++ b/artiq/runtime.rs/src/sched.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + extern crate fringe; extern crate lwip; diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index c9ef4383f..56d7d81f6 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -1,8 +1,10 @@ use std::prelude::v1::*; +use std::mem; use std::str; use std::io::{self, Read}; use {config, rtio_crg, clock, mailbox, kernel}; use logger::BufferLogger; +use cache::Cache; use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY}; use session_proto as host; @@ -21,6 +23,22 @@ fn io_error(msg: &str) -> io::Error { io::Error::new(io::ErrorKind::Other, msg) } +// Persistent state +#[derive(Debug)] +struct Congress { + now: u64, + cache: Cache +} + +impl Congress { + fn new() -> Congress { + Congress { + now: 0, + cache: Cache::new() + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum KernelState { Absent, @@ -29,23 +47,22 @@ enum KernelState { RpcWait } +// Per-connection state #[derive(Debug)] -pub struct Session { +struct Session { kernel_state: KernelState, - watchdog_set: clock::WatchdogSet, - now: u64 + watchdog_set: clock::WatchdogSet } impl Session { - pub fn new() -> Session { + fn new() -> Session { Session { kernel_state: KernelState::Absent, - watchdog_set: clock::WatchdogSet::new(), - now: 0 + watchdog_set: clock::WatchdogSet::new() } } - pub fn running(&self) -> bool { + fn running(&self) -> bool { match self.kernel_state { KernelState::Absent | KernelState::Loaded => false, KernelState::Running | KernelState::RpcWait => true @@ -106,9 +123,9 @@ fn kern_acknowledge() -> io::Result<()> { Ok(()) } -fn comm_handle(waiter: Waiter, +fn comm_handle(logger: &BufferLogger, + waiter: Waiter, stream: &mut TcpStream, - logger: &BufferLogger, session: &mut Session) -> io::Result<()> { match try!(host_read(stream)) { host::Request::Ident => @@ -205,7 +222,7 @@ fn comm_handle(waiter: Waiter, } fn kern_handle(waiter: Waiter, - stream: &mut TcpStream, + congress: &mut Congress, session: &mut Session) -> io::Result<()> { kern::Message::wait_and_receive(waiter, |request| { match (&request, session.kernel_state) { @@ -229,10 +246,10 @@ fn kern_handle(waiter: Waiter, } kern::NowInitRequest => - kern_send(waiter, kern::NowInitReply(session.now)), + kern_send(waiter, kern::NowInitReply(congress.now)), kern::NowSave(now) => { - session.now = now; + congress.now = now; kern_acknowledge() } @@ -247,24 +264,37 @@ fn kern_handle(waiter: Waiter, kern_acknowledge() } + kern::CacheGetRequest { key } => { + let value = congress.cache.get(key); + kern_send(waiter, kern::CacheGetReply { + value: unsafe { mem::transmute::<*const [u32], &'static [u32]>(value) } + }) + } + + kern::CachePutRequest { key, value } => { + let succeeded = congress.cache.put(key, value).is_ok(); + kern_send(waiter, kern::CachePutReply { succeeded: succeeded }) + } + request => unexpected!("unexpected request {:?} from kernel CPU", request) } }) } -fn handle(waiter: Waiter, +fn handle(logger: &BufferLogger, + waiter: Waiter, stream: &mut TcpStream, - logger: &BufferLogger) -> io::Result<()> { + congress: &mut Congress) -> io::Result<()> { try!(check_magic(stream)); let mut session = Session::new(); loop { if stream.readable() { - try!(comm_handle(waiter, stream, logger, &mut session)) + try!(comm_handle(logger, waiter, stream, &mut session)) } if mailbox::receive() != 0 { - try!(kern_handle(waiter, stream, &mut session)) + try!(kern_handle(waiter, congress, &mut session)) } if session.kernel_state == KernelState::Running { @@ -285,6 +315,8 @@ fn handle(waiter: Waiter, pub fn handler(waiter: Waiter, logger: &BufferLogger) { + let mut congress = Congress::new(); + let addr = SocketAddr::new(IP_ANY, 1381); let listener = TcpListener::bind(waiter, addr).unwrap(); info!("accepting network sessions in Rust"); @@ -293,7 +325,7 @@ pub fn handler(waiter: Waiter, let (mut stream, addr) = listener.accept().unwrap(); info!("new connection from {:?}", addr); - match handle(waiter, &mut stream, logger) { + match handle(logger, waiter, &mut stream, &mut congress) { Ok(()) => (), Err(err) => { if err.kind() == io::ErrorKind::UnexpectedEof {