Compare commits

..

3 Commits

7 changed files with 28 additions and 23 deletions

View File

@ -170,6 +170,7 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32, _uses_ddma: bool) {
csr::rtio_dma::base_address_write(ptr as u32);
csr::rtio_dma::time_offset_write(timestamp as u64);
let old_cri_master = csr::cri_con::selected_read();
csr::cri_con::selected_write(1);
csr::rtio_dma::enable_write(1);
#[cfg(has_drtio)]
@ -183,7 +184,7 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32, _uses_ddma: bool) {
});
}
while csr::rtio_dma::enable_read() != 0 {}
csr::cri_con::selected_write(0);
csr::cri_con::selected_write(old_cri_master);
let error = csr::rtio_dma::error_read();
if error != 0 {

View File

@ -103,7 +103,7 @@ pub enum Message {
SubkernelMsgSent,
#[cfg(has_drtio)]
SubkernelMsgRecvRequest {
id: u32,
id: i32,
timeout: i64,
tags: Vec<u8>,
},

View File

@ -25,7 +25,7 @@ pub extern "C" fn load_run(id: u32, destination: u8, run: bool) {
}
}
pub extern "C" fn await_finish(id: u32, timeout: u64) {
pub extern "C" fn await_finish(id: u32, timeout: i64) {
unsafe {
KERNEL_CHANNEL_1TO0
.as_mut()
@ -80,7 +80,7 @@ pub extern "C" fn send_message(
}
}
pub extern "C" fn await_message(id: u32, timeout: u64, tags: &CSlice<u8>, min: u8, max: u8) {
pub extern "C" fn await_message(id: i32, timeout: i64, tags: &CSlice<u8>, min: u8, max: u8) {
unsafe {
KERNEL_CHANNEL_1TO0
.as_mut()

View File

@ -453,10 +453,10 @@ async fn handle_run_kernel(
#[cfg(has_drtio)]
kernel::Message::SubkernelMsgSend {
id,
destination: _,
destination,
data,
} => {
let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await;
let res = subkernel::message_send(aux_mutex, routing_table, timer, id, destination.unwrap(), data).await;
match res {
Ok(_) => (),
Err(e) => {
@ -471,7 +471,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => {
let message_received = subkernel::message_await(id, timeout, timer).await;
let message_received = subkernel::message_await(id as u32, timeout, timer).await;
let (status, count) = match message_received {
Ok(ref message) => (kernel::SubkernelStatus::NoError, message.count),
Err(SubkernelError::Timeout) => (kernel::SubkernelStatus::Timeout, 0),
@ -480,7 +480,7 @@ async fn handle_run_kernel(
Err(SubkernelError::SubkernelException) => {
error!("Exception in subkernel");
// just retrieve the exception
let status = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout)
let status = subkernel::await_finish(aux_mutex, routing_table, timer, id as u32, timeout)
.await
.unwrap();
match stream {

View File

@ -5,7 +5,7 @@ use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::mutex::Mutex;
use log::error;
use log::{error, warn};
use crate::rtio_mgt::drtio;
@ -176,7 +176,7 @@ pub async fn await_finish(
_ => return Err(Error::IncorrectState),
}
if timeout > 0 {
let max_time = timer.get_time() + Milliseconds(timeout);
let max_time = timer.get_time() + Milliseconds(timeout as u64);
while timer.get_time() < max_time {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished { .. } => break,
@ -285,7 +285,7 @@ pub async fn message_await(id: u32, timeout: i64, timer: GlobalTimer) -> Result<
_ => return Err(Error::IncorrectState),
}
}
let max_time = timer.get_time() + Milliseconds(timeout);
let max_time = timer.get_time() + Milliseconds(timeout as u64);
while timeout < 0 || (timeout > 0 && timer.get_time() < max_time) {
{
let mut message_queue = MESSAGE_QUEUE.async_lock().await;
@ -318,9 +318,8 @@ pub async fn message_send<'a>(
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
destination: u8,
message: Vec<u8>,
) -> Result<(), Error> {
let destination = SUBKERNELS.async_lock().await.get(&id).unwrap().destination;
// rpc data prepared by the kernel core already
Ok(drtio::subkernel_send_message(aux_mutex, routing_table, timer, id, destination, &message).await?)
}

View File

@ -657,13 +657,13 @@ fn process_aux_packet(
drtioaux::Packet::SubkernelMessage {
source,
destination: _destination,
id: _id,
id,
status,
length,
data,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
kernel_manager.message_handle_incoming(status, length as usize, &data);
kernel_manager.message_handle_incoming(status, id, length as usize, &data);
router.send(
drtioaux::Packet::SubkernelMessageAck { destination: source },
_routing_table,

View File

@ -99,6 +99,7 @@ macro_rules! unexpected {
/* represents interkernel messages */
struct Message {
count: u8,
id: u32,
data: Vec<u8>,
}
@ -179,7 +180,7 @@ impl MessageManager {
}
}
pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
pub fn handle_incoming(&mut self, status: PayloadStatus, id: u32, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
// called when receiving a message from master
if status.is_first() {
self.in_buffer = None;
@ -189,6 +190,7 @@ impl MessageManager {
None => {
self.in_buffer = Some(Message {
count: data[0],
id: id,
data: data[1..length].to_vec(),
});
}
@ -353,13 +355,14 @@ impl<'a> Manager<'_> {
pub fn message_handle_incoming(
&mut self,
status: PayloadStatus,
id: u32,
length: usize,
slice: &[u8; MASTER_PAYLOAD_MAX_SIZE],
) {
if !self.running() {
return;
}
self.session.messages.handle_incoming(status, length, slice);
self.session.messages.handle_incoming(status, id, length, slice);
}
pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> {
@ -717,11 +720,13 @@ impl<'a> Manager<'_> {
self.session.kernel_state = KernelState::MsgSending;
}
kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => {
let id = if id == -1 { self.session.id } else { id as u32 };
let max_time = if timeout > 0 {
Some(timer.get_time() + Milliseconds(timeout));
Some(timer.get_time() + Milliseconds(timeout as u64))
} else {
None
}
};
info!("awaiting tags: {:?}", tags);
self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, id:id, tags: tags };
}
kernel::Message::SubkernelLoadRunRequest {
@ -745,10 +750,10 @@ impl<'a> Manager<'_> {
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let max_time = if timeout > 0 {
Some(timer.get_time() + Milliseconds(timeout));
Some(timer.get_time() + Milliseconds(timeout as u64))
} else {
None
}
};
self.session.kernel_state = KernelState::SubkernelAwaitFinish {
max_time: max_time,
id: id,
@ -779,7 +784,7 @@ impl<'a> Manager<'_> {
return Ok(());
}
}
if let Some(message) = self.session.messages.get_incoming(id) {
if let Some(message) = self.session.messages.get_incoming(*id) {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::NoError,
count: message.count,
@ -802,7 +807,7 @@ impl<'a> Manager<'_> {
}
KernelState::SubkernelAwaitFinish { max_time, id } => {
if let Some(max_time) = *max_time {
if timer.get_time() > *max_time {
if timer.get_time() > max_time {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::Timeout,
});