From 758adf0495ca78338aa3ab27152922e1ea0fbf54 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 1 Oct 2021 12:53:31 +0200 Subject: [PATCH] drtioaux: created async version --- src/libboard_artiq/src/drtioaux_async.rs | 156 +++++++++++++++++++++++ src/runtime/src/rtio_mgt.rs | 72 +++++------ 2 files changed, 190 insertions(+), 38 deletions(-) create mode 100644 src/libboard_artiq/src/drtioaux_async.rs diff --git a/src/libboard_artiq/src/drtioaux_async.rs b/src/libboard_artiq/src/drtioaux_async.rs new file mode 100644 index 00000000..9a789a6b --- /dev/null +++ b/src/libboard_artiq/src/drtioaux_async.rs @@ -0,0 +1,156 @@ +use crc; + +use core_io::{ErrorKind as IoErrorKind, Error as IoError}; +use void::Void; +use nb; + +use libboard_zynq::{timer::GlobalTimer, time::Milliseconds}; +use libasync::{task, block_async}; + +use io::{proto::ProtoRead, proto::ProtoWrite, Cursor}; +use crate::mem::mem::DRTIOAUX_MEM; +use crate::pl::csr::DRTIOAUX; +use crate::drtioaux::{Error, has_rx_error}; + +pub use crate::drtioaux_proto::Packet; + +pub async fn reset(linkno: u8) { + let linkno = linkno as usize; + unsafe { + // clear buffer first to limit race window with buffer overflow + // error. We assume the CPU is fast enough so that no two packets + // will be received between the buffer and the error flag are cleared. + (DRTIOAUX[linkno].aux_rx_present_write)(1); + (DRTIOAUX[linkno].aux_rx_error_write)(1); + } +} + +fn tx_ready(linkno: usize) -> nb::Result<(), Void> { + unsafe { + if (DRTIOAUX[linkno].aux_tx_read)() != 0 { + Err(nb::Error::WouldBlock) + } + else { + Ok(()) + } + } +} + +async fn copy_with_swap(src: *mut u8, dst: *mut u8, len: isize) { + unsafe { + for i in (0..(len-4)).step_by(4) { + *dst.offset(i) = *src.offset(i+3); + *dst.offset(i+1) = *src.offset(i+2); + *dst.offset(i+2) = *src.offset(i+1); + *dst.offset(i+3) = *src.offset(i); + } + // checksum untouched + // unrolled for performance + *dst.offset(len-4) = *src.offset(len-4); + *dst.offset(len-3) = *src.offset(len-3); + *dst.offset(len-2) = *src.offset(len-2); + *dst.offset(len-1) = *src.offset(len-1); + } +} + +async fn receive(linkno: u8, f: F) -> Result, Error> + where F: FnOnce(&[u8]) -> Result +{ + let linkidx = linkno as usize; + unsafe { + if (DRTIOAUX[linkidx].aux_rx_present_read)() == 1 { + let ptr = (DRTIOAUX_MEM[linkidx].base + DRTIOAUX_MEM[linkidx].size / 2) as *mut u8; + let len = (DRTIOAUX[linkidx].aux_rx_length_read)() as usize; + // work buffer, as byte order will need to be swapped, cannot be in place + let mut buf: [u8; 1024] = [0; 1024]; + copy_with_swap(ptr, buf.as_mut_ptr(), len as isize).await; + let result = f(&buf[0..len]); + (DRTIOAUX[linkidx].aux_rx_present_write)(1); + Ok(Some(result?)) + } else { + Ok(None) + } + } +} + +pub async fn recv(linkno: u8) -> Result, Error> { + if has_rx_error(linkno) { + return Err(Error::GatewareError) + } + + receive(linkno, |buffer| { + if buffer.len() < 8 { + return Err(IoError::new(IoErrorKind::UnexpectedEof, "Unexpected end").into()) + } + + let mut reader = Cursor::new(buffer); + + let checksum_at = buffer.len() - 4; + let checksum = crc::crc32::checksum_ieee(&reader.get_ref()[0..checksum_at]); + reader.set_position(checksum_at); + if reader.read_u32()? != checksum { + return Err(Error::CorruptedPacket) + } + reader.set_position(0); + + Ok(Packet::read_from(&mut reader)?) + }).await +} + +pub async fn recv_timeout(linkno: u8, timeout_ms: Option, + timer: GlobalTimer) -> Result +{ + let timeout_ms = Milliseconds(timeout_ms.unwrap_or(10)); + let limit = timer.get_time() + timeout_ms; + let mut would_block = false; + while timer.get_time() < limit { + // to ensure one last time recv would run one last time in case + // async would return after timeout + if would_block { + task::r#yield().await; + } + match recv(linkno).await? { + None => { would_block = true; }, + Some(packet) => return Ok(packet), + } + } + Err(Error::TimedOut) +} + +async fn transmit(linkno: u8, f: F) -> Result<(), Error> + where F: FnOnce(&mut [u8]) -> Result +{ + let linkno = linkno as usize; + unsafe { + let _ = block_async!(tx_ready(linkno)).await; + let ptr = DRTIOAUX_MEM[linkno].base as *mut u8; + let len = DRTIOAUX_MEM[linkno].size / 2; + // work buffer, works with unaligned mem access + let mut buf: [u8; 1024] = [0; 1024]; + let len = f(&mut buf[0..len])?; + copy_with_swap(buf.as_mut_ptr(), ptr, len as isize).await; + (DRTIOAUX[linkno].aux_tx_length_write)(len as u16); + (DRTIOAUX[linkno].aux_tx_write)(1); + Ok(()) + } +} + +pub async fn send(linkno: u8, packet: &Packet) -> Result<(), Error> { + transmit(linkno, |buffer| { + let mut writer = Cursor::new(buffer); + + packet.write_to(&mut writer)?; + + let padding = 4 - (writer.position() % 4); + if padding != 4 { + for _ in 0..padding { + writer.write_u8(0)?; + } + } + + let checksum = crc::crc32::checksum_ieee(&writer.get_ref()[0..writer.position()]); + writer.write_u32(checksum)?; + + Ok(writer.position()) + }).await +} diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 30fe288d..7fe4732d 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -8,7 +8,9 @@ use libcortex_a9::mutex::Mutex; #[cfg(has_drtio)] pub mod drtio { use super::*; - use libboard_artiq::drtioaux; + use libboard_artiq::drtioaux_async; + use libboard_artiq::drtioaux_async::Packet; + use libboard_artiq::drtioaux::Error; use log::{warn, error, info}; use embedded_hal::blocking::delay::DelayMs; use libasync::{task, delay}; @@ -33,27 +35,21 @@ pub mod drtio { } } - async fn recv_aux_timeout(linkno: u8, timeout: u64, timer: GlobalTimer) -> Result { - let max_time = timer.get_time() + Milliseconds(timeout); - loop { - if !link_rx_up(linkno).await { - return Err("link went down"); - } - if timer.get_time() > max_time { - return Err("timeout"); - } - match drtioaux::recv(linkno) { - Ok(Some(packet)) => return Ok(packet), - Ok(None) => (), - Err(_) => return Err("aux packet error") - } + async fn recv_aux_timeout(linkno: u8, timeout: u64, timer: GlobalTimer) -> Result { + if !link_rx_up(linkno).await { + return Err("link went down"); + } + match drtioaux_async::recv_timeout(linkno, Some(timeout), timer).await { + Ok(packet) => return Ok(packet), + Err(Error::TimedOut) => return Err("timed out"), + Err(_) => return Err("aux packet error"), } } - pub async fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet, - timer: GlobalTimer) -> Result { + pub async fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &Packet, + timer: GlobalTimer) -> Result { let _lock = aux_mutex.lock(); - drtioaux::send(linkno, request).unwrap(); + drtioaux_async::send(linkno, request).await.unwrap(); recv_aux_timeout(linkno, 200, timer).await } @@ -63,7 +59,7 @@ pub mod drtio { if timer.get_time() > max_time { return; } //could this be cut short? - let _ = drtioaux::recv(linkno); + let _ = drtioaux_async::recv(linkno).await; } } @@ -77,9 +73,9 @@ pub mod drtio { if count > 100 { return 0; } - let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::EchoRequest, timer).await; + let reply = aux_transact(aux_mutex, linkno, &Packet::EchoRequest, timer).await; match reply { - Ok(drtioaux::Packet::EchoReply) => { + Ok(Packet::EchoReply) => { // make sure receive buffer is drained let draining_time = Milliseconds(200); drain_buffer(linkno, draining_time, timer).await; @@ -100,7 +96,7 @@ pub mod drtio { // TSCAck is the only aux packet that is sent spontaneously // by the satellite, in response to a TSC set on the RT link. let reply = recv_aux_timeout(linkno, 10000, timer).await?; - if reply == drtioaux::Packet::TSCAck { + if reply == Packet::TSCAck { return Ok(()); } else { return Err("unexpected reply"); @@ -110,11 +106,11 @@ pub mod drtio { async fn load_routing_table(aux_mutex: &Rc>, linkno: u8, routing_table: &drtio_routing::RoutingTable, timer: GlobalTimer) -> Result<(), &'static str> { for i in 0..drtio_routing::DEST_COUNT { - let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath { + let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingSetPath { destination: i as u8, hops: routing_table.0[i] }, timer).await?; - if reply != drtioaux::Packet::RoutingAck { + if reply != Packet::RoutingAck { return Err("unexpected reply"); } } @@ -122,10 +118,10 @@ pub mod drtio { } async fn set_rank(aux_mutex: &Rc>, linkno: u8, rank: u8, timer: GlobalTimer) -> Result<(), &'static str> { - let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank { + let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingSetRank { rank: rank }, timer).await?; - if reply != drtioaux::Packet::RoutingAck { + if reply != Packet::RoutingAck { return Err("unexpected reply"); } Ok(()) @@ -146,7 +142,7 @@ pub mod drtio { async fn process_unsolicited_aux(aux_mutex: &Rc>, linkno: u8) { let _lock = aux_mutex.lock(); - match drtioaux::recv(linkno) { + match drtioaux_async::recv(linkno).await { Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno) @@ -210,18 +206,18 @@ pub mod drtio { let linkno = hop - 1; if destination_up(up_destinations, destination).await { if up_links[linkno as usize] { - let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { + let reply = aux_transact(aux_mutex, linkno, &Packet::DestinationStatusRequest { destination: destination }, timer).await; match reply { - Ok(drtioaux::Packet::DestinationDownReply) => + Ok(Packet::DestinationDownReply) => destination_set_up(routing_table, up_destinations, destination, false).await, - Ok(drtioaux::Packet::DestinationOkReply) => (), - Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => + Ok(Packet::DestinationOkReply) => (), + Ok(Packet::DestinationSequenceErrorReply { channel }) => error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}", destination, channel), - Ok(drtioaux::Packet::DestinationCollisionReply { channel }) => + Ok(Packet::DestinationCollisionReply { channel }) => error!("[DEST#{}] RTIO collision involving channel 0x{:04x}", destination, channel), - Ok(drtioaux::Packet::DestinationBusyReply { channel }) => + Ok(Packet::DestinationBusyReply { channel }) => error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}", destination, channel), Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) @@ -231,12 +227,12 @@ pub mod drtio { } } else { if up_links[linkno as usize] { - let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { + let reply = aux_transact(aux_mutex, linkno, &Packet::DestinationStatusRequest { destination: destination }, timer).await; match reply { - Ok(drtioaux::Packet::DestinationDownReply) => (), - Ok(drtioaux::Packet::DestinationOkReply) => { + Ok(Packet::DestinationDownReply) => (), + Ok(Packet::DestinationOkReply) => { destination_set_up(routing_table, up_destinations, destination, true).await; init_buffer_space(destination as u8, linkno).await; }, @@ -314,9 +310,9 @@ pub mod drtio { let linkno = linkno as u8; if task::block_on(link_rx_up(linkno)) { let reply = task::block_on(aux_transact(&aux_mutex, linkno, - &drtioaux::Packet::ResetRequest, timer)); + &Packet::ResetRequest, timer)); match reply { - Ok(drtioaux::Packet::ResetAck) => (), + Ok(Packet::ResetAck) => (), Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno), Err(e) => error!("[LINK#{}] reset failed, aux packet error ({})", linkno, e) }