From 4b52aa7099bd53c79190d135ba0bf0ef13619fb0 Mon Sep 17 00:00:00 2001 From: occheung Date: Tue, 15 Sep 2020 12:17:42 +0800 Subject: [PATCH] mqtt: added mqtt to scpi conv --- Cargo.toml | 3 +- examples/ethernet.rs | 33 ++--------------- examples/mqtt_client.rs | 81 +++++++++++++++++++++++++++++++++++++---- src/lib.rs | 2 + src/scpi.rs | 37 +++++++++++++++++++ src/translation.rs | 28 ++++++++++++++ 6 files changed, 146 insertions(+), 38 deletions(-) create mode 100644 src/translation.rs diff --git a/Cargo.toml b/Cargo.toml index d007502..8263d6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,9 @@ smoltcp = { version = "0.6.0", default-features = false, features = [ "ethernet" nb = "1.0.0" embedded-nal = "0.1.0" -minimq = "0.1.0" +minimq = { git = "https://github.com/quartiq/minimq.git", branch = "master" } heapless = "0.5.5" +arrayvec = { version = "0.5.1", default-features = false, features = ["array-sizes-33-128", "array-sizes-129-255"] } # Logging and Panicking panic-itm = "0.4.1" diff --git a/examples/ethernet.rs b/examples/ethernet.rs index 0ebbcc2..41b97bf 100644 --- a/examples/ethernet.rs +++ b/examples/ethernet.rs @@ -54,8 +54,7 @@ use firmware::{ ClockSourceCommand, ClockDivisionCommand, }, - Urukul, - scpi_root, recursive_scpi_tree + Urukul, scpi_root, recursive_scpi_tree, scpi_tree }; use scpi::prelude::*; use scpi::ieee488::commands::*; @@ -267,34 +266,8 @@ fn main() -> ! { .ip_addrs(&mut ip_addrs[..]) .finalize(); - // SCPI configs - let tree = scpi_root!( - ["EXAMple"] => { - "HELLO" => { - "WORLD" => HelloWorldCommand - } - }, - "CHANNEL0" => { - "SWitch" => Channel0SwitchCommand, - "Attenuation" => Channel0AttenuationCommand - }, - "CHANNEL1" => { - "SWitch" => Channel1SwitchCommand, - "Attenuation" => Channel1AttenuationCommand - }, - "CHANNEL2" => { - "SWitch" => Channel2SwitchCommand, - "Attenuation" => Channel2AttenuationCommand - }, - "CHANNEL3" => { - "SWitch" => Channel3SwitchCommand, - "Attenuation" => Channel3AttenuationCommand - }, - "CLOCK" => { - "SOURCE" => ClockSourceCommand, - "DIVision" => ClockDivisionCommand - } - ); + // SCPI configs: migrated to scpi.rs, not meant to be touched + let tree = scpi_tree!(); // Device was declared in prior let mut errors = ArrayErrorQueue::<[Error; 10]>::new(); diff --git a/examples/mqtt_client.rs b/examples/mqtt_client.rs index 367aa12..aa1681c 100644 --- a/examples/mqtt_client.rs +++ b/examples/mqtt_client.rs @@ -23,12 +23,50 @@ use rtic::cyccnt::{Instant, U32Ext}; use minimq::{ embedded_nal::{IpAddr, Ipv4Addr, TcpStack, SocketAddr, Mode}, - MqttClient, QoS, + MqttClient, QoS }; use firmware::nal_tcp_client::{NetworkStack, NetStorage, NetworkInterface}; -use firmware::{Urukul}; -use firmware::cpld::{CPLD}; +use firmware::{ + cpld::{ + CPLD, + }, + scpi::{ + HelloWorldCommand, + Channel0SwitchCommand, + Channel1SwitchCommand, + Channel2SwitchCommand, + Channel3SwitchCommand, + Channel0AttenuationCommand, + Channel1AttenuationCommand, + Channel2AttenuationCommand, + Channel3AttenuationCommand, + ClockSourceCommand, + ClockDivisionCommand, + }, + Urukul, scpi_root, recursive_scpi_tree, scpi_tree +}; +use firmware::translation::MqttScpiTranslator; + +use scpi::prelude::*; +use scpi::ieee488::commands::*; +use scpi::scpi::commands::*; +use scpi::{ + ieee488_cls, + ieee488_ese, + ieee488_esr, + ieee488_idn, + ieee488_opc, + ieee488_rst, + ieee488_sre, + ieee488_stb, + ieee488_tst, + ieee488_wai, + scpi_crate_version, + scpi_status, + scpi_system, +}; +use scpi::Context; #[path = "util/logger.rs"] mod logger; @@ -169,7 +207,7 @@ fn main() -> ! { let cpld = CPLD::new(spi, (cs0, cs1, cs2), io_update); let parts = cpld.split(); - let urukul = Urukul::new( + let mut urukul = Urukul::new( parts.spi1, parts.spi2, parts.spi3, parts.spi4, parts.spi5, parts.spi6, parts.spi7, [25_000_000, 25_000_000, 25_000_000, 25_000_000] ); @@ -177,6 +215,17 @@ fn main() -> ! { cp.SCB.invalidate_icache(); cp.SCB.enable_icache(); + // Define SCPI tree + let tree = scpi_tree!(); + + // Device was declared in prior + let mut errors = ArrayErrorQueue::<[Error; 10]>::new(); + let mut context = Context::new(&mut urukul, &mut errors, tree); + + //Response bytebuffer + let mut buf = ArrayVecFormatter::<[u8; 256]>::new(); + // SCPI configs END + // Time unit in ms let mut time: u32 = 0; @@ -190,13 +239,19 @@ fn main() -> ! { add_socket!(sockets, rx_storage, tx_storage); let tcp_stack = NetworkStack::new(&mut net_interface, sockets); + + // Case dealt: Ethernet connection break down, neither side has timeout + // Limitation: Timeout inequality will cause TCP socket state to desync + // Probably fixed in latest smoltcp commit let mut client = MqttClient::::new( IpAddr::V4(Ipv4Addr::new(192, 168, 1, 125)), - "nucleo", + "Nucleo", tcp_stack, ) .unwrap(); + let mut tick = false; + let mut has_subscribed = false; loop { // Update time accumulator in ms @@ -207,15 +262,27 @@ fn main() -> ! { next_ms += 400_000.cycles(); } - // Poll if necessary + // eth Poll if necessary + // Do not poll if eth link is down if tick && client.network_stack.update_delay(time) == 0 && eth_mac.phy_poll_link() { client.network_stack.update(time); } let connection = client .poll(|_client, topic, message, _properties| match topic { - _ => info!("On '{:?}', received: {:?}", topic, message), + topic => { + info!("On '{:?}', received: {:?}", topic, message); + context.run_with_mqtt(topic.as_bytes(), &mut buf); + }, }).is_ok(); + + if connection && !has_subscribed && tick { + match client.subscribe("Urukul/Control", &[]) { + Ok(()) => has_subscribed = true, + Err(minimq::Error::NotReady) => {}, + e => warn!("{:?}", e), + }; + } if connection && tick && (time % 3000) == 0 { info!("Feedback from print publish: {:?}", client diff --git a/src/lib.rs b/src/lib.rs index 3b309a9..9482a59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ use crate::dds::DDS; pub mod scpi; +pub mod translation; + pub mod nal_tcp_client; /* diff --git a/src/scpi.rs b/src/scpi.rs index 9df1f55..ea2fd43 100644 --- a/src/scpi.rs +++ b/src/scpi.rs @@ -110,6 +110,43 @@ macro_rules! scpi_root { }; } +#[macro_export] +macro_rules! scpi_tree { + () => { + scpi_root!( + ["Control"] => { + ["Urukul"] => { + "CHANNEL0" => { + "SWitch" => Channel0SwitchCommand, + "Attenuation" => Channel0AttenuationCommand + }, + "CHANNEL1" => { + "SWitch" => Channel1SwitchCommand, + "Attenuation" => Channel1AttenuationCommand + }, + "CHANNEL2" => { + "SWitch" => Channel2SwitchCommand, + "Attenuation" => Channel2AttenuationCommand + }, + "CHANNEL3" => { + "SWitch" => Channel3SwitchCommand, + "Attenuation" => Channel3AttenuationCommand + }, + "CLOCK" => { + "SOURCE" => ClockSourceCommand, + "DIVision" => ClockDivisionCommand + } + } + }, + ["EXAMple"] => { + "HELLO" => { + "WORLD" => HelloWorldCommand + } + } + ); + }; +} + pub struct HelloWorldCommand {} impl Command for HelloWorldCommand { qonly!(); diff --git a/src/translation.rs b/src/translation.rs new file mode 100644 index 0000000..fd75175 --- /dev/null +++ b/src/translation.rs @@ -0,0 +1,28 @@ +use scpi::prelude::*; +use scpi::Context; +use scpi::error::Result; + +use arrayvec::{ArrayVec}; + +pub trait MqttScpiTranslator { + fn run_with_mqtt(&mut self, s: &[u8], response: &mut FMT) -> Result<()>; +} + +impl<'a, T: Device> MqttScpiTranslator for Context<'a, T> { + fn run_with_mqtt(&mut self, s: &[u8], response: &mut FMT) -> Result<()> + where + FMT: Formatter, + { + let mut array_vec = ArrayVec::<[u8; 1024]>::new(); + for i in s.into_iter() { + if *i == b'/' { + array_vec.try_push(b'/') + .map_err(|_| ErrorCode::OutOfMemory)?; + } else { + array_vec.try_push(*i) + .map_err(|_| ErrorCode::OutOfMemory)?; + } + } + self.run(array_vec.as_slice(), response) + } +} \ No newline at end of file