forked from M-Labs/artiq
analyzer: implement querying up satellites for data
This commit is contained in:
parent
0b03126038
commit
6c96033d41
@ -1,7 +1,11 @@
|
|||||||
use io::{Write, Error as IoError};
|
use io::{Write, Error as IoError};
|
||||||
use board_misoc::{csr, cache};
|
use board_misoc::{csr, cache};
|
||||||
use sched::{Io, TcpListener, TcpStream, Error as SchedError};
|
use sched::{Io, Mutex, TcpListener, TcpStream, Error as SchedError};
|
||||||
use analyzer_proto::*;
|
use analyzer_proto::*;
|
||||||
|
use alloc::vec::Vec;
|
||||||
|
use urc::Urc;
|
||||||
|
use board_artiq::drtio_routing;
|
||||||
|
use core::cell::RefCell;
|
||||||
|
|
||||||
const BUFFER_SIZE: usize = 512 * 1024;
|
const BUFFER_SIZE: usize = 512 * 1024;
|
||||||
|
|
||||||
@ -35,17 +39,78 @@ fn disarm() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn worker(stream: &mut TcpStream) -> Result<(), IoError<SchedError>> {
|
#[cfg(has_drtio)]
|
||||||
let data = unsafe { &BUFFER.data[..] };
|
pub mod remote_analyzer {
|
||||||
let overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 };
|
use super::*;
|
||||||
let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() };
|
use rtio_mgt::drtio;
|
||||||
let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize;
|
|
||||||
let wraparound = total_byte_count >= BUFFER_SIZE as u64;
|
pub struct RemoteBuffer {
|
||||||
|
pub total_byte_count: u64,
|
||||||
|
pub sent_bytes: u32,
|
||||||
|
pub overflow_occurred: bool,
|
||||||
|
pub data: Vec<u8>
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_data(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex,
|
||||||
|
routing_table: &drtio_routing::RoutingTable,
|
||||||
|
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) -> RemoteBuffer {
|
||||||
|
// gets data from satellites and returns consolidated data
|
||||||
|
let mut remote_data: Vec<u8> = Vec::new();
|
||||||
|
let mut remote_overflow = false;
|
||||||
|
let mut remote_sent_bytes = 0;
|
||||||
|
let mut remote_total_bytes = 0;
|
||||||
|
|
||||||
|
let data_vec = match drtio::analyzer_query(io, aux_mutex, ddma_mutex, routing_table, up_destinations) {
|
||||||
|
Ok(data_vec) => data_vec,
|
||||||
|
Err(e) => { error!("error retrieving remote analyzer data: {}", e); Vec::new() }
|
||||||
|
};
|
||||||
|
for data in data_vec {
|
||||||
|
remote_total_bytes += data.total_byte_count;
|
||||||
|
remote_sent_bytes += data.sent_bytes;
|
||||||
|
remote_overflow |= data.overflow_occurred;
|
||||||
|
remote_data.extend(data.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteBuffer {
|
||||||
|
total_byte_count: remote_total_bytes,
|
||||||
|
sent_bytes: remote_sent_bytes,
|
||||||
|
overflow_occurred: remote_overflow,
|
||||||
|
data: remote_data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
fn worker(stream: &mut TcpStream, _io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mutex,
|
||||||
|
_routing_table: &drtio_routing::RoutingTable,
|
||||||
|
_up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>
|
||||||
|
) -> Result<(), IoError<SchedError>> {
|
||||||
|
let local_data = unsafe { &BUFFER.data[..] };
|
||||||
|
let local_overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 };
|
||||||
|
let local_total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() };
|
||||||
|
|
||||||
|
let wraparound = local_total_byte_count >= BUFFER_SIZE as u64;
|
||||||
|
let local_sent_bytes = if wraparound { BUFFER_SIZE as u32 } else { local_total_byte_count as u32 };
|
||||||
|
let pointer = (local_total_byte_count % BUFFER_SIZE as u64) as usize;
|
||||||
|
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
let remote = remote_analyzer::get_data(
|
||||||
|
_io, _aux_mutex, _ddma_mutex, _routing_table, _up_destinations);
|
||||||
|
#[cfg(has_drtio)]
|
||||||
let header = Header {
|
let header = Header {
|
||||||
total_byte_count: total_byte_count,
|
total_byte_count: local_total_byte_count + remote.total_byte_count,
|
||||||
sent_bytes: if wraparound { BUFFER_SIZE as u32 } else { total_byte_count as u32 },
|
sent_bytes: local_sent_bytes + remote.sent_bytes,
|
||||||
overflow_occurred: overflow_occurred,
|
overflow_occurred: local_overflow_occurred | remote.overflow_occurred,
|
||||||
|
log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
||||||
|
dds_onehot_sel: true
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(not(has_drtio))]
|
||||||
|
let header = Header {
|
||||||
|
total_byte_count: local_total_byte_count,
|
||||||
|
sent_bytes: local_sent_bytes,
|
||||||
|
overflow_occurred: local_overflow_occurred,
|
||||||
log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8,
|
||||||
dds_onehot_sel: true // kept for backward compatibility of analyzer dumps
|
dds_onehot_sel: true // kept for backward compatibility of analyzer dumps
|
||||||
};
|
};
|
||||||
@ -54,16 +119,20 @@ fn worker(stream: &mut TcpStream) -> Result<(), IoError<SchedError>> {
|
|||||||
stream.write_all("e".as_bytes())?;
|
stream.write_all("e".as_bytes())?;
|
||||||
header.write_to(stream)?;
|
header.write_to(stream)?;
|
||||||
if wraparound {
|
if wraparound {
|
||||||
stream.write_all(&data[pointer..])?;
|
stream.write_all(&local_data[pointer..])?;
|
||||||
stream.write_all(&data[..pointer])?;
|
stream.write_all(&local_data[..pointer])?;
|
||||||
} else {
|
} else {
|
||||||
stream.write_all(&data[..pointer])?;
|
stream.write_all(&local_data[..pointer])?;
|
||||||
}
|
}
|
||||||
|
#[cfg(has_drtio)]
|
||||||
|
stream.write_all(&remote.data)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn thread(io: Io) {
|
pub fn thread(io: Io, aux_mutex: &Mutex, ddma_mutex: &Mutex,
|
||||||
|
routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
|
||||||
|
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
|
||||||
let listener = TcpListener::new(&io, 65535);
|
let listener = TcpListener::new(&io, 65535);
|
||||||
listener.listen(1382).expect("analyzer: cannot listen");
|
listener.listen(1382).expect("analyzer: cannot listen");
|
||||||
|
|
||||||
@ -75,7 +144,8 @@ pub fn thread(io: Io) {
|
|||||||
|
|
||||||
disarm();
|
disarm();
|
||||||
|
|
||||||
match worker(&mut stream) {
|
let routing_table = routing_table.borrow();
|
||||||
|
match worker(&mut stream, &io, aux_mutex, ddma_mutex, &routing_table, up_destinations) {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(err) => error!("analyzer aborted: {}", err)
|
Err(err) => error!("analyzer aborted: {}", err)
|
||||||
}
|
}
|
||||||
|
@ -209,7 +209,13 @@ fn startup() {
|
|||||||
io.spawn(4096, move |io| { moninj::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table) });
|
io.spawn(4096, move |io| { moninj::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table) });
|
||||||
}
|
}
|
||||||
#[cfg(has_rtio_analyzer)]
|
#[cfg(has_rtio_analyzer)]
|
||||||
io.spawn(4096, analyzer::thread);
|
{
|
||||||
|
let aux_mutex = aux_mutex.clone();
|
||||||
|
let drtio_routing_table = drtio_routing_table.clone();
|
||||||
|
let up_destinations = up_destinations.clone();
|
||||||
|
let ddma_mutex = ddma_mutex.clone();
|
||||||
|
io.spawn(4096, move |io| { analyzer::thread(io, &aux_mutex, &ddma_mutex, &drtio_routing_table, &up_destinations) });
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(has_grabber)]
|
#[cfg(has_grabber)]
|
||||||
io.spawn(4096, grabber_thread);
|
io.spawn(4096, grabber_thread);
|
||||||
|
@ -19,6 +19,8 @@ pub mod drtio {
|
|||||||
use drtioaux;
|
use drtioaux;
|
||||||
use proto_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE;
|
use proto_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE;
|
||||||
use rtio_dma::remote_dma;
|
use rtio_dma::remote_dma;
|
||||||
|
#[cfg(has_rtio_analyzer)]
|
||||||
|
use analyzer::remote_analyzer::RemoteBuffer;
|
||||||
|
|
||||||
pub fn startup(io: &Io, aux_mutex: &Mutex,
|
pub fn startup(io: &Io, aux_mutex: &Mutex,
|
||||||
routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
|
routing_table: &Urc<RefCell<drtio_routing::RoutingTable>>,
|
||||||
@ -404,6 +406,59 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(has_rtio_analyzer)]
|
||||||
|
fn analyzer_get_data(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex,
|
||||||
|
routing_table: &drtio_routing::RoutingTable, destination: u8
|
||||||
|
) -> Result<RemoteBuffer, &'static str> {
|
||||||
|
let linkno = routing_table.0[destination as usize][0] - 1;
|
||||||
|
let reply = aux_transact(io, aux_mutex, ddma_mutex, linkno, &drtioaux::Packet::AnalyzerRequest {
|
||||||
|
destination: destination });
|
||||||
|
let (sent, total, overflow) = match reply {
|
||||||
|
Ok(drtioaux::Packet::AnalyzerHeader {
|
||||||
|
sent_bytes, total_byte_count, overflow_occurred }
|
||||||
|
) => (sent_bytes, total_byte_count, overflow_occurred),
|
||||||
|
Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"),
|
||||||
|
Err(_) => return Err("aux error on remote analyzer request")
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut remote_data: Vec<u8> = Vec::new();
|
||||||
|
if sent > 0 {
|
||||||
|
let mut last_packet = false;
|
||||||
|
while !last_packet {
|
||||||
|
let reply = recv_aux_timeout(io, linkno, 200);
|
||||||
|
match reply {
|
||||||
|
Ok(drtioaux::Packet::AnalyzerData { last, length, data }) => {
|
||||||
|
last_packet = last;
|
||||||
|
remote_data.extend(&data[0..length as usize]);
|
||||||
|
},
|
||||||
|
Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"),
|
||||||
|
Err(_) => return Err("aux error on remote analyzer request")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(RemoteBuffer {
|
||||||
|
sent_bytes: sent,
|
||||||
|
total_byte_count: total,
|
||||||
|
overflow_occurred: overflow,
|
||||||
|
data: remote_data
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(has_rtio_analyzer)]
|
||||||
|
pub fn analyzer_query(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex,
|
||||||
|
routing_table: &drtio_routing::RoutingTable,
|
||||||
|
up_destinations: &Urc<RefCell<[bool; drtio_routing::DEST_COUNT]>>
|
||||||
|
) -> Result<Vec<RemoteBuffer>, &'static str> {
|
||||||
|
let mut remote_buffers: Vec<RemoteBuffer> = Vec::new();
|
||||||
|
for i in 0..drtio_routing::DEST_COUNT as u8 {
|
||||||
|
if destination_up(up_destinations, i) {
|
||||||
|
remote_buffers.push(analyzer_get_data(io, aux_mutex, ddma_mutex, routing_table, i)?);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(remote_buffers)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(has_drtio))]
|
#[cfg(not(has_drtio))]
|
||||||
|
Loading…
Reference in New Issue
Block a user