From ff77204d373ce33ef9029a297eec5fe62377f803 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 17 Sep 2021 14:20:07 +0200 Subject: [PATCH] moving drtio master code from mainline artiq: moninj and rtio_mgt --- src/runtime/src/moninj.rs | 107 +++++++++-- src/runtime/src/rtio_mgt.rs | 365 ++++++++++++++++++++++++++++++++++++ 2 files changed, 455 insertions(+), 17 deletions(-) create mode 100644 src/runtime/src/rtio_mgt.rs diff --git a/src/runtime/src/moninj.rs b/src/runtime/src/moninj.rs index d1124b4c..06b47ec2 100644 --- a/src/runtime/src/moninj.rs +++ b/src/runtime/src/moninj.rs @@ -3,8 +3,11 @@ use alloc::collections::BTreeMap; use log::{debug, info, warn}; use void::Void; +use board_artiq::drtio_routing; + use libboard_zynq::{smoltcp, timer::GlobalTimer, time::Milliseconds}; use libasync::{task, smoltcp::TcpStream, block_async, nb}; +use libcortex_a9::mutex::Mutex; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::{FromPrimitive, ToPrimitive}; @@ -54,29 +57,99 @@ enum DeviceMessage { InjectionStatus = 1 } -fn read_probe(channel: i32, probe: i8) -> i32 { - unsafe { - csr::rtio_moninj::mon_chan_sel_write(channel as _); - csr::rtio_moninj::mon_probe_sel_write(probe as _); - csr::rtio_moninj::mon_value_update_write(1); - csr::rtio_moninj::mon_value_read() as i32 +#[cfg(has_drtio)] +mod remote_moninj { + use drtioaux; + use rtio_mgt::drtio; + use sched::{Io, Mutex}; + + pub fn read_probe(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, channel: u16, probe: u8) -> u32 { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::MonitorRequest { + destination: destination, + channel: channel, + probe: probe + }); + match reply { + Ok(drtioaux::Packet::MonitorReply { value }) => return value, + Ok(packet) => error!("received unexpected aux packet: {:?}", packet), + Err(e) => error!("aux packet error ({})", e) + } + 0 + } + + pub fn inject(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, channel: u16, overrd: u8, value: u8) { + let _lock = aux_mutex.lock(io).unwrap(); + drtioaux::send(linkno, &drtioaux::Packet::InjectionRequest { + destination: destination, + channel: channel, + overrd: overrd, + value: value + }).unwrap(); + } + + pub fn read_injection_status(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, channel: u16, overrd: u8) -> u8 { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::InjectionStatusRequest { + destination: destination, + channel: channel, + overrd: overrd + }); + match reply { + Ok(drtioaux::Packet::InjectionStatusReply { value }) => return value, + Ok(packet) => error!("received unexpected aux packet: {:?}", packet), + Err(e) => error!("aux packet error ({})", e) + } + 0 } } -fn inject(channel: i32, overrd: i8, value: i8) { - unsafe { - csr::rtio_moninj::inj_chan_sel_write(channel as _); - csr::rtio_moninj::inj_override_sel_write(overrd as _); - csr::rtio_moninj::inj_value_write(value as _); +mod local_moninj { + fn read_probe(channel: i32, probe: i8) -> i32 { + unsafe { + csr::rtio_moninj::mon_chan_sel_write(channel as _); + csr::rtio_moninj::mon_probe_sel_write(probe as _); + csr::rtio_moninj::mon_value_update_write(1); + csr::rtio_moninj::mon_value_read() as i32 + } + } + + fn inject(channel: i32, overrd: i8, value: i8) { + unsafe { + csr::rtio_moninj::inj_chan_sel_write(channel as _); + csr::rtio_moninj::inj_override_sel_write(overrd as _); + csr::rtio_moninj::inj_value_write(value as _); + } + } + + fn read_injection_status(channel: i32, overrd: i8) -> i8 { + unsafe { + csr::rtio_moninj::inj_chan_sel_write(channel as _); + csr::rtio_moninj::inj_override_sel_write(overrd as _); + csr::rtio_moninj::inj_value_read() as i8 + } } } -fn read_injection_status(channel: i32, overrd: i8) -> i8 { - unsafe { - csr::rtio_moninj::inj_chan_sel_write(channel as _); - csr::rtio_moninj::inj_override_sel_write(overrd as _); - csr::rtio_moninj::inj_value_read() as i8 - } +#[cfg(has_drtio)] +macro_rules! dispatch { + ($routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{ + let destination = ($channel >> 16) as u8; + let channel = $channel as u16; + let hop = $routing_table.0[destination as usize][0]; + if hop == 0 { + local_moninj::$func(channel, $($param, )*) + } else { + let linkno = hop - 1; + remote_moninj::$func(linkno, destination, channel, $($param, )*) + } + }} +} + +#[cfg(not(has_drtio))] +macro_rules! dispatch { + ($routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{ + let channel = $channel as u16; + local_moninj::$func(channel, $($param, )*) + }} } async fn handle_connection(stream: &TcpStream, timer: GlobalTimer) -> Result<()> { diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs new file mode 100644 index 00000000..1cc74b43 --- /dev/null +++ b/src/runtime/src/rtio_mgt.rs @@ -0,0 +1,365 @@ + +use board_artiq::pl::csr; +#[cfg(has_drtio)] +use libboard_zynq::{timer::GlobalTimer, time::Milliseconds}; +use embedded_hal::blocking::delay::DelayMs; +use board_artiq::drtio_routing; +use libcortex_a9::mutex::Mutex; +use libasync::task; + +#[cfg(has_drtio)] +pub mod drtio { + use super::*; + use drtioaux; + + pub fn startup(aux_mutex: &Mutex, + routing_table: drtio_routing::RoutingTable, + up_destinations: [bool; drtio_routing::DEST_COUNT]) { + let aux_mutex = aux_mutex.clone(); + let routing_table = routing_table.clone(); + let up_destinations = up_destinations.clone(); + task::spawn( { + let routing_table = routing_table.borrow(); + link_thread(io, &aux_mutex, &routing_table, &up_destinations); + }); + } + + fn link_rx_up(linkno: u8) -> bool { + let linkno = linkno as usize; + unsafe { + (csr::DRTIO[linkno].rx_up_read)() == 1 + } + } + + fn recv_aux_timeout(linkno: u8, timeout: u32, timer: GlobalTimer) -> Result { + let max_time = timer.get_time() + Milliseconds(timeout); + loop { + if !link_rx_up(linkno) { + return Err("link went down"); + } + if timer.get_time() > max_time { + return Err("timeout"); + } + match drtioaux::recv(linkno) { + Ok(Some(packet)) => return Ok(packet), + Ok(None) => (), + Err(_) => return Err("aux packet error") + } + } + } + + pub fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet, + timer: GlobalTimer) -> Result { + let _lock = aux_mutex.lock().unwrap(); + drtioaux::send(linkno, request).unwrap(); + recv_aux_timeout(io, linkno, 200, timer) + } + + fn ping_remote(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> u32 { + let mut count = 0; + loop { + if !link_rx_up(linkno) { + return 0 + } + count += 1; + if count > 100 { + return 0; + } + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::EchoRequest, timer); + match reply { + Ok(drtioaux::Packet::EchoReply) => { + // make sure receive buffer is drained + let max_time = timer.get_time() + Milliseconds(200); + loop { + if timer.get_time() > max_time { + return count; + } + let _ = drtioaux::recv(linkno); + } + } + _ => {} + } + } + } + + fn sync_tsc(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> Result<(), &'static str> { + let _lock = aux_mutex.lock().unwrap(); + + unsafe { + (csr::DRTIO[linkno as usize].set_time_write)(1); + while (csr::DRTIO[linkno as usize].set_time_read)() == 1 {} + } + // TSCAck is the only aux packet that is sent spontaneously + // by the satellite, in response to a TSC set on the RT link. + let reply = recv_aux_timeout(io, linkno, 10000, timer)?; + if reply == drtioaux::Packet::TSCAck { + return Ok(()); + } else { + return Err("unexpected reply"); + } + } + + fn load_routing_table(aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer) -> Result<(), &'static str> { + for i in 0..drtio_routing::DEST_COUNT { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath { + destination: i as u8, + hops: routing_table.0[i] + }, timer)?; + if reply != drtioaux::Packet::RoutingAck { + return Err("unexpected reply"); + } + } + Ok(()) + } + + fn set_rank(aux_mutex: &Mutex, linkno: u8, rank: u8, timer: GlobalTimer) -> Result<(), &'static str> { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank { + rank: rank + }, timer)?; + if reply != drtioaux::Packet::RoutingAck { + return Err("unexpected reply"); + } + Ok(()) + } + + fn init_buffer_space(destination: u8, linkno: u8) { + let linkno = linkno as usize; + unsafe { + (csr::DRTIO[linkno].destination_write)(destination); + (csr::DRTIO[linkno].force_destination_write)(1); + (csr::DRTIO[linkno].o_get_buffer_space_write)(1); + while (csr::DRTIO[linkno].o_wait_read)() == 1 {} + info!("[DEST#{}] buffer space is {}", + destination, (csr::DRTIO[linkno].o_dbg_buffer_space_read)()); + (csr::DRTIO[linkno].force_destination_write)(0); + } + } + + fn process_unsolicited_aux(aux_mutex: &Mutex, linkno: u8) { + let _lock = aux_mutex.lock().unwrap(); + match drtioaux::recv(linkno) { + Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), + Ok(None) => (), + Err(_) => warn!("[LINK#{}] aux packet error", linkno) + } + } + + fn process_local_errors(linkno: u8) { + let errors; + let linkidx = linkno as usize; + unsafe { + errors = (csr::DRTIO[linkidx].protocol_error_read)(); + (csr::DRTIO[linkidx].protocol_error_write)(errors); + } + if errors != 0 { + error!("[LINK#{}] error(s) found (0x{:02x}):", linkno, errors); + if errors & 1 != 0 { + error!("[LINK#{}] received packet of an unknown type", linkno); + } + if errors & 2 != 0 { + error!("[LINK#{}] received truncated packet", linkno); + } + if errors & 4 != 0 { + error!("[LINK#{}] timeout attempting to get remote buffer space", linkno); + } + } + } + + fn destination_set_up(routing_table: &drtio_routing::RoutingTable, + up_destinations: [bool; drtio_routing::DEST_COUNT], + destination: u8, up: bool) { + let mut up_destinations = up_destinations.borrow_mut(); + up_destinations[destination as usize] = up; + if up { + drtio_routing::interconnect_enable(routing_table, 0, destination); + info!("[DEST#{}] destination is up", destination); + } else { + drtio_routing::interconnect_disable(destination); + info!("[DEST#{}] destination is down", destination); + } + } + + fn destination_up(up_destinations: &Urc>, destination: u8) -> bool { + let up_destinations = up_destinations.borrow(); + up_destinations[destination as usize] + } + + fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, + up_links: &[bool], + up_destinations: &Urc>, + timer: GlobalTimer) { + for destination in 0..drtio_routing::DEST_COUNT { + let hop = routing_table.0[destination][0]; + let destination = destination as u8; + + if hop == 0 { + /* local RTIO */ + if !destination_up(up_destinations, destination) { + destination_set_up(routing_table, up_destinations, destination, true); + } + } else if hop as usize <= csr::DRTIO.len() { + let linkno = hop - 1; + if destination_up(up_destinations, destination) { + if up_links[linkno as usize] { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { + destination: destination + }, timer); + match reply { + Ok(drtioaux::Packet::DestinationDownReply) => + destination_set_up(routing_table, up_destinations, destination, false), + Ok(drtioaux::Packet::DestinationOkReply) => (), + Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => + error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}", destination, channel), + Ok(drtioaux::Packet::DestinationCollisionReply { channel }) => + error!("[DEST#{}] RTIO collision involving channel 0x{:04x}", destination, channel), + Ok(drtioaux::Packet::DestinationBusyReply { channel }) => + error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}", destination, channel), + Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), + Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) + } + } else { + destination_set_up(routing_table, up_destinations, destination, false); + } + } else { + if up_links[linkno as usize] { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { + destination: destination + }, timer); + match reply { + Ok(drtioaux::Packet::DestinationDownReply) => (), + Ok(drtioaux::Packet::DestinationOkReply) => { + destination_set_up(routing_table, up_destinations, destination, true); + init_buffer_space(destination as u8, linkno); + }, + Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), + Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) + } + } + } + } + } + } + + pub fn link_thread(io: Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, + up_destinations: &Urc>, + timer: GlobalTimer) { + let mut up_links = [false; csr::DRTIO.len()]; + loop { + for linkno in 0..csr::DRTIO.len() { + let linkno = linkno as u8; + if up_links[linkno as usize] { + /* link was previously up */ + if link_rx_up(linkno) { + process_unsolicited_aux(&io, aux_mutex, linkno, timer); + process_local_errors(linkno); + } else { + info!("[LINK#{}] link is down", linkno); + up_links[linkno as usize] = false; + } + } else { + /* link was previously down */ + if link_rx_up(linkno) { + info!("[LINK#{}] link RX became up, pinging", linkno); + let ping_count = ping_remote(aux_mutex, linkno, timer); + if ping_count > 0 { + info!("[LINK#{}] remote replied after {} packets", linkno, ping_count); + up_links[linkno as usize] = true; + if let Err(e) = sync_tsc(aux_mutex, linkno, timer) { + error!("[LINK#{}] failed to sync TSC ({})", linkno, e); + } + if let Err(e) = load_routing_table(aux_mutex, linkno, routing_table) { + error!("[LINK#{}] failed to load routing table ({})", linkno, e); + } + if let Err(e) = set_rank(aux_mutex, linkno, 1) { + error!("[LINK#{}] failed to set rank ({})", linkno, e); + } + info!("[LINK#{}] link initialization completed", linkno); + } else { + error!("[LINK#{}] ping failed", linkno); + } + } + } + } + destination_survey(aux_mutex, routing_table, &up_links, up_destinations, timer); + timer.delay_ms(200); + } + } + + pub fn reset(aux_mutex: &Mutex, timer: GlobalTimer) { + for linkno in 0..csr::DRTIO.len() { + unsafe { + (csr::DRTIO[linkno].reset_write)(1); + } + } + timer.delay_ms(1); + for linkno in 0..csr::DRTIO.len() { + unsafe { + (csr::DRTIO[linkno].reset_write)(0); + } + } + + for linkno in 0..csr::DRTIO.len() { + let linkno = linkno as u8; + if link_rx_up(linkno) { + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::ResetRequest, timer); + match reply { + Ok(drtioaux::Packet::ResetAck) => (), + Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno), + Err(e) => error!("[LINK#{}] reset failed, aux packet error ({})", linkno, e) + } + } + } + } +} + +#[cfg(not(has_drtio))] +pub mod drtio { + use super::*; + + pub fn startup(_aux_mutex: &Mutex, _routing_table: &Urc>, + _up_destinations: &Urc>, timer:GlobalTimer) {} + pub fn reset(_aux_mutex: &Mutex, timer: GlobalTimer) {} +} + +// todo figure out what to do with these untils +fn async_error_thread(io: Io) { + loop { + unsafe { + io.until(|| csr::rtio_core::async_error_read() != 0).unwrap(); + let errors = csr::rtio_core::async_error_read(); + if errors & 1 != 0 { + error!("RTIO collision involving channel {}", + csr::rtio_core::collision_channel_read()); + } + if errors & 2 != 0 { + error!("RTIO busy error involving channel {}", + csr::rtio_core::busy_channel_read()); + } + if errors & 4 != 0 { + error!("RTIO sequence error involving channel {}", + csr::rtio_core::sequence_error_channel_read()); + } + csr::rtio_core::async_error_write(errors); + } + } +} + +pub fn startup(aux_mutex: &Mutex, + routing_table: &Urc>, + up_destinations: &Urc>, timer: GlobalTimer) { + drtio::startup(aux_mutex, routing_table, up_destinations, timer); + unsafe { + csr::rtio_core::reset_phy_write(1); + } + task::spawn(async_error_thread); +} + +pub fn reset(aux_mutex: &Mutex, timer: GlobalTimer) { + unsafe { + csr::rtio_core::reset_write(1); + } + drtio::reset(aux_mutex, timer) +}