forked from M-Labs/artiq
1
0
Fork 0

master: make use of the async message ready flag

This commit is contained in:
mwojcik 2023-12-08 15:48:31 +08:00 committed by Sébastien Bourdeauducq
parent 95b92a178b
commit 4363cdf9fa
6 changed files with 70 additions and 90 deletions

View File

@ -712,4 +712,17 @@ impl Packet {
_ => None _ => None
} }
} }
pub fn expects_response(&self) -> bool {
// returns true if the routable packet should elicit a response
// e.g. reply, ACK packets end a conversation,
// and firmware should not wait for response
match self {
Packet::DmaAddTraceReply { .. } | Packet::DmaRemoveTraceReply { .. } |
Packet::DmaPlaybackReply { .. } | Packet::SubkernelLoadRunReply { .. } |
Packet::SubkernelMessageAck { .. } | Packet::DmaPlaybackStatus { .. } |
Packet::SubkernelFinished { .. } => false,
_ => true
}
}
} }

View File

@ -364,8 +364,11 @@ pub mod subkernel {
{ {
let _lock = subkernel_mutex.lock(io)?; let _lock = subkernel_mutex.lock(io)?;
match unsafe { SUBKERNELS.get(&id).unwrap().state } { match unsafe { SUBKERNELS.get(&id).unwrap().state } {
SubkernelState::Finished { .. } => return Err(Error::SubkernelFinished), SubkernelState::Finished { status: FinishStatus::Ok } |
SubkernelState::Running => (), SubkernelState::Running => (),
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::SubkernelFinished),
_ => return Err(Error::IncorrectState) _ => return Err(Error::IncorrectState)
} }
} }
@ -385,7 +388,8 @@ pub mod subkernel {
} }
} }
match unsafe { SUBKERNELS.get(&id).unwrap().state } { match unsafe { SUBKERNELS.get(&id).unwrap().state } {
SubkernelState::Finished { .. } => return Ok(None), SubkernelState::Finished { status: FinishStatus::CommLost } |
SubkernelState::Finished { status: FinishStatus::Exception(_) } => return Ok(None),
_ => () _ => ()
} }
Err(()) Err(())

View File

@ -78,6 +78,16 @@ pub mod drtio {
} }
} }
fn link_has_async_ready(linkno: u8) -> bool {
let linkno = linkno as usize;
let async_ready;
unsafe {
async_ready = (csr::DRTIO[linkno].async_messages_ready_read)() == 1;
(csr::DRTIO[linkno].async_messages_ready_write)(1);
}
async_ready
}
fn recv_aux_timeout(io: &Io, linkno: u8, timeout: u32) -> Result<drtioaux::Packet, Error> { fn recv_aux_timeout(io: &Io, linkno: u8, timeout: u32) -> Result<drtioaux::Packet, Error> {
let max_time = clock::get_ms() + timeout as u64; let max_time = clock::get_ms() + timeout as u64;
loop { loop {
@ -117,6 +127,8 @@ pub mod drtio {
drtioaux::send(linkno, drtioaux::send(linkno,
&drtioaux::Packet::SubkernelMessageAck { destination: from } &drtioaux::Packet::SubkernelMessageAck { destination: from }
).unwrap(); ).unwrap();
// give the satellite some time to process the message
io.sleep(10).unwrap();
}, },
// routable packets // routable packets
drtioaux::Packet::DmaAddTraceRequest { destination, .. } | drtioaux::Packet::DmaAddTraceRequest { destination, .. } |
@ -147,45 +159,9 @@ pub mod drtio {
} }
} else { } else {
warn!("[LINK#{}] Error handling async packets ({})", linkno, reply.unwrap_err()); warn!("[LINK#{}] Error handling async packets ({})", linkno, reply.unwrap_err());
return;
} }
else {
drtioaux::send(dest_link, &packet).unwrap();
} }
None
}}
}
match packet {
// packets to be consumed locally
drtioaux::Packet::DmaPlaybackStatus { id, destination: 0, error, channel, timestamp } => {
remote_dma::playback_done(io, ddma_mutex, id, 0, error, channel, timestamp);
None
},
drtioaux::Packet::SubkernelFinished { id, destination: 0, with_exception, exception_src } => {
subkernel::subkernel_finished(io, subkernel_mutex, id, with_exception, exception_src);
None
},
drtioaux::Packet::SubkernelMessage { id, source: from, destination: 0, status, length, data } => {
subkernel::message_handle_incoming(io, subkernel_mutex, id, status, length as usize, &data);
// acknowledge receiving part of the message
drtioaux::send(linkno,
&drtioaux::Packet::SubkernelMessageAck { destination: from }
).unwrap();
None
},
// routable packets
drtioaux::Packet::DmaAddTraceRequest { destination, .. } => route_packet!(destination),
drtioaux::Packet::DmaAddTraceReply { destination, .. } => route_packet!(destination),
drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } => route_packet!(destination),
drtioaux::Packet::DmaRemoveTraceReply { destination, .. } => route_packet!(destination),
drtioaux::Packet::DmaPlaybackRequest { destination, .. } => route_packet!(destination),
drtioaux::Packet::DmaPlaybackReply { destination, .. } => route_packet!(destination),
drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } => route_packet!(destination),
drtioaux::Packet::SubkernelLoadRunReply { destination, .. } => route_packet!(destination),
drtioaux::Packet::SubkernelMessage { destination, .. } => route_packet!(destination),
drtioaux::Packet::SubkernelMessageAck { destination, .. } => route_packet!(destination),
drtioaux::Packet::DmaPlaybackStatus { destination, .. } => route_packet!(destination),
drtioaux::Packet::SubkernelFinished { destination, .. } => route_packet!(destination),
other => Some(other)
} }
} }
@ -358,45 +334,36 @@ pub mod drtio {
let linkno = hop - 1; let linkno = hop - 1;
if destination_up(up_destinations, destination) { if destination_up(up_destinations, destination) {
if up_links[linkno as usize] { if up_links[linkno as usize] {
loop {
let reply = aux_transact(io, aux_mutex, linkno, let reply = aux_transact(io, aux_mutex, linkno,
&drtioaux::Packet::DestinationStatusRequest { &drtioaux::Packet::DestinationStatusRequest {
destination: destination destination: destination
}); });
if let Ok(reply) = reply { if let Ok(reply) = reply {
let reply = process_async_packets(io, ddma_mutex, subkernel_mutex, routing_table, linkno, reply);
match reply { match reply {
Some(drtioaux::Packet::DestinationDownReply) => { drtioaux::Packet::DestinationDownReply => {
destination_set_up(routing_table, up_destinations, destination, false); destination_set_up(routing_table, up_destinations, destination, false);
remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, false); remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, false);
subkernel::destination_changed(io, aux_mutex, subkernel_mutex, routing_table, destination, false); subkernel::destination_changed(io, aux_mutex, subkernel_mutex, routing_table, destination, false);
} }
Some(drtioaux::Packet::DestinationOkReply) => (), drtioaux::Packet::DestinationOkReply => (),
Some(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => { drtioaux::Packet::DestinationSequenceErrorReply { channel } => {
error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32)); error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32));
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR }; unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR };
} }
Some(drtioaux::Packet::DestinationCollisionReply { channel }) => { drtioaux::Packet::DestinationCollisionReply { channel } => {
error!("[DEST#{}] RTIO collision involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32)); error!("[DEST#{}] RTIO collision involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32));
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION }; unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION };
} }
Some(drtioaux::Packet::DestinationBusyReply { channel }) => { drtioaux::Packet::DestinationBusyReply { channel } => {
error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32)); error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32));
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY };
} }
Some(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), packet => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
None => {
// continue asking until we get Destination...Reply or error out
// wait a bit not to overwhelm the receiver causing gateway errors
io.sleep(10).unwrap();
continue;
}
} }
} else { } else {
error!("[DEST#{}] communication failed ({:?})", destination, reply.unwrap_err()); error!("[DEST#{}] communication failed ({:?})", destination, reply.unwrap_err());
} }
break;
}
} else { } else {
destination_set_up(routing_table, up_destinations, destination, false); destination_set_up(routing_table, up_destinations, destination, false);
remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, false); remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, false);
@ -436,6 +403,7 @@ pub mod drtio {
if up_links[linkno as usize] { if up_links[linkno as usize] {
/* link was previously up */ /* link was previously up */
if link_rx_up(linkno) { if link_rx_up(linkno) {
process_async_packets(&io, aux_mutex, ddma_mutex, subkernel_mutex, routing_table, linkno);
process_unsolicited_aux(&io, aux_mutex, linkno); process_unsolicited_aux(&io, aux_mutex, linkno);
process_local_errors(linkno); process_local_errors(linkno);
} else { } else {

View File

@ -471,6 +471,7 @@ fn process_host_message(io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mutex, _subke
match subkernel::upload(io, _aux_mutex, _subkernel_mutex, _routing_table, _id) { match subkernel::upload(io, _aux_mutex, _subkernel_mutex, _routing_table, _id) {
Ok(_) => host_write(stream, host::Reply::LoadCompleted)?, Ok(_) => host_write(stream, host::Reply::LoadCompleted)?,
Err(error) => { Err(error) => {
subkernel::clear_subkernels(io, _subkernel_mutex)?;
let mut description = String::new(); let mut description = String::new();
write!(&mut description, "{}", error).unwrap(); write!(&mut description, "{}", error).unwrap();
host_write(stream, host::Reply::LoadFailed(&description))? host_write(stream, host::Reply::LoadFailed(&description))?

View File

@ -146,15 +146,6 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
if hop == 0 { if hop == 0 {
*self_destination = destination; *self_destination = destination;
// async messages
if *rank == 1 {
// for now, master ignores the async_messages_ready packet
if let Some(packet) = router.get_upstream_packet() {
// pass any async or routed packets to master
// this does mean that DDMA/SK packets to master will "trickle down" to higher rank
return drtioaux::send(0, &packet)
}
}
let errors; let errors;
unsafe { unsafe {
errors = csr::drtiosat::rtio_error_read(); errors = csr::drtiosat::rtio_error_read();
@ -776,9 +767,9 @@ pub extern fn main() -> i32 {
if let Some(status) = dma_manager.get_status() { if let Some(status) = dma_manager.get_status() {
info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp); info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp);
router.route(drtioaux::Packet::DmaPlaybackStatus { router.route(drtioaux::Packet::DmaPlaybackStatus {
destination: status.source, id: status.id, error: status.error, source: destination, destination: status.source, id: status.id,
channel: status.channel, timestamp: status.timestamp error: status.error, channel: status.channel, timestamp: status.timestamp
}, &routing_table, rank, destination) }, &routing_table, rank, destination);
} }
kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination); kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination);

View File

@ -101,8 +101,11 @@ impl Router {
} }
pub fn get_upstream_packet(&mut self) -> Option<drtioaux::Packet> { pub fn get_upstream_packet(&mut self) -> Option<drtioaux::Packet> {
let packet = self.upstream_queue.pop_front();
if packet.is_none() {
self.upstream_notified = false; self.upstream_notified = false;
self.upstream_queue.pop_front() }
packet
} }
pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> { pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> {