From b14c19a886aaeb41e4b56d4f8ea8ea2d95de47f4 Mon Sep 17 00:00:00 2001 From: whitequark Date: Tue, 27 Sep 2016 13:36:55 +0000 Subject: [PATCH] Rust: add skeleton session protocol implementation. Only ident requests are supported right now. --- artiq/runtime.rs/Cargo.lock | 7 + artiq/runtime.rs/Cargo.toml | 3 +- artiq/runtime.rs/libstd_artiq/lib.rs | 3 +- artiq/runtime.rs/src/lib.rs | 25 +-- artiq/runtime.rs/src/session/mod.rs | 103 +++++++++ artiq/runtime.rs/src/session/protocol.rs | 272 +++++++++++++++++++++++ 6 files changed, 389 insertions(+), 24 deletions(-) create mode 100644 artiq/runtime.rs/src/session/mod.rs create mode 100644 artiq/runtime.rs/src/session/protocol.rs diff --git a/artiq/runtime.rs/Cargo.lock b/artiq/runtime.rs/Cargo.lock index 6e5a7538d..9fc25d063 100644 --- a/artiq/runtime.rs/Cargo.lock +++ b/artiq/runtime.rs/Cargo.lock @@ -2,6 +2,7 @@ name = "runtime" version = "0.0.0" dependencies = [ + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "fringe 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "lwip 0.0.0", "std_artiq 0.0.0", @@ -11,6 +12,11 @@ dependencies = [ name = "alloc_artiq" version = "0.0.0" +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "fringe" version = "1.1.0" @@ -44,5 +50,6 @@ dependencies = [ ] [metadata] +"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" "checksum fringe 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "987689dcfad85eee8d76b477865641ec483e63fb86d52966bfc350c4a647d78a" "checksum libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "23e3757828fa702a20072c37ff47938e9dd331b92fac6e223d26d4b7a55f7ee2" diff --git a/artiq/runtime.rs/Cargo.toml b/artiq/runtime.rs/Cargo.toml index 962e076cb..adc6c37b2 100644 --- a/artiq/runtime.rs/Cargo.toml +++ b/artiq/runtime.rs/Cargo.toml @@ -10,8 +10,9 @@ path = "src/lib.rs" [dependencies] std_artiq = { path = "libstd_artiq" } -fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] } lwip = { path = "liblwip" } +fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] } +byteorder = { version = "0.5", default-features = false } [profile.dev] panic = 'abort' diff --git a/artiq/runtime.rs/libstd_artiq/lib.rs b/artiq/runtime.rs/libstd_artiq/lib.rs index 83c4fd83d..9e95c2a24 100644 --- a/artiq/runtime.rs/libstd_artiq/lib.rs +++ b/artiq/runtime.rs/libstd_artiq/lib.rs @@ -1,6 +1,6 @@ #![feature(lang_items, asm, alloc, collections, libc, needs_panic_runtime, question_mark, unicode, reflect_marker, raw, int_error_internals, - try_from, try_borrow)] + try_from, try_borrow, macro_reexport, allow_internal_unstable)] #![no_std] #![needs_panic_runtime] @@ -8,6 +8,7 @@ extern crate rustc_unicode; extern crate alloc_artiq; extern crate alloc; #[macro_use] +#[macro_reexport(vec)] extern crate collections; extern crate libc; diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 04e987129..f482efeaa 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -2,44 +2,25 @@ #[macro_use] extern crate std_artiq as std; +extern crate byteorder; use std::prelude::v1::*; pub mod io; +pub mod session; extern { fn network_init(); fn lwip_service(); } -fn timer(waiter: io::Waiter) { - loop { - println!("tick"); - waiter.sleep(std::time::Duration::from_millis(1000)).unwrap(); - } -} - -fn echo(waiter: io::Waiter) { - let addr = io::SocketAddr::new(io::IP_ANY, 1234); - let listener = io::TcpListener::bind(waiter, addr).unwrap(); - loop { - let (mut stream, _addr) = listener.accept().unwrap(); - loop { - let mut buf = [0]; - stream.read(&mut buf).unwrap(); - stream.write(&buf).unwrap(); - } - } -} - #[no_mangle] pub unsafe extern fn rust_main() { println!("Accepting network sessions in Rust."); network_init(); let mut scheduler = io::Scheduler::new(); - scheduler.spawn(4096, timer); - scheduler.spawn(4096, echo); + scheduler.spawn(4096, session::handler); loop { lwip_service(); scheduler.run() diff --git a/artiq/runtime.rs/src/session/mod.rs b/artiq/runtime.rs/src/session/mod.rs new file mode 100644 index 000000000..b97af72bf --- /dev/null +++ b/artiq/runtime.rs/src/session/mod.rs @@ -0,0 +1,103 @@ +use std::prelude::v1::*; +use std::io::{self, Read}; +use self::protocol::*; + +mod protocol; + +#[derive(Debug, Clone, Copy)] +enum KernelState { + Absent, + Loaded, + Running, + RpcWait +} + +#[derive(Debug)] +pub struct Session { + kernel_state: KernelState, +} + +extern { + fn kloader_stop(); + fn watchdog_init(); + fn kloader_start_idle_kernel(); +} + +impl Session { + pub fn start() -> Session { + unsafe { kloader_stop(); } + Session { + kernel_state: KernelState::Absent + } + } + + pub fn end(self) { + unsafe { + kloader_stop(); + watchdog_init(); + kloader_start_idle_kernel(); + } + } +} + +fn check_magic(stream: &mut ::io::TcpStream) -> io::Result<()> { + const MAGIC: &'static [u8] = b"ARTIQ coredev\n"; + + let mut magic: [u8; 14] = [0; 14]; + try!(stream.read_exact(&mut magic)); + if magic != MAGIC { + Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic")) + } else { + Ok(()) + } +} + +fn handle_request(stream: &mut ::io::TcpStream) -> io::Result<()> { + fn read_request(stream: &mut ::io::TcpStream) -> io::Result { + let request = try!(Request::read_from(stream)); + println!("comm<-host {:?}", request); + Ok(request) + } + + fn write_reply(stream: &mut ::io::TcpStream, reply: Reply) -> io::Result<()> { + println!("comm->host {:?}", reply); + reply.write_to(stream) + } + + match try!(read_request(stream)) { + Request::Ident => { + let mut ident: [u8; 256]; + let ident = unsafe { + extern { fn get_ident(ident: *mut u8); } + + ident = ::core::mem::uninitialized(); + get_ident(ident.as_mut_ptr()); + &ident[..ident.iter().position(|&c| c == 0).unwrap()] + }; + + write_reply(stream, Reply::Ident(ident)) + }, + _ => unreachable!() + } +} + +fn handle_requests(stream: &mut ::io::TcpStream) -> io::Result<()> { + try!(check_magic(stream)); + loop { + try!(handle_request(stream)) + } +} + +pub fn handler(waiter: ::io::Waiter) { + let addr = ::io::SocketAddr::new(::io::IP_ANY, 1381); + let listener = ::io::TcpListener::bind(waiter, addr).unwrap(); + loop { + let (mut stream, _addr) = listener.accept().unwrap(); + match handle_requests(&mut stream) { + Ok(()) => (), + Err(err) => { + println!("cannot handle network request: {:?}", err); + } + } + } +} diff --git a/artiq/runtime.rs/src/session/protocol.rs b/artiq/runtime.rs/src/session/protocol.rs new file mode 100644 index 000000000..cdb27a3f7 --- /dev/null +++ b/artiq/runtime.rs/src/session/protocol.rs @@ -0,0 +1,272 @@ +use std::prelude::v1::*; +use std::io::{self, Read, Write}; +use byteorder::{ByteOrder, NetworkEndian}; + +// FIXME: replace these with byteorder core io traits once those are in +fn read_u8(reader: &mut Read) -> io::Result { + let mut bytes = [0; 1]; + try!(reader.read_exact(&mut bytes)); + Ok(bytes[0]) +} + +fn write_u8(writer: &mut Write, value: u8) -> io::Result<()> { + let bytes = [value; 1]; + writer.write_all(&bytes) +} + +fn read_u32(reader: &mut Read) -> io::Result { + let mut bytes = [0; 4]; + try!(reader.read_exact(&mut bytes)); + Ok(NetworkEndian::read_u32(&bytes)) +} + +fn write_u32(writer: &mut Write, value: u32) -> io::Result<()> { + let mut bytes = [0; 4]; + NetworkEndian::write_u32(&mut bytes, value); + writer.write_all(&bytes) +} + +fn read_u64(reader: &mut Read) -> io::Result { + let mut bytes = [0; 4]; + try!(reader.read_exact(&mut bytes)); + Ok(NetworkEndian::read_u64(&bytes)) +} + +fn write_u64(writer: &mut Write, value: u64) -> io::Result<()> { + let mut bytes = [0; 4]; + NetworkEndian::write_u64(&mut bytes, value); + writer.write_all(&bytes) +} + +fn read_bytes(reader: &mut Read) -> io::Result> { + let length = try!(read_u32(reader)); + let mut value = vec![0; length as usize]; + try!(reader.read_exact(&mut value)); + Ok(value) +} + +fn write_bytes(writer: &mut Write, value: &[u8]) -> io::Result<()> { + try!(write_u32(writer, value.len() as u32)); + writer.write_all(value) +} + +fn read_string(reader: &mut Read) -> io::Result { + let bytes = try!(read_bytes(reader)); + String::from_utf8(bytes).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} + +fn write_string(writer: &mut Write, value: &str) -> io::Result<()> { + write_bytes(writer, value.as_bytes()) +} + +fn read_sync(reader: &mut Read) -> io::Result<()> { + let mut sync = [0; 4]; + for i in 0.. { + sync[i % 4] = try!(read_u8(reader)); + if sync == [0x5a; 4] { break } + } + Ok(()) +} + +fn write_sync(writer: &mut Write) -> io::Result<()> { + writer.write_all(&[0x5a; 4]) +} + +#[derive(Debug)] +pub struct Exception { + name: String, + message: String, + param: [u64; 3], + file: String, + line: u32, + column: u32, + function: String, +} + +impl Exception { + pub fn read_from(reader: &mut Read) -> io::Result { + Ok(Exception { + name: try!(read_string(reader)), + message: try!(read_string(reader)), + param: [try!(read_u64(reader)), + try!(read_u64(reader)), + try!(read_u64(reader))], + file: try!(read_string(reader)), + line: try!(read_u32(reader)), + column: try!(read_u32(reader)), + function: try!(read_string(reader)) + }) + } + + pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { + try!(write_string(writer, &self.name)); + try!(write_string(writer, &self.message)); + try!(write_u64(writer, self.param[0])); + try!(write_u64(writer, self.param[1])); + try!(write_u64(writer, self.param[2])); + try!(write_string(writer, &self.file)); + try!(write_u32(writer, self.line)); + try!(write_u32(writer, self.column)); + try!(write_string(writer, &self.function)); + Ok(()) + } +} + +#[derive(Debug)] +pub enum Request { + Log, + LogClear, + + Ident, + SwitchClock(u8), + + LoadLibrary(Vec), + RunKernel, + + RpcReply { tag: String }, // FIXME + RpcException(Exception), + + FlashRead { key: String }, + FlashWrite { key: String, value: Vec }, + FlashErase { key: String }, + FlashRemove { key: String }, +} + +impl Request { + pub fn read_from(reader: &mut Read) -> io::Result { + const HEADER_SIZE: usize = 9; + + try!(read_sync(reader)); + let length = try!(read_u32(reader)) as usize; + let type_ = try!(read_u8(reader)); + + Ok(match type_ { + 1 => Request::Log, + 2 => Request::LogClear, + 3 => Request::Ident, + 4 => Request::SwitchClock(try!(read_u8(reader))), + 5 => { + let mut code = vec![0; length - HEADER_SIZE]; + try!(reader.read_exact(&mut code)); + Request::LoadLibrary(code) + }, + 6 => Request::RunKernel, + 7 => Request::RpcReply { + tag: try!(read_string(reader)) + }, + 8 => Request::RpcException(try!(Exception::read_from(reader))), + 9 => Request::FlashRead { + key: try!(read_string(reader)) + }, + 10 => Request::FlashWrite { + key: try!(read_string(reader)), + value: try!(read_bytes(reader)) + }, + 11 => Request::FlashErase { + key: try!(read_string(reader)) + }, + 12 => Request::FlashRemove { + key: try!(read_string(reader)) + }, + _ => unreachable!() + }) + } +} + +#[derive(Debug)] +pub enum Reply<'a> { + Log(&'a str), + + Ident(&'a [u8]), + ClockSwitchCompleted, + ClockSwitchFailed, + + LoadCompleted, + LoadFailed, + + KernelFinished, + KernelStartupFailed, + KernelException(Exception), + + RpcRequest { service: u32 }, + + FlashRead(&'a [u8]), + FlashOk, + FlashError, + + WatchdogExpired, + ClockFailure, +} + +impl<'a> Reply<'a> { + pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { + let mut buf = Vec::new(); + try!(write_sync(&mut buf)); + try!(write_u32(&mut buf, 0)); // length placeholder + + match *self { + Reply::Log(ref log) => { + try!(write_u8(&mut buf, 1)); + try!(buf.write(log.as_bytes())); + }, + + Reply::Ident(ident) => { + try!(write_u8(&mut buf, 2)); + try!(buf.write(b"AROR")); + try!(buf.write(ident)); + }, + Reply::ClockSwitchCompleted => { + try!(write_u8(&mut buf, 3)); + }, + Reply::ClockSwitchFailed => { + try!(write_u8(&mut buf, 4)); + }, + + Reply::LoadCompleted => { + try!(write_u8(&mut buf, 5)); + }, + Reply::LoadFailed => { + try!(write_u8(&mut buf, 6)); + }, + + Reply::KernelFinished => { + try!(write_u8(&mut buf, 7)); + }, + Reply::KernelStartupFailed => { + try!(write_u8(&mut buf, 8)); + }, + Reply::KernelException(ref exception) => { + try!(write_u8(&mut buf, 9)); + try!(exception.write_to(writer)); + }, + + Reply::RpcRequest { service } => { + try!(write_u8(&mut buf, 10)); + try!(write_u32(&mut buf, service)); + }, + + Reply::FlashRead(ref bytes) => { + try!(write_u8(&mut buf, 11)); + try!(buf.write(bytes)); + }, + Reply::FlashOk => { + try!(write_u8(&mut buf, 12)); + }, + Reply::FlashError => { + try!(write_u8(&mut buf, 13)); + }, + + Reply::WatchdogExpired => { + try!(write_u8(&mut buf, 14)); + }, + Reply::ClockFailure => { + try!(write_u8(&mut buf, 15)); + }, + } + + let len = buf.len(); + try!(write_u32(&mut &mut buf[4..8], len as u32)); + + writer.write_all(&buf) + } +}