satman: support sub-subkernels, routing
This commit is contained in:
parent
35482ce6d1
commit
7d30100a88
|
@ -922,4 +922,36 @@ impl Packet {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn routable_destination(&self) -> Option<u8> {
|
||||||
|
// only for packets that could be re-routed, not only forwarded
|
||||||
|
match self {
|
||||||
|
Packet::DmaAddTraceRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaAddTraceReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaRemoveTraceRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaRemoveTraceReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaPlaybackRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaPlaybackReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelLoadRunRequest { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelMessage { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelMessageAck { destination, .. } => Some(*destination),
|
||||||
|
Packet::DmaPlaybackStatus { destination, .. } => Some(*destination),
|
||||||
|
Packet::SubkernelFinished { destination, .. } => Some(*destination),
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn expects_response(&self) -> bool {
|
||||||
|
// returns true if the routable packet should elicit a response
|
||||||
|
// e.g. reply, ACK packets end a conversation,
|
||||||
|
// and firmware should not wait for response
|
||||||
|
match self {
|
||||||
|
Packet::DmaAddTraceReply { .. } | Packet::DmaRemoveTraceReply { .. } |
|
||||||
|
Packet::DmaPlaybackReply { .. } | Packet::SubkernelLoadRunReply { .. } |
|
||||||
|
Packet::SubkernelMessageAck { .. } | Packet::DmaPlaybackStatus { .. } |
|
||||||
|
Packet::SubkernelFinished { .. } => false,
|
||||||
|
_ => true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ enum ManagerState {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RtioStatus {
|
pub struct RtioStatus {
|
||||||
|
pub source: u8,
|
||||||
pub id: u32,
|
pub id: u32,
|
||||||
pub error: u8,
|
pub error: u8,
|
||||||
pub channel: u32,
|
pub channel: u32,
|
||||||
|
@ -33,9 +34,10 @@ struct Entry {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Manager {
|
pub struct Manager {
|
||||||
entries: BTreeMap<u32, Entry>,
|
entries: BTreeMap<(u8, u32), Entry>,
|
||||||
state: ManagerState,
|
state: ManagerState,
|
||||||
currentid: u32,
|
current_id: u32,
|
||||||
|
current_source: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Manager {
|
impl Manager {
|
||||||
|
@ -45,40 +47,41 @@ impl Manager {
|
||||||
unsafe { while csr::rtio_dma::enable_read() != 0 {} }
|
unsafe { while csr::rtio_dma::enable_read() != 0 {} }
|
||||||
Manager {
|
Manager {
|
||||||
entries: BTreeMap::new(),
|
entries: BTreeMap::new(),
|
||||||
currentid: 0,
|
current_id: 0,
|
||||||
|
current_source: 0,
|
||||||
state: ManagerState::Idle,
|
state: ManagerState::Idle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> {
|
pub fn add(&mut self, source: u8, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> {
|
||||||
let entry = match self.entries.get_mut(&id) {
|
let entry = match self.entries.get_mut(&(source, id)) {
|
||||||
Some(entry) => {
|
Some(entry) => {
|
||||||
if entry.complete || status.is_first() {
|
if entry.complete || status.is_first() {
|
||||||
// replace entry
|
// replace entry
|
||||||
self.entries.remove(&id);
|
self.entries.remove(&(source, id));
|
||||||
self.entries.insert(
|
self.entries.insert(
|
||||||
id,
|
(source, id),
|
||||||
Entry {
|
Entry {
|
||||||
trace: Vec::new(),
|
trace: Vec::new(),
|
||||||
padding_len: 0,
|
padding_len: 0,
|
||||||
complete: false,
|
complete: false,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
self.entries.get_mut(&id).unwrap()
|
self.entries.get_mut(&(source, id)).unwrap()
|
||||||
} else {
|
} else {
|
||||||
entry
|
entry
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
self.entries.insert(
|
self.entries.insert(
|
||||||
id,
|
(source, id),
|
||||||
Entry {
|
Entry {
|
||||||
trace: Vec::new(),
|
trace: Vec::new(),
|
||||||
padding_len: 0,
|
padding_len: 0,
|
||||||
complete: false,
|
complete: false,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
self.entries.get_mut(&id).unwrap()
|
self.entries.get_mut(&(source, id)).unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
entry.trace.extend(&trace[0..trace_len]);
|
entry.trace.extend(&trace[0..trace_len]);
|
||||||
|
@ -105,19 +108,19 @@ impl Manager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn erase(&mut self, id: u32) -> Result<(), Error> {
|
pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> {
|
||||||
match self.entries.remove(&id) {
|
match self.entries.remove(&(source, id)) {
|
||||||
Some(_) => Ok(()),
|
Some(_) => Ok(()),
|
||||||
None => Err(Error::IdNotFound),
|
None => Err(Error::IdNotFound),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn playback(&mut self, id: u32, timestamp: u64) -> Result<(), Error> {
|
pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> {
|
||||||
if self.state != ManagerState::Idle {
|
if self.state != ManagerState::Idle {
|
||||||
return Err(Error::PlaybackInProgress);
|
return Err(Error::PlaybackInProgress);
|
||||||
}
|
}
|
||||||
|
|
||||||
let entry = match self.entries.get(&id) {
|
let entry = match self.entries.get(&(source, id)) {
|
||||||
Some(entry) => entry,
|
Some(entry) => entry,
|
||||||
None => {
|
None => {
|
||||||
return Err(Error::IdNotFound);
|
return Err(Error::IdNotFound);
|
||||||
|
@ -130,7 +133,8 @@ impl Manager {
|
||||||
assert!(ptr as u32 % 64 == 0);
|
assert!(ptr as u32 % 64 == 0);
|
||||||
|
|
||||||
self.state = ManagerState::Playback;
|
self.state = ManagerState::Playback;
|
||||||
self.currentid = id;
|
self.current_id = id;
|
||||||
|
self.current_source = source;
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
csr::rtio_dma::base_address_write(ptr as u32);
|
csr::rtio_dma::base_address_write(ptr as u32);
|
||||||
|
@ -162,7 +166,8 @@ impl Manager {
|
||||||
csr::rtio_dma::error_write(1);
|
csr::rtio_dma::error_write(1);
|
||||||
}
|
}
|
||||||
return Some(RtioStatus {
|
return Some(RtioStatus {
|
||||||
id: self.currentid,
|
source: self.current_source,
|
||||||
|
id: self.current_id,
|
||||||
error: error,
|
error: error,
|
||||||
channel: channel,
|
channel: channel,
|
||||||
timestamp: timestamp,
|
timestamp: timestamp,
|
||||||
|
|
|
@ -40,7 +40,9 @@ use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR};
|
||||||
use libregister::RegisterR;
|
use libregister::RegisterR;
|
||||||
use libsupport_zynq::ram;
|
use libsupport_zynq::ram;
|
||||||
use subkernel::Manager as KernelManager;
|
use subkernel::Manager as KernelManager;
|
||||||
|
use routing::Router;
|
||||||
|
|
||||||
|
mod routing;
|
||||||
mod analyzer;
|
mod analyzer;
|
||||||
mod dma;
|
mod dma;
|
||||||
mod repeater;
|
mod repeater;
|
||||||
|
@ -72,6 +74,12 @@ fn drtiosat_tsc_loaded() -> bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn drtiosat_async_ready() {
|
||||||
|
unsafe {
|
||||||
|
csr::drtiosat::async_messages_ready_write(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
macro_rules! forward {
|
macro_rules! forward {
|
||||||
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{
|
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{
|
||||||
|
@ -79,7 +87,11 @@ macro_rules! forward {
|
||||||
if hop != 0 {
|
if hop != 0 {
|
||||||
let repno = (hop - 1) as usize;
|
let repno = (hop - 1) as usize;
|
||||||
if repno < $repeaters.len() {
|
if repno < $repeaters.len() {
|
||||||
|
if $packet.expects_response() {
|
||||||
return $repeaters[repno].aux_forward($packet, $timer);
|
return $repeaters[repno].aux_forward($packet, $timer);
|
||||||
|
} else {
|
||||||
|
return $repeaters[repno].aux_send($packet);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(drtioaux::Error::RoutingError);
|
return Err(drtioaux::Error::RoutingError);
|
||||||
}
|
}
|
||||||
|
@ -95,13 +107,15 @@ macro_rules! forward {
|
||||||
fn process_aux_packet(
|
fn process_aux_packet(
|
||||||
_repeaters: &mut [repeater::Repeater],
|
_repeaters: &mut [repeater::Repeater],
|
||||||
_routing_table: &mut drtio_routing::RoutingTable,
|
_routing_table: &mut drtio_routing::RoutingTable,
|
||||||
_rank: &mut u8,
|
rank: &mut u8,
|
||||||
|
self_destination: &mut u8,
|
||||||
packet: drtioaux::Packet,
|
packet: drtioaux::Packet,
|
||||||
timer: &mut GlobalTimer,
|
timer: &mut GlobalTimer,
|
||||||
i2c: &mut I2c,
|
i2c: &mut I2c,
|
||||||
dma_manager: &mut DmaManager,
|
dma_manager: &mut DmaManager,
|
||||||
analyzer: &mut Analyzer,
|
analyzer: &mut Analyzer,
|
||||||
kernel_manager: &mut KernelManager,
|
kernel_manager: &mut KernelManager,
|
||||||
|
router: &mut Router,
|
||||||
) -> Result<(), drtioaux::Error> {
|
) -> Result<(), drtioaux::Error> {
|
||||||
// In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels,
|
// In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels,
|
||||||
// and u16 otherwise; hence the `as _` conversion.
|
// and u16 otherwise; hence the `as _` conversion.
|
||||||
|
@ -122,54 +136,12 @@ fn process_aux_packet(
|
||||||
|
|
||||||
drtioaux::Packet::DestinationStatusRequest { destination } => {
|
drtioaux::Packet::DestinationStatusRequest { destination } => {
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
let hop = _routing_table.0[destination as usize][*_rank as usize];
|
let hop = _routing_table.0[destination as usize][*rank as usize];
|
||||||
#[cfg(not(has_drtio_routing))]
|
#[cfg(not(has_drtio_routing))]
|
||||||
let hop = 0;
|
let hop = 0;
|
||||||
|
|
||||||
if hop == 0 {
|
if hop == 0 {
|
||||||
if let Some(status) = dma_manager.check_state() {
|
*self_destination = destination;
|
||||||
info!(
|
|
||||||
"playback done, error: {}, channel: {}, timestamp: {}",
|
|
||||||
status.error, status.channel, status.timestamp
|
|
||||||
);
|
|
||||||
drtioaux::send(
|
|
||||||
0,
|
|
||||||
&drtioaux::Packet::DmaPlaybackStatus {
|
|
||||||
destination: destination,
|
|
||||||
id: status.id,
|
|
||||||
error: status.error,
|
|
||||||
channel: status.channel,
|
|
||||||
timestamp: status.timestamp,
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
} else if let Some(subkernel_finished) = kernel_manager.get_last_finished() {
|
|
||||||
info!(
|
|
||||||
"subkernel {} finished, with exception: {}",
|
|
||||||
subkernel_finished.id, subkernel_finished.with_exception
|
|
||||||
);
|
|
||||||
drtioaux::send(
|
|
||||||
0,
|
|
||||||
&drtioaux::Packet::SubkernelFinished {
|
|
||||||
id: subkernel_finished.id,
|
|
||||||
with_exception: subkernel_finished.with_exception,
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
} else if kernel_manager.message_is_ready() {
|
|
||||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
|
||||||
match kernel_manager.message_get_slice(&mut data_slice) {
|
|
||||||
Some(meta) => drtioaux::send(
|
|
||||||
0,
|
|
||||||
&drtioaux::Packet::SubkernelMessage {
|
|
||||||
destination: destination,
|
|
||||||
id: kernel_manager.get_current_id().unwrap(),
|
|
||||||
status: meta.status,
|
|
||||||
length: meta.len as u16,
|
|
||||||
data: data_slice,
|
|
||||||
},
|
|
||||||
)?,
|
|
||||||
None => warn!("subkernel message is ready but no message is present"),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let errors;
|
let errors;
|
||||||
unsafe {
|
unsafe {
|
||||||
errors = csr::drtiosat::rtio_error_read();
|
errors = csr::drtiosat::rtio_error_read();
|
||||||
|
@ -199,7 +171,6 @@ fn process_aux_packet(
|
||||||
drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?;
|
drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
{
|
{
|
||||||
|
@ -242,11 +213,11 @@ fn process_aux_packet(
|
||||||
drtioaux::send(0, &drtioaux::Packet::RoutingAck)
|
drtioaux::send(0, &drtioaux::Packet::RoutingAck)
|
||||||
}
|
}
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
drtioaux::Packet::RoutingSetRank { rank } => {
|
drtioaux::Packet::RoutingSetRank { rank: new_rank } => {
|
||||||
*_rank = rank;
|
*rank = new_rank;
|
||||||
drtio_routing::interconnect_enable_all(_routing_table, rank);
|
drtio_routing::interconnect_enable_all(_routing_table, new_rank);
|
||||||
|
|
||||||
let rep_rank = rank + 1;
|
let rep_rank = new_rank + 1;
|
||||||
for rep in _repeaters.iter() {
|
for rep in _repeaters.iter() {
|
||||||
if let Err(e) = rep.set_rank(rep_rank, timer) {
|
if let Err(e) = rep.set_rank(rep_rank, timer) {
|
||||||
error!("failed to set rank ({:?})", e);
|
error!("failed to set rank ({:?})", e);
|
||||||
|
@ -267,12 +238,20 @@ fn process_aux_packet(
|
||||||
#[cfg(not(has_drtio_routing))]
|
#[cfg(not(has_drtio_routing))]
|
||||||
drtioaux::Packet::RoutingSetRank { rank: _ } => drtioaux::send(0, &drtioaux::Packet::RoutingAck),
|
drtioaux::Packet::RoutingSetRank { rank: _ } => drtioaux::send(0, &drtioaux::Packet::RoutingAck),
|
||||||
|
|
||||||
|
drtioaux::Packet::RoutingRetrievePackets => {
|
||||||
|
let packet = match router.get_upstream_packet() {
|
||||||
|
Some(packet) => packet,
|
||||||
|
None => drtioaux::Packet::RoutingNoPackets
|
||||||
|
};
|
||||||
|
drtioaux::send(0, &packet)
|
||||||
|
}
|
||||||
|
|
||||||
drtioaux::Packet::MonitorRequest {
|
drtioaux::Packet::MonitorRequest {
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
channel,
|
channel,
|
||||||
probe,
|
probe,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let value;
|
let value;
|
||||||
#[cfg(has_rtio_moninj)]
|
#[cfg(has_rtio_moninj)]
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -294,7 +273,7 @@ fn process_aux_packet(
|
||||||
overrd,
|
overrd,
|
||||||
value,
|
value,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
#[cfg(has_rtio_moninj)]
|
#[cfg(has_rtio_moninj)]
|
||||||
unsafe {
|
unsafe {
|
||||||
csr::rtio_moninj::inj_chan_sel_write(channel as _);
|
csr::rtio_moninj::inj_chan_sel_write(channel as _);
|
||||||
|
@ -308,7 +287,7 @@ fn process_aux_packet(
|
||||||
channel,
|
channel,
|
||||||
overrd,
|
overrd,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let value;
|
let value;
|
||||||
#[cfg(has_rtio_moninj)]
|
#[cfg(has_rtio_moninj)]
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -327,7 +306,7 @@ fn process_aux_packet(
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let succeeded = i2c.start().is_ok();
|
let succeeded = i2c.start().is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
|
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
|
||||||
}
|
}
|
||||||
|
@ -335,7 +314,7 @@ fn process_aux_packet(
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let succeeded = i2c.restart().is_ok();
|
let succeeded = i2c.restart().is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
|
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
|
||||||
}
|
}
|
||||||
|
@ -343,7 +322,7 @@ fn process_aux_packet(
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let succeeded = i2c.stop().is_ok();
|
let succeeded = i2c.stop().is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
|
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
|
||||||
}
|
}
|
||||||
|
@ -352,7 +331,7 @@ fn process_aux_packet(
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
data,
|
data,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
match i2c.write(data) {
|
match i2c.write(data) {
|
||||||
Ok(ack) => drtioaux::send(
|
Ok(ack) => drtioaux::send(
|
||||||
0,
|
0,
|
||||||
|
@ -375,7 +354,7 @@ fn process_aux_packet(
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
ack,
|
ack,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
match i2c.read(ack) {
|
match i2c.read(ack) {
|
||||||
Ok(data) => drtioaux::send(
|
Ok(data) => drtioaux::send(
|
||||||
0,
|
0,
|
||||||
|
@ -399,7 +378,7 @@ fn process_aux_packet(
|
||||||
address,
|
address,
|
||||||
mask,
|
mask,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let ch = match mask {
|
let ch = match mask {
|
||||||
//decode from mainline, PCA9548-centric API
|
//decode from mainline, PCA9548-centric API
|
||||||
0x00 => None,
|
0x00 => None,
|
||||||
|
@ -425,7 +404,7 @@ fn process_aux_packet(
|
||||||
div: _div,
|
div: _div,
|
||||||
cs: _cs,
|
cs: _cs,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
// todo: reimplement when/if SPI is available
|
// todo: reimplement when/if SPI is available
|
||||||
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
|
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
|
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
|
||||||
|
@ -435,7 +414,7 @@ fn process_aux_packet(
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
data: _data,
|
data: _data,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
// todo: reimplement when/if SPI is available
|
// todo: reimplement when/if SPI is available
|
||||||
//let succeeded = spi::write(busno, data).is_ok();
|
//let succeeded = spi::write(busno, data).is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
|
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
|
||||||
|
@ -444,7 +423,7 @@ fn process_aux_packet(
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
busno: _busno,
|
busno: _busno,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
// todo: reimplement when/if SPI is available
|
// todo: reimplement when/if SPI is available
|
||||||
// match spi::read(busno) {
|
// match spi::read(busno) {
|
||||||
// Ok(data) => drtioaux::send(0,
|
// Ok(data) => drtioaux::send(0,
|
||||||
|
@ -464,7 +443,7 @@ fn process_aux_packet(
|
||||||
drtioaux::Packet::AnalyzerHeaderRequest {
|
drtioaux::Packet::AnalyzerHeaderRequest {
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let header = analyzer.get_header();
|
let header = analyzer.get_header();
|
||||||
drtioaux::send(
|
drtioaux::send(
|
||||||
0,
|
0,
|
||||||
|
@ -478,7 +457,7 @@ fn process_aux_packet(
|
||||||
drtioaux::Packet::AnalyzerDataRequest {
|
drtioaux::Packet::AnalyzerDataRequest {
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
|
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
|
||||||
let meta = analyzer.get_data(&mut data_slice);
|
let meta = analyzer.get_data(&mut data_slice);
|
||||||
drtioaux::send(
|
drtioaux::send(
|
||||||
|
@ -492,55 +471,67 @@ fn process_aux_packet(
|
||||||
}
|
}
|
||||||
|
|
||||||
drtioaux::Packet::DmaAddTraceRequest {
|
drtioaux::Packet::DmaAddTraceRequest {
|
||||||
destination: _destination,
|
source,
|
||||||
|
destination,
|
||||||
id,
|
id,
|
||||||
status,
|
status,
|
||||||
length,
|
length,
|
||||||
trace,
|
trace,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
|
||||||
let succeeded = dma_manager.add(id, status, &trace, length as usize).is_ok();
|
*self_destination = destination;
|
||||||
drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded })
|
let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
|
||||||
|
router.send(drtioaux::Packet::DmaAddTraceReply {
|
||||||
|
destination: source, succeeded: succeeded
|
||||||
|
}, _routing_table, *rank, *self_destination, _repeaters)
|
||||||
}
|
}
|
||||||
drtioaux::Packet::DmaRemoveTraceRequest {
|
drtioaux::Packet::DmaRemoveTraceRequest {
|
||||||
|
source,
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
id,
|
id,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let succeeded = dma_manager.erase(id).is_ok();
|
let succeeded = dma_manager.erase(source, id).is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::DmaRemoveTraceReply { succeeded: succeeded })
|
router.send(drtioaux::Packet::DmaRemoveTraceReply {
|
||||||
|
destination: source, succeeded: succeeded
|
||||||
|
}, _routing_table, *rank, *self_destination, _repeaters)
|
||||||
}
|
}
|
||||||
drtioaux::Packet::DmaPlaybackRequest {
|
drtioaux::Packet::DmaPlaybackRequest {
|
||||||
|
source,
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
id,
|
id,
|
||||||
timestamp,
|
timestamp,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let succeeded = if !kernel_manager.running() {
|
let succeeded = if !kernel_manager.running() {
|
||||||
dma_manager.playback(id, timestamp).is_ok()
|
dma_manager.playback(source, id, timestamp).is_ok()
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded })
|
router.send(drtioaux::Packet::DmaPlaybackReply {
|
||||||
|
destination: source, succeeded: succeeded
|
||||||
|
}, _routing_table, *rank, *self_destination, _repeaters)
|
||||||
}
|
}
|
||||||
|
|
||||||
drtioaux::Packet::SubkernelAddDataRequest {
|
drtioaux::Packet::SubkernelAddDataRequest {
|
||||||
destination: _destination,
|
destination,
|
||||||
id,
|
id,
|
||||||
status,
|
status,
|
||||||
length,
|
length,
|
||||||
data,
|
data,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
|
||||||
|
*self_destination = destination;
|
||||||
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
|
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
|
||||||
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
|
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
|
||||||
}
|
}
|
||||||
drtioaux::Packet::SubkernelLoadRunRequest {
|
drtioaux::Packet::SubkernelLoadRunRequest {
|
||||||
|
source,
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
id,
|
id,
|
||||||
run,
|
run,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let mut succeeded = kernel_manager.load(id).is_ok();
|
let mut succeeded = kernel_manager.load(id).is_ok();
|
||||||
// allow preloading a kernel with delayed run
|
// allow preloading a kernel with delayed run
|
||||||
if run {
|
if run {
|
||||||
|
@ -548,15 +539,30 @@ fn process_aux_packet(
|
||||||
// cannot run kernel while DDMA is running
|
// cannot run kernel while DDMA is running
|
||||||
succeeded = false;
|
succeeded = false;
|
||||||
} else {
|
} else {
|
||||||
succeeded |= kernel_manager.run(id).is_ok();
|
succeeded |= kernel_manager.run(source, id).is_ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drtioaux::send(0, &drtioaux::Packet::SubkernelLoadRunReply { succeeded: succeeded })
|
router.send(drtioaux::Packet::SubkernelLoadRunReply {
|
||||||
|
destination: source, succeeded: succeeded
|
||||||
|
},
|
||||||
|
_routing_table, *rank, *self_destination, _repeaters)
|
||||||
|
}
|
||||||
|
drtioaux::Packet::SubkernelLoadRunReply { destination: _destination, succeeded } => {
|
||||||
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
|
// received if local subkernel started another, remote subkernel
|
||||||
|
kernel_manager.subkernel_load_run_reply(succeeded, *self_destination);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
// { destination: u8, id: u32, with_exception: bool, exception_src: u8 },
|
||||||
|
drtioaux::Packet::SubkernelFinished { destination: _destination, id, with_exception, exception_src } => {
|
||||||
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
|
kernel_manager.remote_subkernel_finished(id, with_exception, exception_src);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
drtioaux::Packet::SubkernelExceptionRequest {
|
drtioaux::Packet::SubkernelExceptionRequest {
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
|
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
|
||||||
let meta = kernel_manager.exception_get_slice(&mut data_slice);
|
let meta = kernel_manager.exception_get_slice(&mut data_slice);
|
||||||
drtioaux::send(
|
drtioaux::send(
|
||||||
|
@ -569,38 +575,33 @@ fn process_aux_packet(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
drtioaux::Packet::SubkernelMessage {
|
drtioaux::Packet::SubkernelMessage {
|
||||||
destination,
|
source,
|
||||||
|
destination: _destination,
|
||||||
id: _id,
|
id: _id,
|
||||||
status,
|
status,
|
||||||
length,
|
length,
|
||||||
data,
|
data,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
kernel_manager.message_handle_incoming(status, length as usize, &data);
|
kernel_manager.message_handle_incoming(status, length as usize, &data);
|
||||||
drtioaux::send(
|
drtioaux::send(
|
||||||
0,
|
0,
|
||||||
&drtioaux::Packet::SubkernelMessageAck {
|
&drtioaux::Packet::SubkernelMessageAck {
|
||||||
destination: destination,
|
destination: source,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
drtioaux::Packet::SubkernelMessageAck {
|
drtioaux::Packet::SubkernelMessageAck {
|
||||||
destination: _destination,
|
destination: _destination,
|
||||||
} => {
|
} => {
|
||||||
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
|
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||||
if kernel_manager.message_ack_slice() {
|
if kernel_manager.message_ack_slice() {
|
||||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||||
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {
|
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {
|
||||||
drtioaux::send(
|
// route and not send immediately as ACKs are not a beginning of a transaction
|
||||||
0,
|
router.route(drtioaux::Packet::SubkernelMessage {
|
||||||
&drtioaux::Packet::SubkernelMessage {
|
source: *self_destination, destination: meta.destination, id: kernel_manager.get_current_id().unwrap(),
|
||||||
destination: *_rank,
|
status: meta.status, length: meta.len as u16, data: data_slice}, _routing_table, *rank, *self_destination);
|
||||||
id: kernel_manager.get_current_id().unwrap(),
|
|
||||||
status: meta.status,
|
|
||||||
length: meta.len as u16,
|
|
||||||
data: data_slice,
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
} else {
|
} else {
|
||||||
error!("Error receiving message slice");
|
error!("Error receiving message slice");
|
||||||
}
|
}
|
||||||
|
@ -619,32 +620,35 @@ fn process_aux_packets(
|
||||||
repeaters: &mut [repeater::Repeater],
|
repeaters: &mut [repeater::Repeater],
|
||||||
routing_table: &mut drtio_routing::RoutingTable,
|
routing_table: &mut drtio_routing::RoutingTable,
|
||||||
rank: &mut u8,
|
rank: &mut u8,
|
||||||
|
self_destination: &mut u8,
|
||||||
timer: &mut GlobalTimer,
|
timer: &mut GlobalTimer,
|
||||||
i2c: &mut I2c,
|
i2c: &mut I2c,
|
||||||
dma_manager: &mut DmaManager,
|
dma_manager: &mut DmaManager,
|
||||||
analyzer: &mut Analyzer,
|
analyzer: &mut Analyzer,
|
||||||
kernel_manager: &mut KernelManager,
|
kernel_manager: &mut KernelManager,
|
||||||
|
router: &mut Router,
|
||||||
) {
|
) {
|
||||||
let result = drtioaux::recv(0).and_then(|packet| {
|
let result = drtioaux::recv(0).and_then(|packet| {
|
||||||
if let Some(packet) = packet {
|
if let Some(packet) = packet.or_else(|| router.get_local_packet()) {
|
||||||
process_aux_packet(
|
process_aux_packet(
|
||||||
repeaters,
|
repeaters,
|
||||||
routing_table,
|
routing_table,
|
||||||
rank,
|
rank,
|
||||||
|
self_destination,
|
||||||
packet,
|
packet,
|
||||||
timer,
|
timer,
|
||||||
i2c,
|
i2c,
|
||||||
dma_manager,
|
dma_manager,
|
||||||
analyzer,
|
analyzer,
|
||||||
kernel_manager,
|
kernel_manager,
|
||||||
|
router
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
match result {
|
if let Err(e) = result {
|
||||||
Ok(()) => (),
|
warn!("aux packet error ({:?})", e);
|
||||||
Err(e) => warn!("aux packet error ({:?})", e),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -800,17 +804,21 @@ pub extern "C" fn main_core0() -> i32 {
|
||||||
}
|
}
|
||||||
let mut routing_table = drtio_routing::RoutingTable::default_empty();
|
let mut routing_table = drtio_routing::RoutingTable::default_empty();
|
||||||
let mut rank = 1;
|
let mut rank = 1;
|
||||||
|
let mut destination = 1;
|
||||||
|
|
||||||
let mut hardware_tick_ts = 0;
|
let mut hardware_tick_ts = 0;
|
||||||
|
|
||||||
let mut control = ksupport::kernel::Control::start();
|
let mut control = ksupport::kernel::Control::start();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
|
let mut router = Router::new();
|
||||||
|
|
||||||
while !drtiosat_link_rx_up() {
|
while !drtiosat_link_rx_up() {
|
||||||
drtiosat_process_errors();
|
drtiosat_process_errors();
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
for mut rep in repeaters.iter_mut() {
|
for mut rep in repeaters.iter_mut() {
|
||||||
rep.service(&routing_table, rank, &mut timer);
|
rep.service(&routing_table, rank, destination, &mut router, &mut timer);
|
||||||
}
|
}
|
||||||
#[cfg(feature = "target_kasli_soc")]
|
#[cfg(feature = "target_kasli_soc")]
|
||||||
{
|
{
|
||||||
|
@ -849,15 +857,17 @@ pub extern "C" fn main_core0() -> i32 {
|
||||||
&mut repeaters,
|
&mut repeaters,
|
||||||
&mut routing_table,
|
&mut routing_table,
|
||||||
&mut rank,
|
&mut rank,
|
||||||
|
&mut destination,
|
||||||
&mut timer,
|
&mut timer,
|
||||||
&mut i2c,
|
&mut i2c,
|
||||||
&mut dma_manager,
|
&mut dma_manager,
|
||||||
&mut analyzer,
|
&mut analyzer,
|
||||||
&mut kernel_manager,
|
&mut kernel_manager,
|
||||||
|
&mut router,
|
||||||
);
|
);
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
for mut rep in repeaters.iter_mut() {
|
for mut rep in repeaters.iter_mut() {
|
||||||
rep.service(&routing_table, rank, &mut timer);
|
rep.service(&routing_table, rank, destination, &mut router, &mut timer);
|
||||||
}
|
}
|
||||||
#[cfg(feature = "target_kasli_soc")]
|
#[cfg(feature = "target_kasli_soc")]
|
||||||
{
|
{
|
||||||
|
@ -880,7 +890,33 @@ pub extern "C" fn main_core0() -> i32 {
|
||||||
error!("aux packet error: {:?}", e);
|
error!("aux packet error: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
kernel_manager.process_kern_requests(rank, timer);
|
if let Some(status) = dma_manager.check_state() {
|
||||||
|
info!(
|
||||||
|
"playback done, error: {}, channel: {}, timestamp: {}",
|
||||||
|
status.error, status.channel, status.timestamp
|
||||||
|
);
|
||||||
|
router.route(
|
||||||
|
drtioaux::Packet::DmaPlaybackStatus {
|
||||||
|
source: destination,
|
||||||
|
destination: status.source,
|
||||||
|
id: status.id,
|
||||||
|
error: status.error,
|
||||||
|
channel: status.channel,
|
||||||
|
timestamp: status.timestamp,
|
||||||
|
}, &routing_table, rank, destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
kernel_manager.process_kern_requests(&mut router, &routing_table, rank, destination, &timer);
|
||||||
|
|
||||||
|
if let Some((repno, packet)) = router.get_downstream_packet() {
|
||||||
|
if let Err(e) = repeaters[repno].aux_send(&packet) {
|
||||||
|
warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if router.any_upstream_waiting() {
|
||||||
|
drtiosat_async_ready();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drtiosat_reset_phy(true);
|
drtiosat_reset_phy(true);
|
||||||
|
|
|
@ -7,6 +7,8 @@ use libboard_artiq::{drtio_routing, drtioaux};
|
||||||
use libboard_zynq::time::Milliseconds;
|
use libboard_zynq::time::Milliseconds;
|
||||||
use libboard_zynq::timer::GlobalTimer;
|
use libboard_zynq::timer::GlobalTimer;
|
||||||
|
|
||||||
|
use routing::Router;
|
||||||
|
|
||||||
#[cfg(has_drtio_routing)]
|
#[cfg(has_drtio_routing)]
|
||||||
fn rep_link_rx_up(repno: u8) -> bool {
|
fn rep_link_rx_up(repno: u8) -> bool {
|
||||||
let repno = repno as usize;
|
let repno = repno as usize;
|
||||||
|
@ -53,7 +55,7 @@ impl Repeater {
|
||||||
self.state == RepeaterState::Up
|
self.state == RepeaterState::Up
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8, timer: &mut GlobalTimer) {
|
pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8, destination: u8, router: &mut Router, timer: &mut GlobalTimer) {
|
||||||
self.process_local_errors();
|
self.process_local_errors();
|
||||||
|
|
||||||
match self.state {
|
match self.state {
|
||||||
|
@ -116,6 +118,11 @@ impl Repeater {
|
||||||
info!("[REP#{}] link is down", self.repno);
|
info!("[REP#{}] link is down", self.repno);
|
||||||
self.state = RepeaterState::Down;
|
self.state = RepeaterState::Down;
|
||||||
}
|
}
|
||||||
|
if self.async_messages_ready() {
|
||||||
|
if let Err(e) = self.handle_async(routing_table, rank, destination, router, timer) {
|
||||||
|
warn!("[REP#{}] Error handling async messages ({:?})", self.repno, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
RepeaterState::Failed => {
|
RepeaterState::Failed => {
|
||||||
if !rep_link_rx_up(self.repno) {
|
if !rep_link_rx_up(self.repno) {
|
||||||
|
@ -173,6 +180,28 @@ impl Repeater {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn async_messages_ready(&self) -> bool {
|
||||||
|
let async_rdy;
|
||||||
|
unsafe {
|
||||||
|
async_rdy = (csr::DRTIOREP[self.repno as usize].async_messages_ready_read)();
|
||||||
|
(csr::DRTIOREP[self.repno as usize].async_messages_ready_write)(0);
|
||||||
|
}
|
||||||
|
async_rdy == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_async(&self, routing_table: &drtio_routing::RoutingTable, rank: u8, self_destination: u8, router: &mut Router, timer: &mut GlobalTimer
|
||||||
|
) -> Result<(), drtioaux::Error> {
|
||||||
|
loop {
|
||||||
|
drtioaux::send(self.auxno, &drtioaux::Packet::RoutingRetrievePackets).unwrap();
|
||||||
|
let reply = self.recv_aux_timeout(200, timer)?;
|
||||||
|
match reply {
|
||||||
|
drtioaux::Packet::RoutingNoPackets => break,
|
||||||
|
packet => router.route(packet, routing_table, rank, self_destination)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn recv_aux_timeout(&self, timeout: u32, timer: &mut GlobalTimer) -> Result<drtioaux::Packet, drtioaux::Error> {
|
fn recv_aux_timeout(&self, timeout: u32, timer: &mut GlobalTimer) -> Result<drtioaux::Packet, drtioaux::Error> {
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout.into());
|
let max_time = timer.get_time() + Milliseconds(timeout.into());
|
||||||
loop {
|
loop {
|
||||||
|
@ -191,15 +220,19 @@ impl Repeater {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
|
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
|
||||||
if self.state != RepeaterState::Up {
|
self.aux_send(request)?;
|
||||||
return Err(drtioaux::Error::LinkDown);
|
|
||||||
}
|
|
||||||
drtioaux::send(self.auxno, request).unwrap();
|
|
||||||
let reply = self.recv_aux_timeout(200, timer)?;
|
let reply = self.recv_aux_timeout(200, timer)?;
|
||||||
drtioaux::send(0, &reply).unwrap();
|
drtioaux::send(0, &reply).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn aux_send(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> {
|
||||||
|
if self.state != RepeaterState::Up {
|
||||||
|
return Err(drtioaux::Error::LinkDown);
|
||||||
|
}
|
||||||
|
drtioaux::send(self.auxno, request)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn sync_tsc(&self, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
|
pub fn sync_tsc(&self, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
|
||||||
if self.state != RepeaterState::Up {
|
if self.state != RepeaterState::Up {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
use alloc::collections::vec_deque::VecDeque;
|
||||||
|
use libboard_artiq::{drtioaux, drtio_routing, pl::csr};
|
||||||
|
|
||||||
|
use repeater::Repeater;
|
||||||
|
|
||||||
|
// Packets from downstream (further satellites) are received and routed appropriately.
|
||||||
|
// they're passed as soon as possible downstream (within the subtree), or sent upstream,
|
||||||
|
// which is notified about pending packets.
|
||||||
|
// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest;
|
||||||
|
// for higher ranks, after getting a notification, it will transact with downstream to get the pending packets.
|
||||||
|
|
||||||
|
// forward! macro is not deprecated, as routable packets are only these that can originate
|
||||||
|
// from both master and satellite, e.g. DDMA and Subkernel.
|
||||||
|
|
||||||
|
pub struct Router {
|
||||||
|
upstream_queue: VecDeque<drtioaux::Packet>,
|
||||||
|
local_queue: VecDeque<drtioaux::Packet>,
|
||||||
|
downstream_queue: VecDeque<(usize, drtioaux::Packet)>,
|
||||||
|
upstream_notified: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Router {
|
||||||
|
pub fn new() -> Router {
|
||||||
|
Router {
|
||||||
|
upstream_queue: VecDeque::new(),
|
||||||
|
local_queue: VecDeque::new(),
|
||||||
|
downstream_queue: VecDeque::new(),
|
||||||
|
upstream_notified: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// called by local sources (DDMA, kernel) and by repeaters on receiving async data
|
||||||
|
// messages are always buffered for both upstream and downstream
|
||||||
|
pub fn route(&mut self, packet: drtioaux::Packet,
|
||||||
|
_routing_table: &drtio_routing::RoutingTable, _rank: u8,
|
||||||
|
_self_destination: u8
|
||||||
|
) {
|
||||||
|
#[cfg(has_drtio_routing)]
|
||||||
|
{
|
||||||
|
let destination = packet.routable_destination();
|
||||||
|
if let Some(destination) = destination {
|
||||||
|
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
|
||||||
|
if destination == _self_destination {
|
||||||
|
self.local_queue.push_back(packet);
|
||||||
|
} else if hop > 0 && hop < csr::DRTIOREP.len() {
|
||||||
|
let repno = (hop - 1) as usize;
|
||||||
|
self.downstream_queue.push_back((repno, packet));
|
||||||
|
} else {
|
||||||
|
self.upstream_queue.push_back(packet);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Received an unroutable packet: {:?}", packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(not(has_drtio_routing))]
|
||||||
|
{
|
||||||
|
self.upstream_queue.push_back(packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sends a packet to a required destination, routing if it's necessary
|
||||||
|
pub fn send(&mut self, packet: drtioaux::Packet,
|
||||||
|
_routing_table: &drtio_routing::RoutingTable, _rank: u8,
|
||||||
|
_destination: u8, _repeaters: &[Repeater]
|
||||||
|
) -> Result<(), drtioaux::Error> {
|
||||||
|
#[cfg(has_drtio_routing)]
|
||||||
|
{
|
||||||
|
let destination = packet.routable_destination();
|
||||||
|
if let Some(destination) = destination {
|
||||||
|
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
|
||||||
|
if destination == 0 {
|
||||||
|
// response is needed immediately if master required it
|
||||||
|
drtioaux::send(0, &packet)?;
|
||||||
|
} else if !(hop > 0 && hop < csr::DRTIOREP.len()) {
|
||||||
|
// higher rank can wait
|
||||||
|
self.upstream_queue.push_back(packet);
|
||||||
|
} else {
|
||||||
|
let repno = (hop - 1) as usize;
|
||||||
|
// transaction will occur at closest possible opportunity
|
||||||
|
self.downstream_queue.push_back((repno, packet));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
// packet not supported in routing, fallback - sent directly
|
||||||
|
drtioaux::send(0, &packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(not(has_drtio_routing))]
|
||||||
|
{
|
||||||
|
drtioaux::send(0, &packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn any_upstream_waiting(&mut self) -> bool {
|
||||||
|
let empty = self.upstream_queue.is_empty();
|
||||||
|
if !empty && !self.upstream_notified {
|
||||||
|
self.upstream_notified = true; // so upstream will not get spammed with notifications
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_upstream_packet(&mut self) -> Option<drtioaux::Packet> {
|
||||||
|
let packet = self.upstream_queue.pop_front();
|
||||||
|
if packet.is_none() {
|
||||||
|
self.upstream_notified = false;
|
||||||
|
}
|
||||||
|
packet
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> {
|
||||||
|
self.downstream_queue.pop_front()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_local_packet(&mut self) -> Option<drtioaux::Packet> {
|
||||||
|
self.local_queue.pop_front()
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,12 +8,14 @@ use core_io::{Error as IoError, Write};
|
||||||
use cslice::AsCSlice;
|
use cslice::AsCSlice;
|
||||||
use io::{Cursor, ProtoWrite};
|
use io::{Cursor, ProtoWrite};
|
||||||
use ksupport::{eh_artiq, kernel, rpc};
|
use ksupport::{eh_artiq, kernel, rpc};
|
||||||
use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
|
use libboard_artiq::{drtioaux, drtio_routing::RoutingTable, drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
|
||||||
pl::csr};
|
pl::csr};
|
||||||
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
|
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
|
||||||
use libcortex_a9::sync_channel::Receiver;
|
use libcortex_a9::sync_channel::Receiver;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
|
|
||||||
|
use routing::Router;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
enum KernelState {
|
enum KernelState {
|
||||||
Absent,
|
Absent,
|
||||||
|
@ -21,6 +23,8 @@ enum KernelState {
|
||||||
Running,
|
Running,
|
||||||
MsgAwait(Milliseconds, Vec<u8>),
|
MsgAwait(Milliseconds, Vec<u8>),
|
||||||
MsgSending,
|
MsgSending,
|
||||||
|
SubkernelAwaitLoad,
|
||||||
|
SubkernelAwaitFinish { max_time: Milliseconds, id: u32 },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -31,6 +35,7 @@ pub enum Error {
|
||||||
NoMessage,
|
NoMessage,
|
||||||
AwaitingMessage,
|
AwaitingMessage,
|
||||||
SubkernelIoError,
|
SubkernelIoError,
|
||||||
|
DrtioError,
|
||||||
KernelException(Sliceable),
|
KernelException(Sliceable),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,6 +57,12 @@ impl From<()> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<drtioaux::Error> for Error {
|
||||||
|
fn from(_value: drtioaux::Error) -> Error {
|
||||||
|
Error::DrtioError
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
macro_rules! unexpected {
|
macro_rules! unexpected {
|
||||||
($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*))));
|
($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*))));
|
||||||
}
|
}
|
||||||
|
@ -61,6 +72,7 @@ macro_rules! unexpected {
|
||||||
pub struct Sliceable {
|
pub struct Sliceable {
|
||||||
it: usize,
|
it: usize,
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
|
destination: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* represents interkernel messages */
|
/* represents interkernel messages */
|
||||||
|
@ -92,6 +104,8 @@ struct Session {
|
||||||
kernel_state: KernelState,
|
kernel_state: KernelState,
|
||||||
last_exception: Option<Sliceable>,
|
last_exception: Option<Sliceable>,
|
||||||
messages: MessageManager,
|
messages: MessageManager,
|
||||||
|
source: u8, // which destination requested running the kernel
|
||||||
|
subkernels_finished: VecDeque<(u32, bool, u8)> // tuple of id, with_exception, exception_source
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
|
@ -101,13 +115,16 @@ impl Session {
|
||||||
kernel_state: KernelState::Absent,
|
kernel_state: KernelState::Absent,
|
||||||
last_exception: None,
|
last_exception: None,
|
||||||
messages: MessageManager::new(),
|
messages: MessageManager::new(),
|
||||||
|
source: 0,
|
||||||
|
subkernels_finished: VecDeque::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn running(&self) -> bool {
|
fn running(&self) -> bool {
|
||||||
match self.kernel_state {
|
match self.kernel_state {
|
||||||
KernelState::Absent | KernelState::Loaded => false,
|
KernelState::Absent | KernelState::Loaded => false,
|
||||||
KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending => true,
|
KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending | KernelState::SubkernelAwaitLoad |
|
||||||
|
KernelState::SubkernelAwaitFinish { .. } => true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,9 +146,12 @@ pub struct Manager<'a> {
|
||||||
pub struct SubkernelFinished {
|
pub struct SubkernelFinished {
|
||||||
pub id: u32,
|
pub id: u32,
|
||||||
pub with_exception: bool,
|
pub with_exception: bool,
|
||||||
|
pub exception_source: u8,
|
||||||
|
pub source: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SliceMeta {
|
pub struct SliceMeta {
|
||||||
|
pub destination: u8,
|
||||||
pub len: u16,
|
pub len: u16,
|
||||||
pub status: PayloadStatus,
|
pub status: PayloadStatus,
|
||||||
}
|
}
|
||||||
|
@ -148,6 +168,7 @@ macro_rules! get_slice_fn {
|
||||||
self.it += len;
|
self.it += len;
|
||||||
|
|
||||||
SliceMeta {
|
SliceMeta {
|
||||||
|
destination: self.destination,
|
||||||
len: len as u16,
|
len: len as u16,
|
||||||
status: status,
|
status: status,
|
||||||
}
|
}
|
||||||
|
@ -156,8 +177,8 @@ macro_rules! get_slice_fn {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Sliceable {
|
impl Sliceable {
|
||||||
pub fn new(data: Vec<u8>) -> Sliceable {
|
pub fn new(destination: u8, data: Vec<u8>) -> Sliceable {
|
||||||
Sliceable { it: 0, data: data }
|
Sliceable { it: 0, data: data, destination: destination }
|
||||||
}
|
}
|
||||||
|
|
||||||
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
|
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
|
||||||
|
@ -194,17 +215,6 @@ impl MessageManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_outgoing_ready(&mut self) -> bool {
|
|
||||||
// called by main loop, to see if there's anything to send, will send it afterwards
|
|
||||||
match self.out_state {
|
|
||||||
OutMessageState::MessageReady => {
|
|
||||||
self.out_state = OutMessageState::MessageBeingSent;
|
|
||||||
true
|
|
||||||
}
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn was_message_acknowledged(&mut self) -> bool {
|
pub fn was_message_acknowledged(&mut self) -> bool {
|
||||||
match self.out_state {
|
match self.out_state {
|
||||||
OutMessageState::MessageAcknowledged => {
|
OutMessageState::MessageAcknowledged => {
|
||||||
|
@ -244,10 +254,18 @@ impl MessageManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn accept_outgoing(&mut self, message: Vec<u8>) -> Result<(), Error> {
|
pub fn accept_outgoing(&mut self, id: u32, self_destination: u8, destination: u8,
|
||||||
// service tag skipped in kernel
|
message: Vec<u8>, routing_table: &RoutingTable, rank: u8, router: &mut Router
|
||||||
self.out_message = Some(Sliceable::new(message));
|
) -> Result<(), Error> {
|
||||||
self.out_state = OutMessageState::MessageReady;
|
self.out_message = Some(Sliceable::new(destination, message));
|
||||||
|
|
||||||
|
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||||
|
self.out_state = OutMessageState::MessageBeingSent;
|
||||||
|
let meta = self.get_outgoing_slice(&mut data_slice).unwrap();
|
||||||
|
router.route(drtioaux::Packet::SubkernelMessage {
|
||||||
|
source: self_destination, destination: destination, id: id,
|
||||||
|
status: meta.status, length: meta.len as u16, data: data_slice
|
||||||
|
}, routing_table, rank, self_destination);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,12 +331,13 @@ impl<'a> Manager<'_> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&mut self, id: u32) -> Result<(), Error> {
|
pub fn run(&mut self, source: u8, id: u32) -> Result<(), Error> {
|
||||||
info!("starting subkernel #{}", id);
|
info!("starting subkernel #{}", id);
|
||||||
if self.session.kernel_state != KernelState::Loaded || self.session.id != id {
|
if self.session.kernel_state != KernelState::Loaded || self.session.id != id {
|
||||||
self.load(id)?;
|
self.load(id)?;
|
||||||
}
|
}
|
||||||
self.session.kernel_state = KernelState::Running;
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
self.session.source = source;
|
||||||
unsafe {
|
unsafe {
|
||||||
csr::cri_con::selected_write(2);
|
csr::cri_con::selected_write(2);
|
||||||
}
|
}
|
||||||
|
@ -354,10 +373,6 @@ impl<'a> Manager<'_> {
|
||||||
self.session.messages.ack_slice()
|
self.session.messages.ack_slice()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn message_is_ready(&mut self) -> bool {
|
|
||||||
self.session.messages.is_outgoing_ready()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load(&mut self, id: u32) -> Result<(), Error> {
|
pub fn load(&mut self, id: u32) -> Result<(), Error> {
|
||||||
if self.session.id == id && self.session.kernel_state == KernelState::Loaded {
|
if self.session.id == id && self.session.kernel_state == KernelState::Loaded {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -386,16 +401,13 @@ impl<'a> Manager<'_> {
|
||||||
match self.session.last_exception.as_mut() {
|
match self.session.last_exception.as_mut() {
|
||||||
Some(exception) => exception.get_slice_sat(data_slice),
|
Some(exception) => exception.get_slice_sat(data_slice),
|
||||||
None => SliceMeta {
|
None => SliceMeta {
|
||||||
|
destination: 0,
|
||||||
len: 0,
|
len: 0,
|
||||||
status: PayloadStatus::FirstAndLast,
|
status: PayloadStatus::FirstAndLast,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_last_finished(&mut self) -> Option<SubkernelFinished> {
|
|
||||||
self.last_finished.take()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn kernel_stop(&mut self) {
|
fn kernel_stop(&mut self) {
|
||||||
self.session.kernel_state = KernelState::Absent;
|
self.session.kernel_state = KernelState::Absent;
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -425,13 +437,13 @@ impl<'a> Manager<'_> {
|
||||||
&[],
|
&[],
|
||||||
0,
|
0,
|
||||||
) {
|
) {
|
||||||
Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())),
|
Ok(_) => self.session.last_exception = Some(Sliceable::new(0, writer.into_inner())),
|
||||||
Err(_) => error!("Error writing exception data"),
|
Err(_) => error!("Error writing exception data"),
|
||||||
}
|
}
|
||||||
self.kernel_stop();
|
self.kernel_stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_kern_requests(&mut self, rank: u8, timer: GlobalTimer) {
|
pub fn process_kern_requests(&mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, destination: u8, timer: &GlobalTimer) {
|
||||||
if !self.running() {
|
if !self.running() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -444,6 +456,8 @@ impl<'a> Manager<'_> {
|
||||||
self.last_finished = Some(SubkernelFinished {
|
self.last_finished = Some(SubkernelFinished {
|
||||||
id: self.session.id,
|
id: self.session.id,
|
||||||
with_exception: true,
|
with_exception: true,
|
||||||
|
exception_source: destination,
|
||||||
|
source: self.session.source,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -452,15 +466,19 @@ impl<'a> Manager<'_> {
|
||||||
self.last_finished = Some(SubkernelFinished {
|
self.last_finished = Some(SubkernelFinished {
|
||||||
id: self.session.id,
|
id: self.session.id,
|
||||||
with_exception: true,
|
with_exception: true,
|
||||||
|
exception_source: destination,
|
||||||
|
source: self.session.source,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.process_kern_message(rank, timer) {
|
match self.process_kern_message(router, routing_table, rank, destination, timer) {
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
self.last_finished = Some(SubkernelFinished {
|
self.last_finished = Some(SubkernelFinished {
|
||||||
id: self.session.id,
|
id: self.session.id,
|
||||||
with_exception: false,
|
with_exception: false,
|
||||||
|
exception_source: 0,
|
||||||
|
source: self.session.source,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Ok(false) | Err(Error::NoMessage) => (),
|
Ok(false) | Err(Error::NoMessage) => (),
|
||||||
|
@ -469,6 +487,8 @@ impl<'a> Manager<'_> {
|
||||||
self.last_finished = Some(SubkernelFinished {
|
self.last_finished = Some(SubkernelFinished {
|
||||||
id: self.session.id,
|
id: self.session.id,
|
||||||
with_exception: true,
|
with_exception: true,
|
||||||
|
exception_source: destination,
|
||||||
|
source: self.session.source,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -477,12 +497,35 @@ impl<'a> Manager<'_> {
|
||||||
self.last_finished = Some(SubkernelFinished {
|
self.last_finished = Some(SubkernelFinished {
|
||||||
id: self.session.id,
|
id: self.session.id,
|
||||||
with_exception: true,
|
with_exception: true,
|
||||||
|
exception_source: destination,
|
||||||
|
source: self.session.source,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(subkernel_finished) = self.last_finished.take() {
|
||||||
|
info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception);
|
||||||
|
router.route(drtioaux::Packet::SubkernelFinished {
|
||||||
|
destination: subkernel_finished.source, id: subkernel_finished.id,
|
||||||
|
with_exception: subkernel_finished.with_exception, exception_src: destination
|
||||||
|
}, &routing_table, rank, destination);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_kern_message(&mut self, rank: u8, timer: GlobalTimer) -> Result<bool, Error> {
|
pub fn subkernel_load_run_reply(&mut self, succeeded: bool, self_destination: u8) {
|
||||||
|
if self.session.kernel_state == KernelState::SubkernelAwaitLoad {
|
||||||
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
} else {
|
||||||
|
warn!("received unsolicited SubkernelLoadRunReply");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
|
||||||
|
self.session.subkernels_finished.push_back((id, with_exception, exception_source));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_kern_message(&mut self, router: &mut Router,
|
||||||
|
routing_table: &RoutingTable, rank: u8, self_destination: u8, timer: &GlobalTimer) -> Result<bool, Error> {
|
||||||
let reply = self.control.rx.try_recv()?;
|
let reply = self.control.rx.try_recv()?;
|
||||||
match reply {
|
match reply {
|
||||||
kernel::Message::KernelFinished(_async_errors) => {
|
kernel::Message::KernelFinished(_async_errors) => {
|
||||||
|
@ -503,7 +546,7 @@ impl<'a> Manager<'_> {
|
||||||
Err(_) => error!("Error writing exception data"),
|
Err(_) => error!("Error writing exception data"),
|
||||||
}
|
}
|
||||||
self.kernel_stop();
|
self.kernel_stop();
|
||||||
return Err(Error::KernelException(Sliceable::new(writer.into_inner())));
|
return Err(Error::KernelException(Sliceable::new(0, writer.into_inner())));
|
||||||
}
|
}
|
||||||
kernel::Message::CachePutRequest(key, value) => {
|
kernel::Message::CachePutRequest(key, value) => {
|
||||||
self.cache.insert(key, value);
|
self.cache.insert(key, value);
|
||||||
|
@ -513,18 +556,30 @@ impl<'a> Manager<'_> {
|
||||||
let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone();
|
let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone();
|
||||||
self.control.tx.send(kernel::Message::CacheGetReply(value));
|
self.control.tx.send(kernel::Message::CacheGetReply(value));
|
||||||
}
|
}
|
||||||
kernel::Message::SubkernelMsgSend { id: _, data } => {
|
kernel::Message::SubkernelMsgSend { id, destination: msg_dest, data } => {
|
||||||
self.session.messages.accept_outgoing(data)?;
|
let msg_dest = msg_dest.or(Some(self.session.source)).unwrap();
|
||||||
|
self.session.messages.accept_outgoing(id, self_destination, msg_dest, data, routing_table, rank, router)?;
|
||||||
self.session.kernel_state = KernelState::MsgSending;
|
self.session.kernel_state = KernelState::MsgSending;
|
||||||
}
|
}
|
||||||
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => {
|
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => {
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
let max_time = timer.get_time() + Milliseconds(timeout);
|
||||||
self.session.kernel_state = KernelState::MsgAwait(max_time, tags);
|
self.session.kernel_state = KernelState::MsgAwait(max_time, tags);
|
||||||
}
|
}
|
||||||
|
kernel::Message::SubkernelLoadRunRequest { id, destination: sk_destination, run } => {
|
||||||
|
self.session.kernel_state = KernelState::SubkernelAwaitLoad;
|
||||||
|
router.route(drtioaux::Packet::SubkernelLoadRunRequest {
|
||||||
|
source: self_destination, destination: sk_destination, id: id, run: run
|
||||||
|
}, routing_table, rank, self_destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
kernel::Message::SubkernelAwaitFinishRequest{ id, timeout } => {
|
||||||
|
let max_time = timer.get_time() + Milliseconds(timeout);
|
||||||
|
self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id };
|
||||||
|
}
|
||||||
kernel::Message::UpDestinationsRequest(destination) => {
|
kernel::Message::UpDestinationsRequest(destination) => {
|
||||||
self.control
|
self.control
|
||||||
.tx
|
.tx
|
||||||
.send(kernel::Message::UpDestinationsReply(destination == (rank as i32)));
|
.send(kernel::Message::UpDestinationsReply(destination == (self_destination as i32)));
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
unexpected!("unexpected message from core1 while kernel was running: {:?}", reply);
|
unexpected!("unexpected message from core1 while kernel was running: {:?}", reply);
|
||||||
|
@ -533,7 +588,7 @@ impl<'a> Manager<'_> {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> {
|
fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
|
||||||
match &self.session.kernel_state {
|
match &self.session.kernel_state {
|
||||||
KernelState::MsgAwait(timeout, tags) => {
|
KernelState::MsgAwait(timeout, tags) => {
|
||||||
if timer.get_time() > *timeout {
|
if timer.get_time() > *timeout {
|
||||||
|
@ -565,11 +620,38 @@ impl<'a> Manager<'_> {
|
||||||
Err(Error::AwaitingMessage)
|
Err(Error::AwaitingMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
KernelState::SubkernelAwaitFinish { max_time, id } => {
|
||||||
|
if timer.get_time() > *max_time {
|
||||||
|
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { status: kernel::SubkernelStatus::Timeout });
|
||||||
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
} else {
|
||||||
|
let mut i = 0;
|
||||||
|
for status in self.session.subkernels_finished.iter() {
|
||||||
|
if status.0 == *id {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
if let Some(finish_status) = self.session.subkernels_finished.remove(i) {
|
||||||
|
if finish_status.1 {
|
||||||
|
self.kernel_stop();
|
||||||
|
self.last_finished = Some(SubkernelFinished {
|
||||||
|
source: self.session.source, id: self.session.id,
|
||||||
|
with_exception: true, exception_source: finish_status.2
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { status: kernel::SubkernelStatus::NoError });
|
||||||
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
_ => Ok(()),
|
_ => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec<u8>, timer: GlobalTimer) -> Result<(), Error> {
|
fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec<u8>, timer: &GlobalTimer) -> Result<(), Error> {
|
||||||
let mut reader = Cursor::new(&message.data);
|
let mut reader = Cursor::new(&message.data);
|
||||||
let mut current_tags: &[u8] = &tags;
|
let mut current_tags: &[u8] = &tags;
|
||||||
let mut i = message.count;
|
let mut i = message.count;
|
||||||
|
@ -592,7 +674,7 @@ impl<'a> Manager<'_> {
|
||||||
let mut writer = Cursor::new(buf);
|
let mut writer = Cursor::new(buf);
|
||||||
match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) {
|
match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
exception = Some(Sliceable::new(writer.into_inner()));
|
exception = Some(Sliceable::new(0, writer.into_inner()));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
unexpected = Some("Error writing exception data".to_string());
|
unexpected = Some("Error writing exception data".to_string());
|
||||||
|
@ -686,7 +768,7 @@ where
|
||||||
|
|
||||||
fn recv_w_timeout(
|
fn recv_w_timeout(
|
||||||
rx: &mut Receiver<'_, kernel::Message>,
|
rx: &mut Receiver<'_, kernel::Message>,
|
||||||
timer: GlobalTimer,
|
timer: &GlobalTimer,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) -> Result<kernel::Message, Error> {
|
) -> Result<kernel::Message, Error> {
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
let max_time = timer.get_time() + Milliseconds(timeout);
|
||||||
|
|
Loading…
Reference in New Issue