diff --git a/artiq/firmware/runtime/analyzer.rs b/artiq/firmware/runtime/analyzer.rs index 5b090f732..0674d3ddc 100644 --- a/artiq/firmware/runtime/analyzer.rs +++ b/artiq/firmware/runtime/analyzer.rs @@ -1,7 +1,11 @@ use io::{Write, Error as IoError}; use board_misoc::{csr, cache}; -use sched::{Io, TcpListener, TcpStream, Error as SchedError}; +use sched::{Io, Mutex, TcpListener, TcpStream, Error as SchedError}; use analyzer_proto::*; +use alloc::vec::Vec; +use urc::Urc; +use board_artiq::drtio_routing; +use core::cell::RefCell; const BUFFER_SIZE: usize = 512 * 1024; @@ -35,17 +39,78 @@ fn disarm() { } } -fn worker(stream: &mut TcpStream) -> Result<(), IoError> { - let data = unsafe { &BUFFER.data[..] }; - let overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 }; - let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() }; - let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize; - let wraparound = total_byte_count >= BUFFER_SIZE as u64; +#[cfg(has_drtio)] +pub mod remote_analyzer { + use super::*; + use rtio_mgt::drtio; + + pub struct RemoteBuffer { + pub total_byte_count: u64, + pub sent_bytes: u32, + pub overflow_occurred: bool, + pub data: Vec + } + pub fn get_data(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, + up_destinations: &Urc>) -> RemoteBuffer { + // gets data from satellites and returns consolidated data + let mut remote_data: Vec = Vec::new(); + let mut remote_overflow = false; + let mut remote_sent_bytes = 0; + let mut remote_total_bytes = 0; + + let data_vec = match drtio::analyzer_query(io, aux_mutex, ddma_mutex, routing_table, up_destinations) { + Ok(data_vec) => data_vec, + Err(e) => { error!("error retrieving remote analyzer data: {}", e); Vec::new() } + }; + for data in data_vec { + remote_total_bytes += data.total_byte_count; + remote_sent_bytes += data.sent_bytes; + remote_overflow |= data.overflow_occurred; + remote_data.extend(data.data); + } + + RemoteBuffer { + total_byte_count: remote_total_bytes, + sent_bytes: remote_sent_bytes, + overflow_occurred: remote_overflow, + data: remote_data + } + } +} + + + +fn worker(stream: &mut TcpStream, _io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mutex, + _routing_table: &drtio_routing::RoutingTable, + _up_destinations: &Urc> +) -> Result<(), IoError> { + let local_data = unsafe { &BUFFER.data[..] }; + let local_overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 }; + let local_total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() }; + + let wraparound = local_total_byte_count >= BUFFER_SIZE as u64; + let local_sent_bytes = if wraparound { BUFFER_SIZE as u32 } else { local_total_byte_count as u32 }; + let pointer = (local_total_byte_count % BUFFER_SIZE as u64) as usize; + + #[cfg(has_drtio)] + let remote = remote_analyzer::get_data( + _io, _aux_mutex, _ddma_mutex, _routing_table, _up_destinations); + #[cfg(has_drtio)] let header = Header { - total_byte_count: total_byte_count, - sent_bytes: if wraparound { BUFFER_SIZE as u32 } else { total_byte_count as u32 }, - overflow_occurred: overflow_occurred, + total_byte_count: local_total_byte_count + remote.total_byte_count, + sent_bytes: local_sent_bytes + remote.sent_bytes, + overflow_occurred: local_overflow_occurred | remote.overflow_occurred, + log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8, + dds_onehot_sel: true + }; + + #[cfg(not(has_drtio))] + let header = Header { + total_byte_count: local_total_byte_count, + sent_bytes: local_sent_bytes, + overflow_occurred: local_overflow_occurred, log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8, dds_onehot_sel: true // kept for backward compatibility of analyzer dumps }; @@ -54,16 +119,20 @@ fn worker(stream: &mut TcpStream) -> Result<(), IoError> { stream.write_all("e".as_bytes())?; header.write_to(stream)?; if wraparound { - stream.write_all(&data[pointer..])?; - stream.write_all(&data[..pointer])?; + stream.write_all(&local_data[pointer..])?; + stream.write_all(&local_data[..pointer])?; } else { - stream.write_all(&data[..pointer])?; + stream.write_all(&local_data[..pointer])?; } + #[cfg(has_drtio)] + stream.write_all(&remote.data)?; Ok(()) } -pub fn thread(io: Io) { +pub fn thread(io: Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, + routing_table: &Urc>, + up_destinations: &Urc>) { let listener = TcpListener::new(&io, 65535); listener.listen(1382).expect("analyzer: cannot listen"); @@ -75,7 +144,8 @@ pub fn thread(io: Io) { disarm(); - match worker(&mut stream) { + let routing_table = routing_table.borrow(); + match worker(&mut stream, &io, aux_mutex, ddma_mutex, &routing_table, up_destinations) { Ok(()) => (), Err(err) => error!("analyzer aborted: {}", err) } diff --git a/artiq/firmware/runtime/main.rs b/artiq/firmware/runtime/main.rs index 994c9dfc0..31901227f 100644 --- a/artiq/firmware/runtime/main.rs +++ b/artiq/firmware/runtime/main.rs @@ -209,7 +209,13 @@ fn startup() { io.spawn(4096, move |io| { moninj::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table) }); } #[cfg(has_rtio_analyzer)] - io.spawn(4096, analyzer::thread); + { + let aux_mutex = aux_mutex.clone(); + let drtio_routing_table = drtio_routing_table.clone(); + let up_destinations = up_destinations.clone(); + let ddma_mutex = ddma_mutex.clone(); + io.spawn(4096, move |io| { analyzer::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table, &up_destinations) }); + } #[cfg(has_grabber)] io.spawn(4096, grabber_thread); diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 6b864c128..916ef86d9 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -19,6 +19,8 @@ pub mod drtio { use drtioaux; use proto_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE; use rtio_dma::remote_dma; + #[cfg(has_rtio_analyzer)] + use analyzer::remote_analyzer::RemoteBuffer; pub fn startup(io: &Io, aux_mutex: &Mutex, routing_table: &Urc>, @@ -404,6 +406,59 @@ pub mod drtio { } } + #[cfg(has_rtio_analyzer)] + fn analyzer_get_data(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, destination: u8 + ) -> Result { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact(io, aux_mutex, ddma_mutex, linkno, &drtioaux::Packet::AnalyzerRequest { + destination: destination }); + let (sent, total, overflow) = match reply { + Ok(drtioaux::Packet::AnalyzerHeader { + sent_bytes, total_byte_count, overflow_occurred } + ) => (sent_bytes, total_byte_count, overflow_occurred), + Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"), + Err(_) => return Err("aux error on remote analyzer request") + }; + + let mut remote_data: Vec = Vec::new(); + if sent > 0 { + let mut last_packet = false; + while !last_packet { + let reply = recv_aux_timeout(io, linkno, 200); + match reply { + Ok(drtioaux::Packet::AnalyzerData { last, length, data }) => { + last_packet = last; + remote_data.extend(&data[0..length as usize]); + }, + Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"), + Err(_) => return Err("aux error on remote analyzer request") + } + } + } + + Ok(RemoteBuffer { + sent_bytes: sent, + total_byte_count: total, + overflow_occurred: overflow, + data: remote_data + }) + } + + #[cfg(has_rtio_analyzer)] + pub fn analyzer_query(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, + up_destinations: &Urc> + ) -> Result, &'static str> { + let mut remote_buffers: Vec = Vec::new(); + for i in 0..drtio_routing::DEST_COUNT as u8 { + if destination_up(up_destinations, i) { + remote_buffers.push(analyzer_get_data(io, aux_mutex, ddma_mutex, routing_table, i)?); + } + } + Ok(remote_buffers) + } + } #[cfg(not(has_drtio))]