From a27b450defc8c2e31657c544b01690782071eb53 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 19 May 2023 13:17:04 +0800 Subject: [PATCH] runtime: port drtio-enabled analyzer --- src/runtime/src/analyzer.rs | 101 ++++++++++++++++++++++++++++++++---- src/runtime/src/comms.rs | 2 +- src/runtime/src/rtio_mgt.rs | 63 +++++++++++++++++++++- src/satman/src/analyzer.rs | 9 +++- 4 files changed, 161 insertions(+), 14 deletions(-) diff --git a/src/runtime/src/analyzer.rs b/src/runtime/src/analyzer.rs index 532fb30b..16106c9e 100644 --- a/src/runtime/src/analyzer.rs +++ b/src/runtime/src/analyzer.rs @@ -1,7 +1,10 @@ use libasync::{smoltcp::TcpStream, task}; -use libboard_zynq::smoltcp::Error; -use libcortex_a9::cache; -use log::{debug, info, warn}; +use libboard_zynq::{smoltcp::Error, timer::GlobalTimer}; +use libcortex_a9::{cache, mutex::Mutex}; +use log::{debug, info, warn, error}; +use core::cell::RefCell; +use alloc::{rc::Rc, vec::Vec}; +use libboard_artiq::drtio_routing; use crate::{pl, proto_async::*}; @@ -37,6 +40,48 @@ fn disarm() { debug!("RTIO analyzer disarmed"); } +#[cfg(has_drtio)] +pub mod remote_analyzer { + use super::*; + use crate::rtio_mgt::drtio; + + pub struct RemoteBuffer { + pub total_byte_count: u64, + pub sent_bytes: u32, + pub error: bool, + pub data: Vec + } + + pub async fn get_data(aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + up_destinations: &Rc>, + timer: GlobalTimer) -> Result { + // gets data from satellites and returns consolidated data + let mut remote_data: Vec = Vec::new(); + let mut remote_error = false; + let mut remote_sent_bytes = 0; + let mut remote_total_bytes = 0; + + let data_vec = match drtio::analyzer_query(aux_mutex, routing_table, up_destinations, timer).await { + Ok(data_vec) => data_vec, + Err(e) => return Err(e) + }; + for data in data_vec { + remote_total_bytes += data.total_byte_count; + remote_sent_bytes += data.sent_bytes; + remote_error |= data.error; + remote_data.extend(data.data); + } + + Ok(RemoteBuffer { + total_byte_count: remote_total_bytes, + sent_bytes: remote_sent_bytes, + error: remote_error, + data: remote_data + }) + } +} + #[derive(Debug)] struct Header { sent_bytes: u32, @@ -56,7 +101,10 @@ async fn write_header(stream: &mut TcpStream, header: &Header) -> Result<(), Err Ok(()) } -async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { +async fn handle_connection(stream: &mut TcpStream, _aux_mutex: &Rc>, + _routing_table: &drtio_routing::RoutingTable, + _up_destinations: &Rc>, + _timer: GlobalTimer) -> Result<(), Error> { info!("received connection"); let data = unsafe { &BUFFER.data[..] }; @@ -65,6 +113,7 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { let total_byte_count = unsafe { pl::csr::rtio_analyzer::dma_byte_count_read() as u64 }; let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize; let wraparound = total_byte_count >= BUFFER_SIZE as u64; + let sent_bytes = if wraparound { BUFFER_SIZE as u32 } else { total_byte_count as u32 }; if overflow_occurred { warn!("overflow occured"); @@ -73,13 +122,34 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { warn!("bus error occured"); } + #[cfg(has_drtio)] + let remote = remote_analyzer::get_data(_aux_mutex, _routing_table, _up_destinations, _timer).await; + #[cfg(has_drtio)] + let (header, remote_data) = match remote { + Ok(remote) => (Header { + total_byte_count: total_byte_count + remote.total_byte_count, + sent_bytes: sent_bytes + remote.sent_bytes, + error_occurred: overflow_occurred | bus_error_occurred | remote.error, + log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8, + dds_onehot_sel: true + }, remote.data), + Err(e) => { + error!("Error getting remote analyzer data: {}", e); + (Header { + total_byte_count: total_byte_count, + sent_bytes: sent_bytes, + error_occurred: true, + log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8, + dds_onehot_sel: true + }, + Vec::new()) + } + }; + + #[cfg(not(has_drtio))] let header = Header { total_byte_count: total_byte_count, - sent_bytes: if wraparound { - BUFFER_SIZE as u32 - } else { - total_byte_count as u32 - }, + sent_bytes: sent_bytes, error_occurred: overflow_occurred | bus_error_occurred, log_channel: pl::csr::CONFIG_RTIO_LOG_CHANNEL as u8, dds_onehot_sel: true, // kept for backward compatibility of analyzer dumps @@ -93,17 +163,26 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { } else { stream.send(data[..pointer].iter().copied()).await?; } + #[cfg(has_drtio)] + stream.send(remote_data.iter().copied()).await?; Ok(()) } -pub fn start() { +pub fn start(aux_mutex: &Rc>, + routing_table: &Rc>, + up_destinations: &Rc>, + timer: GlobalTimer,) { + let aux_mutex = aux_mutex.clone(); + let routing_table = routing_table.clone(); + let up_destinations = up_destinations.clone(); task::spawn(async move { loop { arm(); let mut stream = TcpStream::accept(1382, 2048, 2048).await.unwrap(); disarm(); - let _ = handle_connection(&mut stream) + let routing_table = routing_table.borrow(); + let _ = handle_connection(&mut stream, &aux_mutex, &routing_table, &up_destinations, timer) .await .map_err(|e| warn!("connection terminated: {:?}", e)); let _ = stream.flush().await; diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index b5db79d3..6475e147 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -524,7 +524,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { rtio_mgt::startup(&aux_mutex, &drtio_routing_table, &up_destinations, timer, &cfg); - analyzer::start(); + analyzer::start(&aux_mutex, &drtio_routing_table, &up_destinations, timer); moninj::start(timer, &aux_mutex, &drtio_routing_table); let control: Rc> = Rc::new(RefCell::new(kernel::Control::start())); diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 43ed83d7..1aea5f22 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -22,7 +22,7 @@ pub mod drtio { use super::*; use crate::{rtio_dma::remote_dma, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, - SEEN_ASYNC_ERRORS}; + SEEN_ASYNC_ERRORS, analyzer::remote_analyzer::RemoteBuffer}; pub fn startup( aux_mutex: &Rc>, @@ -522,6 +522,67 @@ pub mod drtio { Err(_) => Err("aux error on DMA playback"), } } + + async fn analyzer_get_data( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + destination: u8 + ) -> Result { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact( + aux_mutex, + linkno, + &Packet::AnalyzerHeaderRequest { destination: destination }, + timer, + ) + .await; + let (sent, total, overflow) = match reply { + Ok(Packet::AnalyzerHeader { + sent_bytes, total_byte_count, overflow_occurred } + ) => (sent_bytes, total_byte_count, overflow_occurred), + Ok(_) => return Err("received unexpected aux packet during remote analyzer header request"), + Err(e) => return Err(e) + }; + + let mut remote_data: Vec = Vec::new(); + if sent > 0 { + let mut last_packet = false; + while !last_packet { + let reply = aux_transact(aux_mutex, linkno, + &Packet::AnalyzerDataRequest { destination: destination }, timer,).await; + match reply { + Ok(Packet::AnalyzerData { last, length, data }) => { + last_packet = last; + remote_data.extend(&data[0..length as usize]); + }, + Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"), + Err(e) => return Err(e) + } + } + } + + Ok(RemoteBuffer { + sent_bytes: sent, + total_byte_count: total, + error: overflow, + data: remote_data + }) + } + + pub async fn analyzer_query(aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + up_destinations: &Rc>, + timer: GlobalTimer, + ) -> Result, &'static str> { + let mut remote_buffers: Vec = Vec::new(); + for i in 1..drtio_routing::DEST_COUNT { + if destination_up(up_destinations, i as u8).await { + remote_buffers.push(analyzer_get_data(aux_mutex, routing_table, timer, i as u8).await?); + } + } + Ok(remote_buffers) + } } fn read_device_map(cfg: &Config) -> BTreeMap { diff --git a/src/satman/src/analyzer.rs b/src/satman/src/analyzer.rs index d0b4a6d0..ad3b945c 100644 --- a/src/satman/src/analyzer.rs +++ b/src/satman/src/analyzer.rs @@ -73,7 +73,14 @@ impl Analyzer { let wraparound = total_byte_count >= BUFFER_SIZE as u64; self.sent_bytes = if wraparound { BUFFER_SIZE } else { total_byte_count as usize }; self.data_iter = if wraparound { (total_byte_count % BUFFER_SIZE as u64) as usize } else { 0 }; - + + if overflow { + warn!("overflow occured"); + } + if bus_err { + warn!("bus error occured"); + } + Header { total_byte_count: total_byte_count, sent_bytes: self.sent_bytes as u32,