forked from M-Labs/artiq
subkernel: allow negative timeouts for no timeout
This commit is contained in:
parent
726cb092ca
commit
09462442f7
|
@ -2537,7 +2537,7 @@ class ARTIQIRGenerator(algorithm.Visitor):
|
||||||
timeout = self.visit(node.args[1])
|
timeout = self.visit(node.args[1])
|
||||||
elif len(node.args) == 1 and len(node.keywords) == 0:
|
elif len(node.args) == 1 and len(node.keywords) == 0:
|
||||||
fn = node.args[0].type
|
fn = node.args[0].type
|
||||||
timeout = ir.Constant(10_000, builtins.TInt64())
|
timeout = ir.Constant(-1, builtins.TInt64())
|
||||||
else:
|
else:
|
||||||
assert False
|
assert False
|
||||||
if types.is_method(fn):
|
if types.is_method(fn):
|
||||||
|
@ -2579,7 +2579,7 @@ class ARTIQIRGenerator(algorithm.Visitor):
|
||||||
if len(node.args) == 2 and len(node.keywords) == 0:
|
if len(node.args) == 2 and len(node.keywords) == 0:
|
||||||
name = node.args[0].s
|
name = node.args[0].s
|
||||||
vartype = node.args[1].value
|
vartype = node.args[1].value
|
||||||
timeout = ir.Constant(10_000, builtins.TInt64())
|
timeout = ir.Constant(-1, builtins.TInt64())
|
||||||
elif len(node.args) == 3 and len(node.keywords) == 0:
|
elif len(node.args) == 3 and len(node.keywords) == 0:
|
||||||
name = node.args[0].s
|
name = node.args[0].s
|
||||||
vartype = node.args[1].value
|
vartype = node.args[1].value
|
||||||
|
|
|
@ -498,7 +498,7 @@ extern fn subkernel_load_run(id: u32, destination: u8, run: bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[unwind(allowed)]
|
#[unwind(allowed)]
|
||||||
extern fn subkernel_await_finish(id: u32, timeout: u64) {
|
extern fn subkernel_await_finish(id: u32, timeout: i64) {
|
||||||
send(&SubkernelAwaitFinishRequest { id: id, timeout: timeout });
|
send(&SubkernelAwaitFinishRequest { id: id, timeout: timeout });
|
||||||
recv!(SubkernelAwaitFinishReply { status } => {
|
recv!(SubkernelAwaitFinishReply { status } => {
|
||||||
match status {
|
match status {
|
||||||
|
@ -528,7 +528,7 @@ extern fn subkernel_send_message(id: u32, is_return: bool, destination: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[unwind(allowed)]
|
#[unwind(allowed)]
|
||||||
extern fn subkernel_await_message(id: u32, timeout: u64, tags: &CSlice<u8>, min: u8, max: u8) -> u8 {
|
extern fn subkernel_await_message(id: u32, timeout: i64, tags: &CSlice<u8>, min: u8, max: u8) -> u8 {
|
||||||
send(&SubkernelMsgRecvRequest { id: id, timeout: timeout, tags: tags.as_ref() });
|
send(&SubkernelMsgRecvRequest { id: id, timeout: timeout, tags: tags.as_ref() });
|
||||||
recv!(SubkernelMsgRecvReply { status, count } => {
|
recv!(SubkernelMsgRecvReply { status, count } => {
|
||||||
match status {
|
match status {
|
||||||
|
|
|
@ -105,10 +105,10 @@ pub enum Message<'a> {
|
||||||
|
|
||||||
SubkernelLoadRunRequest { id: u32, destination: u8, run: bool },
|
SubkernelLoadRunRequest { id: u32, destination: u8, run: bool },
|
||||||
SubkernelLoadRunReply { succeeded: bool },
|
SubkernelLoadRunReply { succeeded: bool },
|
||||||
SubkernelAwaitFinishRequest { id: u32, timeout: u64 },
|
SubkernelAwaitFinishRequest { id: u32, timeout: i64 },
|
||||||
SubkernelAwaitFinishReply { status: SubkernelStatus },
|
SubkernelAwaitFinishReply { status: SubkernelStatus },
|
||||||
SubkernelMsgSend { id: u32, destination: Option<u8>, count: u8, tag: &'a [u8], data: *const *const () },
|
SubkernelMsgSend { id: u32, destination: Option<u8>, count: u8, tag: &'a [u8], data: *const *const () },
|
||||||
SubkernelMsgRecvRequest { id: u32, timeout: u64, tags: &'a [u8] },
|
SubkernelMsgRecvRequest { id: u32, timeout: i64, tags: &'a [u8] },
|
||||||
SubkernelMsgRecvReply { status: SubkernelStatus, count: u8 },
|
SubkernelMsgRecvReply { status: SubkernelStatus, count: u8 },
|
||||||
|
|
||||||
Log(fmt::Arguments<'a>),
|
Log(fmt::Arguments<'a>),
|
||||||
|
|
|
@ -279,7 +279,7 @@ pub mod subkernel {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn await_finish(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex,
|
pub fn await_finish(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex,
|
||||||
routing_table: &RoutingTable, id: u32, timeout: u64) -> Result<SubkernelFinished, Error> {
|
routing_table: &RoutingTable, id: u32, timeout: i64) -> Result<SubkernelFinished, Error> {
|
||||||
{
|
{
|
||||||
let _lock = subkernel_mutex.lock(io)?;
|
let _lock = subkernel_mutex.lock(io)?;
|
||||||
match unsafe { SUBKERNELS.get(&id).unwrap().state } {
|
match unsafe { SUBKERNELS.get(&id).unwrap().state } {
|
||||||
|
@ -291,7 +291,7 @@ pub mod subkernel {
|
||||||
}
|
}
|
||||||
let max_time = clock::get_ms() + timeout as u64;
|
let max_time = clock::get_ms() + timeout as u64;
|
||||||
let _res = io.until(|| {
|
let _res = io.until(|| {
|
||||||
if clock::get_ms() > max_time {
|
if timeout > 0 && clock::get_ms() > max_time {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if subkernel_mutex.test_lock() {
|
if subkernel_mutex.test_lock() {
|
||||||
|
@ -305,7 +305,7 @@ pub mod subkernel {
|
||||||
_ => false
|
_ => false
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
if clock::get_ms() > max_time {
|
if timeout > 0 && clock::get_ms() > max_time {
|
||||||
error!("Remote subkernel finish await timed out");
|
error!("Remote subkernel finish await timed out");
|
||||||
return Err(Error::Timeout);
|
return Err(Error::Timeout);
|
||||||
}
|
}
|
||||||
|
@ -360,7 +360,7 @@ pub mod subkernel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn message_await(io: &Io, subkernel_mutex: &Mutex, id: u32, timeout: u64
|
pub fn message_await(io: &Io, subkernel_mutex: &Mutex, id: u32, timeout: i64
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let is_subkernel = {
|
let is_subkernel = {
|
||||||
let _lock = subkernel_mutex.lock(io)?;
|
let _lock = subkernel_mutex.lock(io)?;
|
||||||
|
@ -379,7 +379,7 @@ pub mod subkernel {
|
||||||
};
|
};
|
||||||
let max_time = clock::get_ms() + timeout as u64;
|
let max_time = clock::get_ms() + timeout as u64;
|
||||||
let message = io.until_ok(|| {
|
let message = io.until_ok(|| {
|
||||||
if clock::get_ms() > max_time {
|
if timeout > 0 && clock::get_ms() > max_time {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
if subkernel_mutex.test_lock() {
|
if subkernel_mutex.test_lock() {
|
||||||
|
|
|
@ -63,10 +63,10 @@ enum KernelState {
|
||||||
Absent,
|
Absent,
|
||||||
Loaded,
|
Loaded,
|
||||||
Running,
|
Running,
|
||||||
MsgAwait { id: u32, max_time: u64, tags: Vec<u8> },
|
MsgAwait { id: u32, max_time: i64, tags: Vec<u8> },
|
||||||
MsgSending,
|
MsgSending,
|
||||||
SubkernelAwaitLoad,
|
SubkernelAwaitLoad,
|
||||||
SubkernelAwaitFinish { max_time: u64, id: u32 },
|
SubkernelAwaitFinish { max_time: i64, id: u32 },
|
||||||
DmaUploading { max_time: u64 },
|
DmaUploading { max_time: u64 },
|
||||||
DmaAwait { max_time: u64 },
|
DmaAwait { max_time: u64 },
|
||||||
}
|
}
|
||||||
|
@ -547,7 +547,7 @@ impl Manager {
|
||||||
fn process_external_messages(&mut self) -> Result<(), Error> {
|
fn process_external_messages(&mut self) -> Result<(), Error> {
|
||||||
match &self.session.kernel_state {
|
match &self.session.kernel_state {
|
||||||
KernelState::MsgAwait { id, max_time, tags } => {
|
KernelState::MsgAwait { id, max_time, tags } => {
|
||||||
if clock::get_ms() > *max_time {
|
if *max_time > 0 && clock::get_ms() > *max_time as u64 {
|
||||||
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?;
|
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?;
|
||||||
self.session.kernel_state = KernelState::Running;
|
self.session.kernel_state = KernelState::Running;
|
||||||
return Ok(())
|
return Ok(())
|
||||||
|
@ -570,7 +570,7 @@ impl Manager {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
KernelState::SubkernelAwaitFinish { max_time, id } => {
|
KernelState::SubkernelAwaitFinish { max_time, id } => {
|
||||||
if clock::get_ms() > *max_time {
|
if *max_time > 0 && clock::get_ms() > *max_time as u64 {
|
||||||
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::Timeout })?;
|
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::Timeout })?;
|
||||||
self.session.kernel_state = KernelState::Running;
|
self.session.kernel_state = KernelState::Running;
|
||||||
} else {
|
} else {
|
||||||
|
@ -779,7 +779,8 @@ impl Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
&kern::SubkernelMsgRecvRequest { id, timeout, tags } => {
|
&kern::SubkernelMsgRecvRequest { id, timeout, tags } => {
|
||||||
let max_time = clock::get_ms() + timeout as u64;
|
// negative timeout value means no timeout
|
||||||
|
let max_time = if timeout > 0 { clock::get_ms() as i64 + timeout } else { timeout };
|
||||||
self.session.kernel_state = KernelState::MsgAwait {
|
self.session.kernel_state = KernelState::MsgAwait {
|
||||||
id: id, max_time: max_time, tags: tags.to_vec() };
|
id: id, max_time: max_time, tags: tags.to_vec() };
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -794,7 +795,7 @@ impl Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
&kern::SubkernelAwaitFinishRequest{ id, timeout } => {
|
&kern::SubkernelAwaitFinishRequest{ id, timeout } => {
|
||||||
let max_time = clock::get_ms() + timeout as u64;
|
let max_time = if timeout > 0 { clock::get_ms() as i64 + timeout } else { timeout };
|
||||||
self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id };
|
self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id };
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -304,7 +304,7 @@ For example, a subkernel performing integer addition: ::
|
||||||
result = subkernel_await(subkernel_add)
|
result = subkernel_await(subkernel_add)
|
||||||
assert result == 4
|
assert result == 4
|
||||||
|
|
||||||
Sometimes the subkernel execution may take more time - and the await has a default timeout of 10000 milliseconds (10 seconds). It can be adjusted, as ``subkernel_await()`` accepts an optional timeout argument. If the given value is negative, timeout is disabled.
|
Sometimes the subkernel execution may take more time. By default, the await function will wait forever. However, if timeout is needed it can be set, as ``subkernel_await()`` accepts an optional argument. The value is interpreted in milliseconds and if it is negative, timeout is disabled.
|
||||||
|
|
||||||
Subkernels are compiled after the main kernel, and then immediately uploaded to satellites. When called, master instructs the appropriate satellite to load the subkernel into their kernel core and to run it. If the subkernel is complex, and its binary relatively big, the delay between the call and actually running the subkernel may be substantial; if that delay has to be minimized, ``subkernel_preload(function)`` should be used before the call.
|
Subkernels are compiled after the main kernel, and then immediately uploaded to satellites. When called, master instructs the appropriate satellite to load the subkernel into their kernel core and to run it. If the subkernel is complex, and its binary relatively big, the delay between the call and actually running the subkernel may be substantial; if that delay has to be minimized, ``subkernel_preload(function)`` should be used before the call.
|
||||||
|
|
||||||
|
@ -351,26 +351,26 @@ In general, subkernels do not have to be awaited, but awaiting is required to re
|
||||||
Message passing
|
Message passing
|
||||||
^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
Subkernels besides arguments and returns, can also pass messages between each other or the master with built-in ``subkernel_send`` and ``subkernel_recv`` functions. This can be used for communication between subkernels, passing additional data, or partially computed data. Consider the following example: ::
|
Subkernels besides arguments and returns, can also pass messages between each other or the master with built-in ``subkernel_send()`` and ``subkernel_recv()`` functions. This can be used for communication between subkernels, passing additional data, or partially computed data. Consider the following example: ::
|
||||||
|
|
||||||
from artiq.experiment import *
|
from artiq.experiment import *
|
||||||
|
|
||||||
|
@subkernel(destination=1)
|
||||||
|
def simple_message() -> TInt32:
|
||||||
|
data = subkernel_recv("message", TInt32)
|
||||||
|
return data + 20
|
||||||
|
|
||||||
class MessagePassing(EnvExperiment):
|
class MessagePassing(EnvExperiment):
|
||||||
def build(self):
|
def build(self):
|
||||||
self.setattr_device("core")
|
self.setattr_device("core")
|
||||||
|
|
||||||
@subkernel(destination=1)
|
|
||||||
def simple_self(self) -> TInt32:
|
|
||||||
data = subkernel_recv("message", TInt32)
|
|
||||||
return data + 20
|
|
||||||
|
|
||||||
@kernel
|
@kernel
|
||||||
def run(self):
|
def run(self):
|
||||||
self.simple_self()
|
simple_self()
|
||||||
subkernel_send(1, "message", 150)
|
subkernel_send(1, "message", 150)
|
||||||
result = subkernel_await(self.simple_self)
|
result = subkernel_await(simple_self)
|
||||||
assert result == 170
|
assert result == 170
|
||||||
|
|
||||||
The ``subkernel_send`` function accepts three arguments: destination, name of the message that will be linked with the ``subkernel_recv``, and the value.
|
The ``subkernel_send(destination, name, value)`` function requires three arguments: destination, name of the message that will be linked with the ``subkernel_recv()``, and the passed value.
|
||||||
|
|
||||||
The ``subkernel_recv`` function accepts two obligatory arguments: message name (matching the name provided in ``subkernel_send``) and expected type; and optionally, a third argument - timeout for the operation in milliseconds. If the value is negative, timeout is disabled. The default value is -1 (no timeout). The type between the two functions with the same name must match.
|
The ``subkernel_recv(name, type, [timeout])`` function requires two arguments: message name (matching the name provided in ``subkernel_send``) and expected type. Optionally, it accepts a third argument - timeout for the operation in milliseconds. If the value is negative, timeout is disabled. The default value is no timeout. The value and declared types between the function pair with the same name must match.
|
Loading…
Reference in New Issue