1
0
forked from M-Labs/artiq

runtime: use the destination passed by kernel

This commit is contained in:
mwojcik 2024-01-24 11:51:05 +08:00 committed by Sébastien Bourdeauducq
parent c087a47e45
commit 171c7a6e11
2 changed files with 18 additions and 9 deletions

View File

@ -332,8 +332,9 @@ pub mod subkernel {
Err(_) => return,
};
let subkernel = unsafe { SUBKERNELS.get(&id) };
if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running {
// do not add messages for non-existing, non-running or deleted subkernels
if subkernel.is_some() && subkernel.unwrap().state != SubkernelState::Running {
warn!("received a message for a non-running subkernel #{}", id);
// do not add messages for non-running or deleted subkernels
return
}
if status.is_first() {
@ -361,8 +362,10 @@ pub mod subkernel {
pub fn message_await(io: &Io, subkernel_mutex: &Mutex, id: u32, timeout: u64
) -> Result<Message, Error> {
{
let is_subkernel = {
let _lock = subkernel_mutex.lock(io)?;
let is_subkernel = unsafe { SUBKERNELS.get(&id).is_some() };
if is_subkernel {
match unsafe { SUBKERNELS.get(&id).unwrap().state } {
SubkernelState::Finished { status: FinishStatus::Ok } |
SubkernelState::Running => (),
@ -372,6 +375,8 @@ pub mod subkernel {
_ => return Err(Error::IncorrectState)
}
}
is_subkernel
};
let max_time = clock::get_ms() + timeout as u64;
let message = io.until_ok(|| {
if clock::get_ms() > max_time {
@ -387,10 +392,12 @@ pub mod subkernel {
return Ok(Some(unsafe { MESSAGE_QUEUE.remove(i) }));
}
}
if is_subkernel {
match unsafe { SUBKERNELS.get(&id).unwrap().state } {
SubkernelState::Finished { status: FinishStatus::CommLost } |
SubkernelState::Finished { status: FinishStatus::Exception(_) } => return Ok(None),
_ => ()
}
}
Err(())
});
@ -412,15 +419,17 @@ pub mod subkernel {
}
pub fn message_send<'a>(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex,
routing_table: &RoutingTable, id: u32, count: u8, tag: &'a [u8], message: *const *const ()
routing_table: &RoutingTable, id: u32, destination: Option<u8>, count: u8, tag: &'a [u8], message: *const *const ()
) -> Result<(), Error> {
let mut writer = Cursor::new(Vec::new());
let _lock = subkernel_mutex.lock(io)?;
let destination = unsafe { SUBKERNELS.get(&id).unwrap().destination };
// reuse rpc code for sending arbitrary data
rpc::send_args(&mut writer, 0, tag, message, false)?;
// skip service tag, but overwrite first byte with tag count
let destination = destination.unwrap_or_else(|| {
let _lock = subkernel_mutex.lock(io).unwrap();
unsafe { SUBKERNELS.get(&id).unwrap().destination }
}
);
let data = &mut writer.into_inner()[3..];
data[0] = count;
Ok(drtio::subkernel_send_message(

View File

@ -704,8 +704,8 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
kern_send(io, &kern::SubkernelAwaitFinishReply { status: status })
}
#[cfg(has_drtio)]
&kern::SubkernelMsgSend { id, destination: _, count, tag, data } => {
subkernel::message_send(io, aux_mutex, _subkernel_mutex, routing_table, id, count, tag, data)?;
&kern::SubkernelMsgSend { id, destination, count, tag, data } => {
subkernel::message_send(io, aux_mutex, _subkernel_mutex, routing_table, id, destination, count, tag, data)?;
kern_acknowledge()
}
#[cfg(has_drtio)]