forked from M-Labs/artiq
runtime: fix remaining async RPC bugs.
This commit is contained in:
parent
2095d01b84
commit
6fcd57a41a
|
@ -413,23 +413,27 @@ class CommGeneric:
|
||||||
def _serve_rpc(self, embedding_map):
|
def _serve_rpc(self, embedding_map):
|
||||||
async = self._read_bool()
|
async = self._read_bool()
|
||||||
service_id = self._read_int32()
|
service_id = self._read_int32()
|
||||||
service = embedding_map.retrieve_object(service_id)
|
|
||||||
args, kwargs = self._receive_rpc_args(embedding_map)
|
args, kwargs = self._receive_rpc_args(embedding_map)
|
||||||
return_tags = self._read_bytes()
|
return_tags = self._read_bytes()
|
||||||
|
|
||||||
|
if service_id is 0:
|
||||||
|
service = lambda obj, attr, value: setattr(obj, attr, value)
|
||||||
|
else:
|
||||||
|
service = embedding_map.retrieve_object(service_id)
|
||||||
logger.debug("rpc service: [%d]%r%s %r %r -> %s", service_id, service,
|
logger.debug("rpc service: [%d]%r%s %r %r -> %s", service_id, service,
|
||||||
(" (async)" if async else ""), args, kwargs, return_tags)
|
(" (async)" if async else ""), args, kwargs, return_tags)
|
||||||
|
|
||||||
|
if async:
|
||||||
|
service(*args, **kwargs)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = service(*args, **kwargs)
|
result = service(*args, **kwargs)
|
||||||
if async:
|
logger.debug("rpc service: %d %r %r = %r", service_id, args, kwargs, result)
|
||||||
return
|
|
||||||
else:
|
|
||||||
self._write_header(_H2DMsgType.RPC_REPLY)
|
self._write_header(_H2DMsgType.RPC_REPLY)
|
||||||
self._write_bytes(return_tags)
|
self._write_bytes(return_tags)
|
||||||
self._send_rpc_value(bytearray(return_tags), result, result, service)
|
self._send_rpc_value(bytearray(return_tags), result, result, service)
|
||||||
|
|
||||||
logger.debug("rpc service: %d %r %r = %r", service_id, args, kwargs, result)
|
|
||||||
|
|
||||||
except Exception as exn:
|
except Exception as exn:
|
||||||
logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn)
|
logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn)
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ path = "src/lib.rs"
|
||||||
std_artiq = { path = "libstd_artiq", features = ["alloc"] }
|
std_artiq = { path = "libstd_artiq", features = ["alloc"] }
|
||||||
lwip = { path = "liblwip", default-features = false }
|
lwip = { path = "liblwip", default-features = false }
|
||||||
fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] }
|
fringe = { version = "= 1.1.0", default-features = false, features = ["alloc"] }
|
||||||
log = { version = "0.3", default-features = false, features = [] }
|
log = { version = "0.3", default-features = false, features = ["max_level_debug"] }
|
||||||
log_buffer = { version = "1.0" }
|
log_buffer = { version = "1.0" }
|
||||||
byteorder = { version = "0.5", default-features = false }
|
byteorder = { version = "0.5", default-features = false }
|
||||||
|
|
||||||
|
|
|
@ -12,8 +12,6 @@ extern crate byteorder;
|
||||||
mod board;
|
mod board;
|
||||||
#[path = "../src/mailbox.rs"]
|
#[path = "../src/mailbox.rs"]
|
||||||
mod mailbox;
|
mod mailbox;
|
||||||
#[path = "../src/rpc_queue.rs"]
|
|
||||||
mod rpc_queue;
|
|
||||||
|
|
||||||
#[path = "../src/proto.rs"]
|
#[path = "../src/proto.rs"]
|
||||||
mod proto;
|
mod proto;
|
||||||
|
@ -72,7 +70,7 @@ macro_rules! recv {
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! print {
|
macro_rules! print {
|
||||||
($($arg:tt)*) => ($crate::send(&Log(format_args!($($arg)*))));
|
($($arg:tt)*) => ($crate::send(&$crate::kernel_proto::Log(format_args!($($arg)*))));
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! println {
|
macro_rules! println {
|
||||||
|
@ -80,6 +78,9 @@ macro_rules! println {
|
||||||
($fmt:expr, $($arg:tt)*) => (print!(concat!($fmt, "\n"), $($arg)*));
|
($fmt:expr, $($arg:tt)*) => (print!(concat!($fmt, "\n"), $($arg)*));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[path = "../src/rpc_queue.rs"]
|
||||||
|
mod rpc_queue;
|
||||||
|
|
||||||
#[lang = "panic_fmt"]
|
#[lang = "panic_fmt"]
|
||||||
extern fn panic_fmt(args: core::fmt::Arguments, file: &'static str, line: u32) -> ! {
|
extern fn panic_fmt(args: core::fmt::Arguments, file: &'static str, line: u32) -> ! {
|
||||||
println!("panic at {}:{}: {}", file, line, args);
|
println!("panic at {}:{}: {}", file, line, args);
|
||||||
|
|
|
@ -471,6 +471,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream,
|
||||||
trace!("comm<-kern (async RPC)");
|
trace!("comm<-kern (async RPC)");
|
||||||
let length = NetworkEndian::read_u32(slice) as usize;
|
let length = NetworkEndian::read_u32(slice) as usize;
|
||||||
try!(host_write(stream, host::Reply::RpcRequest { async: true }));
|
try!(host_write(stream, host::Reply::RpcRequest { async: true }));
|
||||||
|
trace!("{:?}" ,&slice[4..][..length]);
|
||||||
try!(stream.write(&slice[4..][..length]));
|
try!(stream.write(&slice[4..][..length]));
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
@ -482,7 +483,7 @@ fn host_kernel_worker(waiter: Waiter,
|
||||||
let mut session = Session::new(congress);
|
let mut session = Session::new(congress);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if !rpc_queue::empty() {
|
while !rpc_queue::empty() {
|
||||||
try!(process_kern_queued_rpc(stream, &mut session))
|
try!(process_kern_queued_rpc(stream, &mut session))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
void send_to_log(const char *ptr, size_t length);
|
void send_to_log(const char *ptr, size_t length);
|
||||||
|
|
||||||
#define KERNELCPU_EXEC_ADDRESS 0x40800080
|
#define KERNELCPU_EXEC_ADDRESS 0x40800000
|
||||||
#define KERNELCPU_PAYLOAD_ADDRESS 0x40840000
|
#define KERNELCPU_PAYLOAD_ADDRESS 0x40840000
|
||||||
#define KERNELCPU_LAST_ADDRESS 0x4fffffff
|
#define KERNELCPU_LAST_ADDRESS 0x4fffffff
|
||||||
#define KSUPPORT_HEADER_SIZE 0x80
|
#define KSUPPORT_HEADER_SIZE 0x80
|
||||||
|
|
|
@ -217,6 +217,27 @@ class AnnotationTest(ExperimentCase):
|
||||||
exp = self.create(_Annotation)
|
exp = self.create(_Annotation)
|
||||||
self.assertEqual(exp.overflow(1), True)
|
self.assertEqual(exp.overflow(1), True)
|
||||||
|
|
||||||
|
class _Async(EnvExperiment):
|
||||||
|
def build(self):
|
||||||
|
self.setattr_device("core")
|
||||||
|
|
||||||
|
@rpc(flags={"async"})
|
||||||
|
def recv_async(self, data):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@kernel
|
||||||
|
def run(self):
|
||||||
|
# fast async path
|
||||||
|
self.recv_async([0]*128)
|
||||||
|
# slow async path
|
||||||
|
self.recv_async([0]*4096)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncTest(ExperimentCase):
|
||||||
|
def test_args(self):
|
||||||
|
exp = self.create(_RPCTypes)
|
||||||
|
exp.run()
|
||||||
|
|
||||||
|
|
||||||
class _Payload1MB(EnvExperiment):
|
class _Payload1MB(EnvExperiment):
|
||||||
def build(self):
|
def build(self):
|
||||||
|
|
Loading…
Reference in New Issue