forked from M-Labs/artiq
1
0
Fork 0

runtime: fix multiple async RPC bugs.

This commit is contained in:
whitequark 2016-11-01 06:51:44 +00:00
parent 636d4efe81
commit c1e6d4b67c
6 changed files with 27 additions and 15 deletions

View File

@ -141,6 +141,9 @@ class CommGeneric:
(value, ) = struct.unpack(">d", self._read_chunk(8)) (value, ) = struct.unpack(">d", self._read_chunk(8))
return value return value
def _read_bool(self):
return True if self._read_int8() else False
def _read_bytes(self): def _read_bytes(self):
return self._read_chunk(self._read_int32()) return self._read_chunk(self._read_int32())
@ -177,6 +180,9 @@ class CommGeneric:
def _write_float64(self, value): def _write_float64(self, value):
self.write(struct.pack(">d", value)) self.write(struct.pack(">d", value))
def _write_bool(self, value):
self.write(struct.pack("B", value))
def _write_bytes(self, value): def _write_bytes(self, value):
self._write_int32(len(value)) self._write_int32(len(value))
self.write(value) self.write(value)
@ -405,24 +411,25 @@ class CommGeneric:
raise IOError("Unknown RPC value tag: {}".format(repr(tag))) raise IOError("Unknown RPC value tag: {}".format(repr(tag)))
def _serve_rpc(self, embedding_map): def _serve_rpc(self, embedding_map):
service_id = self._read_int32() async = self._read_bool()
if service_id == 0: service_id = self._read_int32()
service = lambda obj, attr, value: setattr(obj, attr, value) service = embedding_map.retrieve_object(service_id)
else:
service = embedding_map.retrieve_object(service_id)
args, kwargs = self._receive_rpc_args(embedding_map) args, kwargs = self._receive_rpc_args(embedding_map)
return_tags = self._read_bytes() return_tags = self._read_bytes()
logger.debug("rpc service: [%d]%r %r %r -> %s", service_id, service, args, kwargs, return_tags) logger.debug("rpc service: [%d]%r%s %r %r -> %s", service_id, service,
(" (async)" if async else ""), args, kwargs, return_tags)
try: try:
result = service(*args, **kwargs) result = service(*args, **kwargs)
logger.debug("rpc service: %d %r %r == %r", service_id, args, kwargs, result) if async:
return
if service_id != 0: else:
self._write_header(_H2DMsgType.RPC_REPLY) self._write_header(_H2DMsgType.RPC_REPLY)
self._write_bytes(return_tags) self._write_bytes(return_tags)
self._send_rpc_value(bytearray(return_tags), result, result, service) self._send_rpc_value(bytearray(return_tags), result, result, service)
logger.debug("rpc service: %d %r %r = %r", service_id, args, kwargs, result)
except Exception as exn: except Exception as exn:
logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn) logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn)

View File

@ -103,6 +103,7 @@ class Core:
def run(self, function, args, kwargs): def run(self, function, args, kwargs):
result = None result = None
@rpc(flags={"async"})
def set_result(new_result): def set_result(new_result):
nonlocal result nonlocal result
result = new_result result = new_result

View File

@ -1,6 +1,7 @@
use core::ptr; use core::ptr;
use board::csr; use board::csr;
use mailbox; use mailbox;
use rpc_queue;
use kernel_proto::{KERNELCPU_EXEC_ADDRESS, KERNELCPU_LAST_ADDRESS, KSUPPORT_HEADER_SIZE}; use kernel_proto::{KERNELCPU_EXEC_ADDRESS, KERNELCPU_LAST_ADDRESS, KSUPPORT_HEADER_SIZE};
@ -16,6 +17,8 @@ pub unsafe fn start() {
ptr::copy_nonoverlapping(ksupport_image.as_ptr(), ksupport_addr, ksupport_image.len()); ptr::copy_nonoverlapping(ksupport_image.as_ptr(), ksupport_addr, ksupport_image.len());
csr::kernel_cpu::reset_write(0); csr::kernel_cpu::reset_write(0);
rpc_queue::init();
} }
pub fn stop() { pub fn stop() {

View File

@ -13,7 +13,7 @@ const QUEUE_CHUNK: usize = 0x1000;
pub unsafe fn init() { pub unsafe fn init() {
write_volatile(SEND_MAILBOX, QUEUE_BEGIN); write_volatile(SEND_MAILBOX, QUEUE_BEGIN);
write_volatile(RECV_MAILBOX, QUEUE_END); write_volatile(RECV_MAILBOX, QUEUE_BEGIN);
} }
fn next(mut addr: usize) -> usize { fn next(mut addr: usize) -> usize {

View File

@ -389,7 +389,7 @@ fn process_kern_message(waiter: Waiter,
match stream { match stream {
None => unexpected!("unexpected RPC in flash kernel"), None => unexpected!("unexpected RPC in flash kernel"),
Some(ref mut stream) => { Some(ref mut stream) => {
try!(host_write(stream, host::Reply::RpcRequest)); try!(host_write(stream, host::Reply::RpcRequest { async: async }));
try!(rpc::send_args(&mut BufWriter::new(stream), service, tag, data)); try!(rpc::send_args(&mut BufWriter::new(stream), service, tag, data));
if !async { if !async {
session.kernel_state = KernelState::RpcWait session.kernel_state = KernelState::RpcWait
@ -470,7 +470,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream,
rpc_queue::dequeue(|slice| { rpc_queue::dequeue(|slice| {
trace!("comm<-kern (async RPC)"); trace!("comm<-kern (async RPC)");
let length = NetworkEndian::read_u32(slice) as usize; let length = NetworkEndian::read_u32(slice) as usize;
try!(host_write(stream, host::Reply::RpcRequest)); try!(host_write(stream, host::Reply::RpcRequest { async: true }));
try!(stream.write(&slice[4..][..length])); try!(stream.write(&slice[4..][..length]));
Ok(()) Ok(())
}) })

View File

@ -107,7 +107,7 @@ pub enum Reply<'a> {
backtrace: &'a [usize] backtrace: &'a [usize]
}, },
RpcRequest, RpcRequest { async: bool },
FlashRead(&'a [u8]), FlashRead(&'a [u8]),
FlashOk, FlashOk,
@ -170,8 +170,9 @@ impl<'a> Reply<'a> {
} }
}, },
Reply::RpcRequest => { Reply::RpcRequest { async } => {
try!(write_u8(writer, 10)); try!(write_u8(writer, 10));
try!(write_u8(writer, async as u8));
}, },
Reply::FlashRead(ref bytes) => { Reply::FlashRead(ref bytes) => {