use device endian for core device protocols (#1591)

pull/1593/head
pca006132 2021-01-22 16:33:21 +08:00 committed by GitHub
parent 1e443a3aea
commit 8148fdb8a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 67 additions and 22 deletions

View File

@ -59,7 +59,8 @@ Breaking changes:
unchanged with the new implementation, but the behavior might differ slightly in some
cases (for instance, non-rectangular arrays are not currently supported).
* ``quamash`` has been replaced with ``qasync``.
* Protocols are updated to use device endian.
* Analyzer dump format includes a byte for device endianness.
ARTIQ-5
-------

View File

@ -90,7 +90,17 @@ DecodedDump = namedtuple(
def decode_dump(data):
parts = struct.unpack(">IQbbb", data[:15])
# extract endian byte
if data[0] == ord('E'):
endian = '>'
elif data[0] == ord('e'):
endian = '<'
else:
raise ValueError
data = data[1:]
# only header is device endian
# messages are big endian
parts = struct.unpack(endian + "IQbbb", data[:15])
(sent_bytes, total_byte_count,
error_occured, log_channel, dds_onehot_sel) = parts

View File

@ -173,22 +173,13 @@ class CommKernelDummy:
class CommKernel:
warned_of_mismatch = False
def __init__(self, host, endian='>', port=1381):
self.endian = endian
def __init__(self, host, port=1381):
self._read_type = None
self.host = host
self.port = port
self.read_buffer = bytearray()
self.write_buffer = bytearray()
self.unpack_int32 = struct.Struct(endian + "l").unpack
self.unpack_int64 = struct.Struct(endian + "q").unpack
self.unpack_float64 = struct.Struct(endian + "d").unpack
self.pack_header = struct.Struct(endian + "lB").pack
self.pack_int32 = struct.Struct(endian + "l").pack
self.pack_int64 = struct.Struct(endian + "q").pack
self.pack_float64 = struct.Struct(endian + "d").pack
def open(self):
if hasattr(self, "socket"):
@ -196,6 +187,21 @@ class CommKernel:
self.socket = socket.create_connection((self.host, self.port))
logger.debug("connected to %s:%d", self.host, self.port)
self.socket.sendall(b"ARTIQ coredev\n")
endian = self._read(1)
if endian == b"e":
self.endian = "<"
elif endian == b"E":
self.endian = ">"
else:
raise IOError("Incorrect reply from device: expected e/E.")
self.unpack_int32 = struct.Struct(self.endian + "l").unpack
self.unpack_int64 = struct.Struct(self.endian + "q").unpack
self.unpack_float64 = struct.Struct(self.endian + "d").unpack
self.pack_header = struct.Struct(self.endian + "lB").pack
self.pack_int32 = struct.Struct(self.endian + "l").pack
self.pack_int64 = struct.Struct(self.endian + "q").pack
self.pack_float64 = struct.Struct(self.endian + "d").pack
def close(self):
if not hasattr(self, "socket"):

View File

@ -63,6 +63,13 @@ class CommMgmt:
self.socket = socket.create_connection((self.host, self.port))
logger.debug("connected to %s:%d", self.host, self.port)
self.socket.sendall(b"ARTIQ management\n")
endian = self._read(1)
if endian == b"e":
self.endian = "<"
elif endian == b"E":
self.endian = ">"
else:
raise IOError("Incorrect reply from device: expected e/E.")
def close(self):
if not hasattr(self, "socket"):
@ -86,7 +93,7 @@ class CommMgmt:
self._write(struct.pack("B", value))
def _write_int32(self, value):
self._write(struct.pack(">l", value))
self._write(struct.pack(self.endian + "l", value))
def _write_bytes(self, value):
self._write_int32(len(value))
@ -116,7 +123,7 @@ class CommMgmt:
format(self._read_type, ty))
def _read_int32(self):
(value, ) = struct.unpack(">l", self._read(4))
(value, ) = struct.unpack(self.endian + "l", self._read(4))
return value
def _read_bytes(self):

View File

@ -31,6 +31,14 @@ class CommMonInj:
self._reader, self._writer = await asyncio.open_connection(host, port)
try:
self._writer.write(b"ARTIQ moninj\n")
# get device endian
endian = await self._reader.read(1)
if endian == b"e":
self.endian = "<"
elif endian == b"E":
self.endian = ">"
else:
raise IOError("Incorrect reply from device: expected e/E.")
self._receive_task = asyncio.ensure_future(self._receive_cr())
except:
self._writer.close()
@ -52,19 +60,19 @@ class CommMonInj:
del self._writer
def monitor_probe(self, enable, channel, probe):
packet = struct.pack(">bblb", 0, enable, channel, probe)
packet = struct.pack(self.endian + "bblb", 0, enable, channel, probe)
self._writer.write(packet)
def monitor_injection(self, enable, channel, overrd):
packet = struct.pack(">bblb", 3, enable, channel, overrd)
packet = struct.pack(self.endian + "bblb", 3, enable, channel, overrd)
self._writer.write(packet)
def inject(self, channel, override, value):
packet = struct.pack(">blbb", 1, channel, override, value)
packet = struct.pack(self.endian + "blbb", 1, channel, override, value)
self._writer.write(packet)
def get_injection_status(self, channel, override):
packet = struct.pack(">blb", 2, channel, override)
packet = struct.pack(self.endian + "blb", 2, channel, override)
self._writer.write(packet)
async def _receive_cr(self):
@ -75,11 +83,13 @@ class CommMonInj:
return
if ty == b"\x00":
payload = await self._reader.read(9)
channel, probe, value = struct.unpack(">lbl", payload)
channel, probe, value = struct.unpack(
self.endian + "lbl", payload)
self.monitor_cb(channel, probe, value)
elif ty == b"\x01":
payload = await self._reader.read(6)
channel, override, value = struct.unpack(">lbb", payload)
channel, override, value = struct.unpack(
self.endian + "lbb", payload)
self.injection_status_cb(channel, override, value)
else:
raise ValueError("Unknown packet type", ty)

View File

@ -84,8 +84,7 @@ class Core:
if host is None:
self.comm = CommKernelDummy()
else:
endian = "<" if self.target_cls.little_endian else ">"
self.comm = CommKernel(host, endian)
self.comm = CommKernel(host)
self.first_run = True
self.dmgr = dmgr

View File

@ -51,6 +51,7 @@ fn worker(stream: &mut TcpStream) -> Result<(), IoError<SchedError>> {
};
debug!("{:?}", header);
stream.write_all("E".as_bytes())?;
header.write_to(stream)?;
if wraparound {
stream.write_all(&data[pointer..])?;

View File

@ -15,6 +15,7 @@ impl From<SchedError> for Error<SchedError> {
fn worker(io: &Io, stream: &mut TcpStream) -> Result<(), Error<SchedError>> {
read_magic(stream)?;
Write::write_all(stream, "E".as_bytes())?;
info!("new connection from {}", stream.remote_endpoint());
loop {

View File

@ -2,6 +2,7 @@ use alloc::btree_map::BTreeMap;
use core::cell::RefCell;
use io::Error as IoError;
use io::Write;
use moninj_proto::*;
use sched::{Io, Mutex, TcpListener, TcpStream, Error as SchedError};
use urc::Urc;
@ -122,6 +123,7 @@ fn connection_worker(io: &Io, _aux_mutex: &Mutex, _routing_table: &drtio_routing
let mut next_check = 0;
read_magic(&mut stream)?;
stream.write_all("E".as_bytes())?;
info!("new connection from {}", stream.remote_endpoint());
loop {

View File

@ -615,6 +615,14 @@ pub fn thread(io: Io, aux_mutex: &Mutex,
continue
}
}
match stream.write_all("E".as_bytes()) {
Ok(()) => (),
Err(_) => {
warn!("cannot send endian byte");
stream.close().expect("session: cannot close");
continue
}
}
info!("new connection from {}", stream.remote_endpoint());
let aux_mutex = aux_mutex.clone();