diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index 154a5e0..160f49e 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -5,7 +5,7 @@ use io::proto::{ProtoRead, ProtoWrite}; // used by satellite -> master analyzer, subkernel exceptions pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; // used by DDMA, subkernel program data (need to provide extra ID and destination) -pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4; +pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*source*/1 - /*destination*/1 - /*ID*/4; #[derive(Debug)] pub enum Error { @@ -89,6 +89,8 @@ pub enum Packet { RoutingSetRank { rank: u8, }, + RoutingRetrievePackets, + RoutingNoPackets, RoutingAck, MonitorRequest { @@ -197,6 +199,7 @@ pub enum Packet { }, DmaAddTraceRequest { + source: u8, destination: u8, id: u32, status: PayloadStatus, @@ -204,24 +207,32 @@ pub enum Packet { trace: [u8; MASTER_PAYLOAD_MAX_SIZE], }, DmaAddTraceReply { + source: u8, + destination: u8, + id: u32, succeeded: bool, }, DmaRemoveTraceRequest { + source: u8, destination: u8, id: u32, }, DmaRemoveTraceReply { + destination: u8, succeeded: bool, }, DmaPlaybackRequest { + source: u8, destination: u8, id: u32, timestamp: u64, }, DmaPlaybackReply { + destination: u8, succeeded: bool, }, DmaPlaybackStatus { + source: u8, destination: u8, id: u32, error: u8, @@ -240,22 +251,20 @@ pub enum Packet { succeeded: bool, }, SubkernelLoadRunRequest { + source: u8, destination: u8, id: u32, run: bool, }, SubkernelLoadRunReply { - succeeded: bool, - }, - SubkernelStopRequest { destination: u8, - }, - SubkernelStopReply { succeeded: bool, }, SubkernelFinished { + destination: u8, id: u32, with_exception: bool, + exception_src: u8, }, SubkernelExceptionRequest { destination: u8, @@ -266,6 +275,7 @@ pub enum Packet { data: [u8; SAT_PAYLOAD_MAX_SIZE], }, SubkernelMessage { + source: u8, destination: u8, id: u32, status: PayloadStatus, @@ -315,6 +325,8 @@ impl Packet { rank: reader.read_u8()?, }, 0x32 => Packet::RoutingAck, + 0x33 => Packet::RoutingRetrievePackets, + 0x34 => Packet::RoutingNoPackets, 0x40 => Packet::MonitorRequest { destination: reader.read_u8()?, @@ -429,39 +441,49 @@ impl Packet { } 0xb0 => { + let source = reader.read_u8()?; let destination = reader.read_u8()?; let id = reader.read_u32()?; - let status = PayloadStatus::from(reader.read_u8()?); + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { + source: source, destination: destination, id: id, - status: status, + status: PayloadStatus::from(status), length: length as u16, trace: trace, } } 0xb1 => Packet::DmaAddTraceReply { + source: reader.read_u8()?, + destination: reader.read_u8()?, + id: reader.read_u32()?, succeeded: reader.read_bool()?, }, 0xb2 => Packet::DmaRemoveTraceRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, }, 0xb3 => Packet::DmaRemoveTraceReply { + destination: reader.read_u8()?, succeeded: reader.read_bool()?, }, 0xb4 => Packet::DmaPlaybackRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, timestamp: reader.read_u64()?, }, 0xb5 => Packet::DmaPlaybackReply { + destination: reader.read_u8()?, succeeded: reader.read_bool()?, }, 0xb6 => Packet::DmaPlaybackStatus { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, error: reader.read_u8()?, @@ -488,22 +510,20 @@ impl Packet { succeeded: reader.read_bool()?, }, 0xc4 => Packet::SubkernelLoadRunRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, run: reader.read_bool()?, }, 0xc5 => Packet::SubkernelLoadRunReply { - succeeded: reader.read_bool()?, - }, - 0xc6 => Packet::SubkernelStopRequest { destination: reader.read_u8()?, - }, - 0xc7 => Packet::SubkernelStopReply { succeeded: reader.read_bool()?, }, 0xc8 => Packet::SubkernelFinished { + destination: reader.read_u8()?, id: reader.read_u32()?, with_exception: reader.read_bool()?, + exception_src: reader.read_u8()?, }, 0xc9 => Packet::SubkernelExceptionRequest { destination: reader.read_u8()?, @@ -520,16 +540,18 @@ impl Packet { } } 0xcb => { + let source = reader.read_u8()?; let destination = reader.read_u8()?; let id = reader.read_u32()?; - let status = PayloadStatus::from(reader.read_u8()?); + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelMessage { + source: source, destination: destination, id: id, - status: status, + status: PayloadStatus::from(status), length: length as u16, data: data, } @@ -580,6 +602,8 @@ impl Packet { writer.write_u8(rank)?; } Packet::RoutingAck => writer.write_u8(0x32)?, + Packet::RoutingRetrievePackets => writer.write_u8(0x33)?, + Packet::RoutingNoPackets => writer.write_u8(0x34)?, Packet::MonitorRequest { destination, @@ -751,6 +775,7 @@ impl Packet { } Packet::DmaAddTraceRequest { + source, destination, id, status, @@ -758,6 +783,7 @@ impl Packet { length, } => { writer.write_u8(0xb0)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(status as u8)?; @@ -766,34 +792,52 @@ impl Packet { writer.write_u16(length)?; writer.write_all(&trace[0..length as usize])?; } - Packet::DmaAddTraceReply { succeeded } => { + Packet::DmaAddTraceReply { + source, + destination, + id, + succeeded, + } => { writer.write_u8(0xb1)?; + writer.write_u8(source)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; writer.write_bool(succeeded)?; } - Packet::DmaRemoveTraceRequest { destination, id } => { + Packet::DmaRemoveTraceRequest { + source, + destination, + id, + } => { writer.write_u8(0xb2)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; } - Packet::DmaRemoveTraceReply { succeeded } => { + Packet::DmaRemoveTraceReply { destination, succeeded } => { writer.write_u8(0xb3)?; + writer.write_u8(destination)?; writer.write_bool(succeeded)?; } Packet::DmaPlaybackRequest { + source, destination, id, timestamp, } => { writer.write_u8(0xb4)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u64(timestamp)?; } - Packet::DmaPlaybackReply { succeeded } => { + Packet::DmaPlaybackReply { destination, succeeded } => { writer.write_u8(0xb5)?; + writer.write_u8(destination)?; writer.write_bool(succeeded)?; } Packet::DmaPlaybackStatus { + source, destination, id, error, @@ -801,6 +845,7 @@ impl Packet { timestamp, } => { writer.write_u8(0xb6)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(error)?; @@ -826,28 +871,34 @@ impl Packet { writer.write_u8(0xc1)?; writer.write_bool(succeeded)?; } - Packet::SubkernelLoadRunRequest { destination, id, run } => { + Packet::SubkernelLoadRunRequest { + source, + destination, + id, + run, + } => { writer.write_u8(0xc4)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_bool(run)?; } - Packet::SubkernelLoadRunReply { succeeded } => { + Packet::SubkernelLoadRunReply { destination, succeeded } => { writer.write_u8(0xc5)?; - writer.write_bool(succeeded)?; - } - Packet::SubkernelStopRequest { destination } => { - writer.write_u8(0xc6)?; writer.write_u8(destination)?; - } - Packet::SubkernelStopReply { succeeded } => { - writer.write_u8(0xc7)?; writer.write_bool(succeeded)?; } - Packet::SubkernelFinished { id, with_exception } => { + Packet::SubkernelFinished { + destination, + id, + with_exception, + exception_src, + } => { writer.write_u8(0xc8)?; + writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_bool(with_exception)?; + writer.write_u8(exception_src)?; } Packet::SubkernelExceptionRequest { destination } => { writer.write_u8(0xc9)?; @@ -860,6 +911,7 @@ impl Packet { writer.write_all(&data[0..length as usize])?; } Packet::SubkernelMessage { + source, destination, id, status, @@ -867,6 +919,7 @@ impl Packet { length, } => { writer.write_u8(0xcb)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(status as u8)?; @@ -880,4 +933,39 @@ impl Packet { } Ok(()) } + + pub fn routable_destination(&self) -> Option { + // only for packets that could be re-routed, not only forwarded + match self { + Packet::DmaAddTraceRequest { destination, .. } => Some(*destination), + Packet::DmaAddTraceReply { destination, .. } => Some(*destination), + Packet::DmaRemoveTraceRequest { destination, .. } => Some(*destination), + Packet::DmaRemoveTraceReply { destination, .. } => Some(*destination), + Packet::DmaPlaybackRequest { destination, .. } => Some(*destination), + Packet::DmaPlaybackReply { destination, .. } => Some(*destination), + Packet::SubkernelLoadRunRequest { destination, .. } => Some(*destination), + Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination), + Packet::SubkernelMessage { destination, .. } => Some(*destination), + Packet::SubkernelMessageAck { destination } => Some(*destination), + Packet::DmaPlaybackStatus { destination, .. } => Some(*destination), + Packet::SubkernelFinished { destination, .. } => Some(*destination), + _ => None, + } + } + + pub fn expects_response(&self) -> bool { + // returns true if the routable packet should elicit a response + // e.g. reply, ACK packets end a conversation, + // and firmware should not wait for response + match self { + Packet::DmaAddTraceReply { .. } + | Packet::DmaRemoveTraceReply { .. } + | Packet::DmaPlaybackReply { .. } + | Packet::SubkernelLoadRunReply { .. } + | Packet::SubkernelMessageAck { .. } + | Packet::DmaPlaybackStatus { .. } + | Packet::SubkernelFinished { .. } => false, + _ => true, + } + } } diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index 7e8b859..664f32b 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -77,6 +77,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelLoadRunRequest { id: u32, + destination: u8, run: bool, }, #[cfg(has_drtio)] @@ -95,6 +96,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelMsgSend { id: u32, + destination: Option, data: Vec, }, #[cfg(has_drtio)] diff --git a/src/libksupport/src/kernel/subkernel.rs b/src/libksupport/src/kernel/subkernel.rs index 17754c7..90a71e0 100644 --- a/src/libksupport/src/kernel/subkernel.rs +++ b/src/libksupport/src/kernel/subkernel.rs @@ -5,12 +5,16 @@ use cslice::CSlice; use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; use crate::{artiq_raise, rpc::send_args}; -pub extern "C" fn load_run(id: u32, run: bool) { +pub extern "C" fn load_run(id: u32, destination: u8, run: bool) { unsafe { KERNEL_CHANNEL_1TO0 .as_mut() .unwrap() - .send(Message::SubkernelLoadRunRequest { id: id, run: run }); + .send(Message::SubkernelLoadRunRequest { + id: id, + destination: destination, + run: run, + }); } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { Message::SubkernelLoadRunReply { succeeded: true } => (), @@ -51,7 +55,14 @@ pub extern "C" fn await_finish(id: u32, timeout: u64) { } } -pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice, data: *const *const ()) { +pub extern "C" fn send_message( + id: u32, + is_return: bool, + destination: u8, + count: u8, + tag: &CSlice, + data: *const *const (), +) { let mut buffer = Vec::::new(); send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed"); // overwrite service tag, include how many tags are in the message @@ -59,6 +70,7 @@ pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice, data: *cons unsafe { KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend { id: id, + destination: if is_return { None } else { Some(destination) }, data: buffer[3..].to_vec(), }); } diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 13e5250..f0c42cb 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -401,7 +401,11 @@ async fn handle_run_kernel( control.borrow_mut().tx.async_send(reply).await; } #[cfg(has_drtio)] - kernel::Message::SubkernelLoadRunRequest { id, run } => { + kernel::Message::SubkernelLoadRunRequest { + id, + destination: _, + run, + } => { let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await { Ok(()) => true, Err(e) => { @@ -447,7 +451,11 @@ async fn handle_run_kernel( .await; } #[cfg(has_drtio)] - kernel::Message::SubkernelMsgSend { id, data } => { + kernel::Message::SubkernelMsgSend { + id, + destination: _, + data, + } => { let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; match res { Ok(_) => (), diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 9f4298f..e5f5194 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -142,9 +142,9 @@ pub mod remote_dma { } } - pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) { + pub async fn playback_done(&mut self, source: u8, error: u8, channel: u32, timestamp: u64) { let mut traces_locked = self.traces.async_lock().await; - let mut trace = traces_locked.get_mut(&destination).unwrap(); + let mut trace = traces_locked.get_mut(&source).unwrap(); trace.state = RemoteState::PlaybackEnded { error: error, channel: channel, diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index b2f3aa3..e9de1bc 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -43,39 +43,102 @@ pub mod drtio { unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 } } - async fn process_async_packets(aux_mutex: &Mutex, linkno: u8, packet: Packet) -> Option { - // returns None if an async packet has been consumed - match packet { - Packet::DmaPlaybackStatus { - id, - destination, - error, - channel, - timestamp, - } => { - remote_dma::playback_done(id, destination, error, channel, timestamp).await; - None + async fn link_has_async_ready(linkno: u8) -> bool { + let linkno = linkno as usize; + let async_ready; + unsafe { + async_ready = (csr::DRTIO[linkno].async_messages_ready_read)() == 1; + (csr::DRTIO[linkno].async_messages_ready_write)(1); + } + async_ready + } + + async fn process_async_packets( + aux_mutex: &Mutex, + linkno: u8, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + ) { + if link_has_async_ready(linkno).await { + loop { + let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingRetrievePackets, timer).await; + if let Ok(packet) = reply { + match packet { + Packet::DmaPlaybackStatus { + id, + source, + destination: 0, + error, + channel, + timestamp, + } => { + remote_dma::playback_done(id, source, error, channel, timestamp).await; + } + Packet::SubkernelFinished { + id, + destination: 0, + with_exception, + exception_src, + } => { + subkernel::subkernel_finished(id, with_exception, exception_src).await; + } + Packet::SubkernelMessage { + id, + source, + destination: 0, + status, + length, + data, + } => { + subkernel::message_handle_incoming(id, status, length as usize, &data).await; + // acknowledge receiving part of the message + let _lock = aux_mutex.async_lock().await; + drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: source }) + .await + .unwrap(); + let mut countdown = timer.countdown(); + // give the satellites some time to process + delay(&mut countdown, Milliseconds(10)).await; + } + // routable packets + Packet::DmaAddTraceRequest { destination, .. } + | Packet::DmaAddTraceReply { destination, .. } + | Packet::DmaRemoveTraceRequest { destination, .. } + | Packet::DmaRemoveTraceReply { destination, .. } + | Packet::DmaPlaybackRequest { destination, .. } + | Packet::DmaPlaybackReply { destination, .. } + | Packet::SubkernelLoadRunRequest { destination, .. } + | Packet::SubkernelLoadRunReply { destination, .. } + | Packet::SubkernelMessage { destination, .. } + | Packet::SubkernelMessageAck { destination, .. } + | Packet::DmaPlaybackStatus { destination, .. } + | Packet::SubkernelFinished { destination, .. } => { + let dest_link = routing_table.0[destination as usize][0] - 1; + if dest_link == linkno { + warn!( + "[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}", + linkno, packet + ); + } else if destination == 0 { + warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet) + } else { + drtioaux_async::send(dest_link, &packet).await.unwrap(); + } + } + + Packet::RoutingNoPackets => break, + + other => warn!("[LINK#{}] Received an unroutable packet: {:?}", linkno, other), + } + } else { + warn!( + "[LINK#{}] Error handling async packets ({})", + linkno, + reply.unwrap_err() + ); + return; + } } - Packet::SubkernelFinished { id, with_exception } => { - subkernel::subkernel_finished(id, with_exception).await; - None - } - Packet::SubkernelMessage { - id, - destination: from, - status, - length, - data, - } => { - subkernel::message_handle_incoming(id, status, length as usize, &data).await; - // acknowledge receiving part of the message - let _lock = aux_mutex.async_lock().await; - drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from }) - .await - .unwrap(); - None - } - other => Some(other), } } @@ -210,11 +273,7 @@ pub mod drtio { async fn process_unsolicited_aux(aux_mutex: &Rc>, linkno: u8) { let _lock = aux_mutex.async_lock().await; match drtioaux_async::recv(linkno).await { - Ok(Some(packet)) => { - if let Some(packet) = process_async_packets(aux_mutex, linkno, packet).await { - warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet); - } - } + Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno), } @@ -283,67 +342,53 @@ pub mod drtio { let linkno = hop - 1; if destination_up(up_destinations, destination).await { if up_links[linkno as usize] { - loop { - let reply = aux_transact( - aux_mutex, - linkno, - &Packet::DestinationStatusRequest { - destination: destination, - }, - timer, - ) - .await; - match reply { - Ok(Packet::DestinationDownReply) => { - destination_set_up(routing_table, up_destinations, destination, false).await; - remote_dma::destination_changed( - aux_mutex, - routing_table, - timer, - destination, - false, - ) + let reply = aux_transact( + aux_mutex, + linkno, + &Packet::DestinationStatusRequest { + destination: destination, + }, + timer, + ) + .await; + match reply { + Ok(Packet::DestinationDownReply) => { + destination_set_up(routing_table, up_destinations, destination, false).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false) + .await; + subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false) .await; - subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false) - .await; - } - Ok(Packet::DestinationOkReply) => (), - Ok(Packet::DestinationSequenceErrorReply { channel }) => { - error!( - "[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", - destination, - channel, - resolve_channel_name(channel as u32) - ); - unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR }; - } - Ok(Packet::DestinationCollisionReply { channel }) => { - error!( - "[DEST#{}] RTIO collision involving channel 0x{:04x}:{}", - destination, - channel, - resolve_channel_name(channel as u32) - ); - unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION }; - } - Ok(Packet::DestinationBusyReply { channel }) => { - error!( - "[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}", - destination, - channel, - resolve_channel_name(channel as u32) - ); - unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; - } - Ok(packet) => match process_async_packets(aux_mutex, linkno, packet).await { - Some(packet) => { - error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet) - } - None => continue, - }, - Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), } - break; + Ok(Packet::DestinationOkReply) => (), + Ok(Packet::DestinationSequenceErrorReply { channel }) => { + error!( + "[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", + destination, + channel, + resolve_channel_name(channel as u32) + ); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR }; + } + Ok(Packet::DestinationCollisionReply { channel }) => { + error!( + "[DEST#{}] RTIO collision involving channel 0x{:04x}:{}", + destination, + channel, + resolve_channel_name(channel as u32) + ); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION }; + } + Ok(Packet::DestinationBusyReply { channel }) => { + error!( + "[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}", + destination, + channel, + resolve_channel_name(channel as u32) + ); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; + } + Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), + Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), } } else { destination_set_up(routing_table, up_destinations, destination, false).await; @@ -393,6 +438,7 @@ pub mod drtio { if up_links[linkno as usize] { /* link was previously up */ if link_rx_up(linkno).await { + process_async_packets(aux_mutex, linkno, routing_table, timer).await; process_unsolicited_aux(aux_mutex, linkno).await; process_local_errors(linkno).await; } else { @@ -504,14 +550,23 @@ pub mod drtio { trace, |slice, status, len| Packet::DmaAddTraceRequest { id: id, + source: 0, destination: destination, status: status, length: len as u16, trace: *slice, }, |reply| match reply { - Packet::DmaAddTraceReply { succeeded: true } => Ok(()), - Packet::DmaAddTraceReply { succeeded: false } => Err("error adding trace on satellite"), + Packet::DmaAddTraceReply { + destination: 0, + succeeded: true, + .. + } => Ok(()), + Packet::DmaAddTraceReply { + destination: 0, + succeeded: false, + .. + } => Err("error adding trace on satellite"), _ => Err("adding DMA trace failed, unexpected aux packet"), }, ) @@ -531,14 +586,21 @@ pub mod drtio { linkno, &Packet::DmaRemoveTraceRequest { id: id, + source: 0, destination: destination, }, timer, ) .await; match reply { - Ok(Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), - Ok(Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), + Ok(Packet::DmaRemoveTraceReply { + destination: 0, + succeeded: true, + }) => Ok(()), + Ok(Packet::DmaRemoveTraceReply { + destination: 0, + succeeded: false, + }) => Err("satellite DMA erase error"), Ok(_) => Err("adding trace failed, unexpected aux packet"), Err(_) => Err("erasing trace failed, aux error"), } @@ -558,6 +620,7 @@ pub mod drtio { linkno, &Packet::DmaPlaybackRequest { id: id, + source: 0, destination: destination, timestamp: timestamp, }, @@ -565,8 +628,14 @@ pub mod drtio { ) .await; match reply { - Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), - Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), + Ok(Packet::DmaPlaybackReply { + destination: 0, + succeeded: true, + }) => Ok(()), + Ok(Packet::DmaPlaybackReply { + destination: 0, + succeeded: false, + }) => Err("error on DMA playback request"), Ok(_) => Err("received unexpected aux packet during DMA playback"), Err(_) => Err("aux error on DMA playback"), } @@ -689,6 +758,7 @@ pub mod drtio { linkno, &Packet::SubkernelLoadRunRequest { id: id, + source: 0, destination: destination, run: run, }, @@ -696,8 +766,14 @@ pub mod drtio { ) .await?; match reply { - Packet::SubkernelLoadRunReply { succeeded: true } => return Ok(()), - Packet::SubkernelLoadRunReply { succeeded: false } => return Err("error on subkernel run request"), + Packet::SubkernelLoadRunReply { + destination: 0, + succeeded: true, + } => return Ok(()), + Packet::SubkernelLoadRunReply { + destination: 0, + succeeded: false, + } => return Err("error on subkernel run request"), _ => return Err("received unexpected aux packet during subkernel run"), } } @@ -747,6 +823,7 @@ pub mod drtio { timer, message, |slice, status, len| Packet::SubkernelMessage { + source: 0, destination: destination, id: id, status: status, diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index d129865..02d9f3b 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -13,7 +13,7 @@ use crate::rtio_mgt::drtio; pub enum FinishStatus { Ok, CommLost, - Exception, + Exception(u8), // exception source } #[derive(Debug, PartialEq, Clone, Copy)] @@ -121,14 +121,14 @@ pub async fn clear_subkernels() { CURRENT_MESSAGES.async_lock().await.clear(); } -pub async fn subkernel_finished(id: u32, with_exception: bool) { +pub async fn subkernel_finished(id: u32, with_exception: bool, exception_src: u8) { // called upon receiving DRTIO SubkernelRunDone // may be None if session ends and is cleared if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { if subkernel.state == SubkernelState::Running { subkernel.state = SubkernelState::Finished { status: match with_exception { - true => FinishStatus::Exception, + true => FinishStatus::Exception(exception_src), false => FinishStatus::Ok, }, } @@ -196,11 +196,8 @@ pub async fn await_finish( Ok(SubkernelFinished { id: id, status: status, - exception: if status == FinishStatus::Exception { - Some( - drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination) - .await?, - ) + exception: if let FinishStatus::Exception(dest) = status { + Some(drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, dest).await?) } else { None }, @@ -292,7 +289,7 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result< status: FinishStatus::CommLost, } => return Err(Error::CommLost), SubkernelState::Finished { - status: FinishStatus::Exception, + status: FinishStatus::Exception(_), } => return Err(Error::SubkernelException), _ => (), } diff --git a/src/satman/src/dma.rs b/src/satman/src/dma.rs index dc901db..9a36634 100644 --- a/src/satman/src/dma.rs +++ b/src/satman/src/dma.rs @@ -1,7 +1,13 @@ -use alloc::{collections::btree_map::BTreeMap, vec::Vec}; +use alloc::{collections::btree_map::BTreeMap, string::String, vec::Vec}; +use core::mem; -use libboard_artiq::{drtioaux_proto::PayloadStatus, pl::csr}; +use ksupport::kernel::DmaRecorder; +use libboard_artiq::{drtio_routing::RoutingTable, + drtioaux_proto::{Packet, PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}, + pl::csr}; use libcortex_a9::cache::dcci_slice; +use routing::{Router, Sliceable}; +use subkernel::Manager as KernelManager; const ALIGNMENT: usize = 64; @@ -12,16 +18,20 @@ enum ManagerState { } pub struct RtioStatus { + pub source: u8, pub id: u32, pub error: u8, pub channel: u32, pub timestamp: u64, } +#[derive(Debug)] pub enum Error { IdNotFound, PlaybackInProgress, EntryNotComplete, + MasterDmaFound, + UploadFail, } #[derive(Debug)] @@ -29,13 +39,228 @@ struct Entry { trace: Vec, padding_len: usize, complete: bool, + duration: i64, // relevant for local DMA +} + +impl Entry { + pub fn from_vec(data: Vec, duration: i64) -> Entry { + let mut entry = Entry { + trace: data, + padding_len: 0, + complete: true, + duration: duration, + }; + entry.realign(); + entry + } + + pub fn id(&self) -> u32 { + self.trace[self.padding_len..].as_ptr() as u32 + } + + pub fn realign(&mut self) { + self.trace.push(0); + let data_len = self.trace.len(); + + self.trace.reserve(ALIGNMENT - 1); + let padding = ALIGNMENT - self.trace.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + for _ in 0..padding { + // Vec guarantees that this will not reallocate + self.trace.push(0) + } + for i in 1..data_len + 1 { + self.trace[data_len + padding - i] = self.trace[data_len - i] + } + self.complete = true; + self.padding_len = padding; + + dcci_slice(&self.trace); + } +} + +#[derive(Debug)] +enum RemoteTraceState { + Unsent, + Sending(usize), + Ready, + Running(usize), +} + +#[derive(Debug)] +struct RemoteTraces { + remote_traces: BTreeMap, + state: RemoteTraceState, +} + +impl RemoteTraces { + pub fn new(traces: BTreeMap) -> RemoteTraces { + RemoteTraces { + remote_traces: traces, + state: RemoteTraceState::Unsent, + } + } + + // on subkernel request + pub fn upload_traces( + &mut self, + id: u32, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) -> usize { + let len = self.remote_traces.len(); + if len > 0 { + self.state = RemoteTraceState::Sending(self.remote_traces.len()); + for (dest, trace) in self.remote_traces.iter_mut() { + // queue up the first packet for all destinations, rest will be sent after first ACK + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = trace.get_slice_master(&mut data_slice); + router.route( + Packet::DmaAddTraceRequest { + source: self_destination, + destination: *dest, + id: id, + status: meta.status, + length: meta.len, + trace: data_slice, + }, + routing_table, + rank, + self_destination, + ); + } + } + len + } + + // on incoming Packet::DmaAddTraceReply + pub fn ack_upload( + &mut self, + kernel_manager: &mut KernelManager, + source: u8, + id: u32, + succeeded: bool, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + if let RemoteTraceState::Sending(count) = self.state { + if let Some(trace) = self.remote_traces.get_mut(&source) { + if trace.at_end() { + if count - 1 == 0 { + self.state = RemoteTraceState::Ready; + if let Some((id, timestamp)) = kernel_manager.ddma_remote_uploaded(succeeded) { + self.playback(id, timestamp, router, rank, self_destination, routing_table); + } + } else { + self.state = RemoteTraceState::Sending(count - 1); + } + } else { + // send next slice + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = trace.get_slice_master(&mut data_slice); + router.route( + Packet::DmaAddTraceRequest { + source: self_destination, + destination: meta.destination, + id: id, + status: meta.status, + length: meta.len, + trace: data_slice, + }, + routing_table, + rank, + self_destination, + ); + } + } + } + } + + // on subkernel request + pub fn playback( + &mut self, + id: u32, + timestamp: u64, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + // route all the playback requests + // remote traces (local trace runs on core1 unlike mainline firmware) + self.state = RemoteTraceState::Running(self.remote_traces.len()); + for (dest, _) in self.remote_traces.iter() { + router.route( + Packet::DmaPlaybackRequest { + source: self_destination, + destination: *dest, + id: id, + timestamp: timestamp, + }, + routing_table, + rank, + self_destination, + ); + // response will be ignored (succeeded = false handled by the main thread) + } + } + + // on incoming Packet::DmaPlaybackDone + pub fn remote_finished(&mut self, kernel_manager: &mut KernelManager, error: u8, channel: u32, timestamp: u64) { + if let RemoteTraceState::Running(count) = self.state { + if error != 0 || count - 1 == 0 { + // notify the kernel about a DDMA error or finish + kernel_manager.ddma_finished(error, channel, timestamp); + self.state = RemoteTraceState::Ready; + // further messages will be ignored (if there was an error) + } else { + // no error and not the last one awaited + self.state = RemoteTraceState::Running(count - 1); + } + } + } + + pub fn erase( + &mut self, + id: u32, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + for (dest, _) in self.remote_traces.iter() { + router.route( + Packet::DmaRemoveTraceRequest { + source: self_destination, + destination: *dest, + id: id, + }, + routing_table, + rank, + self_destination, + ); + // response will be ignored as this object will stop existing too + } + } + + pub fn has_remote_traces(&self) -> bool { + self.remote_traces.len() > 0 + } } #[derive(Debug)] pub struct Manager { - entries: BTreeMap, + entries: BTreeMap<(u8, u32), Entry>, state: ManagerState, - currentid: u32, + current_id: u32, + current_source: u8, + + remote_entries: BTreeMap, + name_map: BTreeMap, } impl Manager { @@ -45,79 +270,238 @@ impl Manager { unsafe { while csr::rtio_dma::enable_read() != 0 {} } Manager { entries: BTreeMap::new(), - currentid: 0, + current_id: 0, + current_source: 0, state: ManagerState::Idle, + remote_entries: BTreeMap::new(), + name_map: BTreeMap::new(), } } - pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> { - let entry = match self.entries.get_mut(&id) { + pub fn add( + &mut self, + source: u8, + id: u32, + status: PayloadStatus, + trace: &[u8], + trace_len: usize, + ) -> Result<(), Error> { + let entry = match self.entries.get_mut(&(source, id)) { Some(entry) => { if entry.complete || status.is_first() { // replace entry - self.entries.remove(&id); + self.entries.remove(&(source, id)); self.entries.insert( - id, + (source, id), Entry { trace: Vec::new(), padding_len: 0, complete: false, + duration: 0, }, ); - self.entries.get_mut(&id).unwrap() + self.entries.get_mut(&(source, id)).unwrap() } else { entry } } None => { self.entries.insert( - id, + (source, id), Entry { trace: Vec::new(), padding_len: 0, complete: false, + duration: 0, }, ); - self.entries.get_mut(&id).unwrap() + self.entries.get_mut(&(source, id)).unwrap() } }; entry.trace.extend(&trace[0..trace_len]); if status.is_last() { - entry.trace.push(0); - let data_len = entry.trace.len(); - - // Realign. - entry.trace.reserve(ALIGNMENT - 1); - let padding = ALIGNMENT - entry.trace.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - for _ in 0..padding { - // Vec guarantees that this will not reallocate - entry.trace.push(0) - } - for i in 1..data_len + 1 { - entry.trace[data_len + padding - i] = entry.trace[data_len - i] - } - entry.complete = true; - entry.padding_len = padding; - dcci_slice(&entry.trace); + entry.realign(); } Ok(()) } - pub fn erase(&mut self, id: u32) -> Result<(), Error> { - match self.entries.remove(&id) { + // api for DRTIO + pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> { + match self.entries.remove(&(source, id)) { Some(_) => Ok(()), None => Err(Error::IdNotFound), } } - pub fn playback(&mut self, id: u32, timestamp: u64) -> Result<(), Error> { + // API for subkernel + pub fn erase_name( + &mut self, + name: &str, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + if let Some(id) = self.name_map.get(name) { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.erase(*id, router, rank, self_destination, routing_table); + self.remote_entries.remove(&id); + } + self.entries.remove(&(self_destination, *id)); + self.name_map.remove(name); + } + } + + pub fn remote_finished( + &mut self, + kernel_manager: &mut KernelManager, + id: u32, + error: u8, + channel: u32, + timestamp: u64, + ) { + if let Some(entry) = self.remote_entries.get_mut(&id) { + entry.remote_finished(kernel_manager, error, channel, timestamp); + } + } + + pub fn ack_upload( + &mut self, + kernel_manager: &mut KernelManager, + source: u8, + id: u32, + succeeded: bool, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + if let Some(entry) = self.remote_entries.get_mut(&id) { + entry.ack_upload( + kernel_manager, + source, + id, + succeeded, + router, + rank, + self_destination, + routing_table, + ); + } + } + + // API for subkernel + pub fn upload_traces( + &mut self, + id: u32, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) -> Result { + let remote_traces = self.remote_entries.get_mut(&id); + let mut len = 0; + if let Some(traces) = remote_traces { + len = traces.upload_traces(id, router, rank, self_destination, routing_table); + } + Ok(len) + } + + // API for subkernel + pub fn playback_remote( + &mut self, + id: u32, + timestamp: u64, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) -> Result<(), Error> { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.playback(id, timestamp, router, rank, self_destination, routing_table); + Ok(()) + } else { + Err(Error::IdNotFound) + } + } + + // API for subkernel + pub fn cleanup(&mut self, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + // after subkernel ends, remove all self-generated traces + for (_, id) in self.name_map.iter_mut() { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.erase(*id, router, rank, self_destination, routing_table); + self.remote_entries.remove(&id); + } + self.entries.remove(&(self_destination, *id)); + } + self.name_map.clear(); + } + + // API for subkernel + pub fn retrieve(&self, self_destination: u8, name: &String) -> Option<(i32, i64, bool)> { + let id = self.name_map.get(name)?; + let duration = self.entries.get(&(self_destination, *id))?.duration; + let uses_ddma = self.has_remote_traces(*id); + Some((*id as i32, duration, uses_ddma)) + } + + pub fn has_remote_traces(&self, id: u32) -> bool { + match self.remote_entries.get(&id) { + Some(traces) => traces.has_remote_traces(), + _ => false, + } + } + + pub fn put_record(&mut self, mut recorder: DmaRecorder, self_destination: u8) -> Result { + let mut remote_traces: BTreeMap = BTreeMap::new(); + + let mut local_trace: Vec = Vec::new(); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + recorder.buffer.push(0); + while recorder.buffer[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = recorder.buffer[ptr] as usize; + let destination = recorder.buffer[ptr + 3]; + if destination == 0 { + return Err(Error::MasterDmaFound); + } else if destination == self_destination { + local_trace.extend(&recorder.buffer[ptr..ptr + len]); + } else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&recorder.buffer[ptr..ptr + len]); + } else { + remote_traces.insert( + destination, + Sliceable::new(destination, recorder.buffer[ptr..ptr + len].to_vec()), + ); + } + } + // and jump to the next event + ptr += len; + } + let local_entry = Entry::from_vec(local_trace, recorder.duration); + + let id = local_entry.id(); + self.entries.insert((self_destination, id), local_entry); + self.remote_entries.insert(id, RemoteTraces::new(remote_traces)); + let mut name = String::new(); + mem::swap(&mut recorder.name, &mut name); + self.name_map.insert(name, id); + + Ok(id) + } + + pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> { if self.state != ManagerState::Idle { return Err(Error::PlaybackInProgress); } - let entry = match self.entries.get(&id) { + let entry = match self.entries.get(&(source, id)) { Some(entry) => entry, None => { return Err(Error::IdNotFound); @@ -130,7 +514,8 @@ impl Manager { assert!(ptr as u32 % 64 == 0); self.state = ManagerState::Playback; - self.currentid = id; + self.current_id = id; + self.current_source = source; unsafe { csr::rtio_dma::base_address_write(ptr as u32); @@ -162,7 +547,8 @@ impl Manager { csr::rtio_dma::error_write(1); } return Some(RtioStatus { - id: self.currentid, + source: self.current_source, + id: self.current_id, error: error, channel: channel, timestamp: timestamp, diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index 80d53cd..fab1218 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -39,11 +39,13 @@ use libboard_zynq::{i2c::I2c, print, println, time::Milliseconds, timer::GlobalT use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR}; use libregister::RegisterR; use libsupport_zynq::ram; +use routing::Router; use subkernel::Manager as KernelManager; mod analyzer; mod dma; mod repeater; +mod routing; mod subkernel; fn drtiosat_reset(reset: bool) { @@ -72,6 +74,12 @@ fn drtiosat_tsc_loaded() -> bool { } } +fn drtiosat_async_ready() { + unsafe { + csr::drtiosat::async_messages_ready_write(1); + } +} + #[cfg(has_drtio_routing)] macro_rules! forward { ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{ @@ -79,7 +87,11 @@ macro_rules! forward { if hop != 0 { let repno = (hop - 1) as usize; if repno < $repeaters.len() { - return $repeaters[repno].aux_forward($packet, $timer); + if $packet.expects_response() { + return $repeaters[repno].aux_forward($packet, $timer); + } else { + return $repeaters[repno].aux_send($packet); + } } else { return Err(drtioaux::Error::RoutingError); } @@ -95,13 +107,15 @@ macro_rules! forward { fn process_aux_packet( _repeaters: &mut [repeater::Repeater], _routing_table: &mut drtio_routing::RoutingTable, - _rank: &mut u8, + rank: &mut u8, + self_destination: &mut u8, packet: drtioaux::Packet, timer: &mut GlobalTimer, i2c: &mut I2c, dma_manager: &mut DmaManager, analyzer: &mut Analyzer, kernel_manager: &mut KernelManager, + router: &mut Router, ) -> Result<(), drtioaux::Error> { // In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels, // and u16 otherwise; hence the `as _` conversion. @@ -122,82 +136,39 @@ fn process_aux_packet( drtioaux::Packet::DestinationStatusRequest { destination } => { #[cfg(has_drtio_routing)] - let hop = _routing_table.0[destination as usize][*_rank as usize]; + let hop = _routing_table.0[destination as usize][*rank as usize]; #[cfg(not(has_drtio_routing))] let hop = 0; if hop == 0 { - if let Some(status) = dma_manager.check_state() { - info!( - "playback done, error: {}, channel: {}, timestamp: {}", - status.error, status.channel, status.timestamp - ); - drtioaux::send( - 0, - &drtioaux::Packet::DmaPlaybackStatus { - destination: destination, - id: status.id, - error: status.error, - channel: status.channel, - timestamp: status.timestamp, - }, - )?; - } else if let Some(subkernel_finished) = kernel_manager.get_last_finished() { - info!( - "subkernel {} finished, with exception: {}", - subkernel_finished.id, subkernel_finished.with_exception - ); - drtioaux::send( - 0, - &drtioaux::Packet::SubkernelFinished { - id: subkernel_finished.id, - with_exception: subkernel_finished.with_exception, - }, - )?; - } else if kernel_manager.message_is_ready() { - let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; - match kernel_manager.message_get_slice(&mut data_slice) { - Some(meta) => drtioaux::send( - 0, - &drtioaux::Packet::SubkernelMessage { - destination: destination, - id: kernel_manager.get_current_id().unwrap(), - status: meta.status, - length: meta.len as u16, - data: data_slice, - }, - )?, - None => warn!("subkernel message is ready but no message is present"), - } - } else { - let errors; + *self_destination = destination; + let errors; + unsafe { + errors = csr::drtiosat::rtio_error_read(); + } + if errors & 1 != 0 { + let channel; unsafe { - errors = csr::drtiosat::rtio_error_read(); + channel = csr::drtiosat::sequence_error_channel_read(); + csr::drtiosat::rtio_error_write(1); } - if errors & 1 != 0 { - let channel; - unsafe { - channel = csr::drtiosat::sequence_error_channel_read(); - csr::drtiosat::rtio_error_write(1); - } - drtioaux::send(0, &drtioaux::Packet::DestinationSequenceErrorReply { channel })?; - } else if errors & 2 != 0 { - let channel; - unsafe { - channel = csr::drtiosat::collision_channel_read(); - csr::drtiosat::rtio_error_write(2); - } - drtioaux::send(0, &drtioaux::Packet::DestinationCollisionReply { channel })?; - } else if errors & 4 != 0 { - let channel; - unsafe { - channel = csr::drtiosat::busy_channel_read(); - csr::drtiosat::rtio_error_write(4); - } - drtioaux::send(0, &drtioaux::Packet::DestinationBusyReply { channel })?; - } else { - drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?; + drtioaux::send(0, &drtioaux::Packet::DestinationSequenceErrorReply { channel })?; + } else if errors & 2 != 0 { + let channel; + unsafe { + channel = csr::drtiosat::collision_channel_read(); + csr::drtiosat::rtio_error_write(2); } + drtioaux::send(0, &drtioaux::Packet::DestinationCollisionReply { channel })?; + } else if errors & 4 != 0 { + let channel; + unsafe { + channel = csr::drtiosat::busy_channel_read(); + csr::drtiosat::rtio_error_write(4); + } + drtioaux::send(0, &drtioaux::Packet::DestinationBusyReply { channel })?; + } else { + drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?; } } @@ -242,11 +213,11 @@ fn process_aux_packet( drtioaux::send(0, &drtioaux::Packet::RoutingAck) } #[cfg(has_drtio_routing)] - drtioaux::Packet::RoutingSetRank { rank } => { - *_rank = rank; - drtio_routing::interconnect_enable_all(_routing_table, rank); + drtioaux::Packet::RoutingSetRank { rank: new_rank } => { + *rank = new_rank; + drtio_routing::interconnect_enable_all(_routing_table, new_rank); - let rep_rank = rank + 1; + let rep_rank = new_rank + 1; for rep in _repeaters.iter() { if let Err(e) = rep.set_rank(rep_rank, timer) { error!("failed to set rank ({:?})", e); @@ -267,12 +238,20 @@ fn process_aux_packet( #[cfg(not(has_drtio_routing))] drtioaux::Packet::RoutingSetRank { rank: _ } => drtioaux::send(0, &drtioaux::Packet::RoutingAck), + drtioaux::Packet::RoutingRetrievePackets => { + let packet = router + .get_upstream_packet() + .or(Some(drtioaux::Packet::RoutingNoPackets)) + .unwrap(); + drtioaux::send(0, &packet) + } + drtioaux::Packet::MonitorRequest { destination: _destination, channel, probe, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -294,7 +273,7 @@ fn process_aux_packet( overrd, value, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); #[cfg(has_rtio_moninj)] unsafe { csr::rtio_moninj::inj_chan_sel_write(channel as _); @@ -308,7 +287,7 @@ fn process_aux_packet( channel, overrd, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -327,7 +306,7 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let succeeded = i2c.start().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -335,7 +314,7 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let succeeded = i2c.restart().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -343,7 +322,7 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let succeeded = i2c.stop().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -352,7 +331,7 @@ fn process_aux_packet( busno: _busno, data, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); match i2c.write(data) { Ok(ack) => drtioaux::send( 0, @@ -375,7 +354,7 @@ fn process_aux_packet( busno: _busno, ack, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); match i2c.read(ack) { Ok(data) => drtioaux::send( 0, @@ -399,7 +378,7 @@ fn process_aux_packet( address, mask, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let ch = match mask { //decode from mainline, PCA9548-centric API 0x00 => None, @@ -425,7 +404,7 @@ fn process_aux_packet( div: _div, cs: _cs, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); // todo: reimplement when/if SPI is available //let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) @@ -435,7 +414,7 @@ fn process_aux_packet( busno: _busno, data: _data, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); // todo: reimplement when/if SPI is available //let succeeded = spi::write(busno, data).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) @@ -444,7 +423,7 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); // todo: reimplement when/if SPI is available // match spi::read(busno) { // Ok(data) => drtioaux::send(0, @@ -464,7 +443,7 @@ fn process_aux_packet( drtioaux::Packet::AnalyzerHeaderRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let header = analyzer.get_header(); drtioaux::send( 0, @@ -478,7 +457,7 @@ fn process_aux_packet( drtioaux::Packet::AnalyzerDataRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = analyzer.get_data(&mut data_slice); drtioaux::send( @@ -492,55 +471,135 @@ fn process_aux_packet( } drtioaux::Packet::DmaAddTraceRequest { - destination: _destination, + source, + destination, id, status, length, trace, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = dma_manager.add(id, status, &trace, length as usize).is_ok(); - drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded }) + forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); + *self_destination = destination; + let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok(); + router.send( + drtioaux::Packet::DmaAddTraceReply { + source: *self_destination, + destination: source, + id: id, + succeeded: succeeded, + }, + _routing_table, + *rank, + *self_destination, + ) + } + drtioaux::Packet::DmaAddTraceReply { + source, + destination: _destination, + id, + succeeded, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + dma_manager.ack_upload( + kernel_manager, + source, + id, + succeeded, + router, + *rank, + *self_destination, + _routing_table, + ); + Ok(()) } drtioaux::Packet::DmaRemoveTraceRequest { + source, destination: _destination, id, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = dma_manager.erase(id).is_ok(); - drtioaux::send(0, &drtioaux::Packet::DmaRemoveTraceReply { succeeded: succeeded }) + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + let succeeded = dma_manager.erase(source, id).is_ok(); + router.send( + drtioaux::Packet::DmaRemoveTraceReply { + destination: source, + succeeded: succeeded, + }, + _routing_table, + *rank, + *self_destination, + ) + } + drtioaux::Packet::DmaRemoveTraceReply { + destination: _destination, + succeeded: _, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + Ok(()) } drtioaux::Packet::DmaPlaybackRequest { + source, destination: _destination, id, timestamp, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let succeeded = if !kernel_manager.running() { - dma_manager.playback(id, timestamp).is_ok() + dma_manager.playback(source, id, timestamp).is_ok() } else { false }; - drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded }) + router.send( + drtioaux::Packet::DmaPlaybackReply { + destination: source, + succeeded: succeeded, + }, + _routing_table, + *rank, + *self_destination, + ) + } + drtioaux::Packet::DmaPlaybackReply { + destination: _destination, + succeeded, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + if !succeeded { + kernel_manager.ddma_nack(); + } + Ok(()) + } + drtioaux::Packet::DmaPlaybackStatus { + source: _, + destination: _destination, + id, + error, + channel, + timestamp, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp); + Ok(()) } drtioaux::Packet::SubkernelAddDataRequest { - destination: _destination, + destination, id, status, length, data, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); + *self_destination = destination; let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) } drtioaux::Packet::SubkernelLoadRunRequest { + source, destination: _destination, id, run, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let mut succeeded = kernel_manager.load(id).is_ok(); // allow preloading a kernel with delayed run if run { @@ -548,15 +607,42 @@ fn process_aux_packet( // cannot run kernel while DDMA is running succeeded = false; } else { - succeeded |= kernel_manager.run(id).is_ok(); + succeeded |= kernel_manager.run(source, id).is_ok(); } } - drtioaux::send(0, &drtioaux::Packet::SubkernelLoadRunReply { succeeded: succeeded }) + router.send( + drtioaux::Packet::SubkernelLoadRunReply { + destination: source, + succeeded: succeeded, + }, + _routing_table, + *rank, + *self_destination, + ) + } + drtioaux::Packet::SubkernelLoadRunReply { + destination: _destination, + succeeded, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + // received if local subkernel started another, remote subkernel + kernel_manager.subkernel_load_run_reply(succeeded); + Ok(()) + } + drtioaux::Packet::SubkernelFinished { + destination: _destination, + id, + with_exception, + exception_src, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + kernel_manager.remote_subkernel_finished(id, with_exception, exception_src); + Ok(()) } drtioaux::Packet::SubkernelExceptionRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = kernel_manager.exception_get_slice(&mut data_slice); drtioaux::send( @@ -569,38 +655,43 @@ fn process_aux_packet( ) } drtioaux::Packet::SubkernelMessage { - destination, + source, + destination: _destination, id: _id, status, length, data, } => { - forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); kernel_manager.message_handle_incoming(status, length as usize, &data); - drtioaux::send( - 0, - &drtioaux::Packet::SubkernelMessageAck { - destination: destination, - }, + router.send( + drtioaux::Packet::SubkernelMessageAck { destination: source }, + _routing_table, + *rank, + *self_destination, ) } drtioaux::Packet::SubkernelMessageAck { destination: _destination, } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); if kernel_manager.message_ack_slice() { let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { - drtioaux::send( - 0, - &drtioaux::Packet::SubkernelMessage { - destination: *_rank, + // route and not send immediately as ACKs are not a beginning of a transaction + router.route( + drtioaux::Packet::SubkernelMessage { + source: *self_destination, + destination: meta.destination, id: kernel_manager.get_current_id().unwrap(), status: meta.status, length: meta.len as u16, data: data_slice, }, - )?; + _routing_table, + *rank, + *self_destination, + ); } else { error!("Error receiving message slice"); } @@ -608,8 +699,8 @@ fn process_aux_packet( Ok(()) } - _ => { - warn!("received unexpected aux packet"); + p => { + warn!("received unexpected aux packet: {:?}", p); Ok(()) } } @@ -619,32 +710,35 @@ fn process_aux_packets( repeaters: &mut [repeater::Repeater], routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, + self_destination: &mut u8, timer: &mut GlobalTimer, i2c: &mut I2c, dma_manager: &mut DmaManager, analyzer: &mut Analyzer, kernel_manager: &mut KernelManager, + router: &mut Router, ) { let result = drtioaux::recv(0).and_then(|packet| { - if let Some(packet) = packet { + if let Some(packet) = packet.or_else(|| router.get_local_packet()) { process_aux_packet( repeaters, routing_table, rank, + self_destination, packet, timer, i2c, dma_manager, analyzer, kernel_manager, + router, ) } else { Ok(()) } }); - match result { - Ok(()) => (), - Err(e) => warn!("aux packet error ({:?})", e), + if let Err(e) = result { + warn!("aux packet error ({:?})", e); } } @@ -800,17 +894,20 @@ pub extern "C" fn main_core0() -> i32 { } let mut routing_table = drtio_routing::RoutingTable::default_empty(); let mut rank = 1; + let mut destination = 1; let mut hardware_tick_ts = 0; let mut control = ksupport::kernel::Control::start(); loop { + let mut router = Router::new(); + while !drtiosat_link_rx_up() { drtiosat_process_errors(); #[allow(unused_mut)] for mut rep in repeaters.iter_mut() { - rep.service(&routing_table, rank, &mut timer); + rep.service(&routing_table, rank, destination, &mut router, &mut timer); } #[cfg(feature = "target_kasli_soc")] { @@ -849,15 +946,17 @@ pub extern "C" fn main_core0() -> i32 { &mut repeaters, &mut routing_table, &mut rank, + &mut destination, &mut timer, &mut i2c, &mut dma_manager, &mut analyzer, &mut kernel_manager, + &mut router, ); #[allow(unused_mut)] for mut rep in repeaters.iter_mut() { - rep.service(&routing_table, rank, &mut timer); + rep.service(&routing_table, rank, destination, &mut router, &mut timer); } #[cfg(feature = "target_kasli_soc")] { @@ -880,7 +979,45 @@ pub extern "C" fn main_core0() -> i32 { error!("aux packet error: {:?}", e); } } - kernel_manager.process_kern_requests(rank, timer); + if let Some(status) = dma_manager.check_state() { + info!( + "playback done, error: {}, channel: {}, timestamp: {}", + status.error, status.channel, status.timestamp + ); + router.route( + drtioaux::Packet::DmaPlaybackStatus { + source: destination, + destination: status.source, + id: status.id, + error: status.error, + channel: status.channel, + timestamp: status.timestamp, + }, + &routing_table, + rank, + destination, + ); + } + + kernel_manager.process_kern_requests( + &mut router, + &routing_table, + rank, + destination, + &mut dma_manager, + &timer, + ); + + #[cfg(has_drtio_routing)] + if let Some((repno, packet)) = router.get_downstream_packet() { + if let Err(e) = repeaters[repno].aux_send(&packet) { + warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e) + } + } + + if router.any_upstream_waiting() { + drtiosat_async_ready(); + } } drtiosat_reset_phy(true); diff --git a/src/satman/src/repeater.rs b/src/satman/src/repeater.rs index 2a20b30..e7f6b78 100644 --- a/src/satman/src/repeater.rs +++ b/src/satman/src/repeater.rs @@ -6,6 +6,7 @@ use libboard_artiq::{drtio_routing, drtioaux}; #[cfg(has_drtio_routing)] use libboard_zynq::time::Milliseconds; use libboard_zynq::timer::GlobalTimer; +use routing::Router; #[cfg(has_drtio_routing)] fn rep_link_rx_up(repno: u8) -> bool { @@ -53,7 +54,14 @@ impl Repeater { self.state == RepeaterState::Up } - pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8, timer: &mut GlobalTimer) { + pub fn service( + &mut self, + routing_table: &drtio_routing::RoutingTable, + rank: u8, + destination: u8, + router: &mut Router, + timer: &mut GlobalTimer, + ) { self.process_local_errors(); match self.state { @@ -116,6 +124,11 @@ impl Repeater { info!("[REP#{}] link is down", self.repno); self.state = RepeaterState::Down; } + if self.async_messages_ready() { + if let Err(e) = self.handle_async(routing_table, rank, destination, router, timer) { + warn!("[REP#{}] Error handling async messages ({:?})", self.repno, e); + } + } } RepeaterState::Failed => { if !rep_link_rx_up(self.repno) { @@ -173,6 +186,34 @@ impl Repeater { } } + fn async_messages_ready(&self) -> bool { + let async_rdy; + unsafe { + async_rdy = (csr::DRTIOREP[self.repno as usize].async_messages_ready_read)(); + (csr::DRTIOREP[self.repno as usize].async_messages_ready_write)(0); + } + async_rdy == 1 + } + + fn handle_async( + &self, + routing_table: &drtio_routing::RoutingTable, + rank: u8, + self_destination: u8, + router: &mut Router, + timer: &mut GlobalTimer, + ) -> Result<(), drtioaux::Error> { + loop { + drtioaux::send(self.auxno, &drtioaux::Packet::RoutingRetrievePackets).unwrap(); + let reply = self.recv_aux_timeout(200, timer)?; + match reply { + drtioaux::Packet::RoutingNoPackets => break, + packet => router.route(packet, routing_table, rank, self_destination), + } + } + Ok(()) + } + fn recv_aux_timeout(&self, timeout: u32, timer: &mut GlobalTimer) -> Result { let max_time = timer.get_time() + Milliseconds(timeout.into()); loop { @@ -191,15 +232,19 @@ impl Repeater { } pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { - if self.state != RepeaterState::Up { - return Err(drtioaux::Error::LinkDown); - } - drtioaux::send(self.auxno, request).unwrap(); + self.aux_send(request)?; let reply = self.recv_aux_timeout(200, timer)?; drtioaux::send(0, &reply).unwrap(); Ok(()) } + pub fn aux_send(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> { + if self.state != RepeaterState::Up { + return Err(drtioaux::Error::LinkDown); + } + drtioaux::send(self.auxno, request) + } + pub fn sync_tsc(&self, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { if self.state != RepeaterState::Up { return Ok(()); @@ -302,7 +347,15 @@ impl Repeater { Repeater::default() } - pub fn service(&self, _routing_table: &drtio_routing::RoutingTable, _rank: u8, _timer: &mut GlobalTimer) {} + pub fn service( + &self, + _routing_table: &drtio_routing::RoutingTable, + _rank: u8, + _destination: u8, + _router: &mut Router, + _timer: &mut GlobalTimer, + ) { + } pub fn sync_tsc(&self, _timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { Ok(()) diff --git a/src/satman/src/routing.rs b/src/satman/src/routing.rs new file mode 100644 index 0000000..46f8e7f --- /dev/null +++ b/src/satman/src/routing.rs @@ -0,0 +1,190 @@ +use alloc::{collections::vec_deque::VecDeque, vec::Vec}; +use core::cmp::min; + +#[cfg(has_drtio_routing)] +use libboard_artiq::pl::csr; +use libboard_artiq::{drtio_routing, drtioaux, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}}; + +pub struct SliceMeta { + pub destination: u8, + pub len: u16, + pub status: PayloadStatus, +} + +/* represents data that has to be sent to Master */ +#[derive(Debug)] +pub struct Sliceable { + it: usize, + data: Vec, + destination: u8, +} + +macro_rules! get_slice_fn { + ($name:tt, $size:expr) => { + pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { + let first = self.it == 0; + let len = min($size, self.data.len() - self.it); + let last = self.it + len == self.data.len(); + let status = PayloadStatus::from_status(first, last); + + data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); + self.it += len; + + SliceMeta { + destination: self.destination, + len: len as u16, + status: status, + } + } + }; +} + +impl Sliceable { + pub fn new(destination: u8, data: Vec) -> Sliceable { + Sliceable { + it: 0, + data: data, + destination: destination, + } + } + + pub fn at_end(&self) -> bool { + self.it == self.data.len() + } + + pub fn extend(&mut self, data: &[u8]) { + self.data.extend(data); + } + + get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); + get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); +} + +// Packets from downstream (further satellites) are received and routed appropriately. +// they're passed as soon as possible downstream (within the subtree), or sent upstream, +// which is notified about pending packets. +// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest; +// for higher ranks, after getting a notification, it will transact with downstream to get the pending packets. + +// forward! macro is not deprecated, as routable packets are only these that can originate +// from both master and satellite, e.g. DDMA and Subkernel. + +pub struct Router { + upstream_queue: VecDeque, + local_queue: VecDeque, + #[cfg(has_drtio_routing)] + downstream_queue: VecDeque<(usize, drtioaux::Packet)>, + upstream_notified: bool, +} + +impl Router { + pub fn new() -> Router { + Router { + upstream_queue: VecDeque::new(), + local_queue: VecDeque::new(), + #[cfg(has_drtio_routing)] + downstream_queue: VecDeque::new(), + upstream_notified: false, + } + } + + // Called by local sources (DDMA, kernel) and by repeaters on receiving async data; + // messages are always buffered for both upstream and downstream + pub fn route( + &mut self, + packet: drtioaux::Packet, + _routing_table: &drtio_routing::RoutingTable, + _rank: u8, + self_destination: u8, + ) { + let destination = packet.routable_destination(); + #[cfg(has_drtio_routing)] + { + if let Some(destination) = destination { + let hop = _routing_table.0[destination as usize][_rank as usize] as usize; + if destination == self_destination { + self.local_queue.push_back(packet); + } else if hop > 0 && hop < csr::DRTIOREP.len() { + let repno = (hop - 1) as usize; + self.downstream_queue.push_back((repno, packet)); + } else { + self.upstream_queue.push_back(packet); + } + } else { + error!("Received an unroutable packet: {:?}", packet); + } + } + #[cfg(not(has_drtio_routing))] + { + if destination == Some(self_destination) { + self.local_queue.push_back(packet); + } else { + self.upstream_queue.push_back(packet); + } + } + } + + // Sends a packet to a required destination, routing if necessary + pub fn send( + &mut self, + packet: drtioaux::Packet, + _routing_table: &drtio_routing::RoutingTable, + _rank: u8, + _destination: u8, + ) -> Result<(), drtioaux::Error> { + #[cfg(has_drtio_routing)] + { + let destination = packet.routable_destination(); + if let Some(destination) = destination { + let hop = _routing_table.0[destination as usize][_rank as usize] as usize; + if destination == 0 { + // response is needed immediately if master required it + drtioaux::send(0, &packet)?; + } else if !(hop > 0 && hop < csr::DRTIOREP.len()) { + // higher rank can wait + self.upstream_queue.push_back(packet); + } else { + let repno = (hop - 1) as usize; + // transaction will occur at closest possible opportunity + self.downstream_queue.push_back((repno, packet)); + } + Ok(()) + } else { + // packet not supported in routing, fallback - sent directly + drtioaux::send(0, &packet) + } + } + #[cfg(not(has_drtio_routing))] + { + drtioaux::send(0, &packet) + } + } + + pub fn any_upstream_waiting(&mut self) -> bool { + let empty = self.upstream_queue.is_empty(); + if !empty && !self.upstream_notified { + self.upstream_notified = true; // so upstream will not get spammed with notifications + true + } else { + false + } + } + + pub fn get_upstream_packet(&mut self) -> Option { + let packet = self.upstream_queue.pop_front(); + if packet.is_none() { + self.upstream_notified = false; + } + packet + } + + #[cfg(has_drtio_routing)] + pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> { + self.downstream_queue.pop_front() + } + + pub fn get_local_packet(&mut self) -> Option { + self.local_queue.pop_front() + } +} diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index ce32bd4..50c9690 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -2,17 +2,21 @@ use alloc::{collections::{BTreeMap, VecDeque}, format, string::{String, ToString}, vec::Vec}; -use core::{cmp::min, option::NoneError, slice, str}; +use core::{option::NoneError, slice, str}; use core_io::{Error as IoError, Write}; use cslice::AsCSlice; +use dma::{Error as DmaError, Manager as DmaManager}; use io::{Cursor, ProtoWrite}; use ksupport::{eh_artiq, kernel, rpc}; -use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, +use libboard_artiq::{drtio_routing::RoutingTable, + drtioaux, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, pl::csr}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::sync_channel::Receiver; use log::warn; +use routing::{Router, SliceMeta, Sliceable}; #[derive(Debug, Clone, PartialEq)] enum KernelState { @@ -21,6 +25,24 @@ enum KernelState { Running, MsgAwait(Milliseconds, Vec), MsgSending, + SubkernelAwaitLoad, + SubkernelAwaitFinish { + max_time: Milliseconds, + id: u32, + }, + DmaUploading, + DmaPendingPlayback { + id: u32, + timestamp: u64, + }, + DmaPendingAwait { + id: u32, + timestamp: u64, + max_time: Milliseconds, + }, + DmaAwait { + max_time: Milliseconds, + }, } #[derive(Debug)] @@ -31,7 +53,9 @@ pub enum Error { NoMessage, AwaitingMessage, SubkernelIoError, + DrtioError, KernelException(Sliceable), + DmaError(DmaError), } impl From for Error { @@ -46,21 +70,26 @@ impl From for Error { } } +impl From for Error { + fn from(value: DmaError) -> Error { + Error::DmaError(value) + } +} + impl From<()> for Error { fn from(_: ()) -> Error { Error::NoMessage } } -macro_rules! unexpected { - ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); +impl From for Error { + fn from(_value: drtioaux::Error) -> Error { + Error::DrtioError + } } -/* represents data that has to be sent to Master */ -#[derive(Debug)] -pub struct Sliceable { - it: usize, - data: Vec, +macro_rules! unexpected { + ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); } /* represents interkernel messages */ @@ -72,7 +101,6 @@ struct Message { #[derive(PartialEq)] enum OutMessageState { NoMessage, - MessageReady, MessageBeingSent, MessageSent, MessageAcknowledged, @@ -92,6 +120,8 @@ struct Session { kernel_state: KernelState, last_exception: Option, messages: MessageManager, + source: u8, // which destination requested running the kernel + subkernels_finished: Vec, } impl Session { @@ -101,13 +131,15 @@ impl Session { kernel_state: KernelState::Absent, last_exception: None, messages: MessageManager::new(), + source: 0, + subkernels_finished: Vec::new(), } } fn running(&self) -> bool { match self.kernel_state { KernelState::Absent | KernelState::Loaded => false, - KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending => true, + _ => true, } } } @@ -129,39 +161,8 @@ pub struct Manager<'a> { pub struct SubkernelFinished { pub id: u32, pub with_exception: bool, -} - -pub struct SliceMeta { - pub len: u16, - pub status: PayloadStatus, -} - -macro_rules! get_slice_fn { - ($name:tt, $size:expr) => { - pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { - let first = self.it == 0; - let len = min($size, self.data.len() - self.it); - let last = self.it + len == self.data.len(); - let status = PayloadStatus::from_status(first, last); - - data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); - self.it += len; - - SliceMeta { - len: len as u16, - status: status, - } - } - }; -} - -impl Sliceable { - pub fn new(data: Vec) -> Sliceable { - Sliceable { it: 0, data: data } - } - - get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); - get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); + pub exception_source: u8, + pub source: u8, } impl MessageManager { @@ -194,17 +195,6 @@ impl MessageManager { } } - pub fn is_outgoing_ready(&mut self) -> bool { - // called by main loop, to see if there's anything to send, will send it afterwards - match self.out_state { - OutMessageState::MessageReady => { - self.out_state = OutMessageState::MessageBeingSent; - true - } - _ => false, - } - } - pub fn was_message_acknowledged(&mut self) -> bool { match self.out_state { OutMessageState::MessageAcknowledged => { @@ -244,10 +234,34 @@ impl MessageManager { } } - pub fn accept_outgoing(&mut self, message: Vec) -> Result<(), Error> { - // service tag skipped in kernel - self.out_message = Some(Sliceable::new(message)); - self.out_state = OutMessageState::MessageReady; + pub fn accept_outgoing( + &mut self, + id: u32, + self_destination: u8, + destination: u8, + message: Vec, + routing_table: &RoutingTable, + rank: u8, + router: &mut Router, + ) -> Result<(), Error> { + self.out_message = Some(Sliceable::new(destination, message)); + + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + self.out_state = OutMessageState::MessageBeingSent; + let meta = self.get_outgoing_slice(&mut data_slice).unwrap(); + router.route( + drtioaux::Packet::SubkernelMessage { + source: self_destination, + destination: destination, + id: id, + status: meta.status, + length: meta.len as u16, + data: data_slice, + }, + routing_table, + rank, + self_destination, + ); Ok(()) } @@ -313,12 +327,12 @@ impl<'a> Manager<'_> { } } - pub fn run(&mut self, id: u32) -> Result<(), Error> { - info!("starting subkernel #{}", id); + pub fn run(&mut self, source: u8, id: u32) -> Result<(), Error> { if self.session.kernel_state != KernelState::Loaded || self.session.id != id { self.load(id)?; } self.session.kernel_state = KernelState::Running; + self.session.source = source; unsafe { csr::cri_con::selected_write(2); } @@ -354,10 +368,6 @@ impl<'a> Manager<'_> { self.session.messages.ack_slice() } - pub fn message_is_ready(&mut self) -> bool { - self.session.messages.is_outgoing_ready() - } - pub fn load(&mut self, id: u32) -> Result<(), Error> { if self.session.id == id && self.session.kernel_state == KernelState::Loaded { return Ok(()); @@ -386,16 +396,13 @@ impl<'a> Manager<'_> { match self.session.last_exception.as_mut() { Some(exception) => exception.get_slice_sat(data_slice), None => SliceMeta { + destination: 0, len: 0, status: PayloadStatus::FirstAndLast, }, } } - pub fn get_last_finished(&mut self) -> Option { - self.last_finished.take() - } - fn kernel_stop(&mut self) { self.session.kernel_state = KernelState::Absent; unsafe { @@ -425,13 +432,92 @@ impl<'a> Manager<'_> { &[], 0, ) { - Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())), + Ok(_) => self.session.last_exception = Some(Sliceable::new(0, writer.into_inner())), Err(_) => error!("Error writing exception data"), } self.kernel_stop(); } - pub fn process_kern_requests(&mut self, rank: u8, timer: GlobalTimer) { + pub fn ddma_finished(&mut self, error: u8, channel: u32, timestamp: u64) { + if let KernelState::DmaAwait { .. } = self.session.kernel_state { + self.control.tx.send(kernel::Message::DmaAwaitRemoteReply { + timeout: false, + error: error, + channel: channel, + timestamp: timestamp, + }); + self.session.kernel_state = KernelState::Running; + } + } + + pub fn ddma_nack(&mut self) { + // for simplicity treat it as a timeout... + if let KernelState::DmaAwait { .. } = self.session.kernel_state { + self.control.tx.send(kernel::Message::DmaAwaitRemoteReply { + timeout: true, + error: 0, + channel: 0, + timestamp: 0, + }); + self.session.kernel_state = KernelState::Running; + } + } + + pub fn ddma_remote_uploaded(&mut self, succeeded: bool) -> Option<(u32, u64)> { + // returns a tuple of id, timestamp in case a playback needs to be started immediately + if !succeeded { + self.kernel_stop(); + self.runtime_exception(Error::DmaError(DmaError::UploadFail)); + } + let res = match self.session.kernel_state { + KernelState::DmaPendingPlayback { id, timestamp } => { + self.session.kernel_state = KernelState::Running; + Some((id, timestamp)) + } + KernelState::DmaPendingAwait { + id, + timestamp, + max_time, + } => { + self.session.kernel_state = KernelState::DmaAwait { max_time: max_time }; + Some((id, timestamp)) + } + KernelState::DmaUploading => { + self.session.kernel_state = KernelState::Running; + None + } + _ => None, + }; + res + } + + pub fn process_kern_requests( + &mut self, + router: &mut Router, + routing_table: &RoutingTable, + rank: u8, + destination: u8, + dma_manager: &mut DmaManager, + timer: &GlobalTimer, + ) { + if let Some(subkernel_finished) = self.last_finished.take() { + info!( + "subkernel {} finished, with exception: {}", + subkernel_finished.id, subkernel_finished.with_exception + ); + router.route( + drtioaux::Packet::SubkernelFinished { + destination: subkernel_finished.source, + id: subkernel_finished.id, + with_exception: subkernel_finished.with_exception, + exception_src: subkernel_finished.exception_source, + }, + &routing_table, + rank, + destination, + ); + } + if !self.running() { return; } @@ -444,6 +530,8 @@ impl<'a> Manager<'_> { self.last_finished = Some(SubkernelFinished { id: self.session.id, with_exception: true, + exception_source: destination, + source: self.session.source, }); } Err(e) => { @@ -452,15 +540,19 @@ impl<'a> Manager<'_> { self.last_finished = Some(SubkernelFinished { id: self.session.id, with_exception: true, + exception_source: destination, + source: self.session.source, }); } } - match self.process_kern_message(rank, timer) { + match self.process_kern_message(router, routing_table, rank, destination, dma_manager, timer) { Ok(true) => { self.last_finished = Some(SubkernelFinished { id: self.session.id, with_exception: false, + exception_source: 0, + source: self.session.source, }); } Ok(false) | Err(Error::NoMessage) => (), @@ -469,6 +561,8 @@ impl<'a> Manager<'_> { self.last_finished = Some(SubkernelFinished { id: self.session.id, with_exception: true, + exception_source: destination, + source: self.session.source, }); } Err(e) => { @@ -477,16 +571,52 @@ impl<'a> Manager<'_> { self.last_finished = Some(SubkernelFinished { id: self.session.id, with_exception: true, + exception_source: destination, + source: self.session.source, }); } } } - fn process_kern_message(&mut self, rank: u8, timer: GlobalTimer) -> Result { + pub fn subkernel_load_run_reply(&mut self, succeeded: bool) { + if self.session.kernel_state == KernelState::SubkernelAwaitLoad { + self.control + .tx + .send(kernel::Message::SubkernelLoadRunReply { succeeded: succeeded }); + self.session.kernel_state = KernelState::Running; + } else { + warn!("received unsolicited SubkernelLoadRunReply"); + } + } + + pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) { + if with_exception { + self.kernel_stop(); + self.last_finished = Some(SubkernelFinished { + source: self.session.source, + id: self.session.id, + with_exception: true, + exception_source: exception_source, + }) + } else { + self.session.subkernels_finished.push(id); + } + } + + fn process_kern_message( + &mut self, + router: &mut Router, + routing_table: &RoutingTable, + rank: u8, + self_destination: u8, + dma_manager: &mut DmaManager, + timer: &GlobalTimer, + ) -> Result { let reply = self.control.rx.try_recv()?; match reply { kernel::Message::KernelFinished(_async_errors) => { self.kernel_stop(); + dma_manager.cleanup(router, rank, self_destination, routing_table); return Ok(true); } kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => { @@ -503,7 +633,7 @@ impl<'a> Manager<'_> { Err(_) => error!("Error writing exception data"), } self.kernel_stop(); - return Err(Error::KernelException(Sliceable::new(writer.into_inner()))); + return Err(Error::KernelException(Sliceable::new(0, writer.into_inner()))); } kernel::Message::CachePutRequest(key, value) => { self.cache.insert(key, value); @@ -513,18 +643,104 @@ impl<'a> Manager<'_> { let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone(); self.control.tx.send(kernel::Message::CacheGetReply(value)); } - kernel::Message::SubkernelMsgSend { id: _, data } => { - self.session.messages.accept_outgoing(data)?; + + kernel::Message::DmaPutRequest(recorder) => { + // ddma is always used on satellites + if let Ok(id) = dma_manager.put_record(recorder, self_destination) { + dma_manager.upload_traces(id, router, rank, self_destination, routing_table)?; + self.session.kernel_state = KernelState::DmaUploading; + } else { + unexpected!("DMAError: found an unsupported call to RTIO devices on master") + } + } + kernel::Message::DmaEraseRequest(name) => { + dma_manager.erase_name(&name, router, rank, self_destination, routing_table); + } + kernel::Message::DmaGetRequest(name) => { + let dma_meta = dma_manager.retrieve(self_destination, &name); + self.control.tx.send(kernel::Message::DmaGetReply(dma_meta)); + } + kernel::Message::DmaStartRemoteRequest { id, timestamp } => { + if self.session.kernel_state != KernelState::DmaUploading { + dma_manager.playback_remote( + id as u32, + timestamp as u64, + router, + rank, + self_destination, + routing_table, + )?; + } else { + self.session.kernel_state = KernelState::DmaPendingPlayback { + id: id as u32, + timestamp: timestamp as u64, + }; + } + } + kernel::Message::DmaAwaitRemoteRequest(_id) => { + let max_time = timer.get_time() + Milliseconds(10000); + self.session.kernel_state = match self.session.kernel_state { + // if we are still waiting for the traces to be uploaded, extend the state by timeout + KernelState::DmaPendingPlayback { id, timestamp } => KernelState::DmaPendingAwait { + id: id, + timestamp: timestamp, + max_time: max_time, + }, + _ => KernelState::DmaAwait { max_time: max_time }, + }; + } + + kernel::Message::SubkernelMsgSend { + id: _id, + destination: msg_dest, + data, + } => { + let msg_dest = msg_dest.or(Some(self.session.source)).unwrap(); + self.session.messages.accept_outgoing( + self.session.id, + self_destination, + msg_dest, + data, + routing_table, + rank, + router, + )?; self.session.kernel_state = KernelState::MsgSending; } kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => { let max_time = timer.get_time() + Milliseconds(timeout); self.session.kernel_state = KernelState::MsgAwait(max_time, tags); } + kernel::Message::SubkernelLoadRunRequest { + id, + destination: sk_destination, + run, + } => { + self.session.kernel_state = KernelState::SubkernelAwaitLoad; + router.route( + drtioaux::Packet::SubkernelLoadRunRequest { + source: self_destination, + destination: sk_destination, + id: id, + run: run, + }, + routing_table, + rank, + self_destination, + ); + } + + kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => { + let max_time = timer.get_time() + Milliseconds(timeout); + self.session.kernel_state = KernelState::SubkernelAwaitFinish { + max_time: max_time, + id: id, + }; + } kernel::Message::UpDestinationsRequest(destination) => { - self.control - .tx - .send(kernel::Message::UpDestinationsReply(destination == (rank as i32))); + self.control.tx.send(kernel::Message::UpDestinationsReply( + destination == (self_destination as i32), + )); } _ => { unexpected!("unexpected message from core1 while kernel was running: {:?}", reply); @@ -533,7 +749,7 @@ impl<'a> Manager<'_> { Ok(false) } - fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> { + fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> { match &self.session.kernel_state { KernelState::MsgAwait(timeout, tags) => { if timer.get_time() > *timeout { @@ -565,11 +781,45 @@ impl<'a> Manager<'_> { Err(Error::AwaitingMessage) } } + KernelState::SubkernelAwaitFinish { max_time, id } => { + if timer.get_time() > *max_time { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { + status: kernel::SubkernelStatus::Timeout, + }); + self.session.kernel_state = KernelState::Running; + } else { + let mut i = 0; + for status in &self.session.subkernels_finished { + if *status == *id { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { + status: kernel::SubkernelStatus::NoError, + }); + self.session.kernel_state = KernelState::Running; + self.session.subkernels_finished.swap_remove(i); + break; + } + i += 1; + } + } + Ok(()) + } + KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => { + if timer.get_time() > *max_time { + self.control.tx.send(kernel::Message::DmaAwaitRemoteReply { + timeout: true, + error: 0, + channel: 0, + timestamp: 0, + }); + self.session.kernel_state = KernelState::Running; + } + Ok(()) + } _ => Ok(()), } } - fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec, timer: GlobalTimer) -> Result<(), Error> { + fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec, timer: &GlobalTimer) -> Result<(), Error> { let mut reader = Cursor::new(&message.data); let mut current_tags: &[u8] = &tags; let mut i = message.count; @@ -592,7 +842,7 @@ impl<'a> Manager<'_> { let mut writer = Cursor::new(buf); match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) { Ok(()) => { - exception = Some(Sliceable::new(writer.into_inner())); + exception = Some(Sliceable::new(0, writer.into_inner())); } Err(_) => { unexpected = Some("Error writing exception data".to_string()); @@ -686,7 +936,7 @@ where fn recv_w_timeout( rx: &mut Receiver<'_, kernel::Message>, - timer: GlobalTimer, + timer: &GlobalTimer, timeout: u64, ) -> Result { let max_time = timer.get_time() + Milliseconds(timeout);