forked from M-Labs/artiq
satman: make use of the async flag
This commit is contained in:
parent
1cc7398bc0
commit
95b92a178b
|
@ -77,6 +77,8 @@ pub enum Packet {
|
||||||
|
|
||||||
RoutingSetPath { destination: u8, hops: [u8; 32] },
|
RoutingSetPath { destination: u8, hops: [u8; 32] },
|
||||||
RoutingSetRank { rank: u8 },
|
RoutingSetRank { rank: u8 },
|
||||||
|
RoutingRetrievePackets,
|
||||||
|
RoutingNoPackets,
|
||||||
RoutingAck,
|
RoutingAck,
|
||||||
|
|
||||||
MonitorRequest { destination: u8, channel: u16, probe: u8 },
|
MonitorRequest { destination: u8, channel: u16, probe: u8 },
|
||||||
|
@ -168,6 +170,8 @@ impl Packet {
|
||||||
rank: reader.read_u8()?
|
rank: reader.read_u8()?
|
||||||
},
|
},
|
||||||
0x32 => Packet::RoutingAck,
|
0x32 => Packet::RoutingAck,
|
||||||
|
0x33 => Packet::RoutingRetrievePackets,
|
||||||
|
0x34 => Packet::RoutingNoPackets,
|
||||||
|
|
||||||
0x40 => Packet::MonitorRequest {
|
0x40 => Packet::MonitorRequest {
|
||||||
destination: reader.read_u8()?,
|
destination: reader.read_u8()?,
|
||||||
|
@ -450,6 +454,10 @@ impl Packet {
|
||||||
},
|
},
|
||||||
Packet::RoutingAck =>
|
Packet::RoutingAck =>
|
||||||
writer.write_u8(0x32)?,
|
writer.write_u8(0x32)?,
|
||||||
|
Packet::RoutingRetrievePackets =>
|
||||||
|
writer.write_u8(0x33)?,
|
||||||
|
Packet::RoutingNoPackets =>
|
||||||
|
writer.write_u8(0x34)?,
|
||||||
|
|
||||||
Packet::MonitorRequest { destination, channel, probe } => {
|
Packet::MonitorRequest { destination, channel, probe } => {
|
||||||
writer.write_u8(0x40)?;
|
writer.write_u8(0x40)?;
|
||||||
|
@ -685,4 +693,23 @@ impl Packet {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn routable_destination(&self) -> Option<u8> {
|
||||||
|
// only for packets that could be re-routed, not only forwarded
|
||||||
|
match self {
|
||||||
|
Packet::DmaAddTraceRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaAddTraceReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaRemoveTraceRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaRemoveTraceReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaPlaybackRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaPlaybackReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelLoadRunRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelMessage { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelMessageAck { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaPlaybackStatus { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelFinished { destination, .. } => Some(*destination),
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ pub mod drtio {
|
||||||
let up_destinations = up_destinations.clone();
|
let up_destinations = up_destinations.clone();
|
||||||
let ddma_mutex = ddma_mutex.clone();
|
let ddma_mutex = ddma_mutex.clone();
|
||||||
let subkernel_mutex = subkernel_mutex.clone();
|
let subkernel_mutex = subkernel_mutex.clone();
|
||||||
io.spawn(10240, move |io| {
|
io.spawn(8192, move |io| {
|
||||||
let routing_table = routing_table.borrow();
|
let routing_table = routing_table.borrow();
|
||||||
link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &subkernel_mutex);
|
link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &subkernel_mutex);
|
||||||
});
|
});
|
||||||
|
|
|
@ -650,6 +650,8 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
|
||||||
unsafe { kernel::stop() }
|
unsafe { kernel::stop() }
|
||||||
session.kernel_state = KernelState::Absent;
|
session.kernel_state = KernelState::Absent;
|
||||||
unsafe { session.congress.cache.unborrow() }
|
unsafe { session.congress.cache.unborrow() }
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
subkernel::clear_subkernels(io, _subkernel_mutex)?;
|
||||||
|
|
||||||
match stream {
|
match stream {
|
||||||
None => {
|
None => {
|
||||||
|
|
|
@ -140,7 +140,7 @@ struct Session {
|
||||||
last_exception: Option<Sliceable>,
|
last_exception: Option<Sliceable>,
|
||||||
source: u8, // which destination requested running the kernel
|
source: u8, // which destination requested running the kernel
|
||||||
messages: MessageManager,
|
messages: MessageManager,
|
||||||
subkernels_finished: VecDeque<(u32, bool, u8)> // tuple of id, with_exception, exception_source
|
subkernels_finished: Vec<u32> // ids of subkernels finished
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -286,13 +286,10 @@ impl MessageManager {
|
||||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||||
self.out_state = OutMessageState::MessageBeingSent;
|
self.out_state = OutMessageState::MessageBeingSent;
|
||||||
let meta = self.get_outgoing_slice(&mut data_slice).unwrap();
|
let meta = self.get_outgoing_slice(&mut data_slice).unwrap();
|
||||||
let res = router.route(drtioaux::Packet::SubkernelMessage {
|
router.route(drtioaux::Packet::SubkernelMessage {
|
||||||
source: self_destination, destination: destination, id: id,
|
source: self_destination, destination: destination, id: id,
|
||||||
status: meta.status, length: meta.len as u16, data: data_slice
|
status: meta.status, length: meta.len as u16, data: data_slice
|
||||||
}, routing_table, rank, self_destination);
|
}, routing_table, rank, self_destination);
|
||||||
if let Err(e) = res {
|
|
||||||
warn!("error sending SubkernelMessage: {}", e);
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,7 +306,7 @@ impl Session {
|
||||||
last_exception: None,
|
last_exception: None,
|
||||||
source: 0,
|
source: 0,
|
||||||
messages: MessageManager::new(),
|
messages: MessageManager::new(),
|
||||||
subkernels_finished: VecDeque::new()
|
subkernels_finished: Vec::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,6 +497,15 @@ impl Manager {
|
||||||
with_exception: $with_exception, exception_source: destination
|
with_exception: $with_exception, exception_source: destination
|
||||||
}) }}
|
}) }}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(subkernel_finished) = self.last_finished.take() {
|
||||||
|
info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception);
|
||||||
|
router.route(drtioaux::Packet::SubkernelFinished {
|
||||||
|
destination: subkernel_finished.source, id: subkernel_finished.id,
|
||||||
|
with_exception: subkernel_finished.with_exception, exception_src: subkernel_finished.exception_source
|
||||||
|
}, &routing_table, rank, destination);
|
||||||
|
}
|
||||||
|
|
||||||
if !self.is_running() {
|
if !self.is_running() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -534,17 +540,6 @@ impl Manager {
|
||||||
self.last_finished = finished!(true);
|
self.last_finished = finished!(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(subkernel_finished) = self.last_finished.take() {
|
|
||||||
info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception);
|
|
||||||
let res = router.route(drtioaux::Packet::SubkernelFinished {
|
|
||||||
destination: subkernel_finished.source, id: subkernel_finished.id,
|
|
||||||
with_exception: subkernel_finished.with_exception, exception_src: destination
|
|
||||||
}, &routing_table, rank, destination);
|
|
||||||
if let Err(e) = res {
|
|
||||||
warn!("error sending SubkernelFinished: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_external_messages(&mut self) -> Result<(), Error> {
|
fn process_external_messages(&mut self) -> Result<(), Error> {
|
||||||
|
@ -578,26 +573,15 @@ impl Manager {
|
||||||
self.session.kernel_state = KernelState::Running;
|
self.session.kernel_state = KernelState::Running;
|
||||||
} else {
|
} else {
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
for status in self.session.subkernels_finished.iter() {
|
for status in &self.session.subkernels_finished {
|
||||||
if status.0 == *id {
|
if *status == *id {
|
||||||
|
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?;
|
||||||
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
self.session.subkernels_finished.swap_remove(i);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
if let Some(finish_status) = self.session.subkernels_finished.remove(i) {
|
|
||||||
if finish_status.1 {
|
|
||||||
unsafe { kernel_cpu::stop() }
|
|
||||||
self.session.kernel_state = KernelState::Absent;
|
|
||||||
unsafe { self.cache.unborrow() }
|
|
||||||
self.last_finished = Some(SubkernelFinished {
|
|
||||||
source: self.session.source, id: self.current_id,
|
|
||||||
with_exception: true, exception_source: finish_status.2
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?;
|
|
||||||
self.session.kernel_state = KernelState::Running;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -623,7 +607,17 @@ impl Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
|
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
|
||||||
self.session.subkernels_finished.push_back((id, with_exception, exception_source));
|
if with_exception {
|
||||||
|
unsafe { kernel_cpu::stop() }
|
||||||
|
self.session.kernel_state = KernelState::Absent;
|
||||||
|
unsafe { self.cache.unborrow() }
|
||||||
|
self.last_finished = Some(SubkernelFinished {
|
||||||
|
source: self.session.source, id: self.current_id,
|
||||||
|
with_exception: true, exception_source: exception_source
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
self.session.subkernels_finished.push(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_kern_message(&mut self, router: &mut Router,
|
fn process_kern_message(&mut self, router: &mut Router,
|
||||||
|
@ -722,7 +716,7 @@ impl Manager {
|
||||||
self.session.kernel_state = KernelState::SubkernelAwaitLoad;
|
self.session.kernel_state = KernelState::SubkernelAwaitLoad;
|
||||||
router.route(drtioaux::Packet::SubkernelLoadRunRequest {
|
router.route(drtioaux::Packet::SubkernelLoadRunRequest {
|
||||||
source: destination, destination: sk_destination, id: id, run: run
|
source: destination, destination: sk_destination, id: id, run: run
|
||||||
}, routing_table, rank, destination).map_err(|_| Error::DrtioError)?;
|
}, routing_table, rank, destination);
|
||||||
kern_acknowledge()
|
kern_acknowledge()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,12 @@ fn drtiosat_tsc_loaded() -> bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn drtiosat_async_ready() {
|
||||||
|
unsafe {
|
||||||
|
csr::drtiosat::async_messages_ready_write(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub enum RtioMaster {
|
pub enum RtioMaster {
|
||||||
Drtio,
|
Drtio,
|
||||||
Dma,
|
Dma,
|
||||||
|
@ -90,7 +96,14 @@ macro_rules! forward {
|
||||||
if hop != 0 {
|
if hop != 0 {
|
||||||
let repno = (hop - 1) as usize;
|
let repno = (hop - 1) as usize;
|
||||||
if repno < $repeaters.len() {
|
if repno < $repeaters.len() {
|
||||||
return $repeaters[repno].aux_forward($packet);
|
if $packet.expects_response() {
|
||||||
|
return $repeaters[repno].aux_forward($packet);
|
||||||
|
} else {
|
||||||
|
let res = $repeaters[repno].aux_send($packet);
|
||||||
|
// allow the satellite to parse the packet before next
|
||||||
|
clock::spin_us(10_000);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(drtioaux::Error::RoutingError);
|
return Err(drtioaux::Error::RoutingError);
|
||||||
}
|
}
|
||||||
|
@ -135,7 +148,8 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
|
||||||
*self_destination = destination;
|
*self_destination = destination;
|
||||||
// async messages
|
// async messages
|
||||||
if *rank == 1 {
|
if *rank == 1 {
|
||||||
if let Some(packet) = router.get_upstream_packet(*rank) {
|
// for now, master ignores the async_messages_ready packet
|
||||||
|
if let Some(packet) = router.get_upstream_packet() {
|
||||||
// pass any async or routed packets to master
|
// pass any async or routed packets to master
|
||||||
// this does mean that DDMA/SK packets to master will "trickle down" to higher rank
|
// this does mean that DDMA/SK packets to master will "trickle down" to higher rank
|
||||||
return drtioaux::send(0, &packet)
|
return drtioaux::send(0, &packet)
|
||||||
|
@ -236,14 +250,10 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
|
||||||
drtioaux::send(0, &drtioaux::Packet::RoutingAck)
|
drtioaux::send(0, &drtioaux::Packet::RoutingAck)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(has_drtio_routing)]
|
drtioaux::Packet::RoutingRetrievePackets => {
|
||||||
drtioaux::Packet::RoutingAck => {
|
let packet = router.get_upstream_packet().or(
|
||||||
if *rank > 1 {
|
Some(drtioaux::Packet::RoutingNoPackets)).unwrap();
|
||||||
router.routing_ack_received();
|
drtioaux::send(0, &packet)
|
||||||
} else {
|
|
||||||
warn!("received unexpected RoutingAck");
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
drtioaux::Packet::MonitorRequest { destination: _destination, channel, probe } => {
|
drtioaux::Packet::MonitorRequest { destination: _destination, channel, probe } => {
|
||||||
|
@ -453,10 +463,11 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
|
||||||
if kernelmgr.message_ack_slice() {
|
if kernelmgr.message_ack_slice() {
|
||||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||||
if let Some(meta) = kernelmgr.message_get_slice(&mut data_slice) {
|
if let Some(meta) = kernelmgr.message_get_slice(&mut data_slice) {
|
||||||
router.send(drtioaux::Packet::SubkernelMessage {
|
// route and not send immediately as ACKs are not a beginning of a transaction
|
||||||
|
router.route(drtioaux::Packet::SubkernelMessage {
|
||||||
source: *self_destination, destination: meta.destination, id: kernelmgr.get_current_id().unwrap(),
|
source: *self_destination, destination: meta.destination, id: kernelmgr.get_current_id().unwrap(),
|
||||||
status: meta.status, length: meta.len as u16, data: data_slice
|
status: meta.status, length: meta.len as u16, data: data_slice
|
||||||
}, _routing_table, *rank, *self_destination)?;
|
}, _routing_table, *rank, *self_destination);
|
||||||
} else {
|
} else {
|
||||||
error!("Error receiving message slice");
|
error!("Error receiving message slice");
|
||||||
}
|
}
|
||||||
|
@ -476,16 +487,16 @@ fn process_aux_packets(dma_manager: &mut DmaManager, analyzer: &mut Analyzer,
|
||||||
routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, router: &mut routing::Router,
|
routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, router: &mut routing::Router,
|
||||||
destination: &mut u8) {
|
destination: &mut u8) {
|
||||||
let result =
|
let result =
|
||||||
drtioaux::recv(0).or_else(|_| Ok(router.get_local_packet())).and_then(|packet| {
|
drtioaux::recv(0).and_then(|packet| {
|
||||||
if let Some(packet) = packet {
|
if let Some(packet) = packet.or_else(|| router.get_local_packet()) {
|
||||||
process_aux_packet(dma_manager, analyzer, kernelmgr, repeaters, routing_table, rank, router, destination, packet)
|
process_aux_packet(dma_manager, analyzer, kernelmgr,
|
||||||
|
repeaters, routing_table, rank, router, destination, packet)
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
match result {
|
if let Err(e) = result {
|
||||||
Ok(()) => (),
|
warn!("aux packet error ({})", e);
|
||||||
Err(e) => warn!("aux packet error ({})", e)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -765,22 +776,22 @@ pub extern fn main() -> i32 {
|
||||||
if let Some(status) = dma_manager.get_status() {
|
if let Some(status) = dma_manager.get_status() {
|
||||||
info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp);
|
info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp);
|
||||||
router.route(drtioaux::Packet::DmaPlaybackStatus {
|
router.route(drtioaux::Packet::DmaPlaybackStatus {
|
||||||
source: destination, destination: status.source, id: status.id,
|
destination: status.source, id: status.id, error: status.error,
|
||||||
error: status.error, channel: status.channel, timestamp: status.timestamp
|
channel: status.channel, timestamp: status.timestamp
|
||||||
}, &routing_table, rank, destination);
|
}, &routing_table, rank, destination)
|
||||||
}
|
}
|
||||||
|
|
||||||
kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination);
|
kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination);
|
||||||
|
|
||||||
if rank > 1 {
|
if let Some((repno, packet)) = router.get_downstream_packet() {
|
||||||
if let Some(packet) = router.get_upstream_packet(rank) {
|
if let Err(e) = repeaters[repno].aux_send(&packet) {
|
||||||
// in sat-sat communications, it can be async
|
warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e)
|
||||||
let res = drtioaux::send(0, &packet);
|
|
||||||
if let Err(e) = res {
|
|
||||||
warn!("error routing packet: {}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if router.any_upstream_waiting() {
|
||||||
|
drtiosat_async_ready();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drtiosat_reset_phy(true);
|
drtiosat_reset_phy(true);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use board_artiq::{drtioaux, drtio_routing};
|
use board_artiq::{drtioaux, drtio_routing};
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
use board_misoc::{csr, clock};
|
use board_misoc::{csr, clock};
|
||||||
use routing::{Router, get_routable_packet_destination};
|
use routing::Router;
|
||||||
|
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
fn rep_link_rx_up(repno: u8) -> bool {
|
fn rep_link_rx_up(repno: u8) -> bool {
|
||||||
|
@ -107,11 +107,16 @@ impl Repeater {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RepeaterState::Up => {
|
RepeaterState::Up => {
|
||||||
self.process_unsolicited_aux(routing_table, rank, destination, router);
|
self.process_unsolicited_aux();
|
||||||
if !rep_link_rx_up(self.repno) {
|
if !rep_link_rx_up(self.repno) {
|
||||||
info!("[REP#{}] link is down", self.repno);
|
info!("[REP#{}] link is down", self.repno);
|
||||||
self.state = RepeaterState::Down;
|
self.state = RepeaterState::Down;
|
||||||
}
|
}
|
||||||
|
if self.async_messages_ready() {
|
||||||
|
if let Err(e) = self.handle_async(routing_table, rank, destination, router) {
|
||||||
|
warn!("[REP#{}] Error handling async messages ({})", self.repno, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
RepeaterState::Failed => {
|
RepeaterState::Failed => {
|
||||||
if !rep_link_rx_up(self.repno) {
|
if !rep_link_rx_up(self.repno) {
|
||||||
|
@ -122,21 +127,9 @@ impl Repeater {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_unsolicited_aux(&self, routing_table: &drtio_routing::RoutingTable, rank: u8, self_destination: u8, router: &mut Router) {
|
fn process_unsolicited_aux(&self) {
|
||||||
match drtioaux::recv(self.auxno) {
|
match drtioaux::recv(self.auxno) {
|
||||||
Ok(Some(packet)) => {
|
Ok(Some(packet)) => warn!("[REP#{}] unsolicited aux packet: {:?}", self.repno, packet),
|
||||||
let destination = get_routable_packet_destination(&packet);
|
|
||||||
if destination.is_none() {
|
|
||||||
warn!("[REP#{}] unsolicited aux packet: {:?}", self.repno, packet);
|
|
||||||
} else {
|
|
||||||
// routable packet
|
|
||||||
let res = router.route(packet, routing_table, rank, self_destination);
|
|
||||||
match res {
|
|
||||||
Ok(()) => drtioaux::send(self.auxno, &drtioaux::Packet::RoutingAck).unwrap(),
|
|
||||||
Err(e) => warn!("[REP#{}] Error routing packet: {:?}", self.repno, e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None) => (),
|
Ok(None) => (),
|
||||||
Err(_) => warn!("[REP#{}] aux packet error", self.repno)
|
Err(_) => warn!("[REP#{}] aux packet error", self.repno)
|
||||||
}
|
}
|
||||||
|
@ -192,16 +185,42 @@ impl Repeater {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn aux_forward(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error<!>> {
|
fn async_messages_ready(&self) -> bool {
|
||||||
if self.state != RepeaterState::Up {
|
let async_rdy;
|
||||||
return Err(drtioaux::Error::LinkDown);
|
unsafe {
|
||||||
|
async_rdy = (csr::DRTIOREP[self.repno as usize].async_messages_ready_read)();
|
||||||
|
(csr::DRTIOREP[self.repno as usize].async_messages_ready_write)(0);
|
||||||
}
|
}
|
||||||
drtioaux::send(self.auxno, request).unwrap();
|
async_rdy == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_async(&self, routing_table: &drtio_routing::RoutingTable, rank: u8, self_destination: u8, router: &mut Router
|
||||||
|
) -> Result<(), drtioaux::Error<!>> {
|
||||||
|
loop {
|
||||||
|
drtioaux::send(self.auxno, &drtioaux::Packet::RoutingRetrievePackets).unwrap();
|
||||||
|
let reply = self.recv_aux_timeout(200)?;
|
||||||
|
match reply {
|
||||||
|
drtioaux::Packet::RoutingNoPackets => break,
|
||||||
|
packet => router.route(packet, routing_table, rank, self_destination)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn aux_forward(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error<!>> {
|
||||||
|
self.aux_send(request)?;
|
||||||
let reply = self.recv_aux_timeout(200)?;
|
let reply = self.recv_aux_timeout(200)?;
|
||||||
drtioaux::send(0, &reply).unwrap();
|
drtioaux::send(0, &reply).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn aux_send(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error<!>> {
|
||||||
|
if self.state != RepeaterState::Up {
|
||||||
|
return Err(drtioaux::Error::LinkDown);
|
||||||
|
}
|
||||||
|
drtioaux::send(self.auxno, request)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn sync_tsc(&self) -> Result<(), drtioaux::Error<!>> {
|
pub fn sync_tsc(&self) -> Result<(), drtioaux::Error<!>> {
|
||||||
if self.state != RepeaterState::Up {
|
if self.state != RepeaterState::Up {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -212,8 +231,8 @@ impl Repeater {
|
||||||
(csr::DRTIOREP[repno].set_time_write)(1);
|
(csr::DRTIOREP[repno].set_time_write)(1);
|
||||||
while (csr::DRTIOREP[repno].set_time_read)() == 1 {}
|
while (csr::DRTIOREP[repno].set_time_read)() == 1 {}
|
||||||
}
|
}
|
||||||
// TSCAck is sent spontaneously by the satellite,
|
// TSCAck is the only aux packet that is sent spontaneously
|
||||||
// in response to a TSC set on the RT link.
|
// by the satellite, in response to a TSC set on the RT link.
|
||||||
let reply = self.recv_aux_timeout(10000)?;
|
let reply = self.recv_aux_timeout(10000)?;
|
||||||
if reply == drtioaux::Packet::TSCAck {
|
if reply == drtioaux::Packet::TSCAck {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|
|
@ -1,93 +1,87 @@
|
||||||
use alloc::collections::vec_deque::VecDeque;
|
use alloc::collections::vec_deque::VecDeque;
|
||||||
use board_artiq::{drtioaux, drtio_routing};
|
use board_artiq::{drtioaux, drtio_routing};
|
||||||
|
use board_misoc::csr;
|
||||||
|
|
||||||
// Packets from downstream (further satellites) are received and routed appropriately.
|
// Packets from downstream (further satellites) are received and routed appropriately.
|
||||||
// they're passed immediately if it's possible (within the subtree), or sent upstream.
|
// they're passed as soon as possible downstream (within the subtree), or sent upstream,
|
||||||
|
// which is notified about pending packets.
|
||||||
// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest;
|
// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest;
|
||||||
// for higher ranks, straight upstream, but awaiting for an ACK to make sure the upstream is not overwhelmed.
|
// for higher ranks, after getting a notification, it will transact with downstream to get the pending packets.
|
||||||
|
|
||||||
// forward! macro is not deprecated, as routable packets are only these that can originate
|
// forward! macro is not deprecated, as routable packets are only these that can originate
|
||||||
// from both master and satellite, e.g. DDMA and Subkernel.
|
// from both master and satellite, e.g. DDMA and Subkernel.
|
||||||
|
|
||||||
pub fn get_routable_packet_destination(packet: &drtioaux::Packet) -> Option<u8> {
|
|
||||||
let destination = match packet {
|
|
||||||
// received from downstream
|
|
||||||
drtioaux::Packet::DmaAddTraceRequest { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::DmaAddTraceReply { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::DmaRemoveTraceReply { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::DmaPlaybackRequest { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::DmaPlaybackReply { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::SubkernelLoadRunReply { destination, .. } => destination,
|
|
||||||
// received from downstream or produced locally
|
|
||||||
drtioaux::Packet::SubkernelMessage { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::SubkernelMessageAck { destination, .. } => destination,
|
|
||||||
// "async" - master gets them by deststatreq, satellites would get it through the router
|
|
||||||
drtioaux::Packet::DmaPlaybackStatus { destination, .. } => destination,
|
|
||||||
drtioaux::Packet::SubkernelFinished { destination, .. } => destination,
|
|
||||||
_ => return None
|
|
||||||
};
|
|
||||||
Some(*destination)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Router {
|
pub struct Router {
|
||||||
out_messages: VecDeque<drtioaux::Packet>,
|
upstream_queue: VecDeque<drtioaux::Packet>,
|
||||||
local_messages: VecDeque<drtioaux::Packet>,
|
local_queue: VecDeque<drtioaux::Packet>,
|
||||||
upstream_ready: bool
|
downstream_queue: VecDeque<(usize, drtioaux::Packet)>,
|
||||||
|
upstream_notified: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Router {
|
impl Router {
|
||||||
pub fn new() -> Router {
|
pub fn new() -> Router {
|
||||||
Router {
|
Router {
|
||||||
out_messages: VecDeque::new(),
|
upstream_queue: VecDeque::new(),
|
||||||
local_messages: VecDeque::new(),
|
local_queue: VecDeque::new(),
|
||||||
upstream_ready: true
|
downstream_queue: VecDeque::new(),
|
||||||
|
upstream_notified: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// called by local sources (DDMA, kernel) and by repeaters on receiving async data
|
||||||
// called by local sources (DDMA, kernel) and by repeaters on receiving unsolicited data
|
// messages are always buffered for both upstream and downstream
|
||||||
// messages are always buffered for upstream, or passed downstream directly
|
|
||||||
pub fn route(&mut self, packet: drtioaux::Packet,
|
pub fn route(&mut self, packet: drtioaux::Packet,
|
||||||
_routing_table: &drtio_routing::RoutingTable, _rank: u8,
|
_routing_table: &drtio_routing::RoutingTable, _rank: u8,
|
||||||
_self_destination: u8
|
_self_destination: u8
|
||||||
) -> Result<(), drtioaux::Error<!>> {
|
) {
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
{
|
{
|
||||||
let destination = get_routable_packet_destination(&packet);
|
let destination = packet.routable_destination();
|
||||||
if let Some(destination) = destination {
|
if let Some(destination) = destination {
|
||||||
let hop = _routing_table.0[destination as usize][_rank as usize];
|
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
|
||||||
let auxno = if destination == 0 { 0 } else { hop };
|
|
||||||
if destination == _self_destination {
|
if destination == _self_destination {
|
||||||
self.local_messages.push_back(packet);
|
self.local_queue.push_back(packet);
|
||||||
} else if _rank > 1 {
|
} else if hop > 0 && hop < csr::DRTIOREP.len() {
|
||||||
drtioaux::send(auxno, &packet)?;
|
let repno = (hop - 1) as usize;
|
||||||
|
self.downstream_queue.push_back((repno, packet));
|
||||||
} else {
|
} else {
|
||||||
self.out_messages.push_back(packet);
|
self.upstream_queue.push_back(packet);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(drtioaux::Error::RoutingError);
|
error!("Received an unroutable packet: {:?}", packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(not(has_drtio_routing))]
|
#[cfg(not(has_drtio_routing))]
|
||||||
{
|
{
|
||||||
self.out_messages.push_back(packet);
|
self.upstream_queue.push_back(packet);
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends a packet to a required destination, routing if it's necessary
|
// Sends a packet to a required destination, routing if it's necessary
|
||||||
pub fn send(&mut self, packet: drtioaux::Packet,
|
pub fn send(&mut self, packet: drtioaux::Packet,
|
||||||
_routing_table: &drtio_routing::RoutingTable, _rank: u8, _destination: u8) -> Result<(), drtioaux::Error<!>> {
|
_routing_table: &drtio_routing::RoutingTable,
|
||||||
|
_rank: u8, _destination: u8
|
||||||
|
) -> Result<(), drtioaux::Error<!>> {
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
{
|
{
|
||||||
let destination = get_routable_packet_destination(&packet);
|
let destination = packet.routable_destination();
|
||||||
if destination.is_none() || destination == Some(0) {
|
if let Some(destination) = destination {
|
||||||
// send upstream directly (response to master)
|
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
|
||||||
drtioaux::send(0, &packet)
|
if destination == 0 {
|
||||||
|
// response is needed immediately if master required it
|
||||||
|
drtioaux::send(0, &packet)?;
|
||||||
|
} else if !(hop > 0 && hop < csr::DRTIOREP.len()) {
|
||||||
|
// higher rank can wait
|
||||||
|
self.upstream_queue.push_back(packet);
|
||||||
|
} else {
|
||||||
|
let repno = (hop - 1) as usize;
|
||||||
|
// transaction will occur at closest possible opportunity
|
||||||
|
self.downstream_queue.push_back((repno, packet));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
self.route(packet, _routing_table, _rank, _destination)
|
// packet not supported in routing, fallback - sent directly
|
||||||
|
drtioaux::send(0, &packet)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(not(has_drtio_routing))]
|
#[cfg(not(has_drtio_routing))]
|
||||||
|
@ -96,25 +90,26 @@ impl Router {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_upstream_packet(&mut self, rank: u8) -> Option<drtioaux::Packet> {
|
pub fn any_upstream_waiting(&mut self) -> bool {
|
||||||
// called on DestinationStatusRequest on rank 1, in loop in others
|
let empty = self.upstream_queue.is_empty();
|
||||||
if self.upstream_ready {
|
if !empty && !self.upstream_notified {
|
||||||
let packet = self.out_messages.pop_front();
|
self.upstream_notified = true; // so upstream will not get spammed with notifications
|
||||||
if rank > 1 && packet.is_some() {
|
true
|
||||||
// packet will be sent out, awaiting ACK
|
|
||||||
self.upstream_ready = false;
|
|
||||||
}
|
|
||||||
packet
|
|
||||||
} else {
|
} else {
|
||||||
None
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn routing_ack_received(&mut self) {
|
pub fn get_upstream_packet(&mut self) -> Option<drtioaux::Packet> {
|
||||||
self.upstream_ready = true;
|
self.upstream_notified = false;
|
||||||
|
self.upstream_queue.pop_front()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> {
|
||||||
|
self.downstream_queue.pop_front()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_local_packet(&mut self) -> Option<drtioaux::Packet> {
|
pub fn get_local_packet(&mut self) -> Option<drtioaux::Packet> {
|
||||||
self.local_messages.pop_front()
|
self.local_queue.pop_front()
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue