runtime: implement distributed DMA

pull/2068/head
Spaqin 2023-03-22 11:16:25 +08:00 committed by GitHub
parent 8b1f38b015
commit e9a153b985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 508 additions and 80 deletions

View File

@ -6,7 +6,7 @@ alone could achieve.
"""
from artiq.language.core import syscall, kernel
from artiq.language.types import TInt32, TInt64, TStr, TNone, TTuple
from artiq.language.types import TInt32, TInt64, TStr, TNone, TTuple, TBool
from artiq.coredevice.exceptions import DMAError
from numpy import int64
@ -17,7 +17,7 @@ def dma_record_start(name: TStr) -> TNone:
raise NotImplementedError("syscall not simulated")
@syscall
def dma_record_stop(duration: TInt64) -> TNone:
def dma_record_stop(duration: TInt64, enable_ddma: TBool) -> TNone:
raise NotImplementedError("syscall not simulated")
@syscall
@ -47,6 +47,7 @@ class DMARecordContextManager:
def __init__(self):
self.name = ""
self.saved_now_mu = int64(0)
self.enable_ddma = False
@kernel
def __enter__(self):
@ -56,7 +57,7 @@ class DMARecordContextManager:
@kernel
def __exit__(self, type, value, traceback):
dma_record_stop(now_mu()) # see above
dma_record_stop(now_mu(), self.enable_ddma) # see above
at_mu(self.saved_now_mu)
@ -74,12 +75,18 @@ class CoreDMA:
self.epoch = 0
@kernel
def record(self, name):
def record(self, name, enable_ddma=False):
"""Returns a context manager that will record a DMA trace called ``name``.
Any previously recorded trace with the same name is overwritten.
The trace will persist across kernel switches."""
The trace will persist across kernel switches.
In DRTIO context, you can toggle distributed DMA with ``enable_ddma``.
Enabling it allows running DMA on satellites, rather than sending all
events from the master.
Disabling it may improve performance in some scenarios,
e.g. when there are many small satellite buffers."""
self.epoch += 1
self.recorder.name = name
self.recorder.enable_ddma = enable_ddma
return self.recorder
@kernel

View File

@ -269,7 +269,7 @@ extern fn dma_record_start(name: &CSlice<u8>) {
}
#[unwind(allowed)]
extern fn dma_record_stop(duration: i64) {
extern fn dma_record_stop(duration: i64, enable_ddma: bool) {
unsafe {
dma_record_flush();
@ -285,7 +285,8 @@ extern fn dma_record_stop(duration: i64) {
DMA_RECORDER.active = false;
send(&DmaRecordStop {
duration: duration as u64
duration: duration as u64,
enable_ddma: enable_ddma
});
}
}
@ -370,7 +371,7 @@ extern fn dma_erase(name: &CSlice<u8>) {
#[repr(C)]
struct DmaTrace {
duration: i64,
address: i32,
address: i32
}
#[unwind(allowed)]
@ -404,6 +405,7 @@ extern fn dma_playback(timestamp: i64, ptr: i32) {
csr::cri_con::selected_write(1);
csr::rtio_dma::enable_write(1);
send(&DmaStartRemoteRequest { id: ptr as i32, timestamp: timestamp });
while csr::rtio_dma::enable_read() != 0 {}
csr::cri_con::selected_write(0);
@ -424,6 +426,24 @@ extern fn dma_playback(timestamp: i64, ptr: i32) {
}
}
}
send(&DmaAwaitRemoteRequest { id: ptr as i32 });
recv!(&DmaAwaitRemoteReply { timeout, error, channel, timestamp } => {
if timeout {
raise!("DMAError",
"Error running DMA on satellite device, timed out waiting for results");
}
if error & 1 != 0 {
raise!("RTIOUnderflow",
"RTIO underflow at channel {rtio_channel_info:0}, {1} mu",
channel as i64, timestamp as i64, 0);
}
if error & 2 != 0 {
raise!("RTIODestinationUnreachable",
"RTIO destination unreachable, output, at channel {rtio_channel_info:0}, {1} mu",
channel as i64, timestamp as i64, 0);
}
});
}
#[cfg(not(has_rtio_dma))]

View File

@ -14,8 +14,8 @@ impl<T> From<IoError<T>> for Error<T> {
}
}
/* 512 (max size) - 4 (CRC) - 1 (packet ID) - 4 (trace ID) - 1 (last) - 2 (length) */
const DMA_TRACE_MAX_SIZE: usize = 500;
/* 512 (max size) - 4 (CRC) - 1 (packet ID) - 1 (destination) - 4 (trace ID) - 1 (last) - 2 (length) */
pub const DMA_TRACE_MAX_SIZE: usize = 499;
#[derive(PartialEq, Debug)]
pub enum Packet {
@ -58,13 +58,13 @@ pub enum Packet {
SpiReadReply { succeeded: bool, data: u32 },
SpiBasicReply { succeeded: bool },
DmaAddTraceRequest { id: u32, last: bool, length: u16, trace: [u8; DMA_TRACE_MAX_SIZE] },
DmaAddTraceRequest { destination: u8, id: u32, last: bool, length: u16, trace: [u8; DMA_TRACE_MAX_SIZE] },
DmaAddTraceReply { succeeded: bool },
DmaRemoveTraceRequest { id: u32 },
DmaRemoveTraceRequest { destination: u8, id: u32 },
DmaRemoveTraceReply { succeeded: bool },
DmaPlaybackRequest { id: u32, timestamp: u64 },
DmaPlaybackRequest { destination: u8, id: u32, timestamp: u64 },
DmaPlaybackReply { succeeded: bool },
DmaPlaybackStatus { id: u32, error: u8, channel: u32, timestamp: u64 }
DmaPlaybackStatus { destination: u8, id: u32, error: u8, channel: u32, timestamp: u64 }
}
@ -198,12 +198,14 @@ impl Packet {
},
0xb0 => {
let destination = reader.read_u8()?;
let id = reader.read_u32()?;
let last = reader.read_bool()?;
let length = reader.read_u16()?;
let mut trace: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE];
reader.read_exact(&mut trace[0..length as usize])?;
Packet::DmaAddTraceRequest {
destination: destination,
id: id,
last: last,
length: length as u16,
@ -214,12 +216,14 @@ impl Packet {
succeeded: reader.read_bool()?
},
0xb2 => Packet::DmaRemoveTraceRequest {
destination: reader.read_u8()?,
id: reader.read_u32()?
},
0xb3 => Packet::DmaRemoveTraceReply {
succeeded: reader.read_bool()?
},
0xb4 => Packet::DmaPlaybackRequest {
destination: reader.read_u8()?,
id: reader.read_u32()?,
timestamp: reader.read_u64()?
},
@ -227,6 +231,7 @@ impl Packet {
succeeded: reader.read_bool()?
},
0xb6 => Packet::DmaPlaybackStatus {
destination: reader.read_u8()?,
id: reader.read_u32()?,
error: reader.read_u8()?,
channel: reader.read_u32()?,
@ -392,8 +397,9 @@ impl Packet {
writer.write_bool(succeeded)?;
},
Packet::DmaAddTraceRequest { id, last, trace, length } => {
Packet::DmaAddTraceRequest { destination, id, last, trace, length } => {
writer.write_u8(0xb0)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_bool(last)?;
// trace may be broken down to fit within drtio aux memory limit
@ -405,16 +411,18 @@ impl Packet {
writer.write_u8(0xb1)?;
writer.write_bool(succeeded)?;
},
Packet::DmaRemoveTraceRequest { id } => {
Packet::DmaRemoveTraceRequest { destination, id } => {
writer.write_u8(0xb2)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
},
Packet::DmaRemoveTraceReply { succeeded } => {
writer.write_u8(0xb3)?;
writer.write_bool(succeeded)?;
},
Packet::DmaPlaybackRequest { id, timestamp } => {
Packet::DmaPlaybackRequest { destination, id, timestamp } => {
writer.write_u8(0xb4)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_u64(timestamp)?;
},
@ -422,8 +430,9 @@ impl Packet {
writer.write_u8(0xb5)?;
writer.write_bool(succeeded)?;
},
Packet::DmaPlaybackStatus { id, error, channel, timestamp } => {
Packet::DmaPlaybackStatus { destination, id, error, channel, timestamp } => {
writer.write_u8(0xb6)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_u8(error)?;
writer.write_u32(channel)?;

View File

@ -20,7 +20,8 @@ pub enum Message<'a> {
DmaRecordStart(&'a str),
DmaRecordAppend(&'a [u8]),
DmaRecordStop {
duration: u64
duration: u64,
enable_ddma: bool
},
DmaEraseRequest {
@ -32,9 +33,24 @@ pub enum Message<'a> {
},
DmaRetrieveReply {
trace: Option<&'a [u8]>,
duration: u64
duration: u64,
},
DmaStartRemoteRequest {
id: i32,
timestamp: i64,
},
DmaAwaitRemoteRequest {
id: i32
},
DmaAwaitRemoteReply {
timeout: bool,
error: u8,
channel: u32,
timestamp: u64
},
RunFinished,
RunException {
exceptions: &'a [Option<eh::eh_artiq::Exception<'a>>],

View File

@ -182,6 +182,8 @@ fn startup() {
drtio_routing::interconnect_disable_all();
let aux_mutex = sched::Mutex::new();
let ddma_mutex = sched::Mutex::new();
let mut scheduler = sched::Scheduler::new(interface);
let io = scheduler.io();
@ -189,14 +191,15 @@ fn startup() {
io.spawn(4096, dhcp::dhcp_thread);
}
rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations);
rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex);
io.spawn(4096, mgmt::thread);
{
let aux_mutex = aux_mutex.clone();
let drtio_routing_table = drtio_routing_table.clone();
let up_destinations = up_destinations.clone();
io.spawn(16384, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations) });
let ddma_mutex = ddma_mutex.clone();
io.spawn(16384, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex) });
}
#[cfg(any(has_rtio_moninj, has_drtio))]
{

View File

@ -1,10 +1,182 @@
use core::mem;
use alloc::{vec::Vec, string::String, collections::btree_map::BTreeMap};
use sched::{Io, Mutex};
const ALIGNMENT: usize = 64;
#[cfg(has_drtio)]
pub mod remote_dma {
use super::*;
use board_artiq::drtio_routing::RoutingTable;
use rtio_mgt::drtio;
use board_misoc::clock;
#[derive(Debug, PartialEq, Clone)]
pub enum RemoteState {
NotLoaded,
Loaded,
PlaybackEnded { error: u8, channel: u32, timestamp: u64 }
}
#[derive(Debug, Clone)]
struct RemoteTrace {
trace: Vec<u8>,
pub state: RemoteState
}
impl From<Vec<u8>> for RemoteTrace {
fn from(trace: Vec<u8>) -> Self {
RemoteTrace {
trace: trace,
state: RemoteState::NotLoaded
}
}
}
impl RemoteTrace {
pub fn get_trace(&self) -> &Vec<u8> {
&self.trace
}
}
// remote traces map. ID -> destination, trace pair
static mut TRACES: BTreeMap<u32, BTreeMap<u8, RemoteTrace>> = BTreeMap::new();
pub fn add_traces(io: &Io, ddma_mutex: &Mutex, id: u32, traces: BTreeMap<u8, Vec<u8>>) {
let _lock = ddma_mutex.lock(io);
let mut trace_map: BTreeMap<u8, RemoteTrace> = BTreeMap::new();
for (destination, trace) in traces {
trace_map.insert(destination, trace.into());
}
unsafe { TRACES.insert(id, trace_map); }
}
pub fn await_done(io: &Io, ddma_mutex: &Mutex, id: u32, timeout: u64) -> Result<RemoteState, &'static str> {
let max_time = clock::get_ms() + timeout as u64;
io.until(|| {
if clock::get_ms() > max_time {
return true;
}
if ddma_mutex.test_lock() {
// cannot lock again within io.until - scheduler guarantees
// that it will not be interrupted - so only test the lock
return false;
}
let traces = unsafe { TRACES.get(&id).unwrap() };
for (_dest, trace) in traces {
match trace.state {
RemoteState::PlaybackEnded {error: _, channel: _, timestamp: _} => (),
_ => return false
}
}
true
}).unwrap();
if clock::get_ms() > max_time {
error!("Remote DMA await done timed out");
return Err("Timed out waiting for results.");
}
// clear the internal state, and if there have been any errors, return one of them
let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 };
{
let _lock = ddma_mutex.lock(io).unwrap();
let traces = unsafe { TRACES.get_mut(&id).unwrap() };
for (_dest, trace) in traces {
match trace.state {
RemoteState::PlaybackEnded {error: e, channel: _c, timestamp: _ts} => if e != 0 { playback_state = trace.state.clone(); },
_ => (),
}
trace.state = RemoteState::Loaded;
}
}
Ok(playback_state)
}
pub fn erase(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable,
ddma_mutex: &Mutex, id: u32) {
let _lock = ddma_mutex.lock(io).unwrap();
let destinations = unsafe { TRACES.get(&id).unwrap() };
for destination in destinations.keys() {
match drtio::ddma_send_erase(io, aux_mutex, routing_table, id, *destination) {
Ok(_) => (),
Err(e) => error!("Error erasing trace on DMA: {}", e)
}
}
unsafe { TRACES.remove(&id); }
}
pub fn upload_traces(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable,
ddma_mutex: &Mutex, id: u32) {
let _lock = ddma_mutex.lock(io);
let traces = unsafe { TRACES.get_mut(&id).unwrap() };
for (destination, mut trace) in traces {
match drtio::ddma_upload_trace(io, aux_mutex, routing_table, id, *destination, trace.get_trace())
{
Ok(_) => trace.state = RemoteState::Loaded,
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e)
}
}
}
pub fn playback(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable,
ddma_mutex: &Mutex, id: u32, timestamp: u64) {
// triggers playback on satellites
let destinations = unsafe {
let _lock = ddma_mutex.lock(io).unwrap();
TRACES.get(&id).unwrap() };
for (destination, trace) in destinations {
{
// need to drop the lock before sending the playback request to avoid a deadlock
// if a PlaybackStatus is returned from another satellite in the meanwhile.
let _lock = ddma_mutex.lock(io).unwrap();
if trace.state != RemoteState::Loaded {
error!("Destination {} not ready for DMA, state: {:?}", *destination, trace.state);
continue;
}
}
match drtio::ddma_send_playback(io, aux_mutex, routing_table, ddma_mutex, id, *destination, timestamp) {
Ok(_) => (),
Err(e) => error!("Error during remote DMA playback: {}", e)
}
}
}
pub fn playback_done(io: &Io, ddma_mutex: &Mutex,
id: u32, destination: u8, error: u8, channel: u32, timestamp: u64) {
// called upon receiving PlaybackDone aux packet
let _lock = ddma_mutex.lock(io).unwrap();
let mut trace = unsafe { TRACES.get_mut(&id).unwrap().get_mut(&destination).unwrap() };
trace.state = RemoteState::PlaybackEnded {
error: error,
channel: channel,
timestamp: timestamp
};
}
pub fn destination_changed(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable,
ddma_mutex: &Mutex, destination: u8, up: bool) {
// update state of the destination, resend traces if it's up
let _lock = ddma_mutex.lock(io).unwrap();
let traces_iter = unsafe { TRACES.iter_mut() };
for (id, dest_traces) in traces_iter {
if let Some(trace) = dest_traces.get_mut(&destination) {
if up {
match drtio::ddma_upload_trace(io, aux_mutex, routing_table, *id, destination, trace.get_trace())
{
Ok(_) => trace.state = RemoteState::Loaded,
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e)
}
} else {
trace.state = RemoteState::NotLoaded;
}
}
}
}
}
#[derive(Debug)]
struct Entry {
struct LocalEntry {
trace: Vec<u8>,
padding_len: usize,
duration: u64
@ -12,7 +184,8 @@ struct Entry {
#[derive(Debug)]
pub struct Manager {
entries: BTreeMap<String, Entry>,
entries: BTreeMap<u32, LocalEntry>,
name_map: BTreeMap<String, u32>,
recording_name: String,
recording_trace: Vec<u8>
}
@ -21,59 +194,116 @@ impl Manager {
pub fn new() -> Manager {
Manager {
entries: BTreeMap::new(),
recording_name: String::new(),
name_map: BTreeMap::new(),
recording_trace: Vec::new(),
recording_name: String::new()
}
}
pub fn record_start(&mut self, name: &str) {
pub fn record_start(&mut self, name: &str) -> Option<u32> {
self.recording_name = String::from(name);
self.recording_trace = Vec::new();
// or we could needlessly OOM replacing a large trace
self.entries.remove(name);
if let Some(id) = self.name_map.get(&self.recording_name) {
// replacing a trace
let old_id = id.clone();
self.entries.remove(&id);
self.name_map.remove(&self.recording_name);
// return old ID
return Some(old_id);
}
return None;
}
pub fn record_append(&mut self, data: &[u8]) {
self.recording_trace.extend_from_slice(data)
}
pub fn record_stop(&mut self, duration: u64) {
let mut trace = Vec::new();
mem::swap(&mut self.recording_trace, &mut trace);
trace.push(0);
let data_len = trace.len();
pub fn record_stop(&mut self, duration: u64, enable_ddma: bool,
_io: &Io, _ddma_mutex: &Mutex) -> u32 {
let mut local_trace = Vec::new();
let mut remote_traces: BTreeMap<u8, Vec<u8>> = BTreeMap::new();
// Realign.
trace.reserve(ALIGNMENT - 1);
let padding = ALIGNMENT - trace.as_ptr() as usize % ALIGNMENT;
if enable_ddma {
let mut trace = Vec::new();
mem::swap(&mut self.recording_trace, &mut trace);
trace.push(0);
// analyze each entry and put in proper buckets, as the kernel core
// sends whole chunks, to limit comms/kernel CPU communication,
// and as only comms core has access to varios DMA buffers.
let mut ptr = 0;
while trace[ptr] != 0 {
// ptr + 3 = tgt >> 24 (destination)
let len = trace[ptr] as usize;
let destination = trace[ptr+3];
if destination == 0 {
local_trace.extend(&trace[ptr..ptr+len]);
}
else {
if let Some(remote_trace) = remote_traces.get_mut(&destination) {
remote_trace.extend(&trace[ptr..ptr+len]);
} else {
remote_traces.insert(destination, trace[ptr..ptr+len].to_vec());
}
}
// and jump to the next event
ptr += len;
}
} else {
// with disabled DDMA, move the whole trace to local
mem::swap(&mut self.recording_trace, &mut local_trace);
}
local_trace.push(0);
let data_len = local_trace.len();
// Realign the local entry.
local_trace.reserve(ALIGNMENT - 1);
let padding = ALIGNMENT - local_trace.as_ptr() as usize % ALIGNMENT;
let padding = if padding == ALIGNMENT { 0 } else { padding };
for _ in 0..padding {
// Vec guarantees that this will not reallocate
trace.push(0)
local_trace.push(0)
}
for i in 1..data_len + 1 {
trace[data_len + padding - i] = trace[data_len - i]
local_trace[data_len + padding - i] = local_trace[data_len - i]
}
// trace ID is its pointer
let id = local_trace[padding..].as_ptr() as u32;
self.entries.insert(id, LocalEntry {
trace: local_trace,
padding_len: padding,
duration: duration,
});
let mut name = String::new();
mem::swap(&mut self.recording_name, &mut name);
self.entries.insert(name, Entry {
trace: trace,
padding_len: padding,
duration: duration
});
self.name_map.insert(name, id);
#[cfg(has_drtio)]
remote_dma::add_traces(_io, _ddma_mutex, id, remote_traces);
id
}
pub fn erase(&mut self, name: &str) {
self.entries.remove(name);
if let Some(id) = self.name_map.get(name) {
self.entries.remove(&id);
}
self.name_map.remove(name);
}
#[cfg(has_drtio)]
pub fn get_id(&mut self, name: &str) -> Option<&u32> {
self.name_map.get(name)
}
pub fn with_trace<F, R>(&self, name: &str, f: F) -> R
where F: FnOnce(Option<&[u8]>, u64) -> R {
match self.entries.get(name) {
Some(entry) => f(Some(&entry.trace[entry.padding_len..]), entry.duration),
None => f(None, 0)
if let Some(ptr) = self.name_map.get(name) {
match self.entries.get(ptr) {
Some(entry) => f(Some(&entry.trace[entry.padding_len..]), entry.duration),
None => f(None, 0)
}
} else {
f(None, 0)
}
}
}

View File

@ -18,17 +18,22 @@ static mut RTIO_DEVICE_MAP: BTreeMap<u32, String> = BTreeMap::new();
#[cfg(has_drtio)]
pub mod drtio {
use super::*;
use alloc::vec::Vec;
use drtioaux;
use proto_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE;
use rtio_dma::remote_dma;
pub fn startup(io: &Io, aux_mutex: &Mutex,
routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
ddma_mutex: &Mutex) {
let aux_mutex = aux_mutex.clone();
let routing_table = routing_table.clone();
let up_destinations = up_destinations.clone();
io.spawn(4096, move |io| {
let ddma_mutex = ddma_mutex.clone();
io.spawn(8192, move |io| {
let routing_table = routing_table.borrow();
link_thread(io, &aux_mutex, &routing_table, &up_destinations);
link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex);
});
}
@ -147,9 +152,12 @@ pub mod drtio {
}
}
fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8) {
fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8, ddma_mutex: &Mutex) {
let _lock = aux_mutex.lock(io).unwrap();
match drtioaux::recv(linkno) {
Ok(Some(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp })) => {
remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp);
}
Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet),
Ok(None) => (),
Err(_) => warn!("[LINK#{}] aux packet error", linkno)
@ -198,7 +206,8 @@ pub mod drtio {
fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable,
up_links: &[bool],
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
ddma_mutex: &Mutex) {
for destination in 0..drtio_routing::DEST_COUNT {
let hop = routing_table.0[destination][0];
let destination = destination as u8;
@ -216,8 +225,10 @@ pub mod drtio {
destination: destination
});
match reply {
Ok(drtioaux::Packet::DestinationDownReply) =>
destination_set_up(routing_table, up_destinations, destination, false),
Ok(drtioaux::Packet::DestinationDownReply) => {
destination_set_up(routing_table, up_destinations, destination, false);
remote_dma::destination_changed(io, aux_mutex, routing_table, ddma_mutex, destination, false);
}
Ok(drtioaux::Packet::DestinationOkReply) => (),
Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => {
error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32));
@ -236,6 +247,7 @@ pub mod drtio {
}
} else {
destination_set_up(routing_table, up_destinations, destination, false);
remote_dma::destination_changed(io, aux_mutex, routing_table, ddma_mutex, destination, false);
}
} else {
if up_links[linkno as usize] {
@ -247,6 +259,7 @@ pub mod drtio {
Ok(drtioaux::Packet::DestinationOkReply) => {
destination_set_up(routing_table, up_destinations, destination, true);
init_buffer_space(destination as u8, linkno);
remote_dma::destination_changed(io, aux_mutex, routing_table, ddma_mutex, destination, true);
},
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e)
@ -259,7 +272,8 @@ pub mod drtio {
pub fn link_thread(io: Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
ddma_mutex: &Mutex) {
let mut up_links = [false; csr::DRTIO.len()];
loop {
for linkno in 0..csr::DRTIO.len() {
@ -267,7 +281,7 @@ pub mod drtio {
if up_links[linkno as usize] {
/* link was previously up */
if link_rx_up(linkno) {
process_unsolicited_aux(&io, aux_mutex, linkno);
process_unsolicited_aux(&io, aux_mutex, linkno, ddma_mutex);
process_local_errors(linkno);
} else {
info!("[LINK#{}] link is down", linkno);
@ -297,7 +311,7 @@ pub mod drtio {
}
}
}
destination_survey(&io, aux_mutex, routing_table, &up_links, up_destinations);
destination_survey(&io, aux_mutex, routing_table, &up_links, up_destinations, ddma_mutex);
io.sleep(200).unwrap();
}
}
@ -328,6 +342,72 @@ pub mod drtio {
}
}
}
pub fn ddma_upload_trace(io: &Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
id: u32, destination: u8, trace: &Vec<u8>) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
let mut i = 0;
while i < trace.len() {
let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE];
let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { DMA_TRACE_MAX_SIZE } else { trace.len() - i } as usize;
let last = i + len == trace.len();
trace_slice[..len].clone_from_slice(&trace[i..i+len]);
i += len;
let reply = aux_transact(io, aux_mutex, linkno,
&drtioaux::Packet::DmaAddTraceRequest {
id: id, destination: destination, last: last, length: len as u16, trace: trace_slice});
match reply {
Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: true }) => (),
Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: false }) => {
return Err("error adding trace on satellite"); },
Ok(_) => { return Err("adding DMA trace failed, unexpected aux packet"); },
Err(_) => { return Err("adding DMA trace failed, aux error"); }
}
}
Ok(())
}
pub fn ddma_send_erase(io: &Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
id: u32, destination: u8) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
let reply = aux_transact(io, aux_mutex, linkno,
&drtioaux::Packet::DmaRemoveTraceRequest { id: id, destination: destination });
match reply {
Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()),
Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"),
Ok(_) => Err("adding trace failed, unexpected aux packet"),
Err(_) => Err("erasing trace failed, aux error")
}
}
pub fn ddma_send_playback(io: &Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
ddma_mutex: &Mutex, id: u32, destination: u8, timestamp: u64) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
let _lock = aux_mutex.lock(io).unwrap();
drtioaux::send(linkno, &drtioaux::Packet::DmaPlaybackRequest{
id: id, destination: destination, timestamp: timestamp }).unwrap();
loop {
let reply = recv_aux_timeout(io, linkno, 200);
match reply {
Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: true }) => { return Ok(()) },
Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: false }) => {
return Err("error on DMA playback request") },
// in case we received status from another destination
// but we want to get DmaPlaybackReply anyway, thus the loop
Ok(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => {
remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp);
},
Ok(_) => { return Err("received unexpected aux packet while DMA playback") },
Err(_) => { return Err("aux error on DMA playback") }
}
}
}
}
#[cfg(not(has_drtio))]
@ -336,7 +416,8 @@ pub mod drtio {
pub fn startup(_io: &Io, _aux_mutex: &Mutex,
_routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
_up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {}
_up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
_ddma_mutex: &Mutex) {}
pub fn reset(_io: &Io, _aux_mutex: &Mutex) {}
}
@ -410,9 +491,10 @@ pub fn resolve_channel_name(channel: u32) -> String {
pub fn startup(io: &Io, aux_mutex: &Mutex,
routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
ddma_mutex: &Mutex) {
unsafe { RTIO_DEVICE_MAP = read_device_map(); }
drtio::startup(io, aux_mutex, routing_table, up_destinations);
drtio::startup(io, aux_mutex, routing_table, up_destinations, ddma_mutex);
unsafe {
csr::rtio_core::reset_phy_write(1);
}

View File

@ -301,6 +301,10 @@ impl Mutex {
self.0.set(true);
Ok(MutexGuard(&*self.0))
}
pub fn test_lock<'a>(&'a self) -> bool {
self.0.get()
}
}
pub struct MutexGuard<'a>(&'a Cell<bool>);

View File

@ -10,6 +10,8 @@ use urc::Urc;
use sched::{ThreadHandle, Io, Mutex, TcpListener, TcpStream, Error as SchedError};
use rtio_clocking;
use rtio_dma::Manager as DmaManager;
#[cfg(has_drtio)]
use rtio_dma::remote_dma;
use rtio_mgt::{get_async_errors, resolve_channel_name};
use cache::Cache;
use kern_hwreq;
@ -329,7 +331,7 @@ fn process_host_message(io: &Io,
fn process_kern_message(io: &Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
mut stream: Option<&mut TcpStream>,
ddma_mutex: &Mutex, mut stream: Option<&mut TcpStream>,
session: &mut Session) -> Result<bool, Error<SchedError>> {
kern_recv_notrace(io, |request| {
match (request, session.kernel_state) {
@ -368,19 +370,31 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
}
&kern::DmaRecordStart(name) => {
session.congress.dma_manager.record_start(name);
if let Some(_id) = session.congress.dma_manager.record_start(name) {
// replace the record
#[cfg(has_drtio)]
remote_dma::erase(io, aux_mutex, routing_table, ddma_mutex, _id);
}
kern_acknowledge()
}
&kern::DmaRecordAppend(data) => {
session.congress.dma_manager.record_append(data);
kern_acknowledge()
}
&kern::DmaRecordStop { duration } => {
session.congress.dma_manager.record_stop(duration);
&kern::DmaRecordStop { duration, enable_ddma } => {
let _id = session.congress.dma_manager.record_stop(duration, enable_ddma, io, ddma_mutex);
#[cfg(has_drtio)]
{
remote_dma::upload_traces(io, aux_mutex, routing_table, ddma_mutex, _id);
}
cache::flush_l2_cache();
kern_acknowledge()
}
&kern::DmaEraseRequest { name } => {
#[cfg(has_drtio)]
if let Some(id) = session.congress.dma_manager.get_id(name) {
remote_dma::erase(io, aux_mutex, routing_table, ddma_mutex, *id);
}
session.congress.dma_manager.erase(name);
kern_acknowledge()
}
@ -392,6 +406,27 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
})
})
}
&kern::DmaStartRemoteRequest { id: _id, timestamp: _timestamp } => {
#[cfg(has_drtio)]
remote_dma::playback(io, aux_mutex, routing_table, ddma_mutex, _id as u32, _timestamp as u64);
kern_acknowledge()
}
&kern::DmaAwaitRemoteRequest { id: _id } => {
#[cfg(has_drtio)]
let reply = match remote_dma::await_done(io, ddma_mutex, _id as u32, 10_000) {
Ok(remote_dma::RemoteState::PlaybackEnded { error, channel, timestamp }) =>
kern::DmaAwaitRemoteReply {
timeout: false,
error: error,
channel: channel,
timestamp: timestamp
},
_ => kern::DmaAwaitRemoteReply { timeout: true, error: 0, channel: 0, timestamp: 0},
};
#[cfg(not(has_drtio))]
let reply = kern::DmaAwaitRemoteReply { timeout: false, error: 0, channel: 0, timestamp: 0};
kern_send(io, &reply)
}
&kern::RpcSend { async, service, tag, data } => {
match stream {
@ -511,7 +546,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream,
fn host_kernel_worker(io: &Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
stream: &mut TcpStream,
ddma_mutex: &Mutex, stream: &mut TcpStream,
congress: &mut Congress) -> Result<(), Error<SchedError>> {
let mut session = Session::new(congress);
@ -529,6 +564,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex,
if mailbox::receive() != 0 {
process_kern_message(io, aux_mutex,
routing_table, up_destinations,
ddma_mutex,
Some(stream), &mut session)?;
}
@ -546,7 +582,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex,
fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex,
routing_table: &drtio_routing::RoutingTable,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
congress: &mut Congress,
ddma_mutex: &Mutex, congress: &mut Congress,
config_key: &str) -> Result<(), Error<SchedError>> {
let mut session = Session::new(congress);
@ -568,7 +604,7 @@ fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex,
}
if mailbox::receive() != 0 {
if process_kern_message(io, aux_mutex, routing_table, up_destinations, None, &mut session)? {
if process_kern_message(io, aux_mutex, routing_table, up_destinations, ddma_mutex, None, &mut session)? {
return Ok(())
}
}
@ -598,7 +634,8 @@ fn respawn<F>(io: &Io, handle: &mut Option<ThreadHandle>, f: F)
pub fn thread(io: Io, aux_mutex: &Mutex,
routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
ddma_mutex: &Mutex) {
let listener = TcpListener::new(&io, 65535);
listener.listen(1381).expect("session: cannot listen");
info!("accepting network sessions");
@ -609,7 +646,7 @@ pub fn thread(io: Io, aux_mutex: &Mutex,
{
let mut congress = congress.borrow_mut();
info!("running startup kernel");
match flash_kernel_worker(&io, &aux_mutex, &routing_table.borrow(), &up_destinations, &mut congress, "startup_kernel") {
match flash_kernel_worker(&io, &aux_mutex, &routing_table.borrow(), &up_destinations, ddma_mutex, &mut congress, "startup_kernel") {
Ok(()) =>
info!("startup kernel finished"),
Err(Error::KernelNotFound) =>
@ -649,12 +686,13 @@ pub fn thread(io: Io, aux_mutex: &Mutex,
let routing_table = routing_table.clone();
let up_destinations = up_destinations.clone();
let congress = congress.clone();
let ddma_mutex = ddma_mutex.clone();
let stream = stream.into_handle();
respawn(&io, &mut kernel_thread, move |io| {
let routing_table = routing_table.borrow();
let mut congress = congress.borrow_mut();
let mut stream = TcpStream::from_handle(&io, stream);
match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut stream, &mut *congress) {
match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &mut stream, &mut *congress) {
Ok(()) => (),
Err(Error::Protocol(host::Error::Io(IoError::UnexpectedEnd))) =>
info!("connection closed"),
@ -677,10 +715,11 @@ pub fn thread(io: Io, aux_mutex: &Mutex,
let routing_table = routing_table.clone();
let up_destinations = up_destinations.clone();
let congress = congress.clone();
let ddma_mutex = ddma_mutex.clone();
respawn(&io, &mut kernel_thread, move |io| {
let routing_table = routing_table.borrow();
let mut congress = congress.borrow_mut();
match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut *congress, "idle_kernel") {
match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &mut *congress, "idle_kernel") {
Ok(()) =>
info!("idle kernel finished, standing by"),
Err(Error::Protocol(host::Error::Io(

View File

@ -1,4 +1,4 @@
use board_misoc::csr;
use board_misoc::{csr, cache::flush_l2_cache};
use alloc::{vec::Vec, collections::btree_map::BTreeMap};
const ALIGNMENT: usize = 64;
@ -52,7 +52,19 @@ impl Manager {
pub fn add(&mut self, id: u32, last: bool, trace: &[u8], trace_len: usize) -> Result<(), Error> {
let entry = match self.entries.get_mut(&id) {
Some(entry) => entry,
Some(entry) => {
if entry.complete {
// replace entry
self.entries.remove(&id);
self.entries.insert(id, Entry {
trace: Vec::new(),
padding_len: 0,
complete: false });
self.entries.get_mut(&id).unwrap()
} else {
entry
}
},
None => {
self.entries.insert(id, Entry {
trace: Vec::new(),
@ -80,6 +92,7 @@ impl Manager {
}
entry.complete = true;
entry.padding_len = padding;
flush_l2_cache();
}
Ok(())
}

View File

@ -301,19 +301,22 @@ fn process_aux_packet(_manager: &mut DmaManager, _repeaters: &mut [repeater::Rep
}
}
#[cfg(has_rtio_dma)]
drtioaux::Packet::DmaAddTraceRequest { id, last, length, trace } => {
drtioaux::Packet::DmaAddTraceRequest { destination: _destination, id, last, length, trace } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet);
let succeeded = _manager.add(id, last, &trace, length as usize).is_ok();
drtioaux::send(0,
&drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded })
}
#[cfg(has_rtio_dma)]
drtioaux::Packet::DmaRemoveTraceRequest { id } => {
drtioaux::Packet::DmaRemoveTraceRequest { destination: _destination, id } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet);
let succeeded = _manager.erase(id).is_ok();
drtioaux::send(0,
&drtioaux::Packet::DmaRemoveTraceReply { succeeded: succeeded })
}
#[cfg(has_rtio_dma)]
drtioaux::Packet::DmaPlaybackRequest { id, timestamp } => {
drtioaux::Packet::DmaPlaybackRequest { destination: _destination, id, timestamp } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet);
let succeeded = _manager.playback(id, timestamp).is_ok();
drtioaux::send(0,
&drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded })
@ -463,7 +466,8 @@ pub extern fn main() -> i32 {
unsafe {
ALLOC.add_range(&mut _fheap, &mut _eheap);
pmp::init_stack_guard(&_sstack_guard as *const u8 as usize);
// stack guard disabled, see https://github.com/m-labs/artiq/issues/2067
// pmp::init_stack_guard(&_sstack_guard as *const u8 as usize);
}
clock::init();
@ -571,8 +575,9 @@ pub extern fn main() -> i32 {
}
}
if let Some(status) = dma_manager.check_state() {
info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp);
if let Err(e) = drtioaux::send(0, &drtioaux::Packet::DmaPlaybackStatus {
id: status.id, error: status.error, channel: status.channel, timestamp: status.timestamp }) {
destination: rank, id: status.id, error: status.error, channel: status.channel, timestamp: status.timestamp }) {
error!("error sending DMA playback status: {}", e);
}
}