Support CoreMgmt over DRTIO on Zynq Devices #323

Merged
sb10q merged 27 commits from occheung/artiq-zynq:drtio-coremgmt into master 2024-11-19 18:55:03 +08:00
3 changed files with 752 additions and 94 deletions
Showing only changes of commit bdc29e5709 - Show all commits

View File

@ -785,7 +785,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) {
let cfg = Rc::new(cfg);
let restart_idle = Rc::new(Semaphore::new(1, 1));
mgmt::start(cfg.clone(), restart_idle.clone());
mgmt::start(cfg.clone(), restart_idle.clone(), Some((&aux_mutex, &drtio_routing_table, timer)));
task::spawn(async move {
let connection = Rc::new(Semaphore::new(1, 1));
@ -911,7 +911,7 @@ pub fn soft_panic_main(timer: GlobalTimer, cfg: Config) -> ! {
Sockets::init(32);
let dummy = Rc::new(Semaphore::new(0, 1));
mgmt::start(Rc::new(cfg), dummy);
mgmt::start(Rc::new(cfg), dummy, None);
// getting eth settings disables the LED as it resets GPIO
// need to re-enable it here

View File

@ -1,17 +1,21 @@
use alloc::{rc::Rc, string::String, vec::Vec};
use core::cell::RefCell;
use core::{cell::RefCell, ops::Deref};
use futures::{future::poll_fn, task::Poll};
use libasync::{smoltcp::TcpStream, task};
use libboard_artiq::logger::{BufferLogger, LogBufferRef};
use libboard_zynq::{slcr, smoltcp};
use libboard_artiq::{drtio_routing,
drtio_routing::RoutingTable,
logger::{BufferLogger, LogBufferRef}};
use libboard_zynq::{slcr, smoltcp, timer::GlobalTimer};
use libconfig::Config;
use libcortex_a9::semaphore::Semaphore;
use libcortex_a9::{mutex::Mute, semaphore::Semaphore};
use log::{self, debug, error, info, warn, LevelFilter};
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use crate::proto_async::*;
#[cfg(has_drtio)]
use crate::rtio_mgt::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Error {
@ -19,6 +23,8 @@ pub enum Error {
UnknownLogLevel(u8),
UnexpectedPattern,
UnrecognizedPacket,
#[cfg(has_drtio)]
DrtioError(drtio::Error),
}
type Result<T> = core::result::Result<T, Error>;
@ -30,6 +36,8 @@ impl core::fmt::Display for Error {
&Error::UnknownLogLevel(lvl) => write!(f, "unknown log level {}", lvl),
&Error::UnexpectedPattern => write!(f, "unexpected pattern"),
&Error::UnrecognizedPacket => write!(f, "unrecognized packet"),
#[cfg(has_drtio)]
&Error::DrtioError(error) => write!(f, "drtio error: {}", error),
}
}
}
@ -40,6 +48,13 @@ impl From<smoltcp::Error> for Error {
}
}
#[cfg(has_drtio)]
impl From<drtio::Error> for Error {
fn from(error: drtio::Error) -> Self {
Error::DrtioError(error)
}
}
#[derive(Debug, FromPrimitive)]
pub enum Request {
GetLog = 1,
@ -52,6 +67,9 @@ pub enum Request {
ConfigRead = 12,
ConfigWrite = 13,
ConfigRemove = 14,
ConfigErase = 15,
DebugAllocator = 8,
}
#[repr(i8)]
@ -112,35 +130,512 @@ async fn read_key(stream: &mut TcpStream) -> Result<String> {
Ok(String::from_utf8(buffer).unwrap())
}
async fn handle_connection(
stream: &mut TcpStream,
pull_id: Rc<RefCell<u32>>,
cfg: Rc<Config>,
restart_idle: Rc<Semaphore>,
) -> Result<()> {
if !expect(&stream, b"ARTIQ management\n").await? {
return Err(Error::UnexpectedPattern);
}
stream.send_slice("e".as_bytes()).await?;
#[cfg(has_drtio)]
mod remote_coremgmt {
use io::{Cursor, ProtoWrite};
use libboard_artiq::drtioaux_proto::{Packet, MASTER_PAYLOAD_MAX_SIZE};
use super::*;
use crate::rtio_mgt::drtio;
pub async fn get_log(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
) -> Result<()> {
let mut buffer = Vec::new();
loop {
let msg = read_i8(stream).await;
if let Err(smoltcp::Error::Finished) = msg {
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtGetLogRequest {
destination,
clear: false,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtGetLogReply { last, length, data }) => {
buffer.extend(&data[..length as usize]);
if last {
write_i8(stream, Reply::LogContent as i8).await?;
write_chunk(stream, &buffer).await?;
return Ok(());
}
let msg: Request = FromPrimitive::from_i8(msg?).ok_or(Error::UnrecognizedPacket)?;
match msg {
Request::GetLog => {
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
return Err(drtio::Error::UnexpectedReply.into());
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
return Err(e.into());
}
}
}
}
pub async fn clear_log(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
) -> Result<()> {
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtClearLogRequest { destination },
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn pull_log(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
pull_id: &Rc<RefCell<u32>>,
) -> Result<()> {
let id = {
let mut guard = pull_id.borrow_mut();
*guard += 1;
*guard
};
loop {
if id != *pull_id.borrow() {
// another connection attempts to pull the log...
// abort this connection...
break;
}
let reply = drtio::aux_transact(
aux_mutex,
Review

In what scenario would that be possible?

In what scenario would that be possible?
Review

It might be possible if you start multiple aqctl_corelog with different binding ports specified by -p.
Though in practice, all I see is the older aqctl_corelog not emitting anything, even after exiting newer aqctl_corelogs.

It might be possible if you start multiple `aqctl_corelog` with different binding ports specified by `-p`. Though in practice, all I see is the older `aqctl_corelog` not emitting anything, even after exiting newer `aqctl_corelog`s.
Review

Though in practice, all I see is the older aqctl_corelog not emitting anything, even after exiting newer aqctl_corelogs.

Sounds buggy...

> Though in practice, all I see is the older aqctl_corelog not emitting anything, even after exiting newer aqctl_corelogs. Sounds buggy...
Review

I think we should fix this - in the field aqctl_corelog is going to be mercilessly started over and over again when something doesn't work, sent SIGKILL, having its core device connection brutally dropped e.g. by changing the host's IP address, then restarted, etc.

So this needs to be robust.

I think we should fix this - in the field aqctl_corelog is going to be mercilessly started over and over again when something doesn't work, sent SIGKILL, having its core device connection brutally dropped e.g. by changing the host's IP address, then restarted, etc. So this needs to be robust.
linkno,
routing_table,
&Packet::CoreMgmtGetLogRequest {
destination,
clear: true,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtGetLogReply { last: _, length, data }) => {
write_chunk(stream, &data[..length as usize]).await?;
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
return Err(drtio::Error::UnexpectedReply.into());
}
Err(e) => {
error!("aux packet error ({})", e);
return Err(e.into());
}
}
}
Ok(())
}
pub async fn set_log_filter(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
level: LevelFilter,
) -> Result<()> {
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtSetLogLevelRequest {
destination,
log_level: level as u8,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn set_uart_log_filter(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
level: LevelFilter,
) -> Result<()> {
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtSetUartLogLevelRequest {
destination,
log_level: level as u8,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn config_read(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
_cfg: &Rc<Config>,
key: &String,
) -> Result<()> {
let mut config_key: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let len = key.len();
config_key[..len].clone_from_slice(key.as_bytes());
let mut reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtConfigReadRequest {
destination: destination,
length: len as u16,
key: config_key,
},
timer,
)
.await;
let mut buffer = Vec::<u8>::new();
loop {
match reply {
Ok(Packet::CoreMgmtConfigReadReply { last, length, value }) => {
buffer.extend(&value[..length as usize]);
if last {
write_i8(stream, Reply::ConfigData as i8).await?;
write_chunk(stream, &buffer).await?;
return Ok(());
}
reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtConfigReadContinue {
destination: destination,
},
timer,
)
.await;
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
return Err(drtio::Error::UnexpectedReply.into());
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
return Err(e.into());
}
}
}
}
pub async fn config_write(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
_cfg: &Rc<Config>,
key: &String,
value: Vec<u8>,
_restart_idle: &Rc<Semaphore>,
) -> Result<()> {
let mut message = Cursor::new(Vec::with_capacity(key.len() + value.len() + 4 * 2));
message.write_string(key).unwrap();
message.write_bytes(&value).unwrap();
match drtio::partition_data(
linkno,
aux_mutex,
routing_table,
timer,
message.get_ref(),
|slice, status, len: usize| Packet::CoreMgmtConfigWriteRequest {
destination: destination,
last: status.is_last(),
length: len as u16,
data: *slice,
},
|reply| match reply {
Packet::CoreMgmtAck => Ok(()),
packet => {
error!("received unexpected aux packet: {:?}", packet);
Err(drtio::Error::UnexpectedReply)
}
},
)
.await
{
Ok(()) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn config_remove(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
_cfg: &Rc<Config>,
key: &String,
_restart_idle: &Rc<Semaphore>,
) -> Result<()> {
let mut config_key: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let len = key.len();
config_key[..len].clone_from_slice(key.as_bytes());
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtConfigRemoveRequest {
destination: destination,
length: len as u16,
key: config_key,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn config_erase(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
) -> Result<()> {
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtConfigEraseRequest {
destination: destination,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn reboot(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
) -> Result<()> {
info!("initited reboot request to satellite destination {}", destination);
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtRebootRequest {
destination: destination,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::RebootImminent as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
write_i8(stream, Reply::Error as i8).await?;
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
write_i8(stream, Reply::Error as i8).await?;
Err(e.into())
}
}
}
pub async fn debug_allocator(
stream: &mut TcpStream,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
linkno: u8,
destination: u8,
) -> Result<()> {
let reply = drtio::aux_transact(
aux_mutex,
linkno,
routing_table,
&Packet::CoreMgmtAllocatorDebugRequest {
destination: destination,
},
timer,
)
.await;
match reply {
Ok(Packet::CoreMgmtAck) => {
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Ok(packet) => {
error!("received unexpected aux packet: {:?}", packet);
Err(drtio::Error::UnexpectedReply.into())
}
Err(e) => {
error!("aux packet error ({})", e);
Err(e.into())
}
}
}
}
mod local_coremgmt {
use super::*;
pub async fn get_log(stream: &mut TcpStream) -> Result<()> {
let buffer = get_logger_buffer().await.extract().as_bytes().to_vec();
write_i8(stream, Reply::LogContent as i8).await?;
write_chunk(stream, &buffer).await?;
Ok(())
}
Request::ClearLog => {
pub async fn clear_log(stream: &mut TcpStream) -> Result<()> {
let mut buffer = get_logger_buffer().await;
buffer.clear();
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Request::PullLog => {
pub async fn pull_log(stream: &mut TcpStream, pull_id: &Rc<RefCell<u32>>) -> Result<()> {
let id = {
let mut guard = pull_id.borrow_mut();
*guard += 1;
@ -165,24 +660,26 @@ async fn handle_connection(
logger.set_buffer_log_level(LevelFilter::Trace);
}
}
Ok(())
}
Request::SetLogFilter => {
let lvl = read_log_level_filter(stream).await?;
pub async fn set_log_filter(stream: &mut TcpStream, lvl: LevelFilter) -> Result<()> {
info!("Changing log level to {}", lvl);
log::set_max_level(lvl);
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Request::SetUartLogFilter => {
let lvl = read_log_level_filter(stream).await?;
pub async fn set_uart_log_filter(stream: &mut TcpStream, lvl: LevelFilter) -> Result<()> {
info!("Changing UART log level to {}", lvl);
unsafe {
BufferLogger::get_logger().as_ref().unwrap().set_uart_log_level(lvl);
}
write_i8(stream, Reply::Success as i8).await?;
Ok(())
}
Request::ConfigRead => {
let key = read_key(stream).await?;
debug!("read key: {}", key);
pub async fn config_read(stream: &mut TcpStream, cfg: &Rc<Config>, key: &String) -> Result<()> {
let value = cfg.read(&key);
if let Ok(value) = value {
debug!("got value");
@ -192,17 +689,10 @@ async fn handle_connection(
warn!("read error: no such key");
write_i8(stream, Reply::Error as i8).await?;
}
Ok(())
}
Request::ConfigWrite => {
let key = read_key(stream).await?;
debug!("write key: {}", key);
let len = read_i32(stream).await?;
let len = if len <= 0 { 0 } else { len as usize };
let mut buffer = Vec::with_capacity(len);
unsafe {
buffer.set_len(len);
}
read_chunk(stream, &mut buffer).await?;
pub async fn config_write(stream: &mut TcpStream, cfg: &Rc<Config>, key: &String, value: Vec<u8>, restart_idle: &Rc<Semaphore>) -> Result<()> {
let value = cfg.write(&key, buffer);
if value.is_ok() {
debug!("write success");
@ -215,8 +705,10 @@ async fn handle_connection(
error!("failed to write: {:?}", value);
write_i8(stream, Reply::Error as i8).await?;
}
Ok(())
}
Request::ConfigRemove => {
pub async fn config_remove(stream: &mut TcpStream, cfg: &Rc<Config>, key: &String, restart_idle: &Rc<Semaphore>) -> Result<()> {
let key = read_key(stream).await?;
debug!("erase key: {}", key);
let value = cfg.remove(&key);
@ -230,28 +722,194 @@ async fn handle_connection(
warn!("erase failed");
write_i8(stream, Reply::Error as i8).await?;
}
Ok(())
}
Request::Reboot => {
pub async fn config_erase(stream: &mut TcpStream) -> Result<()> {
error!("zynq device does not support config erase");
write_i8(stream, Reply::Error as i8).await?;
Ok(())
}
pub async fn reboot(stream: &mut TcpStream) -> Result<()> {
info!("rebooting");
write_i8(stream, Reply::RebootImminent as i8).await?;
stream.flush().await?;
slcr::reboot();
unreachable!()
}
}
pub async fn debug_allocator(_stream: &mut TcpStream) -> Result<()> {
error!("zynq device does not support allocator debug print");
Ok(())
}
}
pub fn start(cfg: Rc<Config>, restart_idle: Rc<Semaphore>) {
#[cfg(has_drtio)]
macro_rules! process {
($stream: ident, $drtio_tuple:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{
if $destination == 0 {
local_coremgmt::$func($stream, $($param, )*).await
} else if let Some((aux_mutex, routing_table, timer)) = $drtio_tuple {
let linkno = routing_table.0[$destination as usize][0] - 1 as u8;
remote_coremgmt::$func($stream, aux_mutex, routing_table, timer, linkno, $destination, $($param, )*).await
} else {
error!("coremgmt-over-drtio not supported for panicked device, please reboot");
write_i8($stream, Reply::Error as i8).await?;
Err(drtio::Error::LinkDown.into())
}
}}
}
#[cfg(not(has_drtio))]
macro_rules! process {
($stream: ident, $drtio_tuple:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{
local_coremgmt::$func($stream, $($param, )*).await
}}
}
async fn handle_connection(
stream: &mut TcpStream,
pull_id: Rc<RefCell<u32>>,
cfg: Rc<Config>,
restart_idle: Rc<Semaphore>,
_drtio_tuple: Option<(&Rc<Mutex<bool>>, &RoutingTable, GlobalTimer)>,
) -> Result<()> {
if !expect(&stream, b"ARTIQ management\n").await? {
return Err(Error::UnexpectedPattern);
}
let _destination: u8 = read_i8(stream).await? as u8;
stream.send_slice("e".as_bytes()).await?;
loop {
let msg = read_i8(stream).await;
if let Err(smoltcp::Error::Finished) = msg {
return Ok(());
}
let msg: Request = FromPrimitive::from_i8(msg?).ok_or(Error::UnrecognizedPacket)?;
match msg {
Request::GetLog => process!(stream, _drtio_tuple, _destination, get_log),
Request::ClearLog => process!(stream, _drtio_tuple, _destination, clear_log),
Request::PullLog => process!(
stream,
_drtio_tuple,
_destination,
pull_log,
&pull_id
),
Request::SetLogFilter => {
let lvl = read_log_level_filter(stream).await?;
process!(
stream,
_drtio_tuple,
_destination,
set_log_filter,
lvl
)
}
Request::SetUartLogFilter => {
let lvl = read_log_level_filter(stream).await?;
process!(
stream,
_drtio_tuple,
_destination,
set_uart_log_filter,
lvl
)
}
Request::ConfigRead => {
let key = read_key(stream).await?;
process!(
stream,
_drtio_tuple,
_destination,
config_read,
&cfg,
&key
)
}
Request::ConfigWrite => {
let key = read_key(stream).await?;
let len = read_i32(stream).await?;
let len = if len <= 0 { 0 } else { len as usize };
let mut buffer = Vec::with_capacity(len);
unsafe {
buffer.set_len(len);
}
Outdated
Review

Isn't that just an idiosyncratic object? Canonical way would be to define a struct.

Isn't that just an idiosyncratic object? Canonical way would be to define a ``struct``.
read_chunk(stream, &mut buffer).await?;
process!(
stream,
_drtio_tuple,
_destination,
config_write,
&cfg,
&key,
buffer,
restart_idle
)
}
Request::ConfigRemove => {
let key = read_key(stream).await?;
process!(
stream,
_drtio_tuple,
_destination,
config_remove,
&cfg,
&key,
restart_idle
)
}
Request::Reboot => {
Outdated
Review

I think no other software calls objects "tuples".

I think no other software calls objects "tuples".
Outdated
Review

What about DrtioContext?
And name the fields, as is usually done.

What about DrtioContext? And name the fields, as is usually done.
process!(stream, _drtio_tuple, _destination, reboot)
}
Request::ConfigErase => {
process!(stream, _drtio_tuple, _destination, config_erase)
}
Request::DebugAllocator => {
process!(

Slightly related: I like the tuple to keep the argument list shorter/more compartmentalized, should we adapt it in other places too?

Slightly related: I like the tuple to keep the argument list shorter/more compartmentalized, should we adapt it in other places too?

Agreed that a a lot of argument lists are needlessly long. Especially with the shared sync primitives theres a lot of repetition. Though I'm not sure that these tuples make things much shorter, without some type aliasing. Is there a case for putting these shared sync prims in a struct and passing around an Rc to that instead? (In a similar style to how the mainline repo passes around Io)

Putting the idea out there, obviously it would be a heavy refactor...

Agreed that a a lot of argument lists are needlessly long. Especially with the shared sync primitives theres a lot of repetition. Though I'm not sure that these tuples make things much shorter, without some type aliasing. Is there a case for putting these shared sync prims in a struct and passing around an `Rc` to that instead? (In a similar style to how the mainline repo passes around `Io`) Putting the idea out there, obviously it would be a heavy refactor...
stream,
_drtio_tuple,
_destination,
debug_allocator
)
}
}?;
}
}
pub fn start(
cfg: Rc<Config>,
restart_idle: Rc<Semaphore>,
drtio_tuple: Option<(
&Rc<Mutex<bool>>,
&Rc<RefCell<drtio_routing::RoutingTable>>,
GlobalTimer,
)>,
) {
let drtio_tuple = drtio_tuple.map(
|(aux_mutex, routing_table, timer)| (aux_mutex.clone(), routing_table.clone(), timer)
);
task::spawn(async move {
let pull_id = Rc::new(RefCell::new(0u32));
loop {
let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap();
let pull_id = pull_id.clone();
let cfg = cfg.clone();
let restart_idle = restart_idle.clone();
let drtio_tuple = drtio_tuple.clone();
task::spawn(async move {
info!("received connection");
let _ = handle_connection(&mut stream, pull_id, cfg, restart_idle)
// Avoid consuming the tuple
// Keep the borrowed value on stack
let drtio_tuple = drtio_tuple.as_ref().map(
|(aux_mutex, routing_table, timer)| (aux_mutex, routing_table.borrow(), *timer)
);
let drtio_tuple = drtio_tuple.as_ref().map(
|(aux_mutex, routing_table, timer)| (*aux_mutex, routing_table.deref(), *timer)
);
let _ = handle_connection(&mut stream, pull_id, cfg, restart_idle, drtio_tuple)
.await
.map_err(|e| warn!("connection terminated: {:?}", e));
let _ = stream.flush().await;

View File

@ -577,7 +577,7 @@ pub mod drtio {
}
}
async fn partition_data<PacketF, HandlerF>(
pub async fn partition_data<PacketF, HandlerF>(
linkno: u8,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,