From c1e6d4b67c16b9f8d5013eb5c9e9aa55952bdab7 Mon Sep 17 00:00:00 2001 From: whitequark Date: Tue, 1 Nov 2016 06:51:44 +0000 Subject: [PATCH] runtime: fix multiple async RPC bugs. --- artiq/coredevice/comm_generic.py | 27 +++++++++++++++++---------- artiq/coredevice/core.py | 1 + artiq/runtime.rs/src/kernel.rs | 3 +++ artiq/runtime.rs/src/rpc_queue.rs | 2 +- artiq/runtime.rs/src/session.rs | 4 ++-- artiq/runtime.rs/src/session_proto.rs | 5 +++-- 6 files changed, 27 insertions(+), 15 deletions(-) diff --git a/artiq/coredevice/comm_generic.py b/artiq/coredevice/comm_generic.py index 3163b1ba8..30804d541 100644 --- a/artiq/coredevice/comm_generic.py +++ b/artiq/coredevice/comm_generic.py @@ -141,6 +141,9 @@ class CommGeneric: (value, ) = struct.unpack(">d", self._read_chunk(8)) return value + def _read_bool(self): + return True if self._read_int8() else False + def _read_bytes(self): return self._read_chunk(self._read_int32()) @@ -177,6 +180,9 @@ class CommGeneric: def _write_float64(self, value): self.write(struct.pack(">d", value)) + def _write_bool(self, value): + self.write(struct.pack("B", value)) + def _write_bytes(self, value): self._write_int32(len(value)) self.write(value) @@ -405,24 +411,25 @@ class CommGeneric: raise IOError("Unknown RPC value tag: {}".format(repr(tag))) def _serve_rpc(self, embedding_map): - service_id = self._read_int32() - if service_id == 0: - service = lambda obj, attr, value: setattr(obj, attr, value) - else: - service = embedding_map.retrieve_object(service_id) - + async = self._read_bool() + service_id = self._read_int32() + service = embedding_map.retrieve_object(service_id) args, kwargs = self._receive_rpc_args(embedding_map) return_tags = self._read_bytes() - logger.debug("rpc service: [%d]%r %r %r -> %s", service_id, service, args, kwargs, return_tags) + logger.debug("rpc service: [%d]%r%s %r %r -> %s", service_id, service, + (" (async)" if async else ""), args, kwargs, return_tags) try: result = service(*args, **kwargs) - logger.debug("rpc service: %d %r %r == %r", service_id, args, kwargs, result) - - if service_id != 0: + if async: + return + else: self._write_header(_H2DMsgType.RPC_REPLY) self._write_bytes(return_tags) self._send_rpc_value(bytearray(return_tags), result, result, service) + + logger.debug("rpc service: %d %r %r = %r", service_id, args, kwargs, result) + except Exception as exn: logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn) diff --git a/artiq/coredevice/core.py b/artiq/coredevice/core.py index 8f9330287..1b1b7b3a4 100644 --- a/artiq/coredevice/core.py +++ b/artiq/coredevice/core.py @@ -103,6 +103,7 @@ class Core: def run(self, function, args, kwargs): result = None + @rpc(flags={"async"}) def set_result(new_result): nonlocal result result = new_result diff --git a/artiq/runtime.rs/src/kernel.rs b/artiq/runtime.rs/src/kernel.rs index e50902cf5..5d7e17696 100644 --- a/artiq/runtime.rs/src/kernel.rs +++ b/artiq/runtime.rs/src/kernel.rs @@ -1,6 +1,7 @@ use core::ptr; use board::csr; use mailbox; +use rpc_queue; use kernel_proto::{KERNELCPU_EXEC_ADDRESS, KERNELCPU_LAST_ADDRESS, KSUPPORT_HEADER_SIZE}; @@ -16,6 +17,8 @@ pub unsafe fn start() { ptr::copy_nonoverlapping(ksupport_image.as_ptr(), ksupport_addr, ksupport_image.len()); csr::kernel_cpu::reset_write(0); + + rpc_queue::init(); } pub fn stop() { diff --git a/artiq/runtime.rs/src/rpc_queue.rs b/artiq/runtime.rs/src/rpc_queue.rs index 35fbe0df2..3155813eb 100644 --- a/artiq/runtime.rs/src/rpc_queue.rs +++ b/artiq/runtime.rs/src/rpc_queue.rs @@ -13,7 +13,7 @@ const QUEUE_CHUNK: usize = 0x1000; pub unsafe fn init() { write_volatile(SEND_MAILBOX, QUEUE_BEGIN); - write_volatile(RECV_MAILBOX, QUEUE_END); + write_volatile(RECV_MAILBOX, QUEUE_BEGIN); } fn next(mut addr: usize) -> usize { diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index dacb14d67..5a7bb76c2 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -389,7 +389,7 @@ fn process_kern_message(waiter: Waiter, match stream { None => unexpected!("unexpected RPC in flash kernel"), Some(ref mut stream) => { - try!(host_write(stream, host::Reply::RpcRequest)); + try!(host_write(stream, host::Reply::RpcRequest { async: async })); try!(rpc::send_args(&mut BufWriter::new(stream), service, tag, data)); if !async { session.kernel_state = KernelState::RpcWait @@ -470,7 +470,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream, rpc_queue::dequeue(|slice| { trace!("comm<-kern (async RPC)"); let length = NetworkEndian::read_u32(slice) as usize; - try!(host_write(stream, host::Reply::RpcRequest)); + try!(host_write(stream, host::Reply::RpcRequest { async: true })); try!(stream.write(&slice[4..][..length])); Ok(()) }) diff --git a/artiq/runtime.rs/src/session_proto.rs b/artiq/runtime.rs/src/session_proto.rs index 2ee656c35..fd48c46b3 100644 --- a/artiq/runtime.rs/src/session_proto.rs +++ b/artiq/runtime.rs/src/session_proto.rs @@ -107,7 +107,7 @@ pub enum Reply<'a> { backtrace: &'a [usize] }, - RpcRequest, + RpcRequest { async: bool }, FlashRead(&'a [u8]), FlashOk, @@ -170,8 +170,9 @@ impl<'a> Reply<'a> { } }, - Reply::RpcRequest => { + Reply::RpcRequest { async } => { try!(write_u8(writer, 10)); + try!(write_u8(writer, async as u8)); }, Reply::FlashRead(ref bytes) => {