From 62642957cd9e36c6e87cd2ca4aabfba8c616d71b Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 19 Sep 2018 11:16:21 +0800 Subject: [PATCH] runtime: fix DRTIO aux channel race condition --- artiq/firmware/runtime/kern_hwreq.rs | 186 ++++++++++++++------------- artiq/firmware/runtime/main.rs | 9 +- artiq/firmware/runtime/moninj.rs | 66 ++++------ artiq/firmware/runtime/rtio_mgt.rs | 127 ++++++++++-------- artiq/firmware/runtime/sched.rs | 2 +- artiq/firmware/runtime/session.rs | 26 ++-- 6 files changed, 215 insertions(+), 201 deletions(-) diff --git a/artiq/firmware/runtime/kern_hwreq.rs b/artiq/firmware/runtime/kern_hwreq.rs index 631209923..aa4d21bcf 100644 --- a/artiq/firmware/runtime/kern_hwreq.rs +++ b/artiq/firmware/runtime/kern_hwreq.rs @@ -1,6 +1,6 @@ use core::cell::RefCell; use kernel_proto as kern; -use sched::{Io, Error as SchedError}; +use sched::{Io, Mutex, Error as SchedError}; use session::{kern_acknowledge, kern_send, Error}; use rtio_mgt; use urc::Urc; @@ -11,14 +11,20 @@ use board_artiq::spi as local_spi; #[cfg(has_drtio)] mod remote_i2c { use drtioaux; + use rtio_mgt::drtio; + use sched::{Io, Mutex}; - fn basic_reply(linkno: u8) -> Result<(), ()> { - match drtioaux::recv_timeout(linkno, None) { + pub fn start(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8) -> Result<(), ()> { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::I2cStartRequest { + destination: destination, + busno: busno + }); + match reply { Ok(drtioaux::Packet::I2cBasicReply { succeeded }) => { if succeeded { Ok(()) } else { Err(()) } } - Ok(_) => { - error!("received unexpected aux packet"); + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); Err(()) } Err(e) => { @@ -28,49 +34,53 @@ mod remote_i2c { } } - pub fn start(linkno: u8, destination: u8, busno: u8) -> Result<(), ()> { - let request = drtioaux::Packet::I2cStartRequest { + pub fn restart(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8) -> Result<(), ()> { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::I2cRestartRequest { destination: destination, busno: busno - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) + }); + match reply { + Ok(drtioaux::Packet::I2cBasicReply { succeeded }) => { + if succeeded { Ok(()) } else { Err(()) } + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + Err(()) + } + Err(e) => { + error!("aux packet error ({})", e); + Err(()) + } } - basic_reply(linkno) } - pub fn restart(linkno: u8, destination: u8, busno: u8) -> Result<(), ()> { - let request = drtioaux::Packet::I2cRestartRequest { + pub fn stop(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8) -> Result<(), ()> { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::I2cStopRequest { destination: destination, busno: busno - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) + }); + match reply { + Ok(drtioaux::Packet::I2cBasicReply { succeeded }) => { + if succeeded { Ok(()) } else { Err(()) } + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + Err(()) + } + Err(e) => { + error!("aux packet error ({})", e); + Err(()) + } } - basic_reply(linkno) } - pub fn stop(linkno: u8, destination: u8, busno: u8) -> Result<(), ()> { - let request = drtioaux::Packet::I2cStopRequest { - destination: destination, - busno: busno - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) - } - basic_reply(linkno) - } - - pub fn write(linkno: u8, destination: u8, busno: u8, data: u8) -> Result { - let request = drtioaux::Packet::I2cWriteRequest { + pub fn write(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8, data: u8) -> Result { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::I2cWriteRequest { destination: destination, busno: busno, data: data - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) - } - match drtioaux::recv_timeout(linkno, None) { + }); + match reply { Ok(drtioaux::Packet::I2cWriteReply { succeeded, ack }) => { if succeeded { Ok(ack) } else { Err(()) } } @@ -85,16 +95,13 @@ mod remote_i2c { } } - pub fn read(linkno: u8, destination: u8, busno: u8, ack: bool) -> Result { - let request = drtioaux::Packet::I2cReadRequest { + pub fn read(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8, ack: bool) -> Result { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::I2cReadRequest { destination: destination, busno: busno, ack: ack - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) - } - match drtioaux::recv_timeout(linkno, None) { + }); + match reply { Ok(drtioaux::Packet::I2cReadReply { succeeded, data }) => { if succeeded { Ok(data) } else { Err(()) } } @@ -113,14 +120,24 @@ mod remote_i2c { #[cfg(has_drtio)] mod remote_spi { use drtioaux; + use rtio_mgt::drtio; + use sched::{Io, Mutex}; - fn basic_reply(linkno: u8) -> Result<(), ()> { - match drtioaux::recv_timeout(linkno, None) { + pub fn set_config(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8, flags: u8, length: u8, div: u8, cs: u8) -> Result<(), ()> { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SpiSetConfigRequest { + destination: destination, + busno: busno, + flags: flags, + length: length, + div: div, + cs: cs + }); + match reply { Ok(drtioaux::Packet::SpiBasicReply { succeeded }) => { if succeeded { Ok(()) } else { Err(()) } } - Ok(_) => { - error!("received unexpected aux packet"); + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); Err(()) } Err(e) => { @@ -130,47 +147,38 @@ mod remote_spi { } } - pub fn set_config(linkno: u8, destination: u8, busno: u8, flags: u8, length: u8, div: u8, cs: u8) -> Result<(), ()> { - let request = drtioaux::Packet::SpiSetConfigRequest { - destination: destination, - busno: busno, - flags: flags, - length: length, - div: div, - cs: cs - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) - } - basic_reply(linkno) - } - - pub fn write(linkno: u8, destination: u8, busno: u8, data: u32) -> Result<(), ()> { - let request = drtioaux::Packet::SpiWriteRequest { + pub fn write(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8, data: u32) -> Result<(), ()> { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SpiWriteRequest { destination: destination, busno: busno, data: data - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) + }); + match reply { + Ok(drtioaux::Packet::SpiBasicReply { succeeded }) => { + if succeeded { Ok(()) } else { Err(()) } + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + Err(()) + } + Err(e) => { + error!("aux packet error ({})", e); + Err(()) + } } - basic_reply(linkno) } - pub fn read(linkno: u8, destination: u8, busno: u8) -> Result { - let request = drtioaux::Packet::SpiReadRequest { + pub fn read(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, busno: u8) -> Result { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SpiReadRequest { destination: destination, busno: busno - }; - if drtioaux::send(linkno, &request).is_err() { - return Err(()) - } - match drtioaux::recv_timeout(linkno, None) { + }); + match reply { Ok(drtioaux::Packet::SpiReadReply { succeeded, data }) => { if succeeded { Ok(data) } else { Err(()) } } - Ok(_) => { - error!("received unexpected aux packet"); + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); Err(()) } Err(e) => { @@ -184,7 +192,7 @@ mod remote_spi { #[cfg(has_drtio)] macro_rules! dispatch { - ($mod_local:ident, $mod_remote:ident, $routing_table:ident, $busno:expr, $func:ident $(, $param:expr)*) => {{ + ($io:ident, $aux_mutex:ident, $mod_local:ident, $mod_remote:ident, $routing_table:ident, $busno:expr, $func:ident $(, $param:expr)*) => {{ let destination = ($busno >> 16) as u8; let busno = $busno as u8; let hop = $routing_table.0[destination as usize][0]; @@ -192,27 +200,27 @@ macro_rules! dispatch { $mod_local::$func(busno, $($param, )*) } else { let linkno = hop - 1; - $mod_remote::$func(linkno, destination, busno, $($param, )*) + $mod_remote::$func($io, $aux_mutex, linkno, destination, busno, $($param, )*) } }} } #[cfg(not(has_drtio))] macro_rules! dispatch { - ($mod_local:ident, $mod_remote:ident, $routing_table:ident, $busno:expr, $func:ident $(, $param:expr)*) => {{ + ($io:ident, $aux_mutex:ident,$mod_local:ident, $mod_remote:ident, $routing_table:ident, $busno:expr, $func:ident $(, $param:expr)*) => {{ let busno = $busno as u8; $mod_local::$func(busno, $($param, )*) }} } -pub fn process_kern_hwreq(io: &Io, +pub fn process_kern_hwreq(io: &Io, aux_mutex: &Mutex, _routing_table: &drtio_routing::RoutingTable, _up_destinations: &Urc>, request: &kern::Message) -> Result> { match request { &kern::RtioInitRequest => { info!("resetting RTIO"); - rtio_mgt::init_core(false); + rtio_mgt::init_core(io, aux_mutex, false); kern_acknowledge() } @@ -228,42 +236,42 @@ pub fn process_kern_hwreq(io: &Io, } &kern::I2cStartRequest { busno } => { - let succeeded = dispatch!(local_i2c, remote_i2c, _routing_table, busno, start).is_ok(); + let succeeded = dispatch!(io, aux_mutex, local_i2c, remote_i2c, _routing_table, busno, start).is_ok(); kern_send(io, &kern::I2cBasicReply { succeeded: succeeded }) } &kern::I2cRestartRequest { busno } => { - let succeeded = dispatch!(local_i2c, remote_i2c, _routing_table, busno, restart).is_ok(); + let succeeded = dispatch!(io, aux_mutex, local_i2c, remote_i2c, _routing_table, busno, restart).is_ok(); kern_send(io, &kern::I2cBasicReply { succeeded: succeeded }) } &kern::I2cStopRequest { busno } => { - let succeeded = dispatch!(local_i2c, remote_i2c, _routing_table, busno, stop).is_ok(); + let succeeded = dispatch!(io, aux_mutex, local_i2c, remote_i2c, _routing_table, busno, stop).is_ok(); kern_send(io, &kern::I2cBasicReply { succeeded: succeeded }) } &kern::I2cWriteRequest { busno, data } => { - match dispatch!(local_i2c, remote_i2c, _routing_table, busno, write, data) { + match dispatch!(io, aux_mutex, local_i2c, remote_i2c, _routing_table, busno, write, data) { Ok(ack) => kern_send(io, &kern::I2cWriteReply { succeeded: true, ack: ack }), Err(_) => kern_send(io, &kern::I2cWriteReply { succeeded: false, ack: false }) } } &kern::I2cReadRequest { busno, ack } => { - match dispatch!(local_i2c, remote_i2c, _routing_table, busno, read, ack) { + match dispatch!(io, aux_mutex, local_i2c, remote_i2c, _routing_table, busno, read, ack) { Ok(data) => kern_send(io, &kern::I2cReadReply { succeeded: true, data: data }), Err(_) => kern_send(io, &kern::I2cReadReply { succeeded: false, data: 0xff }) } } &kern::SpiSetConfigRequest { busno, flags, length, div, cs } => { - let succeeded = dispatch!(local_spi, remote_spi, _routing_table, busno, + let succeeded = dispatch!(io, aux_mutex, local_spi, remote_spi, _routing_table, busno, set_config, flags, length, div, cs).is_ok(); kern_send(io, &kern::SpiBasicReply { succeeded: succeeded }) }, &kern::SpiWriteRequest { busno, data } => { - let succeeded = dispatch!(local_spi, remote_spi, _routing_table, busno, + let succeeded = dispatch!(io, aux_mutex, local_spi, remote_spi, _routing_table, busno, write, data).is_ok(); kern_send(io, &kern::SpiBasicReply { succeeded: succeeded }) } &kern::SpiReadRequest { busno } => { - match dispatch!(local_spi, remote_spi, _routing_table, busno, read) { + match dispatch!(io, aux_mutex, local_spi, remote_spi, _routing_table, busno, read) { Ok(data) => kern_send(io, &kern::SpiReadReply { succeeded: true, data: data }), Err(_) => kern_send(io, &kern::SpiReadReply { succeeded: false, data: 0 }) } diff --git a/artiq/firmware/runtime/main.rs b/artiq/firmware/runtime/main.rs index fe7a26d91..d79de3b83 100644 --- a/artiq/firmware/runtime/main.rs +++ b/artiq/firmware/runtime/main.rs @@ -289,22 +289,25 @@ fn startup_ethernet() { drtio_routing::RoutingTable::default_empty())); let up_destinations = urc::Urc::new(RefCell::new( [false; drtio_routing::DEST_COUNT])); + let aux_mutex = sched::Mutex::new(); let mut scheduler = sched::Scheduler::new(); let io = scheduler.io(); - rtio_mgt::startup(&io, &drtio_routing_table, &up_destinations); + rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations); io.spawn(4096, mgmt::thread); { + let aux_mutex = aux_mutex.clone(); let drtio_routing_table = drtio_routing_table.clone(); let up_destinations = up_destinations.clone(); - io.spawn(16384, move |io| { session::thread(io, &drtio_routing_table, &up_destinations) }); + io.spawn(16384, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations) }); } #[cfg(any(has_rtio_moninj, has_drtio))] { + let aux_mutex = aux_mutex.clone(); let drtio_routing_table = drtio_routing_table.clone(); - io.spawn(4096, move |io| { moninj::thread(io, &drtio_routing_table) }); + io.spawn(4096, move |io| { moninj::thread(io, &aux_mutex, &drtio_routing_table) }); } #[cfg(has_rtio_analyzer)] io.spawn(4096, analyzer::thread); diff --git a/artiq/firmware/runtime/moninj.rs b/artiq/firmware/runtime/moninj.rs index 5ffaa96a0..8534376d5 100644 --- a/artiq/firmware/runtime/moninj.rs +++ b/artiq/firmware/runtime/moninj.rs @@ -3,7 +3,7 @@ use core::cell::RefCell; use io::Error as IoError; use moninj_proto::*; -use sched::{Io, TcpListener, TcpStream, Error as SchedError}; +use sched::{Io, Mutex, TcpListener, TcpStream, Error as SchedError}; use urc::Urc; use board_misoc::clock; use board_artiq::drtio_routing; @@ -50,21 +50,16 @@ mod local_moninj { #[cfg(has_drtio)] mod remote_moninj { use drtioaux; + use rtio_mgt::drtio; + use sched::{Io, Mutex}; - pub fn read_probe(linkno: u8, destination: u8, channel: u16, probe: u8) -> u32 { - let request = drtioaux::Packet::MonitorRequest { + pub fn read_probe(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, channel: u16, probe: u8) -> u32 { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::MonitorRequest { destination: destination, channel: channel, probe: probe - }; - match drtioaux::send(linkno, &request) { - Ok(_) => (), - Err(e) => { - error!("aux packet error ({})", e); - return 0; - } - } - match drtioaux::recv_timeout(linkno, None) { + }); + match reply { Ok(drtioaux::Packet::MonitorReply { value }) => return value, Ok(packet) => error!("received unexpected aux packet: {:?}", packet), Err(e) => error!("aux packet error ({})", e) @@ -72,33 +67,23 @@ mod remote_moninj { 0 } - pub fn inject(linkno: u8, destination: u8, channel: u16, overrd: u8, value: u8) { - let request = drtioaux::Packet::InjectionRequest { + pub fn inject(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, channel: u16, overrd: u8, value: u8) { + let _lock = aux_mutex.lock(io).unwrap(); + drtioaux::send(linkno, &drtioaux::Packet::InjectionRequest { destination: destination, channel: channel, overrd: overrd, value: value - }; - match drtioaux::send(linkno, &request) { - Ok(_) => (), - Err(e) => error!("aux packet error ({})", e) - } + }).unwrap(); } - pub fn read_injection_status(linkno: u8, destination: u8, channel: u16, overrd: u8) -> u8 { - let request = drtioaux::Packet::InjectionStatusRequest { + pub fn read_injection_status(io: &Io, aux_mutex: &Mutex, linkno: u8, destination: u8, channel: u16, overrd: u8) -> u8 { + let reply = drtio::aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::InjectionStatusRequest { destination: destination, channel: channel, overrd: overrd - }; - match drtioaux::send(linkno, &request) { - Ok(_) => (), - Err(e) => { - error!("aux packet error ({})", e); - return 0; - } - } - match drtioaux::recv_timeout(linkno, None) { + }); + match reply { Ok(drtioaux::Packet::InjectionStatusReply { value }) => return value, Ok(packet) => error!("received unexpected aux packet: {:?}", packet), Err(e) => error!("aux packet error ({})", e) @@ -109,7 +94,7 @@ mod remote_moninj { #[cfg(has_drtio)] macro_rules! dispatch { - ($routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{ + ($io:ident, $aux_mutex:ident, $routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{ let destination = ($channel >> 16) as u8; let channel = $channel as u16; let hop = $routing_table.0[destination as usize][0]; @@ -117,20 +102,20 @@ macro_rules! dispatch { local_moninj::$func(channel, $($param, )*) } else { let linkno = hop - 1; - remote_moninj::$func(linkno, destination, channel, $($param, )*) + remote_moninj::$func($io, $aux_mutex, linkno, destination, channel, $($param, )*) } }} } #[cfg(not(has_drtio))] macro_rules! dispatch { - ($routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{ + ($io:ident, $aux_mutex:ident, $routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{ let channel = $channel as u16; local_moninj::$func(channel, $($param, )*) }} } -fn connection_worker(io: &Io, _routing_table: &drtio_routing::RoutingTable, +fn connection_worker(io: &Io, _aux_mutex: &Mutex, _routing_table: &drtio_routing::RoutingTable, mut stream: &mut TcpStream) -> Result<(), Error> { let mut probe_watch_list = BTreeMap::new(); let mut inject_watch_list = BTreeMap::new(); @@ -159,9 +144,9 @@ fn connection_worker(io: &Io, _routing_table: &drtio_routing::RoutingTable, let _ = inject_watch_list.remove(&(channel, overrd)); } }, - HostMessage::Inject { channel, overrd, value } => dispatch!(_routing_table, channel, inject, overrd, value), + HostMessage::Inject { channel, overrd, value } => dispatch!(io, _aux_mutex, _routing_table, channel, inject, overrd, value), HostMessage::GetInjectionStatus { channel, overrd } => { - let value = dispatch!(_routing_table, channel, read_injection_status, overrd); + let value = dispatch!(io, _aux_mutex, _routing_table, channel, read_injection_status, overrd); let reply = DeviceMessage::InjectionStatus { channel: channel, overrd: overrd, @@ -178,7 +163,7 @@ fn connection_worker(io: &Io, _routing_table: &drtio_routing::RoutingTable, if clock::get_ms() > next_check { for (&(channel, probe), previous) in probe_watch_list.iter_mut() { - let current = dispatch!(_routing_table, channel, read_probe, probe); + let current = dispatch!(io, _aux_mutex, _routing_table, channel, read_probe, probe); if previous.is_none() || previous.unwrap() != current { let message = DeviceMessage::MonitorStatus { channel: channel, @@ -193,7 +178,7 @@ fn connection_worker(io: &Io, _routing_table: &drtio_routing::RoutingTable, } } for (&(channel, overrd), previous) in inject_watch_list.iter_mut() { - let current = dispatch!(_routing_table, channel, read_injection_status, overrd); + let current = dispatch!(io, _aux_mutex, _routing_table, channel, read_injection_status, overrd); if previous.is_none() || previous.unwrap() != current { let message = DeviceMessage::InjectionStatus { channel: channel, @@ -214,17 +199,18 @@ fn connection_worker(io: &Io, _routing_table: &drtio_routing::RoutingTable, } } -pub fn thread(io: Io, routing_table: &Urc>) { +pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc>) { let listener = TcpListener::new(&io, 2047); listener.listen(1383).expect("moninj: cannot listen"); loop { + let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let stream = listener.accept().expect("moninj: cannot accept").into_handle(); io.spawn(16384, move |io| { let routing_table = routing_table.borrow(); let mut stream = TcpStream::from_handle(&io, stream); - match connection_worker(&io, &routing_table, &mut stream) { + match connection_worker(&io, &aux_mutex, &routing_table, &mut stream) { Ok(()) => {}, Err(err) => error!("moninj aborted: {}", err) } diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index e3688fb57..69233dd60 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -7,6 +7,7 @@ use board_misoc::clock; use board_misoc::config; use board_artiq::drtio_routing; use sched::Io; +use sched::Mutex; #[cfg(has_rtio_crg)] pub mod crg { @@ -47,16 +48,18 @@ pub mod drtio { use super::*; use drtioaux; - pub fn startup(io: &Io, routing_table: &Urc>, + pub fn startup(io: &Io, aux_mutex: &Mutex, + routing_table: &Urc>, up_destinations: &Urc>) { unsafe { csr::drtio_transceiver::stable_clkin_write(1); } + let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); io.spawn(4096, move |io| { let routing_table = routing_table.borrow(); - link_thread(io, &routing_table, &up_destinations); + link_thread(io, &aux_mutex, &routing_table, &up_destinations); }); } @@ -87,26 +90,6 @@ pub mod drtio { } } - fn ping_remote(linkno: u8, io: &Io) -> u32 { - let mut count = 0; - loop { - if !link_rx_up(linkno) { - return 0 - } - count += 1; - if count > 200 { - return 0; - } - drtioaux::send(linkno, &drtioaux::Packet::EchoRequest).unwrap(); - io.sleep(100).unwrap(); - let pr = drtioaux::recv(linkno); - match pr { - Ok(Some(drtioaux::Packet::EchoReply)) => return count, - _ => {} - } - } - } - fn recv_aux_timeout(io: &Io, linkno: u8, timeout: u32) -> Result { let max_time = clock::get_ms() + timeout as u64; loop { @@ -125,7 +108,35 @@ pub mod drtio { } } - fn sync_tsc(io: &Io, linkno: u8) -> Result<(), &'static str> { + pub fn aux_transact(io: &Io, aux_mutex: &Mutex, + linkno: u8, request: &drtioaux::Packet) -> Result { + let _lock = aux_mutex.lock(io).unwrap(); + drtioaux::send(linkno, request).unwrap(); + recv_aux_timeout(io, linkno, 200) + } + + fn ping_remote(io: &Io, aux_mutex: &Mutex, linkno: u8) -> u32 { + let mut count = 0; + loop { + if !link_rx_up(linkno) { + return 0 + } + count += 1; + if count > 100 { + return 0; + } + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::EchoRequest); + match reply { + Ok(drtioaux::Packet::EchoReply) => return count, + _ => {} + } + io.relinquish().unwrap(); + } + } + + fn sync_tsc(io: &Io, aux_mutex: &Mutex, linkno: u8) -> Result<(), &'static str> { + let _lock = aux_mutex.lock(io).unwrap(); + unsafe { (csr::DRTIO[linkno as usize].set_time_write)(1); while (csr::DRTIO[linkno as usize].set_time_read)() == 1 {} @@ -140,14 +151,13 @@ pub mod drtio { } } - fn load_routing_table(io: &Io, linkno: u8, routing_table: &drtio_routing::RoutingTable) + fn load_routing_table(io: &Io, aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable) -> Result<(), &'static str> { for i in 0..drtio_routing::DEST_COUNT { - drtioaux::send(linkno, &drtioaux::Packet::RoutingSetPath { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath { destination: i as u8, hops: routing_table.0[i] - }).unwrap(); - let reply = recv_aux_timeout(io, linkno, 200)?; + })?; if reply != drtioaux::Packet::RoutingAck { return Err("unexpected reply"); } @@ -155,11 +165,10 @@ pub mod drtio { Ok(()) } - fn set_rank(io: &Io, linkno: u8, rank: u8) -> Result<(), &'static str> { - drtioaux::send(linkno, &drtioaux::Packet::RoutingSetRank { + fn set_rank(io: &Io, aux_mutex: &Mutex, linkno: u8, rank: u8) -> Result<(), &'static str> { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank { rank: rank - }).unwrap(); - let reply = recv_aux_timeout(io, linkno, 200)?; + })?; if reply != drtioaux::Packet::RoutingAck { return Err("unexpected reply"); } @@ -179,7 +188,8 @@ pub mod drtio { } } - fn process_unsolicited_aux(linkno: u8) { + fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8) { + let _lock = aux_mutex.lock(io).unwrap(); match drtioaux::recv(linkno) { Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), @@ -227,7 +237,7 @@ pub mod drtio { up_destinations[destination as usize] } - fn destination_survey(io: &Io, routing_table: &drtio_routing::RoutingTable, + fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>) { for destination in 0..drtio_routing::DEST_COUNT { let hop = routing_table.0[destination][0]; @@ -242,10 +252,10 @@ pub mod drtio { let linkno = hop - 1; if destination_up(up_destinations, destination) { if link_up(linkno) { - drtioaux::send(linkno, &drtioaux::Packet::DestinationStatusRequest { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { destination: destination - }).unwrap(); - match recv_aux_timeout(io, linkno, 200) { + }); + match reply { Ok(drtioaux::Packet::DestinationDownReply) => destination_set_up(routing_table, up_destinations, destination, false), Ok(drtioaux::Packet::DestinationOkReply) => (), @@ -263,10 +273,10 @@ pub mod drtio { } } else { if link_up(linkno) { - drtioaux::send(linkno, &drtioaux::Packet::DestinationStatusRequest { + let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { destination: destination - }).unwrap(); - match recv_aux_timeout(io, linkno, 200) { + }); + match reply { Ok(drtioaux::Packet::DestinationDownReply) => (), Ok(drtioaux::Packet::DestinationOkReply) => { destination_set_up(routing_table, up_destinations, destination, true); @@ -281,7 +291,8 @@ pub mod drtio { } } - pub fn link_thread(io: Io, routing_table: &drtio_routing::RoutingTable, + pub fn link_thread(io: Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>) { loop { for linkno in 0..csr::DRTIO.len() { @@ -289,7 +300,7 @@ pub mod drtio { if link_up(linkno) { /* link was previously up */ if link_rx_up(linkno) { - process_unsolicited_aux(linkno); + process_unsolicited_aux(&io, aux_mutex, linkno); process_local_errors(linkno); } else { info!("[LINK#{}] link is down", linkno); @@ -299,17 +310,17 @@ pub mod drtio { /* link was previously down */ if link_rx_up(linkno) { info!("[LINK#{}] link RX became up, pinging", linkno); - let ping_count = ping_remote(linkno, &io); + let ping_count = ping_remote(&io, aux_mutex, linkno); if ping_count > 0 { info!("[LINK#{}] remote replied after {} packets", linkno, ping_count); set_link_up(linkno, true); - if let Err(e) = sync_tsc(&io, linkno) { + if let Err(e) = sync_tsc(&io, aux_mutex, linkno) { error!("[LINK#{}] failed to sync TSC ({})", linkno, e); } - if let Err(e) = load_routing_table(&io, linkno, routing_table) { + if let Err(e) = load_routing_table(&io, aux_mutex, linkno, routing_table) { error!("[LINK#{}] failed to load routing table ({})", linkno, e); } - if let Err(e) = set_rank(&io, linkno, 1) { + if let Err(e) = set_rank(&io, aux_mutex, linkno, 1) { error!("[LINK#{}] failed to set rank ({})", linkno, e); } info!("[LINK#{}] link initialization completed", linkno); @@ -319,18 +330,18 @@ pub mod drtio { } } } - destination_survey(&io, routing_table, up_destinations); + destination_survey(&io, aux_mutex, routing_table, up_destinations); io.sleep(200).unwrap(); } } - pub fn init() { + pub fn init(io: &Io, aux_mutex: &Mutex) { for linkno in 0..csr::DRTIO.len() { let linkno = linkno as u8; if link_up(linkno) { - drtioaux::send(linkno, - &drtioaux::Packet::ResetRequest { phy: false }).unwrap(); - match drtioaux::recv_timeout(linkno, None) { + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::ResetRequest { phy: false }); + match reply { Ok(drtioaux::Packet::ResetAck) => (), Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno), Err(e) => error!("[LINK#{}] reset failed, aux packet error ({})", linkno, e) @@ -344,9 +355,10 @@ pub mod drtio { pub mod drtio { use super::*; - pub fn startup(_io: &Io, _routing_table: &Urc>, + pub fn startup(_io: &Io, _aux_mutex: &Mutex, + _routing_table: &Urc>, _up_destinations: &Urc>) {} - pub fn init() {} + pub fn init(_io: &Io, _aux_mutex: &Mutex) {} pub fn link_up(_linkno: u8) -> bool { false } } @@ -372,7 +384,8 @@ fn async_error_thread(io: Io) { } } -pub fn startup(io: &Io, routing_table: &Urc>, +pub fn startup(io: &Io, aux_mutex: &Mutex, + routing_table: &Urc>, up_destinations: &Urc>) { #[cfg(has_rtio_crg)] { @@ -413,17 +426,17 @@ pub fn startup(io: &Io, routing_table: &Urc } } - drtio::startup(io, routing_table, up_destinations); - init_core(true); + drtio::startup(io, aux_mutex, routing_table, up_destinations); + init_core(io, aux_mutex, true); io.spawn(4096, async_error_thread); } -pub fn init_core(phy: bool) { +pub fn init_core(io: &Io, aux_mutex: &Mutex, phy: bool) { unsafe { csr::rtio_core::reset_write(1); if phy { csr::rtio_core::reset_phy_write(1); } } - drtio::init() + drtio::init(io, aux_mutex) } diff --git a/artiq/firmware/runtime/sched.rs b/artiq/firmware/runtime/sched.rs index 770ac3f6c..7a2a85723 100644 --- a/artiq/firmware/runtime/sched.rs +++ b/artiq/firmware/runtime/sched.rs @@ -273,7 +273,7 @@ impl Mutex { Mutex(Urc::new(Cell::new(false))) } - pub fn lock<'a>(&'a self, io: Io) -> Result, Error> { + pub fn lock<'a>(&'a self, io: &Io) -> Result, Error> { io.until(|| !self.0.get())?; self.0.set(true); Ok(MutexGuard(&*self.0)) diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index e413e8749..ae23c3410 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -6,7 +6,7 @@ use io::{Read, Write, Error as IoError}; use board_misoc::{ident, cache, config}; use {mailbox, rpc_queue, kernel}; use urc::Urc; -use sched::{ThreadHandle, Io, TcpListener, TcpStream, Error as SchedError}; +use sched::{ThreadHandle, Io, Mutex, TcpListener, TcpStream, Error as SchedError}; use rtio_mgt; use rtio_dma::Manager as DmaManager; use cache::Cache; @@ -324,7 +324,7 @@ fn process_host_message(io: &Io, Ok(()) } -fn process_kern_message(io: &Io, +fn process_kern_message(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, mut stream: Option<&mut TcpStream>, @@ -345,7 +345,7 @@ fn process_kern_message(io: &Io, kern_recv_dotrace(request); - if kern_hwreq::process_kern_hwreq(io, routing_table, up_destinations, request)? { + if kern_hwreq::process_kern_hwreq(io, aux_mutex, routing_table, up_destinations, request)? { return Ok(false) } @@ -494,7 +494,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream, }) } -fn host_kernel_worker(io: &Io, +fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, stream: &mut TcpStream, @@ -513,7 +513,7 @@ fn host_kernel_worker(io: &Io, } if mailbox::receive() != 0 { - process_kern_message(io, + process_kern_message(io, aux_mutex, routing_table, up_destinations, Some(stream), &mut session)?; } @@ -534,7 +534,7 @@ fn host_kernel_worker(io: &Io, } } -fn flash_kernel_worker(io: &Io, +fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, congress: &mut Congress, @@ -559,7 +559,7 @@ fn flash_kernel_worker(io: &Io, } if mailbox::receive() != 0 { - if process_kern_message(io, routing_table, up_destinations, None, &mut session)? { + if process_kern_message(io, aux_mutex, routing_table, up_destinations, None, &mut session)? { return Ok(()) } } @@ -591,7 +591,8 @@ fn respawn(io: &Io, handle: &mut Option, f: F) *handle = Some(io.spawn(16384, f)) } -pub fn thread(io: Io, routing_table: &Urc>, +pub fn thread(io: Io, aux_mutex: &Mutex, + routing_table: &Urc>, up_destinations: &Urc>) { let listener = TcpListener::new(&io, 65535); listener.listen(1381).expect("session: cannot listen"); @@ -601,6 +602,7 @@ pub fn thread(io: Io, routing_table: &Urc>, let mut kernel_thread = None; { + let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); let congress = congress.clone(); @@ -608,7 +610,7 @@ pub fn thread(io: Io, routing_table: &Urc>, let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); info!("running startup kernel"); - match flash_kernel_worker(&io, &routing_table, &up_destinations, &mut congress, "startup_kernel") { + match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut congress, "startup_kernel") { Ok(()) => info!("startup kernel finished"), Err(Error::KernelNotFound) => @@ -637,6 +639,7 @@ pub fn thread(io: Io, routing_table: &Urc>, } info!("new connection from {}", stream.remote_endpoint()); + let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); let congress = congress.clone(); @@ -645,7 +648,7 @@ pub fn thread(io: Io, routing_table: &Urc>, let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); let mut stream = TcpStream::from_handle(&io, stream); - match host_kernel_worker(&io, &routing_table, &up_destinations, &mut stream, &mut *congress) { + match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut stream, &mut *congress) { Ok(()) => (), Err(Error::Protocol(host::Error::Io(IoError::UnexpectedEnd))) => info!("connection closed"), @@ -663,13 +666,14 @@ pub fn thread(io: Io, routing_table: &Urc>, if kernel_thread.as_ref().map_or(true, |h| h.terminated()) { info!("no connection, starting idle kernel"); + let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); let congress = congress.clone(); respawn(&io, &mut kernel_thread, move |io| { let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); - match flash_kernel_worker(&io, &routing_table, &up_destinations, &mut *congress, "idle_kernel") { + match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut *congress, "idle_kernel") { Ok(()) => info!("idle kernel finished, standing by"), Err(Error::Protocol(host::Error::Io(