diff --git a/artiq/firmware/libproto_artiq/drtioaux_proto.rs b/artiq/firmware/libproto_artiq/drtioaux_proto.rs index 4a962f6fe..6ca4230eb 100644 --- a/artiq/firmware/libproto_artiq/drtioaux_proto.rs +++ b/artiq/firmware/libproto_artiq/drtioaux_proto.rs @@ -72,7 +72,17 @@ pub enum Packet { DmaRemoveTraceReply { succeeded: bool }, DmaPlaybackRequest { destination: u8, id: u32, timestamp: u64 }, DmaPlaybackReply { succeeded: bool }, - DmaPlaybackStatus { destination: u8, id: u32, error: u8, channel: u32, timestamp: u64 } + DmaPlaybackStatus { destination: u8, id: u32, error: u8, channel: u32, timestamp: u64 }, + + SubkernelAddDataRequest { destination: u8, id: u32, last: bool, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE] }, + SubkernelAddDataReply { succeeded: bool }, + SubkernelLoadRunRequest { destination: u8, id: u32, run: bool }, + SubkernelLoadRunReply { succeeded: bool }, + SubkernelFinished { id: u32, with_exception: bool }, + SubkernelExceptionRequest { destination: u8 }, + SubkernelException { last: bool, length: u16, data: [u8; SAT_PAYLOAD_MAX_SIZE] }, + SubkernelMessage { destination: u8, id: u32, last: bool, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE] }, + SubkernelMessageAck { destination: u8 }, } impl Packet { @@ -268,6 +278,69 @@ impl Packet { timestamp: reader.read_u64()? }, + 0xc0 => { + let destination = reader.read_u8()?; + let id = reader.read_u32()?; + let last = reader.read_bool()?; + let length = reader.read_u16()?; + let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + reader.read_exact(&mut data[0..length as usize])?; + Packet::SubkernelAddDataRequest { + destination: destination, + id: id, + last: last, + length: length as u16, + data: data, + } + }, + 0xc1 => Packet::SubkernelAddDataReply { + succeeded: reader.read_bool()? + }, + 0xc4 => Packet::SubkernelLoadRunRequest { + destination: reader.read_u8()?, + id: reader.read_u32()?, + run: reader.read_bool()? + }, + 0xc5 => Packet::SubkernelLoadRunReply { + succeeded: reader.read_bool()? + }, + 0xc8 => Packet::SubkernelFinished { + id: reader.read_u32()?, + with_exception: reader.read_bool()?, + }, + 0xc9 => Packet::SubkernelExceptionRequest { + destination: reader.read_u8()? + }, + 0xca => { + let last = reader.read_bool()?; + let length = reader.read_u16()?; + let mut data: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; + reader.read_exact(&mut data[0..length as usize])?; + Packet::SubkernelException { + last: last, + length: length, + data: data + } + }, + 0xcb => { + let destination = reader.read_u8()?; + let id = reader.read_u32()?; + let last = reader.read_bool()?; + let length = reader.read_u16()?; + let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + reader.read_exact(&mut data[0..length as usize])?; + Packet::SubkernelMessage { + destination: destination, + id: id, + last: last, + length: length as u16, + data: data, + } + }, + 0xcc => Packet::SubkernelMessageAck { + destination: reader.read_u8()? + }, + ty => return Err(Error::UnknownPacket(ty)) }) } @@ -488,7 +561,57 @@ impl Packet { writer.write_u8(error)?; writer.write_u32(channel)?; writer.write_u64(timestamp)?; - } + }, + + Packet::SubkernelAddDataRequest { destination, id, last, data, length } => { + writer.write_u8(0xc0)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; + writer.write_bool(last)?; + writer.write_u16(length)?; + writer.write_all(&data[0..length as usize])?; + }, + Packet::SubkernelAddDataReply { succeeded } => { + writer.write_u8(0xc1)?; + writer.write_bool(succeeded)?; + }, + Packet::SubkernelLoadRunRequest { destination, id, run } => { + writer.write_u8(0xc4)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; + writer.write_bool(run)?; + }, + Packet::SubkernelLoadRunReply { succeeded } => { + writer.write_u8(0xc5)?; + writer.write_bool(succeeded)?; + }, + Packet::SubkernelFinished { id, with_exception } => { + writer.write_u8(0xc8)?; + writer.write_u32(id)?; + writer.write_bool(with_exception)?; + }, + Packet::SubkernelExceptionRequest { destination } => { + writer.write_u8(0xc9)?; + writer.write_u8(destination)?; + }, + Packet::SubkernelException { last, length, data } => { + writer.write_u8(0xca)?; + writer.write_bool(last)?; + writer.write_u16(length)?; + writer.write_all(&data[0..length as usize])?; + }, + Packet::SubkernelMessage { destination, id, last, data, length } => { + writer.write_u8(0xcb)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; + writer.write_bool(last)?; + writer.write_u16(length)?; + writer.write_all(&data[0..length as usize])?; + }, + Packet::SubkernelMessageAck { destination } => { + writer.write_u8(0xcc)?; + writer.write_u8(destination)?; + }, } Ok(()) } diff --git a/artiq/firmware/libproto_artiq/session_proto.rs b/artiq/firmware/libproto_artiq/session_proto.rs index d4277c26c..d5266ec3d 100644 --- a/artiq/firmware/libproto_artiq/session_proto.rs +++ b/artiq/firmware/libproto_artiq/session_proto.rs @@ -84,6 +84,8 @@ pub enum Request { column: u32, function: u32, }, + + UploadSubkernel { id: u32, destination: u8, kernel: Vec }, } #[derive(Debug)] @@ -137,6 +139,11 @@ impl Request { column: reader.read_u32()?, function: reader.read_u32()? }, + 9 => Request::UploadSubkernel { + id: reader.read_u32()?, + destination: reader.read_u8()?, + kernel: reader.read_bytes()? + }, ty => return Err(Error::UnknownPacket(ty)) }) diff --git a/artiq/firmware/runtime/kernel.rs b/artiq/firmware/runtime/kernel.rs index 42c1f2f05..e7a11bfc4 100644 --- a/artiq/firmware/runtime/kernel.rs +++ b/artiq/firmware/runtime/kernel.rs @@ -87,3 +87,329 @@ unsafe fn load_image(image: &[u8]) -> Result<(), &'static str> { pub fn validate(ptr: usize) -> bool { ptr >= KERNELCPU_EXEC_ADDRESS && ptr <= KERNELCPU_LAST_ADDRESS } + + +#[cfg(has_drtio)] +pub mod subkernel { + use alloc::{vec::Vec, collections::btree_map::BTreeMap, string::String, string::ToString}; + use core::str; + use board_artiq::drtio_routing::RoutingTable; + use board_misoc::clock; + use proto_artiq::{drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE, rpc_proto as rpc}; + use io::Cursor; + use rtio_mgt::drtio; + use sched::{Io, Mutex, Error as SchedError}; + + #[derive(Debug, PartialEq, Clone, Copy)] + pub enum FinishStatus { + Ok, + CommLost, + Exception + } + + #[derive(Debug, PartialEq, Clone, Copy)] + pub enum SubkernelState { + NotLoaded, + Uploaded, + Running, + Finished { status: FinishStatus }, + } + + #[derive(Fail, Debug)] + pub enum Error { + #[fail(display = "Timed out waiting for subkernel")] + Timeout, + #[fail(display = "Session killed while waiting for subkernel")] + SessionKilled, + #[fail(display = "Subkernel is in incorrect state for the given operation")] + IncorrectState, + #[fail(display = "DRTIO error: {}", _0)] + DrtioError(String), + #[fail(display = "scheduler error")] + SchedError(SchedError), + #[fail(display = "rpc io error")] + RpcIoError, + #[fail(display = "subkernel finished prematurely")] + SubkernelFinished, + } + + impl From<&str> for Error { + fn from(value: &str) -> Error { + Error::DrtioError(value.to_string()) + } + } + + impl From for Error { + fn from(value: SchedError) -> Error { + match value { + SchedError::Interrupted => Error::SessionKilled, + x => Error::SchedError(x) + } + } + } + + impl From> for Error { + fn from(_value: io::Error) -> Error { + Error::RpcIoError + } + } + + pub struct SubkernelFinished { + pub id: u32, + pub comm_lost: bool, + pub exception: Option> + } + + struct Subkernel { + pub destination: u8, + pub data: Vec, + pub state: SubkernelState + } + + impl Subkernel { + pub fn new(destination: u8, data: Vec) -> Self { + Subkernel { + destination: destination, + data: data, + state: SubkernelState::NotLoaded + } + } + } + + static mut SUBKERNELS: BTreeMap = BTreeMap::new(); + + pub fn add_subkernel(io: &Io, subkernel_mutex: &Mutex, id: u32, destination: u8, kernel: Vec) { + let _lock = subkernel_mutex.lock(io).unwrap(); + unsafe { SUBKERNELS.insert(id, Subkernel::new(destination, kernel)); } + } + + pub fn upload(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, + routing_table: &RoutingTable, id: u32) -> Result<(), Error> { + let _lock = subkernel_mutex.lock(io).unwrap(); + let subkernel = unsafe { SUBKERNELS.get_mut(&id).unwrap() }; + drtio::subkernel_upload(io, aux_mutex, routing_table, id, + subkernel.destination, &subkernel.data)?; + subkernel.state = SubkernelState::Uploaded; + Ok(()) + } + + pub fn load(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, routing_table: &RoutingTable, + id: u32, run: bool) -> Result<(), Error> { + let _lock = subkernel_mutex.lock(io).unwrap(); + let subkernel = unsafe { SUBKERNELS.get_mut(&id).unwrap() }; + if subkernel.state != SubkernelState::Uploaded { + return Err(Error::IncorrectState); + } + drtio::subkernel_load(io, aux_mutex, routing_table, id, subkernel.destination, run)?; + if run { + subkernel.state = SubkernelState::Running; + } + Ok(()) + } + + pub fn clear_subkernels(io: &Io, subkernel_mutex: &Mutex) { + let _lock = subkernel_mutex.lock(io).unwrap(); + unsafe { + SUBKERNELS = BTreeMap::new(); + MESSAGE_QUEUE = Vec::new(); + CURRENT_MESSAGES = BTreeMap::new(); + } + } + + pub fn subkernel_finished(io: &Io, subkernel_mutex: &Mutex, id: u32, with_exception: bool) { + // called upon receiving DRTIO SubkernelRunDone + let _lock = subkernel_mutex.lock(io).unwrap(); + let subkernel = unsafe { SUBKERNELS.get_mut(&id) }; + // may be None if session ends and is cleared + if let Some(subkernel) = subkernel { + subkernel.state = SubkernelState::Finished { + status: match with_exception { + true => FinishStatus::Exception, + false => FinishStatus::Ok, + } + } + } + } + + pub fn destination_changed(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, + routing_table: &RoutingTable, destination: u8, up: bool) { + let _lock = subkernel_mutex.lock(io).unwrap(); + let subkernels_iter = unsafe { SUBKERNELS.iter_mut() }; + for (id, subkernel) in subkernels_iter { + if subkernel.destination == destination { + if up { + match drtio::subkernel_upload(io, aux_mutex, routing_table, *id, destination, &subkernel.data) + { + Ok(_) => subkernel.state = SubkernelState::Uploaded, + Err(e) => error!("Error adding subkernel on destination {}: {}", destination, e) + } + } else { + subkernel.state = match subkernel.state { + SubkernelState::Running => SubkernelState::Finished { status: FinishStatus::CommLost }, + _ => SubkernelState::NotLoaded, + } + } + } + } + } + + pub fn retrieve_finish_status(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, + routing_table: &RoutingTable, id: u32) -> Result { + let _lock = subkernel_mutex.lock(io)?; + let mut subkernel = unsafe { SUBKERNELS.get_mut(&id).unwrap() }; + match subkernel.state { + SubkernelState::Finished { status } => { + subkernel.state = SubkernelState::Uploaded; + Ok(SubkernelFinished { + id: id, + comm_lost: status == FinishStatus::CommLost, + exception: if status == FinishStatus::Exception { + Some(drtio::subkernel_retrieve_exception(io, aux_mutex, + routing_table, subkernel.destination)?) + } else { None } + }) + }, + _ => Err(Error::IncorrectState) + } + } + + pub fn await_finish(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, + routing_table: &RoutingTable, id: u32, timeout: u64) -> Result { + { + let _lock = subkernel_mutex.lock(io)?; + match unsafe { SUBKERNELS.get(&id).unwrap().state } { + SubkernelState::Running | SubkernelState::Finished { .. } => (), + _ => return Err(Error::IncorrectState) + } + } + let max_time = clock::get_ms() + timeout as u64; + let _res = io.until(|| { + if clock::get_ms() > max_time { + return true; + } + if subkernel_mutex.test_lock() { + // cannot lock again within io.until - scheduler guarantees + // that it will not be interrupted - so only test the lock + return false; + } + let subkernel = unsafe { SUBKERNELS.get(&id).unwrap() }; + match subkernel.state { + SubkernelState::Finished { .. } => true, + _ => false + } + })?; + if clock::get_ms() > max_time { + error!("Remote subkernel finish await timed out"); + return Err(Error::Timeout); + } + retrieve_finish_status(io, aux_mutex, subkernel_mutex, routing_table, id) + } + + pub struct Message { + from_id: u32, + pub tag_count: u8, + pub tag: u8, + pub data: Vec + } + + // FIFO queue of messages + static mut MESSAGE_QUEUE: Vec = Vec::new(); + // currently under construction message(s) (can be from multiple sources) + static mut CURRENT_MESSAGES: BTreeMap = BTreeMap::new(); + + pub fn message_handle_incoming(io: &Io, subkernel_mutex: &Mutex, + id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + // called when receiving a message from satellite + let _lock = match subkernel_mutex.lock(io) { + Ok(lock) => lock, + // may get interrupted, when session is cancelled or main kernel finishes without await + Err(_) => return, + }; + if unsafe { SUBKERNELS.get(&id).is_none() } { + // do not add messages for non-existing or deleted subkernels + return + } + match unsafe { CURRENT_MESSAGES.get_mut(&id) } { + Some(message) => message.data.extend(&data[..length]), + None => unsafe { + CURRENT_MESSAGES.insert(id, Message { + from_id: id, + tag_count: data[0], + tag: data[1], + data: data[2..length].to_vec() + }); + } + }; + if last { + unsafe { + // when done, remove from working queue + MESSAGE_QUEUE.push(CURRENT_MESSAGES.remove(&id).unwrap()); + }; + } + } + + pub fn message_await(io: &Io, subkernel_mutex: &Mutex, id: u32, timeout: u64 + ) -> Result { + { + let _lock = subkernel_mutex.lock(io)?; + match unsafe { SUBKERNELS.get(&id).unwrap().state } { + SubkernelState::Finished { .. } => return Err(Error::SubkernelFinished), + SubkernelState::Running => (), + _ => return Err(Error::IncorrectState) + } + } + let max_time = clock::get_ms() + timeout as u64; + let message = io.until_ok(|| { + if clock::get_ms() > max_time { + return Ok(None); + } + if subkernel_mutex.test_lock() { + return Err(()); + } + let msg_len = unsafe { MESSAGE_QUEUE.len() }; + for i in 0..msg_len { + let msg = unsafe { &MESSAGE_QUEUE[i] }; + if msg.from_id == id { + return Ok(Some(unsafe { MESSAGE_QUEUE.remove(i) })); + } + } + match unsafe { SUBKERNELS.get(&id).unwrap().state } { + SubkernelState::Finished { .. } => return Ok(None), + _ => () + } + Err(()) + }); + match message { + Ok(Some(message)) => Ok(message), + Ok(None) => { + if clock::get_ms() > max_time { + Err(Error::Timeout) + } else { + let _lock = subkernel_mutex.lock(io)?; + match unsafe { SUBKERNELS.get(&id).unwrap().state } { + SubkernelState::Finished { .. } => Err(Error::SubkernelFinished), + _ => Err(Error::IncorrectState) + } + } + } + Err(e) => Err(Error::SchedError(e)), + } + } + + pub fn message_send<'a>(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, + routing_table: &RoutingTable, id: u32, count: u8, tag: &'a [u8], message: *const *const () + ) -> Result<(), Error> { + let mut writer = Cursor::new(Vec::new()); + let _lock = subkernel_mutex.lock(io).unwrap(); + let destination = unsafe { SUBKERNELS.get(&id).unwrap().destination }; + + // reuse rpc code for sending arbitrary data + rpc::send_args(&mut writer, 0, tag, message)?; + // skip service tag, but overwrite first byte with tag count + let data = &mut writer.into_inner()[3..]; + data[0] = count; + Ok(drtio::subkernel_send_message( + io, aux_mutex, routing_table, id, destination, data + )?) + } +} \ No newline at end of file diff --git a/artiq/firmware/runtime/main.rs b/artiq/firmware/runtime/main.rs index 08367375a..f0970a73f 100644 --- a/artiq/firmware/runtime/main.rs +++ b/artiq/firmware/runtime/main.rs @@ -1,4 +1,4 @@ -#![feature(lang_items, panic_info_message, const_btree_new, iter_advance_by)] +#![feature(lang_items, panic_info_message, const_btree_new, iter_advance_by, never_type)] #![no_std] extern crate dyld; @@ -189,6 +189,7 @@ fn startup() { let aux_mutex = sched::Mutex::new(); let ddma_mutex = sched::Mutex::new(); + let subkernel_mutex = sched::Mutex::new(); let mut scheduler = sched::Scheduler::new(interface); let io = scheduler.io(); @@ -197,7 +198,7 @@ fn startup() { io.spawn(4096, dhcp::dhcp_thread); } - rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex); + rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex, &subkernel_mutex); io.spawn(4096, mgmt::thread); { @@ -205,7 +206,8 @@ fn startup() { let drtio_routing_table = drtio_routing_table.clone(); let up_destinations = up_destinations.clone(); let ddma_mutex = ddma_mutex.clone(); - io.spawn(16384, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex) }); + let subkernel_mutex = subkernel_mutex.clone(); + io.spawn(32768, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex, &subkernel_mutex) }); } #[cfg(any(has_rtio_moninj, has_drtio))] { diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index f643fafd5..ba6ba3865 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -21,18 +21,20 @@ pub mod drtio { use rtio_dma::remote_dma; #[cfg(has_rtio_analyzer)] use analyzer::remote_analyzer::RemoteBuffer; + use kernel::subkernel; pub fn startup(io: &Io, aux_mutex: &Mutex, routing_table: &Urc>, up_destinations: &Urc>, - ddma_mutex: &Mutex) { + ddma_mutex: &Mutex, subkernel_mutex: &Mutex) { let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); let ddma_mutex = ddma_mutex.clone(); + let subkernel_mutex = subkernel_mutex.clone(); io.spawn(8192, move |io| { let routing_table = routing_table.borrow(); - link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex); + link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &subkernel_mutex); }); } @@ -61,14 +63,26 @@ pub mod drtio { } } - fn process_async_packets(io: &Io, ddma_mutex: &Mutex, packet: drtioaux::Packet - ) -> Option { + fn process_async_packets(io: &Io, ddma_mutex: &Mutex, subkernel_mutex: &Mutex, linkno: u8, + packet: drtioaux::Packet) -> Option { // returns None if an async packet has been consumed match packet { drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp } => { remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); None }, + drtioaux::Packet::SubkernelFinished { id, with_exception } => { + subkernel::subkernel_finished(io, subkernel_mutex, id, with_exception); + None + }, + drtioaux::Packet::SubkernelMessage { id, destination: from, last, length, data } => { + subkernel::message_handle_incoming(io, subkernel_mutex, id, last, length as usize, &data); + // acknowledge receiving part of the message + drtioaux::send(linkno, + &drtioaux::Packet::SubkernelMessageAck { destination: from } + ).unwrap(); + None + } other => Some(other) } } @@ -166,13 +180,14 @@ pub mod drtio { } } - fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8, ddma_mutex: &Mutex) { + fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, subkernel_mutex: &Mutex, linkno: u8) { let _lock = aux_mutex.lock(io).unwrap(); match drtioaux::recv(linkno) { - Ok(Some(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp })) => { - remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); + Ok(Some(packet)) => { + if let Some(packet) = process_async_packets(io, ddma_mutex, subkernel_mutex, linkno, packet) { + warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet); + } } - Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno) } @@ -221,7 +236,7 @@ pub mod drtio { fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_links: &[bool], up_destinations: &Urc>, - ddma_mutex: &Mutex) { + ddma_mutex: &Mutex, subkernel_mutex: &Mutex) { for destination in 0..drtio_routing::DEST_COUNT { let hop = routing_table.0[destination][0]; let destination = destination as u8; @@ -241,11 +256,12 @@ pub mod drtio { destination: destination }); if let Ok(reply) = reply { - let reply = process_async_packets(io, ddma_mutex, reply); + let reply = process_async_packets(io, ddma_mutex, subkernel_mutex, linkno, reply); match reply { Some(drtioaux::Packet::DestinationDownReply) => { destination_set_up(routing_table, up_destinations, destination, false); remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, false); + subkernel::destination_changed(io, aux_mutex, subkernel_mutex, routing_table, destination, false); } Some(drtioaux::Packet::DestinationOkReply) => (), Some(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => { @@ -276,6 +292,7 @@ pub mod drtio { } else { destination_set_up(routing_table, up_destinations, destination, false); remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, false); + subkernel::destination_changed(io, aux_mutex, subkernel_mutex, routing_table, destination, false); } } else { if up_links[linkno as usize] { @@ -289,6 +306,7 @@ pub mod drtio { destination_set_up(routing_table, up_destinations, destination, true); init_buffer_space(destination as u8, linkno); remote_dma::destination_changed(io, aux_mutex, ddma_mutex, routing_table, destination, true); + subkernel::destination_changed(io, aux_mutex, subkernel_mutex, routing_table, destination, true); }, Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) @@ -302,7 +320,7 @@ pub mod drtio { pub fn link_thread(io: Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - ddma_mutex: &Mutex) { + ddma_mutex: &Mutex, subkernel_mutex: &Mutex) { let mut up_links = [false; csr::DRTIO.len()]; loop { for linkno in 0..csr::DRTIO.len() { @@ -310,7 +328,7 @@ pub mod drtio { if up_links[linkno as usize] { /* link was previously up */ if link_rx_up(linkno) { - process_unsolicited_aux(&io, aux_mutex, linkno, ddma_mutex); + process_unsolicited_aux(&io, aux_mutex, ddma_mutex, subkernel_mutex, linkno); process_local_errors(linkno); } else { info!("[LINK#{}] link is down", linkno); @@ -340,7 +358,7 @@ pub mod drtio { } } } - destination_survey(&io, aux_mutex, routing_table, &up_links, up_destinations, ddma_mutex); + destination_survey(&io, aux_mutex, routing_table, &up_links, up_destinations, ddma_mutex, subkernel_mutex); io.sleep(200).unwrap(); } } @@ -374,13 +392,13 @@ pub mod drtio { fn partition_data(data: &[u8], send_f: F) -> Result<(), &'static str> where F: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], bool, usize) -> Result<(), &'static str> { - let mut i = 0; + let mut i = 0; while i < data.len() { let mut slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let len: usize = if i + MASTER_PAYLOAD_MAX_SIZE < data.len() { MASTER_PAYLOAD_MAX_SIZE } else { data.len() - i } as usize; let last = i + len == data.len(); slice[..len].clone_from_slice(&data[i..i+len]); - i += len; + i += len; send_f(&slice, last, len)?; } Ok(()) @@ -483,6 +501,74 @@ pub mod drtio { Ok(remote_buffers) } + pub fn subkernel_upload(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, + id: u32, destination: u8, data: &Vec) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + partition_data(data, |slice, last, len: usize| { + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::SubkernelAddDataRequest { + id: id, destination: destination, last: last, length: len as u16, data: *slice}); + match reply { + Ok(drtioaux::Packet::SubkernelAddDataReply { succeeded: true }) => Ok(()), + Ok(drtioaux::Packet::SubkernelAddDataReply { succeeded: false }) => + Err("error adding subkernel on satellite"), + Ok(_) => Err("adding subkernel failed, unexpected aux packet"), + Err(_) => Err("adding subkernel failed, aux error") + } + }) + } + + pub fn subkernel_load(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, + id: u32, destination: u8, run: bool) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::SubkernelLoadRunRequest{ id: id, destination: destination, run: run }); + match reply { + Ok(drtioaux::Packet::SubkernelLoadRunReply { succeeded: true }) => return Ok(()), + Ok(drtioaux::Packet::SubkernelLoadRunReply { succeeded: false }) => + return Err("error on subkernel run request"), + Ok(_) => return Err("received unexpected aux packet during subkernel run"), + Err(_) => return Err("aux error on subkernel run") + } + } + + pub fn subkernel_retrieve_exception(io: &Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, destination: u8 + ) -> Result, &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let mut remote_data: Vec = Vec::new(); + loop { + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::SubkernelExceptionRequest { destination: destination }); + match reply { + Ok(drtioaux::Packet::SubkernelException { last, length, data }) => { + remote_data.extend(&data[0..length as usize]); + if last { + return Ok(remote_data); + } + }, + Ok(_) => return Err("received unexpected aux packet during subkernel exception request"), + Err(e) => return Err(e) + } + } + } + + pub fn subkernel_send_message(io: &Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, id: u32, destination: u8, message: &[u8] + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + partition_data(message, |slice, last, len: usize| { + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::SubkernelMessage { + destination: destination, id: id, last: last, length: len as u16, data: *slice}); + match reply { + Ok(drtioaux::Packet::SubkernelMessageAck { .. }) => Ok(()), + Ok(_) => Err("sending message to subkernel failed, unexpected aux packet"), + Err(_) => Err("sending message to subkernel, aux error") + } + }) + } + } #[cfg(not(has_drtio))] @@ -492,7 +578,7 @@ pub mod drtio { pub fn startup(_io: &Io, _aux_mutex: &Mutex, _routing_table: &Urc>, _up_destinations: &Urc>, - _ddma_mutex: &Mutex) {} + _ddma_mutex: &Mutex, _subkernel_mutex: &Mutex) {} pub fn reset(_io: &Io, _aux_mutex: &Mutex) {} } @@ -556,9 +642,9 @@ fn read_device_map() -> DeviceMap { pub fn startup(io: &Io, aux_mutex: &Mutex, routing_table: &Urc>, up_destinations: &Urc>, - ddma_mutex: &Mutex) { + ddma_mutex: &Mutex, subkernel_mutex: &Mutex) { set_device_map(read_device_map()); - drtio::startup(io, aux_mutex, routing_table, up_destinations, ddma_mutex); + drtio::startup(io, aux_mutex, routing_table, up_destinations, ddma_mutex, subkernel_mutex); unsafe { csr::rtio_core::reset_phy_write(1); } diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index 64e015038..4f629f512 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -1,9 +1,11 @@ use core::{mem, str, cell::{Cell, RefCell}, fmt::Write as FmtWrite}; -use alloc::{vec::Vec, string::String}; +use alloc::{vec::Vec, string::{String, ToString}}; use byteorder::{ByteOrder, NativeEndian}; use cslice::CSlice; use io::{Read, Write, Error as IoError}; +#[cfg(has_drtio)] +use io::{Cursor, ProtoRead}; use board_misoc::{ident, cache, config}; use {mailbox, rpc_queue, kernel}; use urc::Urc; @@ -12,6 +14,8 @@ use rtio_clocking; use rtio_dma::Manager as DmaManager; #[cfg(has_drtio)] use rtio_dma::remote_dma; +#[cfg(has_drtio)] +use kernel::{subkernel, subkernel::Error as SubkernelError}; use rtio_mgt::get_async_errors; use cache::Cache; use kern_hwreq; @@ -33,6 +37,11 @@ pub enum Error { ClockFailure, #[fail(display = "protocol error: {}", _0)] Protocol(#[cause] host::Error), + #[fail(display = "subkernel io error")] + SubkernelIoError, + #[cfg(has_drtio)] + #[fail(display = "subkernel error: {}", _0)] + Subkernel(#[cause] SubkernelError), #[fail(display = "{}", _0)] Unexpected(String), } @@ -55,10 +64,42 @@ impl From> for Error { } } +impl From<&str> for Error { + fn from(value: &str) -> Error { + Error::Unexpected(value.to_string()) + } +} + +impl From> for Error { + fn from(_value: io::Error) -> Error { + Error::SubkernelIoError + } +} + +#[cfg(has_drtio)] +impl From for Error { + fn from(value: SubkernelError) -> Error { + Error::Subkernel(value) + } +} + macro_rules! unexpected { ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); } +#[cfg(has_drtio)] +macro_rules! propagate_subkernel_exception { + ( $exception:ident, $stream:ident ) => { + error!("Exception in subkernel"); + match $stream { + None => return Ok(true), + Some(ref mut $stream) => { + $stream.write_all($exception)?; + } + } + } +} + // Persistent state #[derive(Debug)] struct Congress { @@ -131,6 +172,8 @@ fn host_read(reader: &mut R) -> Result> let request = host::Request::read_from(reader)?; match &request { &host::Request::LoadKernel(_) => debug!("comm<-host LoadLibrary(...)"), + &host::Request::UploadSubkernel { id, destination, kernel: _} => debug!( + "comm<-host UploadSubkernel(id: {}, destination: {}, ...)", id, destination), _ => debug!("comm<-host {:?}", request) } Ok(request) @@ -233,8 +276,8 @@ fn kern_run(session: &mut Session) -> Result<(), Error> { kern_acknowledge() } -fn process_host_message(io: &Io, - stream: &mut TcpStream, +fn process_host_message(io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mutex, _subkernel_mutex: &Mutex, + _routing_table: &drtio_routing::RoutingTable, stream: &mut TcpStream, session: &mut Session) -> Result<(), Error> { match host_read(stream)? { host::Request::SystemInfo => { @@ -245,7 +288,7 @@ fn process_host_message(io: &Io, session.congress.finished_cleanly.set(true) } - host::Request::LoadKernel(kernel) => + host::Request::LoadKernel(kernel) => { match unsafe { kern_load(io, session, &kernel) } { Ok(()) => host_write(stream, host::Reply::LoadCompleted)?, Err(error) => { @@ -254,7 +297,8 @@ fn process_host_message(io: &Io, host_write(stream, host::Reply::LoadFailed(&description))?; kern_acknowledge()?; } - }, + } + }, host::Request::RunKernel => match kern_run(session) { Ok(()) => (), @@ -323,6 +367,23 @@ fn process_host_message(io: &Io, session.kernel_state = KernelState::Running } + + host::Request::UploadSubkernel { id: _id, destination: _dest, kernel: _kernel } => { + #[cfg(has_drtio)] + { + subkernel::add_subkernel(io, _subkernel_mutex, _id, _dest, _kernel); + match subkernel::upload(io, _aux_mutex, _subkernel_mutex, _routing_table, _id) { + Ok(_) => host_write(stream, host::Reply::LoadCompleted)?, + Err(error) => { + let mut description = String::new(); + write!(&mut description, "{}", error).unwrap(); + host_write(stream, host::Reply::LoadFailed(&description))? + } + } + } + #[cfg(not(has_drtio))] + host_write(stream, host::Reply::LoadFailed("No DRTIO on this system, subkernels are not supported"))? + } } Ok(()) @@ -331,7 +392,7 @@ fn process_host_message(io: &Io, fn process_kern_message(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - ddma_mutex: &Mutex, mut stream: Option<&mut TcpStream>, + ddma_mutex: &Mutex, _subkernel_mutex: &Mutex, mut stream: Option<&mut TcpStream>, session: &mut Session) -> Result> { kern_recv_notrace(io, |request| { match (request, session.kernel_state) { @@ -510,6 +571,111 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, } } } + #[cfg(has_drtio)] + &kern::SubkernelLoadRunRequest { id, run } => { + let succeeded = match subkernel::load( + io, aux_mutex, _subkernel_mutex, routing_table, id, run) { + Ok(()) => true, + Err(e) => { error!("Error loading subkernel: {}", e); false } + }; + kern_send(io, &kern::SubkernelLoadRunReply { succeeded: succeeded }) + } + #[cfg(has_drtio)] + &kern::SubkernelAwaitFinishRequest{ id, timeout } => { + let res = subkernel::await_finish(io, aux_mutex, _subkernel_mutex, routing_table, + id, timeout); + let status = match res { + Ok(ref res) => { + if res.comm_lost { + kern::SubkernelStatus::CommLost + } else if let Some(exception) = &res.exception { + propagate_subkernel_exception!(exception, stream); + // will not be called after exception is served + kern::SubkernelStatus::OtherError + } else { + kern::SubkernelStatus::NoError + } + }, + Err(SubkernelError::Timeout) => kern::SubkernelStatus::Timeout, + Err(SubkernelError::IncorrectState) => kern::SubkernelStatus::IncorrectState, + Err(_) => kern::SubkernelStatus::OtherError + }; + kern_send(io, &kern::SubkernelAwaitFinishReply { status: status }) + } + #[cfg(has_drtio)] + &kern::SubkernelMsgSend { id, count, tag, data } => { + subkernel::message_send(io, aux_mutex, _subkernel_mutex, routing_table, id, count, tag, data)?; + kern_acknowledge() + } + #[cfg(has_drtio)] + &kern::SubkernelMsgRecvRequest { id, timeout } => { + let message_received = subkernel::message_await(io, _subkernel_mutex, id, timeout); + let (status, count) = match message_received { + Ok(ref message) => (kern::SubkernelStatus::NoError, message.tag_count), + Err(SubkernelError::Timeout) => (kern::SubkernelStatus::Timeout, 0), + Err(SubkernelError::IncorrectState) => (kern::SubkernelStatus::IncorrectState, 0), + Err(SubkernelError::SubkernelFinished) => { + let res = subkernel::retrieve_finish_status(io, aux_mutex, _subkernel_mutex, + routing_table, id)?; + if res.comm_lost { + (kern::SubkernelStatus::CommLost, 0) + } else if let Some(exception) = &res.exception { + propagate_subkernel_exception!(exception, stream); + (kern::SubkernelStatus::OtherError, 0) + } else { + (kern::SubkernelStatus::OtherError, 0) + } + } + Err(_) => (kern::SubkernelStatus::OtherError, 0) + }; + kern_send(io, &kern::SubkernelMsgRecvReply { status: status, count: count })?; + if let Ok(message) = message_received { + // receive code almost identical to RPC recv, except we are not reading from a stream + let mut reader = Cursor::new(message.data); + let mut tag: [u8; 1] = [message.tag]; + let mut i = 0; + loop { + // kernel has to consume all arguments in the whole message + let slot = kern_recv(io, |reply| { + match reply { + &kern::RpcRecvRequest(slot) => Ok(slot), + other => unexpected!( + "expected root value slot from kernel CPU, not {:?}", other) + } + })?; + let res = rpc::recv_return(&mut reader, &tag, slot, &|size| -> Result<_, Error> { + if size == 0 { + return Ok(0 as *mut ()) + } + kern_send(io, &kern::RpcRecvReply(Ok(size)))?; + Ok(kern_recv(io, |reply| { + match reply { + &kern::RpcRecvRequest(slot) => Ok(slot), + other => unexpected!( + "expected nested value slot from kernel CPU, not {:?}", other) + } + })?) + }); + match res { + Ok(_) => kern_send(io, &kern::RpcRecvReply(Ok(0)))?, + Err(_) => unexpected!("expected valid subkernel message data") + }; + i += 1; + if i < message.tag_count { + // update the tag for next read + tag[0] = reader.read_u8()?; + } else { + // should be done by then + break; + } + } + Ok(()) + } else { + // if timed out, no data has been received, exception should be raised by kernel + Ok(()) + } + }, + request => unexpected!("unexpected request {:?} from kernel CPU", request) }.and(Ok(false)) }) @@ -530,13 +696,17 @@ fn process_kern_queued_rpc(stream: &mut TcpStream, fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - ddma_mutex: &Mutex, stream: &mut TcpStream, + ddma_mutex: &Mutex, subkernel_mutex: &Mutex, + stream: &mut TcpStream, congress: &mut Congress) -> Result<(), Error> { let mut session = Session::new(congress); + #[cfg(has_drtio)] + subkernel::clear_subkernels(&io, &subkernel_mutex); loop { if stream.can_recv() { - process_host_message(io, stream, &mut session)? + process_host_message(io, aux_mutex, ddma_mutex, subkernel_mutex, + routing_table, stream, &mut session)? } else if !stream.may_recv() { return Ok(()) } @@ -548,7 +718,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, if mailbox::receive() != 0 { process_kern_message(io, aux_mutex, routing_table, up_destinations, - ddma_mutex, + ddma_mutex, subkernel_mutex, Some(stream), &mut session)?; } @@ -566,7 +736,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - ddma_mutex: &Mutex, congress: &mut Congress, + ddma_mutex: &Mutex, subkernel_mutex: &Mutex, congress: &mut Congress, config_key: &str) -> Result<(), Error> { let mut session = Session::new(congress); @@ -588,7 +758,7 @@ fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex, } if mailbox::receive() != 0 { - if process_kern_message(io, aux_mutex, routing_table, up_destinations, ddma_mutex, None, &mut session)? { + if process_kern_message(io, aux_mutex, routing_table, up_destinations, ddma_mutex, subkernel_mutex, None, &mut session)? { return Ok(()) } } @@ -619,7 +789,7 @@ fn respawn(io: &Io, handle: &mut Option, f: F) pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc>, up_destinations: &Urc>, - ddma_mutex: &Mutex) { + ddma_mutex: &Mutex, subkernel_mutex: &Mutex) { let listener = TcpListener::new(&io, 65535); listener.listen(1381).expect("session: cannot listen"); info!("accepting network sessions"); @@ -628,9 +798,11 @@ pub fn thread(io: Io, aux_mutex: &Mutex, let mut kernel_thread = None; { + let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); info!("running startup kernel"); - match flash_kernel_worker(&io, &aux_mutex, &routing_table.borrow(), &up_destinations, ddma_mutex, &mut congress, "startup_kernel") { + match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, + ddma_mutex, subkernel_mutex, &mut congress, "startup_kernel") { Ok(()) => info!("startup kernel finished"), Err(Error::KernelNotFound) => @@ -671,12 +843,14 @@ pub fn thread(io: Io, aux_mutex: &Mutex, let up_destinations = up_destinations.clone(); let congress = congress.clone(); let ddma_mutex = ddma_mutex.clone(); + let subkernel_mutex = subkernel_mutex.clone(); let stream = stream.into_handle(); respawn(&io, &mut kernel_thread, move |io| { let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); let mut stream = TcpStream::from_handle(&io, stream); - match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &mut stream, &mut *congress) { + match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, + &ddma_mutex, &subkernel_mutex, &mut stream, &mut *congress) { Ok(()) => (), Err(Error::Protocol(host::Error::Io(IoError::UnexpectedEnd))) => info!("connection closed"), @@ -689,6 +863,8 @@ pub fn thread(io: Io, aux_mutex: &Mutex, } } stream.close().expect("session: close socket"); + #[cfg(has_drtio)] + subkernel::clear_subkernels(&io, &subkernel_mutex); }); } @@ -700,10 +876,12 @@ pub fn thread(io: Io, aux_mutex: &Mutex, let up_destinations = up_destinations.clone(); let congress = congress.clone(); let ddma_mutex = ddma_mutex.clone(); + let subkernel_mutex = subkernel_mutex.clone(); respawn(&io, &mut kernel_thread, move |io| { let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); - match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &mut *congress, "idle_kernel") { + match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, + &ddma_mutex, &subkernel_mutex, &mut *congress, "idle_kernel") { Ok(()) => info!("idle kernel finished, standing by"), Err(Error::Protocol(host::Error::Io(