2
0
mirror of https://github.com/m-labs/artiq.git synced 2024-12-19 00:16:29 +08:00

Rust: implement idle kernels and session takeover.

This commit is contained in:
whitequark 2016-10-02 04:37:24 +00:00
parent 8bced9dcb5
commit 30e997f045
10 changed files with 316 additions and 101 deletions

View File

@ -5,7 +5,7 @@ dependencies = [
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fringe 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log_buffer 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log_buffer 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lwip 0.0.0",
"std_artiq 0.0.0",
"walkdir 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -49,7 +49,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "log_buffer"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -96,7 +96,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "23e3757828fa702a20072c37ff47938e9dd331b92fac6e223d26d4b7a55f7ee2"
"checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054"
"checksum log_buffer 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8beb5ba24eca52f9958874445c4de5e086a7e82a1ec6b7ab81e5fcfb134f25a"
"checksum log_buffer 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ec57723b84bbe7bdf76aa93169c9b59e67473317c6de3a83cb2a0f8ccb2aa493"
"checksum walkdir 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "c66c0b9792f0a765345452775f3adbd28dde9d33f30d13e5dcc5ae17cf6f3780"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"

View File

@ -14,7 +14,7 @@ path = "src/lib.rs"
[dependencies]
std_artiq = { path = "libstd_artiq" }
lwip = { path = "liblwip" }
lwip = { path = "liblwip", default-features = false }
fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] }
log = { version = "0.3", default-features = false }
log_buffer = { version = "1.0" }

View File

@ -10,3 +10,7 @@ path = "lib.rs"
[dependencies]
lwip-sys = { path = "../liblwip-sys" }
std_artiq = { path = "../libstd_artiq" }
[features]
default = ["preemption"]
preemption = []

View File

@ -175,6 +175,9 @@ pub struct Pbuf<'payload> {
phantom: PhantomData<&'payload [u8]>
}
#[cfg(not(feature = "preemption"))]
unsafe impl<'payload> Send for Pbuf<'payload> {}
impl<'payload> Pbuf<'payload> {
unsafe fn from_raw(raw: *mut lwip_sys::pbuf) -> Pbuf<'payload> {
Pbuf { raw: raw, phantom: PhantomData }
@ -259,6 +262,9 @@ pub struct UdpSocket {
state: Box<RefCell<UdpSocketState>>
}
#[cfg(not(feature = "preemption"))]
unsafe impl Send for UdpSocket {}
impl UdpSocket {
pub fn new() -> Result<UdpSocket> {
extern fn recv(arg: *mut c_void, _pcb: *mut lwip_sys::udp_pcb,
@ -347,6 +353,9 @@ pub struct TcpListener {
state: Box<RefCell<TcpListenerState>>
}
#[cfg(not(feature = "preemption"))]
unsafe impl Send for TcpListener {}
impl TcpListener {
pub fn bind(addr: SocketAddr) -> Result<TcpListener> {
extern fn accept(arg: *mut c_void, newpcb: *mut lwip_sys::tcp_pcb,
@ -428,6 +437,9 @@ pub struct TcpStream {
state: Box<RefCell<TcpStreamState>>
}
#[cfg(not(feature = "preemption"))]
unsafe impl Send for TcpStream {}
impl TcpStream {
fn from_raw(raw: *mut lwip_sys::tcp_pcb) -> TcpStream {
extern fn recv(arg: *mut c_void, _raw: *mut lwip_sys::tcp_pcb,

View File

@ -15,14 +15,14 @@ pub unsafe fn start() {
stop();
extern {
static _binary_ksupport_elf_start: ();
static _binary_ksupport_elf_end: ();
static _binary_ksupport_elf_start: u8;
static _binary_ksupport_elf_end: u8;
}
let ksupport_start = &_binary_ksupport_elf_start as *const _ as usize;
let ksupport_end = &_binary_ksupport_elf_end as *const _ as usize;
ptr::copy_nonoverlapping(ksupport_start as *const u8,
let ksupport_start = &_binary_ksupport_elf_start as *const _;
let ksupport_end = &_binary_ksupport_elf_end as *const _;
ptr::copy_nonoverlapping(ksupport_start,
(KERNELCPU_EXEC_ADDRESS - KSUPPORT_HEADER_SIZE) as *mut u8,
ksupport_end - ksupport_start);
ksupport_end as usize - ksupport_start as usize);
csr::kernel_cpu::reset_write(0);
}

View File

@ -1,5 +1,5 @@
#![no_std]
#![feature(libc)]
#![feature(libc, borrow_state, const_fn)]
#[macro_use]
extern crate std_artiq as std;
@ -8,16 +8,19 @@ extern crate libc;
extern crate log;
extern crate log_buffer;
extern crate byteorder;
extern crate fringe;
extern crate lwip;
use logger::BufferLogger;
mod board;
mod sched;
mod config;
mod clock;
mod rtio_crg;
mod mailbox;
mod urc;
mod sched;
mod logger;
mod cache;
@ -36,9 +39,9 @@ include!(concat!(env!("OUT_DIR"), "/git_info.rs"));
#[no_mangle]
pub unsafe extern fn rust_main() {
static mut log_buffer: [u8; 4096] = [0; 4096];
BufferLogger::new(&mut log_buffer[..])
.register(move |logger| {
static mut LOG_BUFFER: [u8; 4096] = [0; 4096];
BufferLogger::new(&mut LOG_BUFFER[..])
.register(move || {
info!("booting ARTIQ runtime ({})", GIT_COMMIT);
clock::init();
@ -46,13 +49,11 @@ pub unsafe extern fn rust_main() {
network_init();
let mut scheduler = sched::Scheduler::new();
scheduler.spawn(8192, move |waiter| {
session::handler(waiter, logger)
});
scheduler.spawner().spawn(8192, session::handler);
loop {
scheduler.run();
lwip_service();
scheduler.run()
}
})
}

View File

@ -1,3 +1,4 @@
use core::{mem, ptr};
use core::cell::RefCell;
use log::{self, Log, LogMetadata, LogRecord, LogLevelFilter};
use log_buffer::LogBuffer;
@ -6,9 +7,10 @@ pub struct BufferLogger {
buffer: RefCell<LogBuffer<&'static mut [u8]>>
}
// We can never preempt from within the logger, so there can be no data races.
unsafe impl Sync for BufferLogger {}
static mut LOGGER: *const BufferLogger = ptr::null();
impl BufferLogger {
pub fn new(buffer: &'static mut [u8]) -> BufferLogger {
BufferLogger {
@ -16,7 +18,7 @@ impl BufferLogger {
}
}
pub fn register<F: FnOnce(&BufferLogger)>(&self, f: F) {
pub fn register<F: FnOnce()>(&self, f: F) {
// log::set_logger_raw captures a pointer to ourselves, so we must prevent
// ourselves from being moved or dropped after that function is called (and
// before log::shutdown_logger_raw is called).
@ -25,9 +27,17 @@ impl BufferLogger {
max_log_level.set(LogLevelFilter::Trace);
self as *const Log
}).expect("global logger can only be initialized once");
LOGGER = self;
}
f(self);
f();
log::shutdown_logger_raw().unwrap();
unsafe {
LOGGER = ptr::null();
}
}
pub fn with_instance<R, F: FnOnce(&BufferLogger) -> R>(f: F) -> R {
f(unsafe { mem::transmute::<*const BufferLogger, &BufferLogger>(LOGGER) })
}
pub fn clear(&self) {

View File

@ -1,14 +1,14 @@
#![allow(dead_code)]
extern crate fringe;
extern crate lwip;
use std::cell::RefCell;
use std::cell::{RefCell, BorrowState};
use std::vec::Vec;
use std::rc::Rc;
use std::time::{Instant, Duration};
use std::io::{Read, Write, Result, Error, ErrorKind};
use self::fringe::OwnedStack;
use self::fringe::generator::{Generator, Yielder};
use fringe::OwnedStack;
use fringe::generator::{Generator, Yielder, State as GeneratorState};
use lwip;
use urc::Urc;
#[derive(Debug)]
struct WaitRequest {
@ -30,33 +30,82 @@ struct Thread {
interrupted: bool
}
#[derive(Debug)]
pub struct Scheduler {
threads: Vec<Thread>,
index: usize
}
impl Scheduler {
pub fn new() -> Scheduler {
Scheduler { threads: Vec::new(), index: 0 }
}
pub unsafe fn spawn<F: FnOnce(Waiter) + Send>(&mut self, stack_size: usize, f: F) {
impl Thread {
unsafe fn new<F>(spawner: Spawner, stack_size: usize, f: F) -> ThreadHandle
where F: 'static + FnOnce(Waiter, Spawner) + Send {
let stack = OwnedStack::new(stack_size);
let thread = Thread {
generator: Generator::unsafe_new(stack, move |yielder, _| {
f(Waiter(yielder))
ThreadHandle::new(Thread {
generator: Generator::unsafe_new(stack, |yielder, _| {
f(Waiter(yielder), spawner)
}),
waiting_for: WaitRequest {
timeout: None,
event: None
},
interrupted: false
};
self.threads.push(thread)
})
}
pub fn terminated(&self) -> bool {
// FIXME: https://github.com/nathan7/libfringe/pull/56
match self.generator.state() {
GeneratorState::Unavailable => true,
GeneratorState::Runnable => false
}
}
pub fn interrupt(&mut self) {
self.interrupted = true
}
}
#[derive(Debug, Clone)]
pub struct ThreadHandle(Urc<RefCell<Thread>>);
impl ThreadHandle {
fn new(thread: Thread) -> ThreadHandle {
ThreadHandle(Urc::new(RefCell::new(thread)))
}
pub fn terminated(&self) -> bool {
match self.0.borrow_state() {
BorrowState::Unused => self.0.borrow().terminated(),
_ => false // the running thread hasn't terminated
}
}
pub fn interrupt(&self) {
// FIXME: use try_borrow() instead once it's available
match self.0.borrow_state() {
BorrowState::Unused => self.0.borrow_mut().interrupt(),
_ => panic!("cannot interrupt the running thread")
}
}
}
#[derive(Debug)]
pub struct Scheduler {
threads: Vec<ThreadHandle>,
index: usize,
spawner: Spawner
}
impl Scheduler {
pub fn new() -> Scheduler {
Scheduler {
threads: Vec::new(),
index: 0,
spawner: Spawner::new()
}
}
pub fn spawner(&self) -> &Spawner {
&self.spawner
}
pub fn run(&mut self) {
self.threads.append(&mut *self.spawner.queue.borrow_mut());
if self.threads.len() == 0 { return }
let now = Instant::now();
@ -66,7 +115,7 @@ impl Scheduler {
self.index = (self.index + 1) % self.threads.len();
let result = {
let thread = &mut self.threads[self.index];
let thread = &mut *self.threads[self.index].0.borrow_mut();
match thread.waiting_for {
_ if thread.interrupted => {
thread.interrupted = false;
@ -97,7 +146,8 @@ impl Scheduler {
},
Some(wait_request) => {
// The thread has suspended itself.
self.threads[self.index].waiting_for = wait_request
let thread = &mut *self.threads[self.index].0.borrow_mut();
thread.waiting_for = wait_request
}
}
@ -106,8 +156,27 @@ impl Scheduler {
}
}
#[derive(Debug, Clone)]
pub struct Spawner {
queue: Urc<RefCell<Vec<ThreadHandle>>>
}
impl Spawner {
fn new() -> Spawner {
Spawner { queue: Urc::new(RefCell::new(Vec::new())) }
}
pub fn spawn<F>(&self, stack_size: usize, f: F) -> ThreadHandle
where F: 'static + FnOnce(Waiter, Spawner) + Send {
let handle = unsafe { Thread::new(self.clone(), stack_size, f) };
self.queue.borrow_mut().push(handle.clone());
handle
}
}
enum WaitEvent {
Completion(*const (Fn() -> bool + 'static)),
Termination(*const RefCell<Thread>),
UdpReadable(*const RefCell<lwip::UdpSocketState>),
TcpAcceptable(*const RefCell<lwip::TcpListenerState>),
TcpWriteable(*const RefCell<lwip::TcpStreamState>),
@ -119,6 +188,8 @@ impl WaitEvent {
match *self {
WaitEvent::Completion(f) =>
unsafe { (*f)() },
WaitEvent::Termination(thread) =>
unsafe { (*thread).borrow().terminated() },
WaitEvent::UdpReadable(state) =>
unsafe { (*state).borrow().readable() },
WaitEvent::TcpAcceptable(state) =>
@ -173,6 +244,13 @@ impl<'a> Waiter<'a> {
}
}
pub fn join(&self, thread: ThreadHandle) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
event: Some(WaitEvent::Termination(&*thread.0))
})
}
pub fn until<F: Fn() -> bool + 'static>(&self, f: F) -> Result<()> {
self.suspend(WaitRequest {
timeout: None,
@ -211,7 +289,7 @@ impl<'a> Waiter<'a> {
// Wrappers around lwip
pub use self::lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr};
pub use lwip::{IpAddr, IP4_ANY, IP6_ANY, IP_ANY, SocketAddr};
#[derive(Debug)]
pub struct UdpSocket<'a> {
@ -227,6 +305,14 @@ impl<'a> UdpSocket<'a> {
})
}
pub fn into_lower(self) -> lwip::UdpSocket {
self.lower
}
pub fn from_lower(waiter: Waiter<'a>, inner: lwip::UdpSocket) -> UdpSocket {
UdpSocket { waiter: waiter, lower: inner }
}
pub fn bind(&self, addr: SocketAddr) -> Result<()> {
Ok(try!(self.lower.bind(addr)))
}
@ -285,6 +371,14 @@ impl<'a> TcpListener<'a> {
})
}
pub fn into_lower(self) -> lwip::TcpListener {
self.lower
}
pub fn from_lower(waiter: Waiter<'a>, inner: lwip::TcpListener) -> TcpListener {
TcpListener { waiter: waiter, lower: inner }
}
pub fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
try!(self.waiter.tcp_acceptable(&self.lower));
let stream_lower = self.lower.try_accept().unwrap();
@ -301,7 +395,9 @@ impl<'a> TcpListener<'a> {
}
}
pub use self::lwip::Shutdown;
pub use lwip::Shutdown;
pub struct TcpStreamInner(lwip::TcpStream, Option<(lwip::Pbuf<'static>, usize)>);
#[derive(Debug)]
pub struct TcpStream<'a> {
@ -311,6 +407,14 @@ pub struct TcpStream<'a> {
}
impl<'a> TcpStream<'a> {
pub fn into_lower(self) -> TcpStreamInner {
TcpStreamInner(self.lower, self.buffer)
}
pub fn from_lower(waiter: Waiter<'a>, inner: TcpStreamInner) -> TcpStream {
TcpStream { waiter: waiter, lower: inner.0, buffer: inner.1 }
}
pub fn shutdown(&self, how: Shutdown) -> Result<()> {
Ok(try!(self.lower.shutdown(how)))
}

View File

@ -1,11 +1,13 @@
use std::prelude::v1::*;
use std::mem;
use std::str;
use std::{mem, str};
use std::cell::RefCell;
use std::io::{self, Read};
use {config, rtio_crg, clock, mailbox, kernel};
use logger::BufferLogger;
use cache::Cache;
use sched::{Waiter, TcpListener, TcpStream, SocketAddr, IP_ANY};
use urc::Urc;
use sched::{ThreadHandle, Waiter, Spawner};
use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY};
use session_proto as host;
use kernel_proto as kern;
@ -49,14 +51,16 @@ enum KernelState {
// Per-connection state
#[derive(Debug)]
struct Session {
struct Session<'a> {
congress: &'a mut Congress,
kernel_state: KernelState,
watchdog_set: clock::WatchdogSet
}
impl Session {
fn new() -> Session {
impl<'a> Session<'a> {
fn new(congress: &mut Congress) -> Session {
Session {
congress: congress,
kernel_state: KernelState::Absent,
watchdog_set: clock::WatchdogSet::new()
}
@ -70,7 +74,7 @@ impl Session {
}
}
impl Drop for Session {
impl<'a> Drop for Session<'a> {
fn drop(&mut self) {
kernel::stop()
}
@ -123,8 +127,7 @@ fn kern_acknowledge() -> io::Result<()> {
Ok(())
}
fn comm_handle(logger: &BufferLogger,
waiter: Waiter,
fn process_host_message(waiter: Waiter,
stream: &mut TcpStream,
session: &mut Session) -> io::Result<()> {
match try!(host_read(stream)) {
@ -135,13 +138,15 @@ fn comm_handle(logger: &BufferLogger,
host::Request::Log => {
// Logging the packet with the log is inadvisable
trace!("comm->host Log(...)");
logger.extract(move |log| {
BufferLogger::with_instance(|logger| {
logger.extract(|log| {
host::Reply::Log(log).write_to(stream)
})
})
}
host::Request::LogClear => {
logger.clear();
BufferLogger::with_instance(|logger| logger.clear());
host_write(stream, host::Reply::Log(""))
}
@ -221,8 +226,7 @@ fn comm_handle(logger: &BufferLogger,
}
}
fn kern_handle(waiter: Waiter,
congress: &mut Congress,
fn process_kern_message(waiter: Waiter,
session: &mut Session) -> io::Result<()> {
kern::Message::wait_and_receive(waiter, |request| {
match (&request, session.kernel_state) {
@ -246,10 +250,10 @@ fn kern_handle(waiter: Waiter,
}
kern::NowInitRequest =>
kern_send(waiter, kern::NowInitReply(congress.now)),
kern_send(waiter, kern::NowInitReply(session.congress.now)),
kern::NowSave(now) => {
congress.now = now;
session.congress.now = now;
kern_acknowledge()
}
@ -265,14 +269,14 @@ fn kern_handle(waiter: Waiter,
}
kern::CacheGetRequest { key } => {
let value = congress.cache.get(key);
let value = session.congress.cache.get(key);
kern_send(waiter, kern::CacheGetReply {
value: unsafe { mem::transmute::<*const [u32], &'static [u32]>(value) }
})
}
kern::CachePutRequest { key, value } => {
let succeeded = congress.cache.put(key, value).is_ok();
let succeeded = session.congress.cache.put(key, value).is_ok();
kern_send(waiter, kern::CachePutReply { succeeded: succeeded })
}
@ -281,20 +285,17 @@ fn kern_handle(waiter: Waiter,
})
}
fn handle(logger: &BufferLogger,
waiter: Waiter,
fn host_kernel_worker(waiter: Waiter,
stream: &mut TcpStream,
congress: &mut Congress) -> io::Result<()> {
try!(check_magic(stream));
let mut session = Session::new();
let mut session = Session::new(congress);
loop {
if stream.readable() {
try!(comm_handle(logger, waiter, stream, &mut session))
try!(process_host_message(waiter, stream, &mut session));
}
if mailbox::receive() != 0 {
try!(kern_handle(waiter, congress, &mut session))
try!(process_kern_message(waiter, &mut session))
}
if session.kernel_state == KernelState::Running {
@ -313,19 +314,52 @@ fn handle(logger: &BufferLogger,
}
}
pub fn handler(waiter: Waiter,
logger: &BufferLogger) {
let mut congress = Congress::new();
fn flash_kernel_worker(waiter: Waiter,
congress: &mut Congress) -> io::Result<()> {
let mut session = Session::new(congress);
loop {
try!(process_kern_message(waiter, &mut session))
}
}
fn respawn<F>(spawner: Spawner, waiter: Waiter,
handle: &mut Option<ThreadHandle>,
f: F) where F: 'static + FnOnce(Waiter, Spawner) + Send {
match handle.take() {
None => (),
Some(handle) => {
info!("terminating running kernel");
handle.interrupt();
waiter.join(handle).expect("cannot join interrupt thread")
}
}
*handle = Some(spawner.spawn(8192, f))
}
pub fn handler(waiter: Waiter, spawner: Spawner) {
let congress = Urc::new(RefCell::new(Congress::new()));
let addr = SocketAddr::new(IP_ANY, 1381);
let listener = TcpListener::bind(waiter, addr).unwrap();
let listener = TcpListener::bind(waiter, addr).expect("cannot bind socket");
info!("accepting network sessions in Rust");
let mut kernel_thread = None;
loop {
let (mut stream, addr) = listener.accept().unwrap();
if listener.acceptable() {
let (mut stream, addr) = listener.accept().expect("cannot accept client");
match check_magic(&mut stream) {
Ok(()) => (),
Err(_) => continue
}
info!("new connection from {:?}", addr);
match handle(logger, waiter, &mut stream, &mut congress) {
let stream = stream.into_lower();
let congress = congress.clone();
respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| {
let mut stream = TcpStream::from_lower(waiter, stream);
let mut congress = congress.borrow_mut();
match host_kernel_worker(waiter, &mut stream, &mut congress) {
Ok(()) => (),
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
@ -335,5 +369,24 @@ pub fn handler(waiter: Waiter,
}
}
}
})
}
if kernel_thread.is_none() {
info!("no connection, starting idle kernel");
let congress = congress.clone();
respawn(spawner.clone(), waiter, &mut kernel_thread, move |waiter, _spawner| {
let mut congress = congress.borrow_mut();
match flash_kernel_worker(waiter, &mut congress) {
Ok(()) =>
info!("idle kernel finished, standing by"),
Err(err) => {
error!("idle kernel aborted: {:?}", err);
}
}
})
}
waiter.relinquish()
}
}

View File

@ -0,0 +1,31 @@
use std::rc::Rc;
use std::ops::{Deref, DerefMut};
use std::fmt;
pub struct Urc<T: ?Sized>(Rc<T>);
impl<T> Urc<T> {
pub fn new(value: T) -> Urc<T> { Urc(Rc::new(value)) }
}
unsafe impl<T: ?Sized> Send for Urc<T> {}
unsafe impl<T: ?Sized> Sync for Urc<T> {}
impl<T: ?Sized> Deref for Urc<T> {
type Target = T;
fn deref(&self) -> &Self::Target { self.0.deref() }
}
impl<T: ?Sized> Clone for Urc<T> {
fn clone(&self) -> Urc<T> {
Urc(self.0.clone())
}
}
impl<T: ?Sized + fmt::Debug> fmt::Debug for Urc<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}