From 95b92a178b9aa1b87e6de69aeabe2ec538cb379e Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 7 Dec 2023 16:42:39 +0800 Subject: [PATCH] satman: make use of the async flag --- .../firmware/libproto_artiq/drtioaux_proto.rs | 27 ++++ artiq/firmware/runtime/rtio_mgt.rs | 2 +- artiq/firmware/runtime/session.rs | 2 + artiq/firmware/satman/kernel.rs | 64 +++++---- artiq/firmware/satman/main.rs | 67 ++++++---- artiq/firmware/satman/repeater.rs | 63 +++++---- artiq/firmware/satman/routing.rs | 121 +++++++++--------- 7 files changed, 197 insertions(+), 149 deletions(-) diff --git a/artiq/firmware/libproto_artiq/drtioaux_proto.rs b/artiq/firmware/libproto_artiq/drtioaux_proto.rs index ca38e6b71..0013c8171 100644 --- a/artiq/firmware/libproto_artiq/drtioaux_proto.rs +++ b/artiq/firmware/libproto_artiq/drtioaux_proto.rs @@ -77,6 +77,8 @@ pub enum Packet { RoutingSetPath { destination: u8, hops: [u8; 32] }, RoutingSetRank { rank: u8 }, + RoutingRetrievePackets, + RoutingNoPackets, RoutingAck, MonitorRequest { destination: u8, channel: u16, probe: u8 }, @@ -168,6 +170,8 @@ impl Packet { rank: reader.read_u8()? }, 0x32 => Packet::RoutingAck, + 0x33 => Packet::RoutingRetrievePackets, + 0x34 => Packet::RoutingNoPackets, 0x40 => Packet::MonitorRequest { destination: reader.read_u8()?, @@ -450,6 +454,10 @@ impl Packet { }, Packet::RoutingAck => writer.write_u8(0x32)?, + Packet::RoutingRetrievePackets => + writer.write_u8(0x33)?, + Packet::RoutingNoPackets => + writer.write_u8(0x34)?, Packet::MonitorRequest { destination, channel, probe } => { writer.write_u8(0x40)?; @@ -685,4 +693,23 @@ impl Packet { } Ok(()) } + + pub fn routable_destination(&self) -> Option { + // only for packets that could be re-routed, not only forwarded + match self { + Packet::DmaAddTraceRequest { destination, .. } => Some(*destination), + Packet::DmaAddTraceReply { destination, .. } => Some(*destination), + Packet::DmaRemoveTraceRequest { destination, .. } => Some(*destination), + Packet::DmaRemoveTraceReply { destination, .. } => Some(*destination), + Packet::DmaPlaybackRequest { destination, .. } => Some(*destination), + Packet::DmaPlaybackReply { destination, .. } => Some(*destination), + Packet::SubkernelLoadRunRequest { destination, .. } => Some(*destination), + Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination), + Packet::SubkernelMessage { destination, .. } => Some(*destination), + Packet::SubkernelMessageAck { destination, .. } => Some(*destination), + Packet::DmaPlaybackStatus { destination, .. } => Some(*destination), + Packet::SubkernelFinished { destination, .. } => Some(*destination), + _ => None + } + } } diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index fcb61da26..75347da68 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -65,7 +65,7 @@ pub mod drtio { let up_destinations = up_destinations.clone(); let ddma_mutex = ddma_mutex.clone(); let subkernel_mutex = subkernel_mutex.clone(); - io.spawn(10240, move |io| { + io.spawn(8192, move |io| { let routing_table = routing_table.borrow(); link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &subkernel_mutex); }); diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index 129d5013f..44b4bd0be 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -650,6 +650,8 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, unsafe { kernel::stop() } session.kernel_state = KernelState::Absent; unsafe { session.congress.cache.unborrow() } + #[cfg(has_drtio)] + subkernel::clear_subkernels(io, _subkernel_mutex)?; match stream { None => { diff --git a/artiq/firmware/satman/kernel.rs b/artiq/firmware/satman/kernel.rs index 07fed8094..0b9762385 100644 --- a/artiq/firmware/satman/kernel.rs +++ b/artiq/firmware/satman/kernel.rs @@ -140,7 +140,7 @@ struct Session { last_exception: Option, source: u8, // which destination requested running the kernel messages: MessageManager, - subkernels_finished: VecDeque<(u32, bool, u8)> // tuple of id, with_exception, exception_source + subkernels_finished: Vec // ids of subkernels finished } #[derive(Debug)] @@ -286,13 +286,10 @@ impl MessageManager { let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; self.out_state = OutMessageState::MessageBeingSent; let meta = self.get_outgoing_slice(&mut data_slice).unwrap(); - let res = router.route(drtioaux::Packet::SubkernelMessage { + router.route(drtioaux::Packet::SubkernelMessage { source: self_destination, destination: destination, id: id, status: meta.status, length: meta.len as u16, data: data_slice }, routing_table, rank, self_destination); - if let Err(e) = res { - warn!("error sending SubkernelMessage: {}", e); - } Ok(()) } @@ -309,7 +306,7 @@ impl Session { last_exception: None, source: 0, messages: MessageManager::new(), - subkernels_finished: VecDeque::new() + subkernels_finished: Vec::new() } } @@ -500,6 +497,15 @@ impl Manager { with_exception: $with_exception, exception_source: destination }) }} } + + if let Some(subkernel_finished) = self.last_finished.take() { + info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception); + router.route(drtioaux::Packet::SubkernelFinished { + destination: subkernel_finished.source, id: subkernel_finished.id, + with_exception: subkernel_finished.with_exception, exception_src: subkernel_finished.exception_source + }, &routing_table, rank, destination); + } + if !self.is_running() { return; } @@ -534,17 +540,6 @@ impl Manager { self.last_finished = finished!(true); } } - - if let Some(subkernel_finished) = self.last_finished.take() { - info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception); - let res = router.route(drtioaux::Packet::SubkernelFinished { - destination: subkernel_finished.source, id: subkernel_finished.id, - with_exception: subkernel_finished.with_exception, exception_src: destination - }, &routing_table, rank, destination); - if let Err(e) = res { - warn!("error sending SubkernelFinished: {}", e); - } - } } fn process_external_messages(&mut self) -> Result<(), Error> { @@ -578,26 +573,15 @@ impl Manager { self.session.kernel_state = KernelState::Running; } else { let mut i = 0; - for status in self.session.subkernels_finished.iter() { - if status.0 == *id { + for status in &self.session.subkernels_finished { + if *status == *id { + kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?; + self.session.kernel_state = KernelState::Running; + self.session.subkernels_finished.swap_remove(i); break; } i += 1; } - if let Some(finish_status) = self.session.subkernels_finished.remove(i) { - if finish_status.1 { - unsafe { kernel_cpu::stop() } - self.session.kernel_state = KernelState::Absent; - unsafe { self.cache.unborrow() } - self.last_finished = Some(SubkernelFinished { - source: self.session.source, id: self.current_id, - with_exception: true, exception_source: finish_status.2 - }) - } else { - kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?; - self.session.kernel_state = KernelState::Running; - } - } } Ok(()) } @@ -623,7 +607,17 @@ impl Manager { } pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) { - self.session.subkernels_finished.push_back((id, with_exception, exception_source)); + if with_exception { + unsafe { kernel_cpu::stop() } + self.session.kernel_state = KernelState::Absent; + unsafe { self.cache.unborrow() } + self.last_finished = Some(SubkernelFinished { + source: self.session.source, id: self.current_id, + with_exception: true, exception_source: exception_source + }) + } else { + self.session.subkernels_finished.push(id); + } } fn process_kern_message(&mut self, router: &mut Router, @@ -722,7 +716,7 @@ impl Manager { self.session.kernel_state = KernelState::SubkernelAwaitLoad; router.route(drtioaux::Packet::SubkernelLoadRunRequest { source: destination, destination: sk_destination, id: id, run: run - }, routing_table, rank, destination).map_err(|_| Error::DrtioError)?; + }, routing_table, rank, destination); kern_acknowledge() } diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index 64eab6e77..1a8e794a3 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -66,6 +66,12 @@ fn drtiosat_tsc_loaded() -> bool { } } +fn drtiosat_async_ready() { + unsafe { + csr::drtiosat::async_messages_ready_write(1); + } +} + pub enum RtioMaster { Drtio, Dma, @@ -90,7 +96,14 @@ macro_rules! forward { if hop != 0 { let repno = (hop - 1) as usize; if repno < $repeaters.len() { - return $repeaters[repno].aux_forward($packet); + if $packet.expects_response() { + return $repeaters[repno].aux_forward($packet); + } else { + let res = $repeaters[repno].aux_send($packet); + // allow the satellite to parse the packet before next + clock::spin_us(10_000); + return res; + } } else { return Err(drtioaux::Error::RoutingError); } @@ -135,7 +148,8 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg *self_destination = destination; // async messages if *rank == 1 { - if let Some(packet) = router.get_upstream_packet(*rank) { + // for now, master ignores the async_messages_ready packet + if let Some(packet) = router.get_upstream_packet() { // pass any async or routed packets to master // this does mean that DDMA/SK packets to master will "trickle down" to higher rank return drtioaux::send(0, &packet) @@ -236,14 +250,10 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg drtioaux::send(0, &drtioaux::Packet::RoutingAck) } - #[cfg(has_drtio_routing)] - drtioaux::Packet::RoutingAck => { - if *rank > 1 { - router.routing_ack_received(); - } else { - warn!("received unexpected RoutingAck"); - } - Ok(()) + drtioaux::Packet::RoutingRetrievePackets => { + let packet = router.get_upstream_packet().or( + Some(drtioaux::Packet::RoutingNoPackets)).unwrap(); + drtioaux::send(0, &packet) } drtioaux::Packet::MonitorRequest { destination: _destination, channel, probe } => { @@ -453,10 +463,11 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg if kernelmgr.message_ack_slice() { let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; if let Some(meta) = kernelmgr.message_get_slice(&mut data_slice) { - router.send(drtioaux::Packet::SubkernelMessage { + // route and not send immediately as ACKs are not a beginning of a transaction + router.route(drtioaux::Packet::SubkernelMessage { source: *self_destination, destination: meta.destination, id: kernelmgr.get_current_id().unwrap(), status: meta.status, length: meta.len as u16, data: data_slice - }, _routing_table, *rank, *self_destination)?; + }, _routing_table, *rank, *self_destination); } else { error!("Error receiving message slice"); } @@ -476,16 +487,16 @@ fn process_aux_packets(dma_manager: &mut DmaManager, analyzer: &mut Analyzer, routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, router: &mut routing::Router, destination: &mut u8) { let result = - drtioaux::recv(0).or_else(|_| Ok(router.get_local_packet())).and_then(|packet| { - if let Some(packet) = packet { - process_aux_packet(dma_manager, analyzer, kernelmgr, repeaters, routing_table, rank, router, destination, packet) + drtioaux::recv(0).and_then(|packet| { + if let Some(packet) = packet.or_else(|| router.get_local_packet()) { + process_aux_packet(dma_manager, analyzer, kernelmgr, + repeaters, routing_table, rank, router, destination, packet) } else { Ok(()) } }); - match result { - Ok(()) => (), - Err(e) => warn!("aux packet error ({})", e) + if let Err(e) = result { + warn!("aux packet error ({})", e); } } @@ -765,22 +776,22 @@ pub extern fn main() -> i32 { if let Some(status) = dma_manager.get_status() { info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp); router.route(drtioaux::Packet::DmaPlaybackStatus { - source: destination, destination: status.source, id: status.id, - error: status.error, channel: status.channel, timestamp: status.timestamp - }, &routing_table, rank, destination); + destination: status.source, id: status.id, error: status.error, + channel: status.channel, timestamp: status.timestamp + }, &routing_table, rank, destination) } kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination); - if rank > 1 { - if let Some(packet) = router.get_upstream_packet(rank) { - // in sat-sat communications, it can be async - let res = drtioaux::send(0, &packet); - if let Err(e) = res { - warn!("error routing packet: {}", e); - } + if let Some((repno, packet)) = router.get_downstream_packet() { + if let Err(e) = repeaters[repno].aux_send(&packet) { + warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e) } } + + if router.any_upstream_waiting() { + drtiosat_async_ready(); + } } drtiosat_reset_phy(true); diff --git a/artiq/firmware/satman/repeater.rs b/artiq/firmware/satman/repeater.rs index 15542488b..544527cda 100644 --- a/artiq/firmware/satman/repeater.rs +++ b/artiq/firmware/satman/repeater.rs @@ -1,7 +1,7 @@ use board_artiq::{drtioaux, drtio_routing}; #[cfg(has_drtio_routing)] use board_misoc::{csr, clock}; -use routing::{Router, get_routable_packet_destination}; +use routing::Router; #[cfg(has_drtio_routing)] fn rep_link_rx_up(repno: u8) -> bool { @@ -107,11 +107,16 @@ impl Repeater { } } RepeaterState::Up => { - self.process_unsolicited_aux(routing_table, rank, destination, router); + self.process_unsolicited_aux(); if !rep_link_rx_up(self.repno) { info!("[REP#{}] link is down", self.repno); self.state = RepeaterState::Down; } + if self.async_messages_ready() { + if let Err(e) = self.handle_async(routing_table, rank, destination, router) { + warn!("[REP#{}] Error handling async messages ({})", self.repno, e); + } + } } RepeaterState::Failed => { if !rep_link_rx_up(self.repno) { @@ -122,21 +127,9 @@ impl Repeater { } } - fn process_unsolicited_aux(&self, routing_table: &drtio_routing::RoutingTable, rank: u8, self_destination: u8, router: &mut Router) { + fn process_unsolicited_aux(&self) { match drtioaux::recv(self.auxno) { - Ok(Some(packet)) => { - let destination = get_routable_packet_destination(&packet); - if destination.is_none() { - warn!("[REP#{}] unsolicited aux packet: {:?}", self.repno, packet); - } else { - // routable packet - let res = router.route(packet, routing_table, rank, self_destination); - match res { - Ok(()) => drtioaux::send(self.auxno, &drtioaux::Packet::RoutingAck).unwrap(), - Err(e) => warn!("[REP#{}] Error routing packet: {:?}", self.repno, e), - } - } - } + Ok(Some(packet)) => warn!("[REP#{}] unsolicited aux packet: {:?}", self.repno, packet), Ok(None) => (), Err(_) => warn!("[REP#{}] aux packet error", self.repno) } @@ -192,16 +185,42 @@ impl Repeater { } } - pub fn aux_forward(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> { - if self.state != RepeaterState::Up { - return Err(drtioaux::Error::LinkDown); + fn async_messages_ready(&self) -> bool { + let async_rdy; + unsafe { + async_rdy = (csr::DRTIOREP[self.repno as usize].async_messages_ready_read)(); + (csr::DRTIOREP[self.repno as usize].async_messages_ready_write)(0); } - drtioaux::send(self.auxno, request).unwrap(); + async_rdy == 1 + } + + fn handle_async(&self, routing_table: &drtio_routing::RoutingTable, rank: u8, self_destination: u8, router: &mut Router + ) -> Result<(), drtioaux::Error> { + loop { + drtioaux::send(self.auxno, &drtioaux::Packet::RoutingRetrievePackets).unwrap(); + let reply = self.recv_aux_timeout(200)?; + match reply { + drtioaux::Packet::RoutingNoPackets => break, + packet => router.route(packet, routing_table, rank, self_destination) + } + } + Ok(()) + } + + pub fn aux_forward(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> { + self.aux_send(request)?; let reply = self.recv_aux_timeout(200)?; drtioaux::send(0, &reply).unwrap(); Ok(()) } + pub fn aux_send(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> { + if self.state != RepeaterState::Up { + return Err(drtioaux::Error::LinkDown); + } + drtioaux::send(self.auxno, request) + } + pub fn sync_tsc(&self) -> Result<(), drtioaux::Error> { if self.state != RepeaterState::Up { return Ok(()); @@ -212,8 +231,8 @@ impl Repeater { (csr::DRTIOREP[repno].set_time_write)(1); while (csr::DRTIOREP[repno].set_time_read)() == 1 {} } - // TSCAck is sent spontaneously by the satellite, - // in response to a TSC set on the RT link. + // TSCAck is the only aux packet that is sent spontaneously + // by the satellite, in response to a TSC set on the RT link. let reply = self.recv_aux_timeout(10000)?; if reply == drtioaux::Packet::TSCAck { return Ok(()); diff --git a/artiq/firmware/satman/routing.rs b/artiq/firmware/satman/routing.rs index ba82d95cb..e129e4a54 100644 --- a/artiq/firmware/satman/routing.rs +++ b/artiq/firmware/satman/routing.rs @@ -1,93 +1,87 @@ use alloc::collections::vec_deque::VecDeque; use board_artiq::{drtioaux, drtio_routing}; +use board_misoc::csr; // Packets from downstream (further satellites) are received and routed appropriately. -// they're passed immediately if it's possible (within the subtree), or sent upstream. +// they're passed as soon as possible downstream (within the subtree), or sent upstream, +// which is notified about pending packets. // for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest; -// for higher ranks, straight upstream, but awaiting for an ACK to make sure the upstream is not overwhelmed. +// for higher ranks, after getting a notification, it will transact with downstream to get the pending packets. // forward! macro is not deprecated, as routable packets are only these that can originate // from both master and satellite, e.g. DDMA and Subkernel. -pub fn get_routable_packet_destination(packet: &drtioaux::Packet) -> Option { - let destination = match packet { - // received from downstream - drtioaux::Packet::DmaAddTraceRequest { destination, .. } => destination, - drtioaux::Packet::DmaAddTraceReply { destination, .. } => destination, - drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } => destination, - drtioaux::Packet::DmaRemoveTraceReply { destination, .. } => destination, - drtioaux::Packet::DmaPlaybackRequest { destination, .. } => destination, - drtioaux::Packet::DmaPlaybackReply { destination, .. } => destination, - drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } => destination, - drtioaux::Packet::SubkernelLoadRunReply { destination, .. } => destination, - // received from downstream or produced locally - drtioaux::Packet::SubkernelMessage { destination, .. } => destination, - drtioaux::Packet::SubkernelMessageAck { destination, .. } => destination, - // "async" - master gets them by deststatreq, satellites would get it through the router - drtioaux::Packet::DmaPlaybackStatus { destination, .. } => destination, - drtioaux::Packet::SubkernelFinished { destination, .. } => destination, - _ => return None - }; - Some(*destination) -} - pub struct Router { - out_messages: VecDeque, - local_messages: VecDeque, - upstream_ready: bool + upstream_queue: VecDeque, + local_queue: VecDeque, + downstream_queue: VecDeque<(usize, drtioaux::Packet)>, + upstream_notified: bool, } impl Router { pub fn new() -> Router { Router { - out_messages: VecDeque::new(), - local_messages: VecDeque::new(), - upstream_ready: true + upstream_queue: VecDeque::new(), + local_queue: VecDeque::new(), + downstream_queue: VecDeque::new(), + upstream_notified: false, } } - - // called by local sources (DDMA, kernel) and by repeaters on receiving unsolicited data - // messages are always buffered for upstream, or passed downstream directly + // called by local sources (DDMA, kernel) and by repeaters on receiving async data + // messages are always buffered for both upstream and downstream pub fn route(&mut self, packet: drtioaux::Packet, _routing_table: &drtio_routing::RoutingTable, _rank: u8, _self_destination: u8 - ) -> Result<(), drtioaux::Error> { + ) { #[cfg(has_drtio_routing)] { - let destination = get_routable_packet_destination(&packet); + let destination = packet.routable_destination(); if let Some(destination) = destination { - let hop = _routing_table.0[destination as usize][_rank as usize]; - let auxno = if destination == 0 { 0 } else { hop }; + let hop = _routing_table.0[destination as usize][_rank as usize] as usize; if destination == _self_destination { - self.local_messages.push_back(packet); - } else if _rank > 1 { - drtioaux::send(auxno, &packet)?; + self.local_queue.push_back(packet); + } else if hop > 0 && hop < csr::DRTIOREP.len() { + let repno = (hop - 1) as usize; + self.downstream_queue.push_back((repno, packet)); } else { - self.out_messages.push_back(packet); + self.upstream_queue.push_back(packet); } } else { - return Err(drtioaux::Error::RoutingError); + error!("Received an unroutable packet: {:?}", packet); } } #[cfg(not(has_drtio_routing))] { - self.out_messages.push_back(packet); + self.upstream_queue.push_back(packet); } - Ok(()) } // Sends a packet to a required destination, routing if it's necessary pub fn send(&mut self, packet: drtioaux::Packet, - _routing_table: &drtio_routing::RoutingTable, _rank: u8, _destination: u8) -> Result<(), drtioaux::Error> { + _routing_table: &drtio_routing::RoutingTable, + _rank: u8, _destination: u8 + ) -> Result<(), drtioaux::Error> { #[cfg(has_drtio_routing)] { - let destination = get_routable_packet_destination(&packet); - if destination.is_none() || destination == Some(0) { - // send upstream directly (response to master) - drtioaux::send(0, &packet) + let destination = packet.routable_destination(); + if let Some(destination) = destination { + let hop = _routing_table.0[destination as usize][_rank as usize] as usize; + if destination == 0 { + // response is needed immediately if master required it + drtioaux::send(0, &packet)?; + } else if !(hop > 0 && hop < csr::DRTIOREP.len()) { + // higher rank can wait + self.upstream_queue.push_back(packet); + } else { + let repno = (hop - 1) as usize; + // transaction will occur at closest possible opportunity + self.downstream_queue.push_back((repno, packet)); + } + Ok(()) } else { - self.route(packet, _routing_table, _rank, _destination) + // packet not supported in routing, fallback - sent directly + drtioaux::send(0, &packet) } } #[cfg(not(has_drtio_routing))] @@ -96,25 +90,26 @@ impl Router { } } - pub fn get_upstream_packet(&mut self, rank: u8) -> Option { - // called on DestinationStatusRequest on rank 1, in loop in others - if self.upstream_ready { - let packet = self.out_messages.pop_front(); - if rank > 1 && packet.is_some() { - // packet will be sent out, awaiting ACK - self.upstream_ready = false; - } - packet + pub fn any_upstream_waiting(&mut self) -> bool { + let empty = self.upstream_queue.is_empty(); + if !empty && !self.upstream_notified { + self.upstream_notified = true; // so upstream will not get spammed with notifications + true } else { - None + false } } - pub fn routing_ack_received(&mut self) { - self.upstream_ready = true; + pub fn get_upstream_packet(&mut self) -> Option { + self.upstream_notified = false; + self.upstream_queue.pop_front() + } + + pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> { + self.downstream_queue.pop_front() } pub fn get_local_packet(&mut self) -> Option { - self.local_messages.pop_front() + self.local_queue.pop_front() } } \ No newline at end of file