forked from M-Labs/artiq
1
0
Fork 0

Rust: implement cache.

This commit is contained in:
whitequark 2016-10-01 18:24:53 +00:00
parent d825393e81
commit 8bced9dcb5
5 changed files with 100 additions and 17 deletions

View File

@ -0,0 +1,47 @@
use std::vec::Vec;
use std::string::String;
use std::btree_map::BTreeMap;
#[derive(Debug)]
struct Entry {
data: Vec<u32>,
borrowed: bool
}
#[derive(Debug)]
pub struct Cache {
entries: BTreeMap<String, Entry>
}
impl Cache {
pub fn new() -> Cache {
Cache { entries: BTreeMap::new() }
}
pub fn get(&mut self, key: &str) -> *const [u32] {
match self.entries.get_mut(key) {
None => &[],
Some(ref mut entry) => {
entry.borrowed = true;
&entry.data[..]
}
}
}
pub fn put(&mut self, key: &str, data: &[u32]) -> Result<(), ()> {
match self.entries.get_mut(key) {
None => (),
Some(ref mut entry) => {
if entry.borrowed { return Err(()) }
entry.data = Vec::from(data);
return Ok(())
}
}
self.entries.insert(String::from(key), Entry {
data: Vec::from(data),
borrowed: false
});
Ok(())
}
}

View File

@ -249,6 +249,7 @@ mod c {
#[repr(u32)] #[repr(u32)]
#[derive(Debug)] #[derive(Debug)]
#[allow(dead_code)]
pub enum Type { pub enum Type {
LoadRequest, LoadRequest,
LoadReply, LoadReply,

View File

@ -19,6 +19,7 @@ mod rtio_crg;
mod mailbox; mod mailbox;
mod logger; mod logger;
mod cache;
mod kernel_proto; mod kernel_proto;
mod session_proto; mod session_proto;

View File

@ -1,3 +1,5 @@
#![allow(dead_code)]
extern crate fringe; extern crate fringe;
extern crate lwip; extern crate lwip;

View File

@ -1,8 +1,10 @@
use std::prelude::v1::*; use std::prelude::v1::*;
use std::mem;
use std::str; use std::str;
use std::io::{self, Read}; use std::io::{self, Read};
use {config, rtio_crg, clock, mailbox, kernel}; use {config, rtio_crg, clock, mailbox, kernel};
use logger::BufferLogger; use logger::BufferLogger;
use cache::Cache;
use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY}; use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY};
use session_proto as host; use session_proto as host;
@ -21,6 +23,22 @@ fn io_error(msg: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, msg) io::Error::new(io::ErrorKind::Other, msg)
} }
// Persistent state
#[derive(Debug)]
struct Congress {
now: u64,
cache: Cache
}
impl Congress {
fn new() -> Congress {
Congress {
now: 0,
cache: Cache::new()
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum KernelState { enum KernelState {
Absent, Absent,
@ -29,23 +47,22 @@ enum KernelState {
RpcWait RpcWait
} }
// Per-connection state
#[derive(Debug)] #[derive(Debug)]
pub struct Session { struct Session {
kernel_state: KernelState, kernel_state: KernelState,
watchdog_set: clock::WatchdogSet, watchdog_set: clock::WatchdogSet
now: u64
} }
impl Session { impl Session {
pub fn new() -> Session { fn new() -> Session {
Session { Session {
kernel_state: KernelState::Absent, kernel_state: KernelState::Absent,
watchdog_set: clock::WatchdogSet::new(), watchdog_set: clock::WatchdogSet::new()
now: 0
} }
} }
pub fn running(&self) -> bool { fn running(&self) -> bool {
match self.kernel_state { match self.kernel_state {
KernelState::Absent | KernelState::Loaded => false, KernelState::Absent | KernelState::Loaded => false,
KernelState::Running | KernelState::RpcWait => true KernelState::Running | KernelState::RpcWait => true
@ -106,9 +123,9 @@ fn kern_acknowledge() -> io::Result<()> {
Ok(()) Ok(())
} }
fn comm_handle(waiter: Waiter, fn comm_handle(logger: &BufferLogger,
waiter: Waiter,
stream: &mut TcpStream, stream: &mut TcpStream,
logger: &BufferLogger,
session: &mut Session) -> io::Result<()> { session: &mut Session) -> io::Result<()> {
match try!(host_read(stream)) { match try!(host_read(stream)) {
host::Request::Ident => host::Request::Ident =>
@ -205,7 +222,7 @@ fn comm_handle(waiter: Waiter,
} }
fn kern_handle(waiter: Waiter, fn kern_handle(waiter: Waiter,
stream: &mut TcpStream, congress: &mut Congress,
session: &mut Session) -> io::Result<()> { session: &mut Session) -> io::Result<()> {
kern::Message::wait_and_receive(waiter, |request| { kern::Message::wait_and_receive(waiter, |request| {
match (&request, session.kernel_state) { match (&request, session.kernel_state) {
@ -229,10 +246,10 @@ fn kern_handle(waiter: Waiter,
} }
kern::NowInitRequest => kern::NowInitRequest =>
kern_send(waiter, kern::NowInitReply(session.now)), kern_send(waiter, kern::NowInitReply(congress.now)),
kern::NowSave(now) => { kern::NowSave(now) => {
session.now = now; congress.now = now;
kern_acknowledge() kern_acknowledge()
} }
@ -247,24 +264,37 @@ fn kern_handle(waiter: Waiter,
kern_acknowledge() kern_acknowledge()
} }
kern::CacheGetRequest { key } => {
let value = congress.cache.get(key);
kern_send(waiter, kern::CacheGetReply {
value: unsafe { mem::transmute::<*const [u32], &'static [u32]>(value) }
})
}
kern::CachePutRequest { key, value } => {
let succeeded = congress.cache.put(key, value).is_ok();
kern_send(waiter, kern::CachePutReply { succeeded: succeeded })
}
request => unexpected!("unexpected request {:?} from kernel CPU", request) request => unexpected!("unexpected request {:?} from kernel CPU", request)
} }
}) })
} }
fn handle(waiter: Waiter, fn handle(logger: &BufferLogger,
waiter: Waiter,
stream: &mut TcpStream, stream: &mut TcpStream,
logger: &BufferLogger) -> io::Result<()> { congress: &mut Congress) -> io::Result<()> {
try!(check_magic(stream)); try!(check_magic(stream));
let mut session = Session::new(); let mut session = Session::new();
loop { loop {
if stream.readable() { if stream.readable() {
try!(comm_handle(waiter, stream, logger, &mut session)) try!(comm_handle(logger, waiter, stream, &mut session))
} }
if mailbox::receive() != 0 { if mailbox::receive() != 0 {
try!(kern_handle(waiter, stream, &mut session)) try!(kern_handle(waiter, congress, &mut session))
} }
if session.kernel_state == KernelState::Running { if session.kernel_state == KernelState::Running {
@ -285,6 +315,8 @@ fn handle(waiter: Waiter,
pub fn handler(waiter: Waiter, pub fn handler(waiter: Waiter,
logger: &BufferLogger) { logger: &BufferLogger) {
let mut congress = Congress::new();
let addr = SocketAddr::new(IP_ANY, 1381); let addr = SocketAddr::new(IP_ANY, 1381);
let listener = TcpListener::bind(waiter, addr).unwrap(); let listener = TcpListener::bind(waiter, addr).unwrap();
info!("accepting network sessions in Rust"); info!("accepting network sessions in Rust");
@ -293,7 +325,7 @@ pub fn handler(waiter: Waiter,
let (mut stream, addr) = listener.accept().unwrap(); let (mut stream, addr) = listener.accept().unwrap();
info!("new connection from {:?}", addr); info!("new connection from {:?}", addr);
match handle(waiter, &mut stream, logger) { match handle(logger, waiter, &mut stream, &mut congress) {
Ok(()) => (), Ok(()) => (),
Err(err) => { Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof { if err.kind() == io::ErrorKind::UnexpectedEof {