From 557bc4bb56a2771f57f32fed425ddcc837da0caf Mon Sep 17 00:00:00 2001 From: whitequark Date: Thu, 6 Oct 2016 17:25:43 +0000 Subject: [PATCH] runtime: remove unnecessary buffering. --- artiq/coredevice/comm_generic.py | 76 +++++-------------- artiq/runtime.rs/src/session.rs | 36 ++++----- artiq/runtime.rs/src/session_proto.rs | 102 +++++++++++--------------- 3 files changed, 75 insertions(+), 139 deletions(-) diff --git a/artiq/coredevice/comm_generic.py b/artiq/coredevice/comm_generic.py index 1d42a12c1..3163b1ba8 100644 --- a/artiq/coredevice/comm_generic.py +++ b/artiq/coredevice/comm_generic.py @@ -20,7 +20,7 @@ class _H2DMsgType(Enum): IDENT_REQUEST = 3 SWITCH_CLOCK = 4 - LOAD_LIBRARY = 5 + LOAD_KERNEL = 5 RUN_KERNEL = 6 RPC_REPLY = 7 @@ -68,9 +68,7 @@ RPCKeyword = namedtuple('RPCKeyword', ['name', 'value']) class CommGeneric: def __init__(self): - self._read_type = self._write_type = None - self._read_length = 0 - self._write_buffer = [] + self._read_type = None def open(self): """Opens the communication channel. @@ -99,10 +97,6 @@ class CommGeneric: def _read_header(self): self.open() - if self._read_length > 0: - raise IOError("Read underrun ({} bytes remaining)". - format(self._read_length)) - # Wait for a synchronization sequence, 5a 5a 5a 5a. sync_count = 0 while sync_count < 4: @@ -113,20 +107,11 @@ class CommGeneric: sync_count = 0 # Read message header. - (self._read_length, ) = struct.unpack(">l", self.read(4)) - if not self._read_length: # inband connection close - raise OSError("Connection closed") - (raw_type, ) = struct.unpack("B", self.read(1)) self._read_type = _D2HMsgType(raw_type) - if self._read_length < 9: - raise IOError("Read overrun in message header ({} remaining)". - format(self._read_length)) - self._read_length -= 9 - - logger.debug("receiving message: type=%r length=%d", - self._read_type, self._read_length) + logger.debug("receiving message: type=%r", + self._read_type) def _read_expect(self, ty): if self._read_type != ty: @@ -138,12 +123,6 @@ class CommGeneric: self._read_expect(ty) def _read_chunk(self, length): - if self._read_length < length: - raise IOError("Read overrun while trying to read {} bytes ({} remaining)" - " in packet {}". - format(length, self._read_length, self._read_type)) - - self._read_length -= length return self.read(length) def _read_int8(self): @@ -175,43 +154,32 @@ class CommGeneric: def _write_header(self, ty): self.open() - logger.debug("preparing to send message: type=%r", ty) - self._write_type = ty - self._write_buffer = [] + logger.debug("sending message: type=%r", ty) - def _write_flush(self): - # Calculate message size. - length = sum([len(chunk) for chunk in self._write_buffer]) - logger.debug("sending message: type=%r length=%d", self._write_type, length) - - # Write synchronization sequence, header and body. - self.write(struct.pack(">llB", 0x5a5a5a5a, - 9 + length, self._write_type.value)) - for chunk in self._write_buffer: - self.write(chunk) + # Write synchronization sequence and header. + self.write(struct.pack(">lB", 0x5a5a5a5a, ty.value)) def _write_empty(self, ty): self._write_header(ty) - self._write_flush() def _write_chunk(self, chunk): - self._write_buffer.append(chunk) + self.write(chunk) def _write_int8(self, value): - self._write_buffer.append(struct.pack("B", value)) + self.write(struct.pack("B", value)) def _write_int32(self, value): - self._write_buffer.append(struct.pack(">l", value)) + self.write(struct.pack(">l", value)) def _write_int64(self, value): - self._write_buffer.append(struct.pack(">q", value)) + self.write(struct.pack(">q", value)) def _write_float64(self, value): - self._write_buffer.append(struct.pack(">d", value)) + self.write(struct.pack(">d", value)) def _write_bytes(self, value): self._write_int32(len(value)) - self._write_buffer.append(value) + self.write(value) def _write_string(self, value): self._write_bytes(value.encode("utf-8")) @@ -232,7 +200,7 @@ class CommGeneric: if runtime_id != b"AROR": raise UnsupportedDevice("Unsupported runtime ID: {}" .format(runtime_id)) - gateware_version = self._read_chunk(self._read_length).decode("utf-8") + gateware_version = self._read_string() if gateware_version != software_version and \ gateware_version + ".dirty" != software_version: logger.warning("Mismatch between gateware (%s) " @@ -242,7 +210,6 @@ class CommGeneric: def switch_clock(self, external): self._write_header(_H2DMsgType.SWITCH_CLOCK) self._write_int8(external) - self._write_flush() self._read_empty(_D2HMsgType.CLOCK_SWITCH_COMPLETED) @@ -251,7 +218,7 @@ class CommGeneric: self._read_header() self._read_expect(_D2HMsgType.LOG_REPLY) - return self._read_chunk(self._read_length).decode("utf-8", "replace") + return self._read_string() def clear_log(self): self._write_empty(_H2DMsgType.LOG_CLEAR) @@ -261,17 +228,15 @@ class CommGeneric: def flash_storage_read(self, key): self._write_header(_H2DMsgType.FLASH_READ_REQUEST) self._write_string(key) - self._write_flush() self._read_header() self._read_expect(_D2HMsgType.FLASH_READ_REPLY) - return self._read_chunk(self._read_length) + return self._read_string() def flash_storage_write(self, key, value): self._write_header(_H2DMsgType.FLASH_WRITE_REQUEST) self._write_string(key) self._write_bytes(value) - self._write_flush() self._read_header() if self._read_type == _D2HMsgType.FLASH_ERROR_REPLY: @@ -287,14 +252,12 @@ class CommGeneric: def flash_storage_remove(self, key): self._write_header(_H2DMsgType.FLASH_REMOVE_REQUEST) self._write_string(key) - self._write_flush() self._read_empty(_D2HMsgType.FLASH_OK_REPLY) def load(self, kernel_library): - self._write_header(_H2DMsgType.LOAD_LIBRARY) - self._write_chunk(kernel_library) - self._write_flush() + self._write_header(_H2DMsgType.LOAD_KERNEL) + self._write_bytes(kernel_library) self._read_empty(_D2HMsgType.LOAD_COMPLETED) @@ -460,7 +423,6 @@ class CommGeneric: self._write_header(_H2DMsgType.RPC_REPLY) self._write_bytes(return_tags) self._send_rpc_value(bytearray(return_tags), result, result, service) - self._write_flush() except Exception as exn: logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn) @@ -503,8 +465,6 @@ class CommGeneric: self._write_int32(-1) # column not known self._write_string(function) - self._write_flush() - def _serve_exception(self, embedding_map, symbolizer, demangler): name = self._read_string() message = self._read_string() diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index 5b135b226..bbdf30f48 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -233,31 +233,29 @@ fn process_host_message(waiter: Waiter, Err(_) => host_write(stream, host::Reply::KernelStartupFailed) }, - host::Request::RpcReply { tag, data } => { + host::Request::RpcReply { tag } => { if session.kernel_state != KernelState::RpcWait { unexpected!("unsolicited RPC reply") } - try!(kern_recv(waiter, |reply| { + let slot = try!(kern_recv(waiter, |reply| { match reply { - kern::RpcRecvRequest { slot } => { - let mut data = io::Cursor::new(data); - rpc::recv_return(&mut data, &tag, slot, &|size| { - try!(kern_send(waiter, kern::RpcRecvReply { - alloc_size: size, exception: None - })); - kern_recv(waiter, |reply| { - match reply { - kern::RpcRecvRequest { slot } => Ok(slot), - _ => unreachable!() - } - }) - }) - } + kern::RpcRecvRequest { slot } => Ok(slot), other => unexpected!("unexpected reply from kernel CPU: {:?}", other) } })); + try!(rpc::recv_return(stream, &tag, slot, &|size| { + try!(kern_send(waiter, kern::RpcRecvReply { + alloc_size: size, exception: None + })); + kern_recv(waiter, |reply| { + match reply { + kern::RpcRecvRequest { slot } => Ok(slot), + _ => unreachable!() + } + }) + })); try!(kern_send(waiter, kern::RpcRecvReply { alloc_size: 0, exception: None })); session.kernel_state = KernelState::Running; @@ -352,12 +350,10 @@ fn process_kern_message(waiter: Waiter, match stream { None => unexpected!("unexpected RPC in flash kernel"), Some(ref mut stream) => { - let mut buf = Vec::new(); - try!(rpc::send_args(&mut buf, tag, data)); try!(host_write(stream, host::Reply::RpcRequest { - service: service, - data: &buf[..] + service: service })); + try!(rpc::send_args(stream, tag, data)); if !batch { session.kernel_state = KernelState::RpcWait } diff --git a/artiq/runtime.rs/src/session_proto.rs b/artiq/runtime.rs/src/session_proto.rs index b9f7d92fc..7babdc35a 100644 --- a/artiq/runtime.rs/src/session_proto.rs +++ b/artiq/runtime.rs/src/session_proto.rs @@ -26,7 +26,7 @@ pub enum Request { LoadKernel(Vec), RunKernel, - RpcReply { tag: Vec, data: Vec }, + RpcReply { tag: Vec }, RpcException { name: String, message: String, @@ -45,29 +45,17 @@ pub enum Request { impl Request { pub fn read_from(reader: &mut Read) -> io::Result { - const HEADER_SIZE: usize = 9; - try!(read_sync(reader)); - let length = try!(read_u32(reader)) as usize; - let ty = try!(read_u8(reader)); - - Ok(match ty { + Ok(match try!(read_u8(reader)) { 1 => Request::Log, 2 => Request::LogClear, 3 => Request::Ident, 4 => Request::SwitchClock(try!(read_u8(reader))), - 5 => { - let mut code = vec![0; length - HEADER_SIZE]; - try!(reader.read_exact(&mut code)); - Request::LoadKernel(code) - } + 5 => Request::LoadKernel(try!(read_bytes(reader))), 6 => Request::RunKernel, - 7 => { - let tag = try!(read_bytes(reader)); - let mut data = vec![0; length - HEADER_SIZE - 4 - tag.len()]; - try!(reader.read_exact(&mut data)); - Request::RpcReply { tag: tag, data: data } - } + 7 => Request::RpcReply { + tag: try!(read_bytes(reader)) + }, 8 => Request::RpcException { name: try!(read_string(reader)), message: try!(read_string(reader)), @@ -119,7 +107,7 @@ pub enum Reply<'a> { backtrace: &'a [usize] }, - RpcRequest { service: u32, data: &'a [u8] }, + RpcRequest { service: u32 }, FlashRead(&'a [u8]), FlashOk, @@ -131,88 +119,80 @@ pub enum Reply<'a> { impl<'a> Reply<'a> { pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { - let mut buf = Vec::new(); - try!(write_sync(&mut buf)); - try!(write_u32(&mut buf, 0)); // length placeholder - + try!(write_sync(writer)); match *self { Reply::Log(ref log) => { - try!(write_u8(&mut buf, 1)); - try!(buf.write(log.as_bytes())); + try!(write_u8(writer, 1)); + try!(write_string(writer, log)); }, Reply::Ident(ident) => { - try!(write_u8(&mut buf, 2)); - try!(buf.write(b"AROR")); - try!(buf.write(ident.as_bytes())); + try!(write_u8(writer, 2)); + try!(writer.write(b"AROR")); + try!(write_string(writer, ident)); }, Reply::ClockSwitchCompleted => { - try!(write_u8(&mut buf, 3)); + try!(write_u8(writer, 3)); }, Reply::ClockSwitchFailed => { - try!(write_u8(&mut buf, 4)); + try!(write_u8(writer, 4)); }, Reply::LoadCompleted => { - try!(write_u8(&mut buf, 5)); + try!(write_u8(writer, 5)); }, Reply::LoadFailed => { - try!(write_u8(&mut buf, 6)); + try!(write_u8(writer, 6)); }, Reply::KernelFinished => { - try!(write_u8(&mut buf, 7)); + try!(write_u8(writer, 7)); }, Reply::KernelStartupFailed => { - try!(write_u8(&mut buf, 8)); + try!(write_u8(writer, 8)); }, Reply::KernelException { name, message, param, file, line, column, function, backtrace } => { - try!(write_u8(&mut buf, 9)); - try!(write_string(&mut buf, name)); - try!(write_string(&mut buf, message)); - try!(write_u64(&mut buf, param[0])); - try!(write_u64(&mut buf, param[1])); - try!(write_u64(&mut buf, param[2])); - try!(write_string(&mut buf, file)); - try!(write_u32(&mut buf, line)); - try!(write_u32(&mut buf, column)); - try!(write_string(&mut buf, function)); - try!(write_u32(&mut buf, backtrace.len() as u32)); + try!(write_u8(writer, 9)); + try!(write_string(writer, name)); + try!(write_string(writer, message)); + try!(write_u64(writer, param[0])); + try!(write_u64(writer, param[1])); + try!(write_u64(writer, param[2])); + try!(write_string(writer, file)); + try!(write_u32(writer, line)); + try!(write_u32(writer, column)); + try!(write_string(writer, function)); + try!(write_u32(writer, backtrace.len() as u32)); for &addr in backtrace { - try!(write_u32(&mut buf, addr as u32)) + try!(write_u32(writer, addr as u32)) } }, - Reply::RpcRequest { service, data } => { - try!(write_u8(&mut buf, 10)); - try!(write_u32(&mut buf, service)); - try!(buf.write(data)); + Reply::RpcRequest { service } => { + try!(write_u8(writer, 10)); + try!(write_u32(writer, service)); }, Reply::FlashRead(ref bytes) => { - try!(write_u8(&mut buf, 11)); - try!(buf.write(bytes)); + try!(write_u8(writer, 11)); + try!(write_bytes(writer, bytes)); }, Reply::FlashOk => { - try!(write_u8(&mut buf, 12)); + try!(write_u8(writer, 12)); }, Reply::FlashError => { - try!(write_u8(&mut buf, 13)); + try!(write_u8(writer, 13)); }, Reply::WatchdogExpired => { - try!(write_u8(&mut buf, 14)); + try!(write_u8(writer, 14)); }, Reply::ClockFailure => { - try!(write_u8(&mut buf, 15)); + try!(write_u8(writer, 15)); }, } - - let len = buf.len(); - try!(write_u32(&mut &mut buf[4..8], len as u32)); - - writer.write_all(&buf) + Ok(()) } }