From ddfe51e7ac2f7a68b01006c9ec97a35aeed416c9 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 19 May 2023 09:41:26 +0800 Subject: [PATCH] analyzer: use transactions for data transmission --- .../firmware/libproto_artiq/drtioaux_proto.rs | 18 ++- artiq/firmware/runtime/analyzer.rs | 37 +++-- artiq/firmware/runtime/main.rs | 2 +- artiq/firmware/runtime/rtio_mgt.rs | 19 +-- artiq/firmware/satman/analyzer.rs | 126 ++++++++++++++---- artiq/firmware/satman/main.rs | 36 +++-- 6 files changed, 173 insertions(+), 65 deletions(-) diff --git a/artiq/firmware/libproto_artiq/drtioaux_proto.rs b/artiq/firmware/libproto_artiq/drtioaux_proto.rs index d09be2745..94f26dca7 100644 --- a/artiq/firmware/libproto_artiq/drtioaux_proto.rs +++ b/artiq/firmware/libproto_artiq/drtioaux_proto.rs @@ -58,8 +58,9 @@ pub enum Packet { SpiReadReply { succeeded: bool, data: u32 }, SpiBasicReply { succeeded: bool }, - AnalyzerRequest { destination: u8 }, + AnalyzerHeaderRequest { destination: u8 }, AnalyzerHeader { sent_bytes: u32, total_byte_count: u64, overflow_occurred: bool }, + AnalyzerDataRequest { destination: u8 }, AnalyzerData { last: bool, length: u16, data: [u8; ANALYZER_MAX_SIZE]}, DmaAddTraceRequest { destination: u8, id: u32, last: bool, length: u16, trace: [u8; DMA_TRACE_MAX_SIZE] }, @@ -200,7 +201,7 @@ impl Packet { succeeded: reader.read_bool()? }, - 0xa0 => Packet::AnalyzerRequest { + 0xa0 => Packet::AnalyzerHeaderRequest { destination: reader.read_u8()? }, 0xa1 => Packet::AnalyzerHeader { @@ -208,7 +209,10 @@ impl Packet { total_byte_count: reader.read_u64()?, overflow_occurred: reader.read_bool()?, }, - 0xa2 => { + 0xa2 => Packet::AnalyzerDataRequest { + destination: reader.read_u8()? + }, + 0xa3 => { let last = reader.read_bool()?; let length = reader.read_u16()?; let mut data: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; @@ -420,7 +424,7 @@ impl Packet { writer.write_bool(succeeded)?; }, - Packet::AnalyzerRequest { destination } => { + Packet::AnalyzerHeaderRequest { destination } => { writer.write_u8(0xa0)?; writer.write_u8(destination)?; }, @@ -430,8 +434,12 @@ impl Packet { writer.write_u64(total_byte_count)?; writer.write_bool(overflow_occurred)?; }, - Packet::AnalyzerData { last, length, data } => { + Packet::AnalyzerDataRequest { destination } => { writer.write_u8(0xa2)?; + writer.write_u8(destination)?; + }, + Packet::AnalyzerData { last, length, data } => { + writer.write_u8(0xa3)?; writer.write_bool(last)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; diff --git a/artiq/firmware/runtime/analyzer.rs b/artiq/firmware/runtime/analyzer.rs index 0674d3ddc..4b5bd222d 100644 --- a/artiq/firmware/runtime/analyzer.rs +++ b/artiq/firmware/runtime/analyzer.rs @@ -2,7 +2,6 @@ use io::{Write, Error as IoError}; use board_misoc::{csr, cache}; use sched::{Io, Mutex, TcpListener, TcpStream, Error as SchedError}; use analyzer_proto::*; -use alloc::vec::Vec; use urc::Urc; use board_artiq::drtio_routing; use core::cell::RefCell; @@ -43,6 +42,7 @@ fn disarm() { pub mod remote_analyzer { use super::*; use rtio_mgt::drtio; + use alloc::vec::Vec; pub struct RemoteBuffer { pub total_byte_count: u64, @@ -53,7 +53,7 @@ pub mod remote_analyzer { pub fn get_data(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - up_destinations: &Urc>) -> RemoteBuffer { + up_destinations: &Urc>) -> Result { // gets data from satellites and returns consolidated data let mut remote_data: Vec = Vec::new(); let mut remote_overflow = false; @@ -62,7 +62,7 @@ pub mod remote_analyzer { let data_vec = match drtio::analyzer_query(io, aux_mutex, ddma_mutex, routing_table, up_destinations) { Ok(data_vec) => data_vec, - Err(e) => { error!("error retrieving remote analyzer data: {}", e); Vec::new() } + Err(e) => return Err(e) }; for data in data_vec { remote_total_bytes += data.total_byte_count; @@ -71,12 +71,12 @@ pub mod remote_analyzer { remote_data.extend(data.data); } - RemoteBuffer { + Ok(RemoteBuffer { total_byte_count: remote_total_bytes, sent_bytes: remote_sent_bytes, overflow_occurred: remote_overflow, data: remote_data - } + }) } } @@ -98,12 +98,25 @@ fn worker(stream: &mut TcpStream, _io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mu let remote = remote_analyzer::get_data( _io, _aux_mutex, _ddma_mutex, _routing_table, _up_destinations); #[cfg(has_drtio)] - let header = Header { - total_byte_count: local_total_byte_count + remote.total_byte_count, - sent_bytes: local_sent_bytes + remote.sent_bytes, - overflow_occurred: local_overflow_occurred | remote.overflow_occurred, - log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8, - dds_onehot_sel: true + let (header, remote_data) = match remote { + Ok(remote) => (Header { + total_byte_count: local_total_byte_count + remote.total_byte_count, + sent_bytes: local_sent_bytes + remote.sent_bytes, + overflow_occurred: local_overflow_occurred | remote.overflow_occurred, + log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8, + dds_onehot_sel: true + }, remote.data), + Err(e) => { + error!("Error getting remote analyzer data: {}", e); + (Header { + total_byte_count: local_total_byte_count, + sent_bytes: local_sent_bytes, + overflow_occurred: true, + log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8, + dds_onehot_sel: true + }, + Vec::new()) + } }; #[cfg(not(has_drtio))] @@ -125,7 +138,7 @@ fn worker(stream: &mut TcpStream, _io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mu stream.write_all(&local_data[..pointer])?; } #[cfg(has_drtio)] - stream.write_all(&remote.data)?; + stream.write_all(&remote_data)?; Ok(()) } diff --git a/artiq/firmware/runtime/main.rs b/artiq/firmware/runtime/main.rs index 31901227f..285768e8e 100644 --- a/artiq/firmware/runtime/main.rs +++ b/artiq/firmware/runtime/main.rs @@ -214,7 +214,7 @@ fn startup() { let drtio_routing_table = drtio_routing_table.clone(); let up_destinations = up_destinations.clone(); let ddma_mutex = ddma_mutex.clone(); - io.spawn(4096, move |io| { analyzer::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table, &up_destinations) }); + io.spawn(8192, move |io| { analyzer::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table, &up_destinations) }); } #[cfg(has_grabber)] diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 916ef86d9..de1b3332c 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -411,28 +411,29 @@ pub mod drtio { routing_table: &drtio_routing::RoutingTable, destination: u8 ) -> Result { let linkno = routing_table.0[destination as usize][0] - 1; - let reply = aux_transact(io, aux_mutex, ddma_mutex, linkno, &drtioaux::Packet::AnalyzerRequest { - destination: destination }); + let reply = aux_transact(io, aux_mutex, ddma_mutex, linkno, + &drtioaux::Packet::AnalyzerHeaderRequest { destination: destination }); let (sent, total, overflow) = match reply { Ok(drtioaux::Packet::AnalyzerHeader { sent_bytes, total_byte_count, overflow_occurred } ) => (sent_bytes, total_byte_count, overflow_occurred), - Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"), - Err(_) => return Err("aux error on remote analyzer request") + Ok(_) => return Err("received unexpected aux packet during remote analyzer header request"), + Err(e) => return Err(e) }; let mut remote_data: Vec = Vec::new(); if sent > 0 { let mut last_packet = false; while !last_packet { - let reply = recv_aux_timeout(io, linkno, 200); + let reply = aux_transact(io, aux_mutex, ddma_mutex, linkno, + &drtioaux::Packet::AnalyzerDataRequest { destination: destination }); match reply { Ok(drtioaux::Packet::AnalyzerData { last, length, data }) => { last_packet = last; remote_data.extend(&data[0..length as usize]); }, Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"), - Err(_) => return Err("aux error on remote analyzer request") + Err(e) => return Err(e) } } } @@ -451,9 +452,9 @@ pub mod drtio { up_destinations: &Urc> ) -> Result, &'static str> { let mut remote_buffers: Vec = Vec::new(); - for i in 0..drtio_routing::DEST_COUNT as u8 { - if destination_up(up_destinations, i) { - remote_buffers.push(analyzer_get_data(io, aux_mutex, ddma_mutex, routing_table, i)?); + for i in 1..drtio_routing::DEST_COUNT { + if destination_up(up_destinations, i as u8) { + remote_buffers.push(analyzer_get_data(io, aux_mutex, ddma_mutex, routing_table, i as u8)?); } } Ok(remote_buffers) diff --git a/artiq/firmware/satman/analyzer.rs b/artiq/firmware/satman/analyzer.rs index 4378d1deb..189d931f6 100644 --- a/artiq/firmware/satman/analyzer.rs +++ b/artiq/firmware/satman/analyzer.rs @@ -1,5 +1,4 @@ use board_misoc::{csr, cache}; -use board_artiq::drtioaux; use proto_artiq::drtioaux_proto::ANALYZER_MAX_SIZE; const BUFFER_SIZE: usize = 512 * 1024; @@ -13,7 +12,7 @@ static mut BUFFER: Buffer = Buffer { data: [0; BUFFER_SIZE] }; -pub fn arm() { +fn arm() { unsafe { let base_addr = &mut BUFFER.data[0] as *mut _ as usize; let last_addr = &mut BUFFER.data[BUFFER_SIZE - 1] as *mut _ as usize; @@ -25,7 +24,7 @@ pub fn arm() { } } -pub fn disarm() { +fn disarm() { unsafe { csr::rtio_analyzer::enable_write(0); while csr::rtio_analyzer::busy_read() != 0 {} @@ -34,38 +33,109 @@ pub fn disarm() { } } -pub fn send() -> Result<(), drtioaux::Error> { - let data = unsafe { &BUFFER.data[..] }; - let overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 }; - let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() }; - let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize; - let wraparound = total_byte_count >= BUFFER_SIZE as u64; - let sent_bytes = if wraparound { BUFFER_SIZE } else { total_byte_count as usize }; +pub struct Analyzer { + // necessary for keeping track of sent data + sent_bytes: usize, + data_iter: usize +} - drtioaux::send(0, &drtioaux::Packet::AnalyzerHeader { - total_byte_count: total_byte_count, - sent_bytes: sent_bytes as u32, - overflow_occurred: overflow_occurred, - })?; +pub struct Header { + pub total_byte_count: u64, + pub sent_bytes: u32, + pub overflow: bool +} - let mut i = if wraparound { pointer } else { 0 }; - while i < sent_bytes { - let mut data_slice: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; - let len: usize = if i + ANALYZER_MAX_SIZE < sent_bytes { ANALYZER_MAX_SIZE } else { sent_bytes - i } as usize; - let last = i + len == sent_bytes; +pub struct AnalyzerSliceMeta { + pub len: u16, + pub last: bool +} + +impl Analyzer { + pub fn new() -> Analyzer { + // create and arm new Analyzer + arm(); + Analyzer { + sent_bytes: 0, + data_iter: 0 + } + } + + pub fn get_header(&mut self) -> Header { + disarm(); + + let overflow = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 }; + let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() }; + let wraparound = total_byte_count >= BUFFER_SIZE as u64; + self.sent_bytes = if wraparound { BUFFER_SIZE } else { total_byte_count as usize }; + self.data_iter = if wraparound { (total_byte_count % BUFFER_SIZE as u64) as usize } else { 0 }; + + Header { + total_byte_count: total_byte_count, + sent_bytes: self.sent_bytes as u32, + overflow: overflow + } + } + + pub fn get_data(&mut self, data_slice: &mut [u8; ANALYZER_MAX_SIZE]) -> AnalyzerSliceMeta { + let data = unsafe { &BUFFER.data[..] }; + let i = self.data_iter; + let len = if i + ANALYZER_MAX_SIZE < self.sent_bytes { ANALYZER_MAX_SIZE } else { self.sent_bytes - i }; + let last = i + len == self.sent_bytes; if i + len >= BUFFER_SIZE { data_slice[..len].clone_from_slice(&data[i..BUFFER_SIZE]); data_slice[..len].clone_from_slice(&data[..(i+len) % BUFFER_SIZE]); } else { data_slice[..len].clone_from_slice(&data[i..i+len]); } - i += len; - drtioaux::send(0, &drtioaux::Packet::AnalyzerData { - last: last, - length: len as u16, - data: data_slice, - })?; - } + self.data_iter += len; - Ok(()) + if last { + arm(); + } + + AnalyzerSliceMeta { + len: len as u16, + last: last + } + } } + + +// pub fn send() -> Result<(), drtioaux::Error> { +// let data = unsafe { &BUFFER.data[..] }; +// let overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 }; +// let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() }; +// let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize; +// let wraparound = total_byte_count >= BUFFER_SIZE as u64; +// let sent_bytes = if wraparound { BUFFER_SIZE } else { total_byte_count as usize }; + +// drtioaux::send(0, &drtioaux::Packet::AnalyzerHeader { +// total_byte_count: total_byte_count, +// sent_bytes: sent_bytes as u32, +// overflow_occurred: overflow_occurred, +// })?; + +// info!("header sent, total bytes: {}; sent: {}, overflow: {}", total_byte_count, sent_bytes, overflow_occurred); + +// let mut i = if wraparound { pointer } else { 0 }; +// while i < sent_bytes { +// let mut data_slice: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; +// let len: usize = if i + ANALYZER_MAX_SIZE < sent_bytes { ANALYZER_MAX_SIZE } else { sent_bytes - i } as usize; +// let last = i + len == sent_bytes; +// if i + len >= BUFFER_SIZE { +// data_slice[..len].clone_from_slice(&data[i..BUFFER_SIZE]); +// data_slice[..len].clone_from_slice(&data[..(i+len) % BUFFER_SIZE]); +// } else { +// data_slice[..len].clone_from_slice(&data[i..i+len]); +// } +// i += len; +// drtioaux::send(0, &drtioaux::Packet::AnalyzerData { +// last: last, +// length: len as u16, +// data: data_slice, +// })?; +// //info!("sent data, last: {}, length: {}", last, len); +// } + +// Ok(()) +// } diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index 65282701e..4e9f8fa33 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -16,8 +16,10 @@ use board_misoc::{csr, ident, clock, uart_logger, i2c, pmp}; use board_artiq::si5324; use board_artiq::{spi, drtioaux}; use board_artiq::drtio_routing; +use proto_artiq::drtioaux_proto::ANALYZER_MAX_SIZE; use riscv::register::{mcause, mepc, mtval}; use dma::Manager as DmaManager; +use analyzer::Analyzer; #[global_allocator] static mut ALLOC: alloc_list::ListAlloc = alloc_list::EMPTY; @@ -75,7 +77,7 @@ macro_rules! forward { ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr) => {} } -fn process_aux_packet(_manager: &mut DmaManager, _repeaters: &mut [repeater::Repeater], +fn process_aux_packet(_manager: &mut DmaManager, analyzer: &mut Analyzer, _repeaters: &mut [repeater::Repeater], _routing_table: &mut drtio_routing::RoutingTable, _rank: &mut u8, packet: drtioaux::Packet) -> Result<(), drtioaux::Error> { // In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels, @@ -303,12 +305,25 @@ fn process_aux_packet(_manager: &mut DmaManager, _repeaters: &mut [repeater::Rep } } - drtioaux::Packet::AnalyzerRequest { destination: _destination } => { + drtioaux::Packet::AnalyzerHeaderRequest { destination: _destination } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet); - analyzer::disarm(); - let res = analyzer::send(); - analyzer::arm(); - res + let header = analyzer.get_header(); + drtioaux::send(0, &drtioaux::Packet::AnalyzerHeader { + total_byte_count: header.total_byte_count, + sent_bytes: header.sent_bytes, + overflow_occurred: header.overflow, + }) + } + + drtioaux::Packet::AnalyzerDataRequest { destination: _destination } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + let mut data_slice: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; + let meta = analyzer.get_data(&mut data_slice); + drtioaux::send(0, &drtioaux::Packet::AnalyzerData { + last: meta.last, + length: meta.len, + data: data_slice, + }) } #[cfg(has_rtio_dma)] @@ -340,12 +355,13 @@ fn process_aux_packet(_manager: &mut DmaManager, _repeaters: &mut [repeater::Rep } } -fn process_aux_packets(dma_manager: &mut DmaManager, repeaters: &mut [repeater::Repeater], +fn process_aux_packets(dma_manager: &mut DmaManager, analyzer: &mut Analyzer, + repeaters: &mut [repeater::Repeater], routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8) { let result = drtioaux::recv(0).and_then(|packet| { if let Some(packet) = packet { - process_aux_packet(dma_manager, repeaters, routing_table, rank, packet) + process_aux_packet(dma_manager, analyzer, repeaters, routing_table, rank, packet) } else { Ok(()) } @@ -558,7 +574,7 @@ pub extern fn main() -> i32 { let mut dma_manager = DmaManager::new(); // Reset the analyzer as well. - analyzer::arm(); + let mut analyzer = Analyzer::new(); drtioaux::reset(0); drtiosat_reset(false); @@ -566,7 +582,7 @@ pub extern fn main() -> i32 { while drtiosat_link_rx_up() { drtiosat_process_errors(); - process_aux_packets(&mut dma_manager, &mut repeaters, &mut routing_table, &mut rank); + process_aux_packets(&mut dma_manager, &mut analyzer, &mut repeaters, &mut routing_table, &mut rank); for rep in repeaters.iter_mut() { rep.service(&routing_table, rank); }