forked from M-Labs/artiq-zynq
runtime: port drtio-enabled analyzer
This commit is contained in:
parent
c536a70890
commit
a27b450def
@ -1,7 +1,10 @@
|
|||||||
use libasync::{smoltcp::TcpStream, task};
|
use libasync::{smoltcp::TcpStream, task};
|
||||||
use libboard_zynq::smoltcp::Error;
|
use libboard_zynq::{smoltcp::Error, timer::GlobalTimer};
|
||||||
use libcortex_a9::cache;
|
use libcortex_a9::{cache, mutex::Mutex};
|
||||||
use log::{debug, info, warn};
|
use log::{debug, info, warn, error};
|
||||||
|
use core::cell::RefCell;
|
||||||
|
use alloc::{rc::Rc, vec::Vec};
|
||||||
|
use libboard_artiq::drtio_routing;
|
||||||
|
|
||||||
use crate::{pl, proto_async::*};
|
use crate::{pl, proto_async::*};
|
||||||
|
|
||||||
@ -37,6 +40,48 @@ fn disarm() {
|
|||||||
debug!("RTIO analyzer disarmed");
|
debug!("RTIO analyzer disarmed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
pub mod remote_analyzer {
|
||||||
|
use super::*;
|
||||||
|
use crate::rtio_mgt::drtio;
|
||||||
|
|
||||||
|
pub struct RemoteBuffer {
|
||||||
|
pub total_byte_count: u64,
|
||||||
|
pub sent_bytes: u32,
|
||||||
|
pub error: bool,
|
||||||
|
pub data: Vec<u8>
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_data(aux_mutex: &Rc<Mutex<bool>>,
|
||||||
|
routing_table: &drtio_routing::RoutingTable,
|
||||||
|
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
|
||||||
|
timer: GlobalTimer) -> Result<RemoteBuffer, &'static str> {
|
||||||
|
// gets data from satellites and returns consolidated data
|
||||||
|
let mut remote_data: Vec<u8> = Vec::new();
|
||||||
|
let mut remote_error = false;
|
||||||
|
let mut remote_sent_bytes = 0;
|
||||||
|
let mut remote_total_bytes = 0;
|
||||||
|
|
||||||
|
let data_vec = match drtio::analyzer_query(aux_mutex, routing_table, up_destinations, timer).await {
|
||||||
|
Ok(data_vec) => data_vec,
|
||||||
|
Err(e) => return Err(e)
|
||||||
|
};
|
||||||
|
for data in data_vec {
|
||||||
|
remote_total_bytes += data.total_byte_count;
|
||||||
|
remote_sent_bytes += data.sent_bytes;
|
||||||
|
remote_error |= data.error;
|
||||||
|
remote_data.extend(data.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(RemoteBuffer {
|
||||||
|
total_byte_count: remote_total_bytes,
|
||||||
|
sent_bytes: remote_sent_bytes,
|
||||||
|
error: remote_error,
|
||||||
|
data: remote_data
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Header {
|
struct Header {
|
||||||
sent_bytes: u32,
|
sent_bytes: u32,
|
||||||
@ -56,7 +101,10 @@ async fn write_header(stream: &mut TcpStream, header: &Header) -> Result<(), Err
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> {
|
async fn handle_connection(stream: &mut TcpStream, _aux_mutex: &Rc<Mutex<bool>>,
|
||||||
|
_routing_table: &drtio_routing::RoutingTable,
|
||||||
|
_up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
|
||||||
|
_timer: GlobalTimer) -> Result<(), Error> {
|
||||||
info!("received connection");
|
info!("received connection");
|
||||||
|
|
||||||
let data = unsafe { &BUFFER.data[..] };
|
let data = unsafe { &BUFFER.data[..] };
|
||||||
@ -65,6 +113,7 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> {
|
|||||||
let total_byte_count = unsafe { pl::csr::rtio_analyzer::dma_byte_count_read() as u64 };
|
let total_byte_count = unsafe { pl::csr::rtio_analyzer::dma_byte_count_read() as u64 };
|
||||||
let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize;
|
let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize;
|
||||||
let wraparound = total_byte_count >= BUFFER_SIZE as u64;
|
let wraparound = total_byte_count >= BUFFER_SIZE as u64;
|
||||||
|
let sent_bytes = if wraparound { BUFFER_SIZE as u32 } else { total_byte_count as u32 };
|
||||||
|
|
||||||
if overflow_occurred {
|
if overflow_occurred {
|
||||||
warn!("overflow occured");
|
warn!("overflow occured");
|
||||||
@ -73,13 +122,34 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> {
|
|||||||
warn!("bus error occured");
|
warn!("bus error occured");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
let remote = remote_analyzer::get_data(_aux_mutex, _routing_table, _up_destinations, _timer).await;
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
let (header, remote_data) = match remote {
|
||||||
|
Ok(remote) => (Header {
|
||||||
|
total_byte_count: total_byte_count + remote.total_byte_count,
|
||||||
|
sent_bytes: sent_bytes + remote.sent_bytes,
|
||||||
|
error_occurred: overflow_occurred | bus_error_occurred | remote.error,
|
||||||
|
log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
||||||
|
dds_onehot_sel: true
|
||||||
|
}, remote.data),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error getting remote analyzer data: {}", e);
|
||||||
|
(Header {
|
||||||
|
total_byte_count: total_byte_count,
|
||||||
|
sent_bytes: sent_bytes,
|
||||||
|
error_occurred: true,
|
||||||
|
log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
||||||
|
dds_onehot_sel: true
|
||||||
|
},
|
||||||
|
Vec::new())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(not(has_drtio))]
|
||||||
let header = Header {
|
let header = Header {
|
||||||
total_byte_count: total_byte_count,
|
total_byte_count: total_byte_count,
|
||||||
sent_bytes: if wraparound {
|
sent_bytes: sent_bytes,
|
||||||
BUFFER_SIZE as u32
|
|
||||||
} else {
|
|
||||||
total_byte_count as u32
|
|
||||||
},
|
|
||||||
error_occurred: overflow_occurred | bus_error_occurred,
|
error_occurred: overflow_occurred | bus_error_occurred,
|
||||||
log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
||||||
dds_onehot_sel: true, // kept for backward compatibility of analyzer dumps
|
dds_onehot_sel: true, // kept for backward compatibility of analyzer dumps
|
||||||
@ -93,17 +163,26 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> {
|
|||||||
} else {
|
} else {
|
||||||
stream.send(data[..pointer].iter().copied()).await?;
|
stream.send(data[..pointer].iter().copied()).await?;
|
||||||
}
|
}
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
stream.send(remote_data.iter().copied()).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start() {
|
pub fn start(aux_mutex: &Rc<Mutex<bool>>,
|
||||||
|
routing_table: &Rc<RefCell<drtio_routing::RoutingTable>>,
|
||||||
|
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
|
||||||
|
timer: GlobalTimer,) {
|
||||||
|
let aux_mutex = aux_mutex.clone();
|
||||||
|
let routing_table = routing_table.clone();
|
||||||
|
let up_destinations = up_destinations.clone();
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
arm();
|
arm();
|
||||||
let mut stream = TcpStream::accept(1382, 2048, 2048).await.unwrap();
|
let mut stream = TcpStream::accept(1382, 2048, 2048).await.unwrap();
|
||||||
disarm();
|
disarm();
|
||||||
let _ = handle_connection(&mut stream)
|
let routing_table = routing_table.borrow();
|
||||||
|
let _ = handle_connection(&mut stream, &aux_mutex, &routing_table, &up_destinations, timer)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| warn!("connection terminated: {:?}", e));
|
.map_err(|e| warn!("connection terminated: {:?}", e));
|
||||||
let _ = stream.flush().await;
|
let _ = stream.flush().await;
|
||||||
|
@ -524,7 +524,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) {
|
|||||||
|
|
||||||
rtio_mgt::startup(&aux_mutex, &drtio_routing_table, &up_destinations, timer, &cfg);
|
rtio_mgt::startup(&aux_mutex, &drtio_routing_table, &up_destinations, timer, &cfg);
|
||||||
|
|
||||||
analyzer::start();
|
analyzer::start(&aux_mutex, &drtio_routing_table, &up_destinations, timer);
|
||||||
moninj::start(timer, &aux_mutex, &drtio_routing_table);
|
moninj::start(timer, &aux_mutex, &drtio_routing_table);
|
||||||
|
|
||||||
let control: Rc<RefCell<kernel::Control>> = Rc::new(RefCell::new(kernel::Control::start()));
|
let control: Rc<RefCell<kernel::Control>> = Rc::new(RefCell::new(kernel::Control::start()));
|
||||||
|
@ -22,7 +22,7 @@ pub mod drtio {
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{rtio_dma::remote_dma, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR,
|
use crate::{rtio_dma::remote_dma, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR,
|
||||||
SEEN_ASYNC_ERRORS};
|
SEEN_ASYNC_ERRORS, analyzer::remote_analyzer::RemoteBuffer};
|
||||||
|
|
||||||
pub fn startup(
|
pub fn startup(
|
||||||
aux_mutex: &Rc<Mutex<bool>>,
|
aux_mutex: &Rc<Mutex<bool>>,
|
||||||
@ -522,6 +522,67 @@ pub mod drtio {
|
|||||||
Err(_) => Err("aux error on DMA playback"),
|
Err(_) => Err("aux error on DMA playback"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn analyzer_get_data(
|
||||||
|
aux_mutex: &Rc<Mutex<bool>>,
|
||||||
|
routing_table: &drtio_routing::RoutingTable,
|
||||||
|
timer: GlobalTimer,
|
||||||
|
destination: u8
|
||||||
|
) -> Result<RemoteBuffer, &'static str> {
|
||||||
|
let linkno = routing_table.0[destination as usize][0] - 1;
|
||||||
|
let reply = aux_transact(
|
||||||
|
aux_mutex,
|
||||||
|
linkno,
|
||||||
|
&Packet::AnalyzerHeaderRequest { destination: destination },
|
||||||
|
timer,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let (sent, total, overflow) = match reply {
|
||||||
|
Ok(Packet::AnalyzerHeader {
|
||||||
|
sent_bytes, total_byte_count, overflow_occurred }
|
||||||
|
) => (sent_bytes, total_byte_count, overflow_occurred),
|
||||||
|
Ok(_) => return Err("received unexpected aux packet during remote analyzer header request"),
|
||||||
|
Err(e) => return Err(e)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut remote_data: Vec<u8> = Vec::new();
|
||||||
|
if sent > 0 {
|
||||||
|
let mut last_packet = false;
|
||||||
|
while !last_packet {
|
||||||
|
let reply = aux_transact(aux_mutex, linkno,
|
||||||
|
&Packet::AnalyzerDataRequest { destination: destination }, timer,).await;
|
||||||
|
match reply {
|
||||||
|
Ok(Packet::AnalyzerData { last, length, data }) => {
|
||||||
|
last_packet = last;
|
||||||
|
remote_data.extend(&data[0..length as usize]);
|
||||||
|
},
|
||||||
|
Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"),
|
||||||
|
Err(e) => return Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(RemoteBuffer {
|
||||||
|
sent_bytes: sent,
|
||||||
|
total_byte_count: total,
|
||||||
|
error: overflow,
|
||||||
|
data: remote_data
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn analyzer_query(aux_mutex: &Rc<Mutex<bool>>,
|
||||||
|
routing_table: &drtio_routing::RoutingTable,
|
||||||
|
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
|
||||||
|
timer: GlobalTimer,
|
||||||
|
) -> Result<Vec<RemoteBuffer>, &'static str> {
|
||||||
|
let mut remote_buffers: Vec<RemoteBuffer> = Vec::new();
|
||||||
|
for i in 1..drtio_routing::DEST_COUNT {
|
||||||
|
if destination_up(up_destinations, i as u8).await {
|
||||||
|
remote_buffers.push(analyzer_get_data(aux_mutex, routing_table, timer, i as u8).await?);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(remote_buffers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_device_map(cfg: &Config) -> BTreeMap<u32, String> {
|
fn read_device_map(cfg: &Config) -> BTreeMap<u32, String> {
|
||||||
|
@ -74,6 +74,13 @@ impl Analyzer {
|
|||||||
self.sent_bytes = if wraparound { BUFFER_SIZE } else { total_byte_count as usize };
|
self.sent_bytes = if wraparound { BUFFER_SIZE } else { total_byte_count as usize };
|
||||||
self.data_iter = if wraparound { (total_byte_count % BUFFER_SIZE as u64) as usize } else { 0 };
|
self.data_iter = if wraparound { (total_byte_count % BUFFER_SIZE as u64) as usize } else { 0 };
|
||||||
|
|
||||||
|
if overflow {
|
||||||
|
warn!("overflow occured");
|
||||||
|
}
|
||||||
|
if bus_err {
|
||||||
|
warn!("bus error occured");
|
||||||
|
}
|
||||||
|
|
||||||
Header {
|
Header {
|
||||||
total_byte_count: total_byte_count,
|
total_byte_count: total_byte_count,
|
||||||
sent_bytes: self.sent_bytes as u32,
|
sent_bytes: self.sent_bytes as u32,
|
||||||
|
Loading…
Reference in New Issue
Block a user