forked from M-Labs/artiq
1
0
Fork 0

master: support unsolicited async messages

This commit is contained in:
mwojcik 2024-04-19 16:28:26 +08:00 committed by Sébastien Bourdeauducq
parent b1c305fd11
commit a49ba3e350
2 changed files with 57 additions and 80 deletions

View File

@ -77,8 +77,6 @@ pub enum Packet {
RoutingSetPath { destination: u8, hops: [u8; 32] },
RoutingSetRank { rank: u8 },
RoutingRetrievePackets,
RoutingNoPackets,
RoutingAck,
MonitorRequest { destination: u8, channel: u16, probe: u8 },
@ -170,8 +168,6 @@ impl Packet {
rank: reader.read_u8()?
},
0x32 => Packet::RoutingAck,
0x33 => Packet::RoutingRetrievePackets,
0x34 => Packet::RoutingNoPackets,
0x40 => Packet::MonitorRequest {
destination: reader.read_u8()?,
@ -456,10 +452,6 @@ impl Packet {
},
Packet::RoutingAck =>
writer.write_u8(0x32)?,
Packet::RoutingRetrievePackets =>
writer.write_u8(0x33)?,
Packet::RoutingNoPackets =>
writer.write_u8(0x34)?,
Packet::MonitorRequest { destination, channel, probe } => {
writer.write_u8(0x40)?;

View File

@ -78,16 +78,6 @@ pub mod drtio {
}
}
fn link_has_async_ready(linkno: u8) -> bool {
let linkno = linkno as usize;
let async_ready;
unsafe {
async_ready = (csr::DRTIO[linkno].async_messages_ready_read)() == 1;
(csr::DRTIO[linkno].async_messages_ready_write)(1);
}
async_ready
}
fn recv_aux_timeout(io: &Io, linkno: u8, timeout: u32) -> Result<drtioaux::Packet, Error> {
let max_time = clock::get_ms() + timeout as u64;
loop {
@ -97,6 +87,7 @@ pub mod drtio {
if clock::get_ms() > max_time {
return Err(Error::Timeout);
}
// todo: reinsert handling of async messages
match drtioaux::recv(linkno) {
Ok(Some(packet)) => return Ok(packet),
Ok(None) => (),
@ -106,62 +97,51 @@ pub mod drtio {
}
}
fn process_async_packets(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, subkernel_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable, linkno: u8)
{
if link_has_async_ready(linkno) {
loop {
let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingRetrievePackets);
if let Ok(packet) = reply {
match packet {
// packets to be consumed locally
drtioaux::Packet::DmaPlaybackStatus { id, source, destination: 0, error, channel, timestamp } => {
remote_dma::playback_done(io, ddma_mutex, id, source, error, channel, timestamp);
},
drtioaux::Packet::SubkernelFinished { id, destination: 0, with_exception, exception_src } => {
subkernel::subkernel_finished(io, subkernel_mutex, id, with_exception, exception_src);
},
drtioaux::Packet::SubkernelMessage { id, source: from, destination: 0, status, length, data } => {
subkernel::message_handle_incoming(io, subkernel_mutex, id, status, length as usize, &data);
// acknowledge receiving part of the message
drtioaux::send(linkno,
&drtioaux::Packet::SubkernelMessageAck { destination: from }
).unwrap();
// give the satellite some time to process the message
io.sleep(10).unwrap();
},
// routable packets
drtioaux::Packet::DmaAddTraceRequest { destination, .. } |
drtioaux::Packet::DmaAddTraceReply { destination, .. } |
drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } |
drtioaux::Packet::DmaRemoveTraceReply { destination, .. } |
drtioaux::Packet::DmaPlaybackRequest { destination, .. } |
drtioaux::Packet::DmaPlaybackReply { destination, .. } |
drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } |
drtioaux::Packet::SubkernelLoadRunReply { destination, .. } |
drtioaux::Packet::SubkernelMessage { destination, .. } |
drtioaux::Packet::SubkernelMessageAck { destination, .. } |
drtioaux::Packet::DmaPlaybackStatus { destination, .. } |
drtioaux::Packet::SubkernelFinished { destination, .. } => {
let dest_link = routing_table.0[destination as usize][0] - 1;
if dest_link == linkno {
warn!("[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}", linkno, packet);
} else if destination == 0 {
warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet)
} else {
drtioaux::send(dest_link, &packet).unwrap();
}
}
drtioaux::Packet::RoutingNoPackets => break,
other => warn!("[LINK#{}] Received an unroutable packet: {:?}", linkno, other)
}
fn process_async_packets(io: &Io, ddma_mutex: &Mutex, subkernel_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable, linkno: u8, packet: drtioaux::Packet
) -> Option<drtioaux::Packet> {
match packet {
// packets to be consumed locally
drtioaux::Packet::DmaPlaybackStatus { id, source, destination: 0, error, channel, timestamp } => {
remote_dma::playback_done(io, ddma_mutex, id, source, error, channel, timestamp);
None
},
drtioaux::Packet::SubkernelFinished { id, destination: 0, with_exception, exception_src } => {
subkernel::subkernel_finished(io, subkernel_mutex, id, with_exception, exception_src);
None
},
drtioaux::Packet::SubkernelMessage { id, source: from, destination: 0, status, length, data } => {
subkernel::message_handle_incoming(io, subkernel_mutex, id, status, length as usize, &data);
// acknowledge receiving part of the message
drtioaux::send(linkno,
&drtioaux::Packet::SubkernelMessageAck { destination: from }
).unwrap();
None
},
// routable packets
drtioaux::Packet::DmaAddTraceRequest { destination, .. } |
drtioaux::Packet::DmaAddTraceReply { destination, .. } |
drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } |
drtioaux::Packet::DmaRemoveTraceReply { destination, .. } |
drtioaux::Packet::DmaPlaybackRequest { destination, .. } |
drtioaux::Packet::DmaPlaybackReply { destination, .. } |
drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } |
drtioaux::Packet::SubkernelLoadRunReply { destination, .. } |
drtioaux::Packet::SubkernelMessage { destination, .. } |
drtioaux::Packet::SubkernelMessageAck { destination, .. } |
drtioaux::Packet::DmaPlaybackStatus { destination, .. } |
drtioaux::Packet::SubkernelFinished { destination, .. } => {
let dest_link = routing_table.0[destination as usize][0] - 1;
if dest_link == linkno {
warn!("[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}", linkno, packet);
} else if destination == 0 {
warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet)
} else {
warn!("[LINK#{}] Error handling async packets ({})", linkno, reply.unwrap_err());
return;
drtioaux::send(dest_link, &packet).unwrap();
}
}
None
}
other => Some(other)
}
}
@ -268,12 +248,19 @@ pub mod drtio {
}
}
fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8) {
fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, subkernel_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable, linkno: u8) {
let _lock = aux_mutex.lock(io).unwrap();
match drtioaux::recv(linkno) {
Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet),
Ok(None) => (),
Err(_) => warn!("[LINK#{}] aux packet error", linkno)
loop {
match drtioaux::recv(linkno) {
Ok(Some(packet)) => {
if let Some(packet) = process_async_packets(&io, ddma_mutex, subkernel_mutex, routing_table, linkno, packet) {
warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet);
}
},
Ok(None) => return,
Err(_) => { warn!("[LINK#{}] aux packet error", linkno); return }
}
}
}
@ -403,8 +390,7 @@ pub mod drtio {
if up_links[linkno as usize] {
/* link was previously up */
if link_rx_up(linkno) {
process_async_packets(&io, aux_mutex, ddma_mutex, subkernel_mutex, routing_table, linkno);
process_unsolicited_aux(&io, aux_mutex, linkno);
process_unsolicited_aux(&io, aux_mutex, ddma_mutex, subkernel_mutex, routing_table, linkno);
process_local_errors(linkno);
} else {
info!("[LINK#{}] link is down", linkno);
@ -636,7 +622,6 @@ pub mod drtio {
}
})
}
}
#[cfg(not(has_drtio))]