From 6fcd57a41a342c710c0b5fc0d216a20f4df177ad Mon Sep 17 00:00:00 2001 From: whitequark Date: Tue, 1 Nov 2016 10:30:42 +0000 Subject: [PATCH] runtime: fix remaining async RPC bugs. --- artiq/coredevice/comm_generic.py | 20 ++++++++++++-------- artiq/runtime.rs/Cargo.toml | 2 +- artiq/runtime.rs/libksupport/lib.rs | 7 ++++--- artiq/runtime.rs/src/session.rs | 3 ++- artiq/runtime/ksupport_glue.c | 2 +- artiq/test/coredevice/test_embedding.py | 21 +++++++++++++++++++++ 6 files changed, 41 insertions(+), 14 deletions(-) diff --git a/artiq/coredevice/comm_generic.py b/artiq/coredevice/comm_generic.py index 30804d541..8afa8c096 100644 --- a/artiq/coredevice/comm_generic.py +++ b/artiq/coredevice/comm_generic.py @@ -413,23 +413,27 @@ class CommGeneric: def _serve_rpc(self, embedding_map): async = self._read_bool() service_id = self._read_int32() - service = embedding_map.retrieve_object(service_id) args, kwargs = self._receive_rpc_args(embedding_map) return_tags = self._read_bytes() + + if service_id is 0: + service = lambda obj, attr, value: setattr(obj, attr, value) + else: + service = embedding_map.retrieve_object(service_id) logger.debug("rpc service: [%d]%r%s %r %r -> %s", service_id, service, (" (async)" if async else ""), args, kwargs, return_tags) + if async: + service(*args, **kwargs) + return + try: result = service(*args, **kwargs) - if async: - return - else: - self._write_header(_H2DMsgType.RPC_REPLY) - self._write_bytes(return_tags) - self._send_rpc_value(bytearray(return_tags), result, result, service) - logger.debug("rpc service: %d %r %r = %r", service_id, args, kwargs, result) + self._write_header(_H2DMsgType.RPC_REPLY) + self._write_bytes(return_tags) + self._send_rpc_value(bytearray(return_tags), result, result, service) except Exception as exn: logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn) diff --git a/artiq/runtime.rs/Cargo.toml b/artiq/runtime.rs/Cargo.toml index 146ff9089..5a459d204 100644 --- a/artiq/runtime.rs/Cargo.toml +++ b/artiq/runtime.rs/Cargo.toml @@ -16,7 +16,7 @@ path = "src/lib.rs" std_artiq = { path = "libstd_artiq", features = ["alloc"] } lwip = { path = "liblwip", default-features = false } fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] } -log = { version = "0.3", default-features = false, features = [] } +log = { version = "0.3", default-features = false, features = ["max_level_debug"] } log_buffer = { version = "1.0" } byteorder = { version = "0.5", default-features = false } diff --git a/artiq/runtime.rs/libksupport/lib.rs b/artiq/runtime.rs/libksupport/lib.rs index 73b35c4bc..dc1c853e4 100644 --- a/artiq/runtime.rs/libksupport/lib.rs +++ b/artiq/runtime.rs/libksupport/lib.rs @@ -12,8 +12,6 @@ extern crate byteorder; mod board; #[path = "../src/mailbox.rs"] mod mailbox; -#[path = "../src/rpc_queue.rs"] -mod rpc_queue; #[path = "../src/proto.rs"] mod proto; @@ -72,7 +70,7 @@ macro_rules! recv { } macro_rules! print { - ($($arg:tt)*) => ($crate::send(&Log(format_args!($($arg)*)))); + ($($arg:tt)*) => ($crate::send(&$crate::kernel_proto::Log(format_args!($($arg)*)))); } macro_rules! println { @@ -80,6 +78,9 @@ macro_rules! println { ($fmt:expr, $($arg:tt)*) => (print!(concat!($fmt, "\n"), $($arg)*)); } +#[path = "../src/rpc_queue.rs"] +mod rpc_queue; + #[lang = "panic_fmt"] extern fn panic_fmt(args: core::fmt::Arguments, file: &'static str, line: u32) -> ! { println!("panic at {}:{}: {}", file, line, args); diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index 5a7bb76c2..8ad957a61 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -471,6 +471,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream, trace!("comm<-kern (async RPC)"); let length = NetworkEndian::read_u32(slice) as usize; try!(host_write(stream, host::Reply::RpcRequest { async: true })); + trace!("{:?}" ,&slice[4..][..length]); try!(stream.write(&slice[4..][..length])); Ok(()) }) @@ -482,7 +483,7 @@ fn host_kernel_worker(waiter: Waiter, let mut session = Session::new(congress); loop { - if !rpc_queue::empty() { + while !rpc_queue::empty() { try!(process_kern_queued_rpc(stream, &mut session)) } diff --git a/artiq/runtime/ksupport_glue.c b/artiq/runtime/ksupport_glue.c index 2fa056f5e..515635c94 100644 --- a/artiq/runtime/ksupport_glue.c +++ b/artiq/runtime/ksupport_glue.c @@ -8,7 +8,7 @@ void send_to_log(const char *ptr, size_t length); -#define KERNELCPU_EXEC_ADDRESS 0x40800080 +#define KERNELCPU_EXEC_ADDRESS 0x40800000 #define KERNELCPU_PAYLOAD_ADDRESS 0x40840000 #define KERNELCPU_LAST_ADDRESS 0x4fffffff #define KSUPPORT_HEADER_SIZE 0x80 diff --git a/artiq/test/coredevice/test_embedding.py b/artiq/test/coredevice/test_embedding.py index 683468b1d..8ad7cef63 100644 --- a/artiq/test/coredevice/test_embedding.py +++ b/artiq/test/coredevice/test_embedding.py @@ -217,6 +217,27 @@ class AnnotationTest(ExperimentCase): exp = self.create(_Annotation) self.assertEqual(exp.overflow(1), True) +class _Async(EnvExperiment): + def build(self): + self.setattr_device("core") + + @rpc(flags={"async"}) + def recv_async(self, data): + pass + + @kernel + def run(self): + # fast async path + self.recv_async([0]*128) + # slow async path + self.recv_async([0]*4096) + + +class AsyncTest(ExperimentCase): + def test_args(self): + exp = self.create(_RPCTypes) + exp.run() + class _Payload1MB(EnvExperiment): def build(self):