master: drtioaux:

- support async flag
- source in packets
- rerouting packets
pull/284/head
mwojcik 2023-12-12 10:19:17 +08:00 committed by sb10q
parent d044bbd8bb
commit e31a31c4ff
6 changed files with 278 additions and 147 deletions

View File

@ -5,7 +5,7 @@ use io::proto::{ProtoRead, ProtoWrite};
// used by satellite -> master analyzer, subkernel exceptions // used by satellite -> master analyzer, subkernel exceptions
pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2;
// used by DDMA, subkernel program data (need to provide extra ID and destination) // used by DDMA, subkernel program data (need to provide extra ID and destination)
pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4; pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*source*/1 - /*destination*/1 - /*ID*/4;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -89,6 +89,8 @@ pub enum Packet {
RoutingSetRank { RoutingSetRank {
rank: u8, rank: u8,
}, },
RoutingRetrievePackets,
RoutingNoPackets,
RoutingAck, RoutingAck,
MonitorRequest { MonitorRequest {
@ -197,6 +199,7 @@ pub enum Packet {
}, },
DmaAddTraceRequest { DmaAddTraceRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
status: PayloadStatus, status: PayloadStatus,
@ -204,24 +207,30 @@ pub enum Packet {
trace: [u8; MASTER_PAYLOAD_MAX_SIZE], trace: [u8; MASTER_PAYLOAD_MAX_SIZE],
}, },
DmaAddTraceReply { DmaAddTraceReply {
destination: u8,
succeeded: bool, succeeded: bool,
}, },
DmaRemoveTraceRequest { DmaRemoveTraceRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
}, },
DmaRemoveTraceReply { DmaRemoveTraceReply {
destination: u8,
succeeded: bool, succeeded: bool,
}, },
DmaPlaybackRequest { DmaPlaybackRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
timestamp: u64, timestamp: u64,
}, },
DmaPlaybackReply { DmaPlaybackReply {
destination: u8,
succeeded: bool, succeeded: bool,
}, },
DmaPlaybackStatus { DmaPlaybackStatus {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
error: u8, error: u8,
@ -240,22 +249,20 @@ pub enum Packet {
succeeded: bool, succeeded: bool,
}, },
SubkernelLoadRunRequest { SubkernelLoadRunRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
run: bool, run: bool,
}, },
SubkernelLoadRunReply { SubkernelLoadRunReply {
succeeded: bool,
},
SubkernelStopRequest {
destination: u8, destination: u8,
},
SubkernelStopReply {
succeeded: bool, succeeded: bool,
}, },
SubkernelFinished { SubkernelFinished {
destination: u8,
id: u32, id: u32,
with_exception: bool, with_exception: bool,
exception_src: u8,
}, },
SubkernelExceptionRequest { SubkernelExceptionRequest {
destination: u8, destination: u8,
@ -266,6 +273,7 @@ pub enum Packet {
data: [u8; SAT_PAYLOAD_MAX_SIZE], data: [u8; SAT_PAYLOAD_MAX_SIZE],
}, },
SubkernelMessage { SubkernelMessage {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
status: PayloadStatus, status: PayloadStatus,
@ -315,6 +323,8 @@ impl Packet {
rank: reader.read_u8()?, rank: reader.read_u8()?,
}, },
0x32 => Packet::RoutingAck, 0x32 => Packet::RoutingAck,
0x33 => Packet::RoutingRetrievePackets,
0x34 => Packet::RoutingNoPackets,
0x40 => Packet::MonitorRequest { 0x40 => Packet::MonitorRequest {
destination: reader.read_u8()?, destination: reader.read_u8()?,
@ -429,39 +439,47 @@ impl Packet {
} }
0xb0 => { 0xb0 => {
let source = reader.read_u8()?;
let destination = reader.read_u8()?; let destination = reader.read_u8()?;
let id = reader.read_u32()?; let id = reader.read_u32()?;
let status = PayloadStatus::from(reader.read_u8()?); let status = reader.read_u8()?;
let length = reader.read_u16()?; let length = reader.read_u16()?;
let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut trace[0..length as usize])?; reader.read_exact(&mut trace[0..length as usize])?;
Packet::DmaAddTraceRequest { Packet::DmaAddTraceRequest {
source: source,
destination: destination, destination: destination,
id: id, id: id,
status: status, status: PayloadStatus::from(status),
length: length as u16, length: length as u16,
trace: trace, trace: trace,
} }
} }
0xb1 => Packet::DmaAddTraceReply { 0xb1 => Packet::DmaAddTraceReply {
destination: reader.read_u8()?,
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xb2 => Packet::DmaRemoveTraceRequest { 0xb2 => Packet::DmaRemoveTraceRequest {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
}, },
0xb3 => Packet::DmaRemoveTraceReply { 0xb3 => Packet::DmaRemoveTraceReply {
destination: reader.read_u8()?,
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xb4 => Packet::DmaPlaybackRequest { 0xb4 => Packet::DmaPlaybackRequest {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
timestamp: reader.read_u64()?, timestamp: reader.read_u64()?,
}, },
0xb5 => Packet::DmaPlaybackReply { 0xb5 => Packet::DmaPlaybackReply {
destination: reader.read_u8()?,
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xb6 => Packet::DmaPlaybackStatus { 0xb6 => Packet::DmaPlaybackStatus {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
error: reader.read_u8()?, error: reader.read_u8()?,
@ -488,22 +506,20 @@ impl Packet {
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xc4 => Packet::SubkernelLoadRunRequest { 0xc4 => Packet::SubkernelLoadRunRequest {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
run: reader.read_bool()?, run: reader.read_bool()?,
}, },
0xc5 => Packet::SubkernelLoadRunReply { 0xc5 => Packet::SubkernelLoadRunReply {
succeeded: reader.read_bool()?,
},
0xc6 => Packet::SubkernelStopRequest {
destination: reader.read_u8()?, destination: reader.read_u8()?,
},
0xc7 => Packet::SubkernelStopReply {
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xc8 => Packet::SubkernelFinished { 0xc8 => Packet::SubkernelFinished {
destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
with_exception: reader.read_bool()?, with_exception: reader.read_bool()?,
exception_src: reader.read_u8()?,
}, },
0xc9 => Packet::SubkernelExceptionRequest { 0xc9 => Packet::SubkernelExceptionRequest {
destination: reader.read_u8()?, destination: reader.read_u8()?,
@ -520,16 +536,18 @@ impl Packet {
} }
} }
0xcb => { 0xcb => {
let source = reader.read_u8()?;
let destination = reader.read_u8()?; let destination = reader.read_u8()?;
let id = reader.read_u32()?; let id = reader.read_u32()?;
let status = PayloadStatus::from(reader.read_u8()?); let status = reader.read_u8()?;
let length = reader.read_u16()?; let length = reader.read_u16()?;
let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut data[0..length as usize])?; reader.read_exact(&mut data[0..length as usize])?;
Packet::SubkernelMessage { Packet::SubkernelMessage {
source: source,
destination: destination, destination: destination,
id: id, id: id,
status: status, status: PayloadStatus::from(status),
length: length as u16, length: length as u16,
data: data, data: data,
} }
@ -580,6 +598,8 @@ impl Packet {
writer.write_u8(rank)?; writer.write_u8(rank)?;
} }
Packet::RoutingAck => writer.write_u8(0x32)?, Packet::RoutingAck => writer.write_u8(0x32)?,
Packet::RoutingRetrievePackets => writer.write_u8(0x33)?,
Packet::RoutingNoPackets => writer.write_u8(0x34)?,
Packet::MonitorRequest { Packet::MonitorRequest {
destination, destination,
@ -751,6 +771,7 @@ impl Packet {
} }
Packet::DmaAddTraceRequest { Packet::DmaAddTraceRequest {
source,
destination, destination,
id, id,
status, status,
@ -758,6 +779,7 @@ impl Packet {
length, length,
} => { } => {
writer.write_u8(0xb0)?; writer.write_u8(0xb0)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u8(status as u8)?; writer.write_u8(status as u8)?;
@ -766,34 +788,45 @@ impl Packet {
writer.write_u16(length)?; writer.write_u16(length)?;
writer.write_all(&trace[0..length as usize])?; writer.write_all(&trace[0..length as usize])?;
} }
Packet::DmaAddTraceReply { succeeded } => { Packet::DmaAddTraceReply { destination, succeeded } => {
writer.write_u8(0xb1)?; writer.write_u8(0xb1)?;
writer.write_u8(destination)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::DmaRemoveTraceRequest { destination, id } => { Packet::DmaRemoveTraceRequest {
source,
destination,
id,
} => {
writer.write_u8(0xb2)?; writer.write_u8(0xb2)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
} }
Packet::DmaRemoveTraceReply { succeeded } => { Packet::DmaRemoveTraceReply { destination, succeeded } => {
writer.write_u8(0xb3)?; writer.write_u8(0xb3)?;
writer.write_u8(destination)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::DmaPlaybackRequest { Packet::DmaPlaybackRequest {
source,
destination, destination,
id, id,
timestamp, timestamp,
} => { } => {
writer.write_u8(0xb4)?; writer.write_u8(0xb4)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u64(timestamp)?; writer.write_u64(timestamp)?;
} }
Packet::DmaPlaybackReply { succeeded } => { Packet::DmaPlaybackReply { destination, succeeded } => {
writer.write_u8(0xb5)?; writer.write_u8(0xb5)?;
writer.write_u8(destination)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::DmaPlaybackStatus { Packet::DmaPlaybackStatus {
source,
destination, destination,
id, id,
error, error,
@ -801,6 +834,7 @@ impl Packet {
timestamp, timestamp,
} => { } => {
writer.write_u8(0xb6)?; writer.write_u8(0xb6)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u8(error)?; writer.write_u8(error)?;
@ -826,28 +860,34 @@ impl Packet {
writer.write_u8(0xc1)?; writer.write_u8(0xc1)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::SubkernelLoadRunRequest { destination, id, run } => { Packet::SubkernelLoadRunRequest {
source,
destination,
id,
run,
} => {
writer.write_u8(0xc4)?; writer.write_u8(0xc4)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_bool(run)?; writer.write_bool(run)?;
} }
Packet::SubkernelLoadRunReply { succeeded } => { Packet::SubkernelLoadRunReply { destination, succeeded } => {
writer.write_u8(0xc5)?; writer.write_u8(0xc5)?;
writer.write_bool(succeeded)?;
}
Packet::SubkernelStopRequest { destination } => {
writer.write_u8(0xc6)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
}
Packet::SubkernelStopReply { succeeded } => {
writer.write_u8(0xc7)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::SubkernelFinished { id, with_exception } => { Packet::SubkernelFinished {
destination,
id,
with_exception,
exception_src,
} => {
writer.write_u8(0xc8)?; writer.write_u8(0xc8)?;
writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_bool(with_exception)?; writer.write_bool(with_exception)?;
writer.write_u8(exception_src)?;
} }
Packet::SubkernelExceptionRequest { destination } => { Packet::SubkernelExceptionRequest { destination } => {
writer.write_u8(0xc9)?; writer.write_u8(0xc9)?;
@ -860,6 +900,7 @@ impl Packet {
writer.write_all(&data[0..length as usize])?; writer.write_all(&data[0..length as usize])?;
} }
Packet::SubkernelMessage { Packet::SubkernelMessage {
source,
destination, destination,
id, id,
status, status,
@ -867,6 +908,7 @@ impl Packet {
length, length,
} => { } => {
writer.write_u8(0xcb)?; writer.write_u8(0xcb)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u8(status as u8)?; writer.write_u8(status as u8)?;

View File

@ -77,6 +77,7 @@ pub enum Message {
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelLoadRunRequest { SubkernelLoadRunRequest {
id: u32, id: u32,
destination: u8,
run: bool, run: bool,
}, },
#[cfg(has_drtio)] #[cfg(has_drtio)]
@ -95,6 +96,7 @@ pub enum Message {
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelMsgSend { SubkernelMsgSend {
id: u32, id: u32,
destination: Option<u8>,
data: Vec<u8>, data: Vec<u8>,
}, },
#[cfg(has_drtio)] #[cfg(has_drtio)]

View File

@ -5,12 +5,16 @@ use cslice::CSlice;
use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0};
use crate::{artiq_raise, rpc::send_args}; use crate::{artiq_raise, rpc::send_args};
pub extern "C" fn load_run(id: u32, run: bool) { pub extern "C" fn load_run(id: u32, destination: u8, run: bool) {
unsafe { unsafe {
KERNEL_CHANNEL_1TO0 KERNEL_CHANNEL_1TO0
.as_mut() .as_mut()
.unwrap() .unwrap()
.send(Message::SubkernelLoadRunRequest { id: id, run: run }); .send(Message::SubkernelLoadRunRequest {
id: id,
destination: destination,
run: run,
});
} }
match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() {
Message::SubkernelLoadRunReply { succeeded: true } => (), Message::SubkernelLoadRunReply { succeeded: true } => (),
@ -51,7 +55,14 @@ pub extern "C" fn await_finish(id: u32, timeout: u64) {
} }
} }
pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice<u8>, data: *const *const ()) { pub extern "C" fn send_message(
id: u32,
is_return: bool,
destination: u8,
count: u8,
tag: &CSlice<u8>,
data: *const *const (),
) {
let mut buffer = Vec::<u8>::new(); let mut buffer = Vec::<u8>::new();
send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed"); send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed");
// overwrite service tag, include how many tags are in the message // overwrite service tag, include how many tags are in the message
@ -59,6 +70,7 @@ pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice<u8>, data: *cons
unsafe { unsafe {
KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend { KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend {
id: id, id: id,
destination: if is_return { None } else { Some(destination) },
data: buffer[3..].to_vec(), data: buffer[3..].to_vec(),
}); });
} }

View File

@ -401,7 +401,11 @@ async fn handle_run_kernel(
control.borrow_mut().tx.async_send(reply).await; control.borrow_mut().tx.async_send(reply).await;
} }
#[cfg(has_drtio)] #[cfg(has_drtio)]
kernel::Message::SubkernelLoadRunRequest { id, run } => { kernel::Message::SubkernelLoadRunRequest {
id,
destination: _,
run,
} => {
let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await { let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await {
Ok(()) => true, Ok(()) => true,
Err(e) => { Err(e) => {
@ -447,7 +451,11 @@ async fn handle_run_kernel(
.await; .await;
} }
#[cfg(has_drtio)] #[cfg(has_drtio)]
kernel::Message::SubkernelMsgSend { id, data } => { kernel::Message::SubkernelMsgSend {
id,
destination: _,
data,
} => {
let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await;
match res { match res {
Ok(_) => (), Ok(_) => (),

View File

@ -43,39 +43,97 @@ pub mod drtio {
unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 } unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 }
} }
async fn process_async_packets(aux_mutex: &Mutex<bool>, linkno: u8, packet: Packet) -> Option<Packet> { async fn link_has_async_ready(linkno: u8) -> bool {
// returns None if an async packet has been consumed let linkno = linkno as usize;
match packet { let async_ready;
Packet::DmaPlaybackStatus { unsafe {
id, async_ready = (csr::DRTIO[linkno].async_messages_ready_read)() == 1;
destination, (csr::DRTIO[linkno].async_messages_ready_write)(1);
error, }
channel, async_ready
timestamp, }
} => {
remote_dma::playback_done(id, destination, error, channel, timestamp).await; async fn process_async_packets(aux_mutex: &Mutex<bool>, linkno: u8, routing_table: &drtio_routing::RoutingTable, timer: GlobalTimer) {
None if link_has_async_ready(linkno).await {
loop {
let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingRetrievePackets, timer).await;
if let Ok(packet) = reply {
match packet {
Packet::DmaPlaybackStatus {
id,
source,
destination: 0,
error,
channel,
timestamp,
} => {
remote_dma::playback_done(id, source, error, channel, timestamp).await;
}
Packet::SubkernelFinished {
id,
destination: 0,
with_exception,
exception_src,
} => {
subkernel::subkernel_finished(id, with_exception, exception_src).await;
}
Packet::SubkernelMessage {
id,
source,
destination: 0,
status,
length,
data,
} => {
subkernel::message_handle_incoming(id, status, length as usize, &data).await;
// acknowledge receiving part of the message
let _lock = aux_mutex.async_lock().await;
drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: source })
.await
.unwrap();
let mut countdown = timer.countdown();
// give the satellites some time to process
delay(&mut countdown, Milliseconds(10)).await;
}
// routable packets
Packet::DmaAddTraceRequest { destination, .. }
| Packet::DmaAddTraceReply { destination, .. }
| Packet::DmaRemoveTraceRequest { destination, .. }
| Packet::DmaRemoveTraceReply { destination, .. }
| Packet::DmaPlaybackRequest { destination, .. }
| Packet::DmaPlaybackReply { destination, .. }
| Packet::SubkernelLoadRunRequest { destination, .. }
| Packet::SubkernelLoadRunReply { destination, .. }
| Packet::SubkernelMessage { destination, .. }
| Packet::SubkernelMessageAck { destination, .. }
| Packet::DmaPlaybackStatus { destination, .. }
| Packet::SubkernelFinished { destination, .. } => {
let dest_link = routing_table.0[destination as usize][0] - 1;
if dest_link == linkno {
warn!(
"[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}",
linkno, packet
);
} else if destination == 0 {
warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet)
} else {
drtioaux_async::send(dest_link, &packet).await.unwrap();
}
}
Packet::RoutingNoPackets => break,
other => warn!("[LINK#{}] Received an unroutable packet: {:?}", linkno, other),
}
} else {
warn!(
"[LINK#{}] Error handling async packets ({})",
linkno,
reply.unwrap_err()
);
return;
}
} }
Packet::SubkernelFinished { id, with_exception } => {
subkernel::subkernel_finished(id, with_exception).await;
None
}
Packet::SubkernelMessage {
id,
destination: from,
status,
length,
data,
} => {
subkernel::message_handle_incoming(id, status, length as usize, &data).await;
// acknowledge receiving part of the message
let _lock = aux_mutex.async_lock().await;
drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from })
.await
.unwrap();
None
}
other => Some(other),
} }
} }
@ -210,11 +268,7 @@ pub mod drtio {
async fn process_unsolicited_aux(aux_mutex: &Rc<Mutex<bool>>, linkno: u8) { async fn process_unsolicited_aux(aux_mutex: &Rc<Mutex<bool>>, linkno: u8) {
let _lock = aux_mutex.async_lock().await; let _lock = aux_mutex.async_lock().await;
match drtioaux_async::recv(linkno).await { match drtioaux_async::recv(linkno).await {
Ok(Some(packet)) => { Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet),
if let Some(packet) = process_async_packets(aux_mutex, linkno, packet).await {
warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet);
}
}
Ok(None) => (), Ok(None) => (),
Err(_) => warn!("[LINK#{}] aux packet error", linkno), Err(_) => warn!("[LINK#{}] aux packet error", linkno),
} }
@ -283,67 +337,53 @@ pub mod drtio {
let linkno = hop - 1; let linkno = hop - 1;
if destination_up(up_destinations, destination).await { if destination_up(up_destinations, destination).await {
if up_links[linkno as usize] { if up_links[linkno as usize] {
loop { let reply = aux_transact(
let reply = aux_transact( aux_mutex,
aux_mutex, linkno,
linkno, &Packet::DestinationStatusRequest {
&Packet::DestinationStatusRequest { destination: destination,
destination: destination, },
}, timer,
timer, )
) .await;
.await; match reply {
match reply { Ok(Packet::DestinationDownReply) => {
Ok(Packet::DestinationDownReply) => { destination_set_up(routing_table, up_destinations, destination, false).await;
destination_set_up(routing_table, up_destinations, destination, false).await; remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false)
remote_dma::destination_changed( .await;
aux_mutex, subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false)
routing_table,
timer,
destination,
false,
)
.await; .await;
subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false)
.await;
}
Ok(Packet::DestinationOkReply) => (),
Ok(Packet::DestinationSequenceErrorReply { channel }) => {
error!(
"[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}",
destination,
channel,
resolve_channel_name(channel as u32)
);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR };
}
Ok(Packet::DestinationCollisionReply { channel }) => {
error!(
"[DEST#{}] RTIO collision involving channel 0x{:04x}:{}",
destination,
channel,
resolve_channel_name(channel as u32)
);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION };
}
Ok(Packet::DestinationBusyReply { channel }) => {
error!(
"[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}",
destination,
channel,
resolve_channel_name(channel as u32)
);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY };
}
Ok(packet) => match process_async_packets(aux_mutex, linkno, packet).await {
Some(packet) => {
error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet)
}
None => continue,
},
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e),
} }
break; Ok(Packet::DestinationOkReply) => (),
Ok(Packet::DestinationSequenceErrorReply { channel }) => {
error!(
"[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}",
destination,
channel,
resolve_channel_name(channel as u32)
);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR };
}
Ok(Packet::DestinationCollisionReply { channel }) => {
error!(
"[DEST#{}] RTIO collision involving channel 0x{:04x}:{}",
destination,
channel,
resolve_channel_name(channel as u32)
);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION };
}
Ok(Packet::DestinationBusyReply { channel }) => {
error!(
"[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}",
destination,
channel,
resolve_channel_name(channel as u32)
);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY };
}
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e),
} }
} else { } else {
destination_set_up(routing_table, up_destinations, destination, false).await; destination_set_up(routing_table, up_destinations, destination, false).await;
@ -393,6 +433,7 @@ pub mod drtio {
if up_links[linkno as usize] { if up_links[linkno as usize] {
/* link was previously up */ /* link was previously up */
if link_rx_up(linkno).await { if link_rx_up(linkno).await {
process_async_packets(aux_mutex, linkno, routing_table, timer).await;
process_unsolicited_aux(aux_mutex, linkno).await; process_unsolicited_aux(aux_mutex, linkno).await;
process_local_errors(linkno).await; process_local_errors(linkno).await;
} else { } else {
@ -504,14 +545,21 @@ pub mod drtio {
trace, trace,
|slice, status, len| Packet::DmaAddTraceRequest { |slice, status, len| Packet::DmaAddTraceRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
status: status, status: status,
length: len as u16, length: len as u16,
trace: *slice, trace: *slice,
}, },
|reply| match reply { |reply| match reply {
Packet::DmaAddTraceReply { succeeded: true } => Ok(()), Packet::DmaAddTraceReply {
Packet::DmaAddTraceReply { succeeded: false } => Err("error adding trace on satellite"), destination: 0,
succeeded: true,
} => Ok(()),
Packet::DmaAddTraceReply {
destination: 0,
succeeded: false,
} => Err("error adding trace on satellite"),
_ => Err("adding DMA trace failed, unexpected aux packet"), _ => Err("adding DMA trace failed, unexpected aux packet"),
}, },
) )
@ -531,14 +579,21 @@ pub mod drtio {
linkno, linkno,
&Packet::DmaRemoveTraceRequest { &Packet::DmaRemoveTraceRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
}, },
timer, timer,
) )
.await; .await;
match reply { match reply {
Ok(Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), Ok(Packet::DmaRemoveTraceReply {
Ok(Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), destination: 0,
succeeded: true,
}) => Ok(()),
Ok(Packet::DmaRemoveTraceReply {
destination: 0,
succeeded: false,
}) => Err("satellite DMA erase error"),
Ok(_) => Err("adding trace failed, unexpected aux packet"), Ok(_) => Err("adding trace failed, unexpected aux packet"),
Err(_) => Err("erasing trace failed, aux error"), Err(_) => Err("erasing trace failed, aux error"),
} }
@ -558,6 +613,7 @@ pub mod drtio {
linkno, linkno,
&Packet::DmaPlaybackRequest { &Packet::DmaPlaybackRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
timestamp: timestamp, timestamp: timestamp,
}, },
@ -565,8 +621,14 @@ pub mod drtio {
) )
.await; .await;
match reply { match reply {
Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), Ok(Packet::DmaPlaybackReply {
Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), destination: 0,
succeeded: true,
}) => Ok(()),
Ok(Packet::DmaPlaybackReply {
destination: 0,
succeeded: false,
}) => Err("error on DMA playback request"),
Ok(_) => Err("received unexpected aux packet during DMA playback"), Ok(_) => Err("received unexpected aux packet during DMA playback"),
Err(_) => Err("aux error on DMA playback"), Err(_) => Err("aux error on DMA playback"),
} }
@ -689,6 +751,7 @@ pub mod drtio {
linkno, linkno,
&Packet::SubkernelLoadRunRequest { &Packet::SubkernelLoadRunRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
run: run, run: run,
}, },
@ -696,8 +759,14 @@ pub mod drtio {
) )
.await?; .await?;
match reply { match reply {
Packet::SubkernelLoadRunReply { succeeded: true } => return Ok(()), Packet::SubkernelLoadRunReply {
Packet::SubkernelLoadRunReply { succeeded: false } => return Err("error on subkernel run request"), destination: 0,
succeeded: true,
} => return Ok(()),
Packet::SubkernelLoadRunReply {
destination: 0,
succeeded: false,
} => return Err("error on subkernel run request"),
_ => return Err("received unexpected aux packet during subkernel run"), _ => return Err("received unexpected aux packet during subkernel run"),
} }
} }
@ -747,6 +816,7 @@ pub mod drtio {
timer, timer,
message, message,
|slice, status, len| Packet::SubkernelMessage { |slice, status, len| Packet::SubkernelMessage {
source: 0,
destination: destination, destination: destination,
id: id, id: id,
status: status, status: status,

View File

@ -13,7 +13,7 @@ use crate::rtio_mgt::drtio;
pub enum FinishStatus { pub enum FinishStatus {
Ok, Ok,
CommLost, CommLost,
Exception, Exception(u8), // exception source
} }
#[derive(Debug, PartialEq, Clone, Copy)] #[derive(Debug, PartialEq, Clone, Copy)]
@ -121,14 +121,14 @@ pub async fn clear_subkernels() {
CURRENT_MESSAGES.async_lock().await.clear(); CURRENT_MESSAGES.async_lock().await.clear();
} }
pub async fn subkernel_finished(id: u32, with_exception: bool) { pub async fn subkernel_finished(id: u32, with_exception: bool, exception_src: u8) {
// called upon receiving DRTIO SubkernelRunDone // called upon receiving DRTIO SubkernelRunDone
// may be None if session ends and is cleared // may be None if session ends and is cleared
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
if subkernel.state == SubkernelState::Running { if subkernel.state == SubkernelState::Running {
subkernel.state = SubkernelState::Finished { subkernel.state = SubkernelState::Finished {
status: match with_exception { status: match with_exception {
true => FinishStatus::Exception, true => FinishStatus::Exception(exception_src),
false => FinishStatus::Ok, false => FinishStatus::Ok,
}, },
} }
@ -196,11 +196,8 @@ pub async fn await_finish(
Ok(SubkernelFinished { Ok(SubkernelFinished {
id: id, id: id,
status: status, status: status,
exception: if status == FinishStatus::Exception { exception: if let FinishStatus::Exception(dest) = status {
Some( Some(drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, dest).await?)
drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination)
.await?,
)
} else { } else {
None None
}, },
@ -292,7 +289,7 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<
status: FinishStatus::CommLost, status: FinishStatus::CommLost,
} => return Err(Error::CommLost), } => return Err(Error::CommLost),
SubkernelState::Finished { SubkernelState::Finished {
status: FinishStatus::Exception, status: FinishStatus::Exception(_),
} => return Err(Error::SubkernelException), } => return Err(Error::SubkernelException),
_ => (), _ => (),
} }