forked from M-Labs/artiq
1
0
Fork 0

runtime: remove unnecessary buffering.

This commit is contained in:
whitequark 2016-10-06 17:25:43 +00:00
parent edafb08b43
commit 557bc4bb56
3 changed files with 75 additions and 139 deletions

View File

@ -20,7 +20,7 @@ class _H2DMsgType(Enum):
IDENT_REQUEST = 3 IDENT_REQUEST = 3
SWITCH_CLOCK = 4 SWITCH_CLOCK = 4
LOAD_LIBRARY = 5 LOAD_KERNEL = 5
RUN_KERNEL = 6 RUN_KERNEL = 6
RPC_REPLY = 7 RPC_REPLY = 7
@ -68,9 +68,7 @@ RPCKeyword = namedtuple('RPCKeyword', ['name', 'value'])
class CommGeneric: class CommGeneric:
def __init__(self): def __init__(self):
self._read_type = self._write_type = None self._read_type = None
self._read_length = 0
self._write_buffer = []
def open(self): def open(self):
"""Opens the communication channel. """Opens the communication channel.
@ -99,10 +97,6 @@ class CommGeneric:
def _read_header(self): def _read_header(self):
self.open() self.open()
if self._read_length > 0:
raise IOError("Read underrun ({} bytes remaining)".
format(self._read_length))
# Wait for a synchronization sequence, 5a 5a 5a 5a. # Wait for a synchronization sequence, 5a 5a 5a 5a.
sync_count = 0 sync_count = 0
while sync_count < 4: while sync_count < 4:
@ -113,20 +107,11 @@ class CommGeneric:
sync_count = 0 sync_count = 0
# Read message header. # Read message header.
(self._read_length, ) = struct.unpack(">l", self.read(4))
if not self._read_length: # inband connection close
raise OSError("Connection closed")
(raw_type, ) = struct.unpack("B", self.read(1)) (raw_type, ) = struct.unpack("B", self.read(1))
self._read_type = _D2HMsgType(raw_type) self._read_type = _D2HMsgType(raw_type)
if self._read_length < 9: logger.debug("receiving message: type=%r",
raise IOError("Read overrun in message header ({} remaining)". self._read_type)
format(self._read_length))
self._read_length -= 9
logger.debug("receiving message: type=%r length=%d",
self._read_type, self._read_length)
def _read_expect(self, ty): def _read_expect(self, ty):
if self._read_type != ty: if self._read_type != ty:
@ -138,12 +123,6 @@ class CommGeneric:
self._read_expect(ty) self._read_expect(ty)
def _read_chunk(self, length): def _read_chunk(self, length):
if self._read_length < length:
raise IOError("Read overrun while trying to read {} bytes ({} remaining)"
" in packet {}".
format(length, self._read_length, self._read_type))
self._read_length -= length
return self.read(length) return self.read(length)
def _read_int8(self): def _read_int8(self):
@ -175,43 +154,32 @@ class CommGeneric:
def _write_header(self, ty): def _write_header(self, ty):
self.open() self.open()
logger.debug("preparing to send message: type=%r", ty) logger.debug("sending message: type=%r", ty)
self._write_type = ty
self._write_buffer = []
def _write_flush(self): # Write synchronization sequence and header.
# Calculate message size. self.write(struct.pack(">lB", 0x5a5a5a5a, ty.value))
length = sum([len(chunk) for chunk in self._write_buffer])
logger.debug("sending message: type=%r length=%d", self._write_type, length)
# Write synchronization sequence, header and body.
self.write(struct.pack(">llB", 0x5a5a5a5a,
9 + length, self._write_type.value))
for chunk in self._write_buffer:
self.write(chunk)
def _write_empty(self, ty): def _write_empty(self, ty):
self._write_header(ty) self._write_header(ty)
self._write_flush()
def _write_chunk(self, chunk): def _write_chunk(self, chunk):
self._write_buffer.append(chunk) self.write(chunk)
def _write_int8(self, value): def _write_int8(self, value):
self._write_buffer.append(struct.pack("B", value)) self.write(struct.pack("B", value))
def _write_int32(self, value): def _write_int32(self, value):
self._write_buffer.append(struct.pack(">l", value)) self.write(struct.pack(">l", value))
def _write_int64(self, value): def _write_int64(self, value):
self._write_buffer.append(struct.pack(">q", value)) self.write(struct.pack(">q", value))
def _write_float64(self, value): def _write_float64(self, value):
self._write_buffer.append(struct.pack(">d", value)) self.write(struct.pack(">d", value))
def _write_bytes(self, value): def _write_bytes(self, value):
self._write_int32(len(value)) self._write_int32(len(value))
self._write_buffer.append(value) self.write(value)
def _write_string(self, value): def _write_string(self, value):
self._write_bytes(value.encode("utf-8")) self._write_bytes(value.encode("utf-8"))
@ -232,7 +200,7 @@ class CommGeneric:
if runtime_id != b"AROR": if runtime_id != b"AROR":
raise UnsupportedDevice("Unsupported runtime ID: {}" raise UnsupportedDevice("Unsupported runtime ID: {}"
.format(runtime_id)) .format(runtime_id))
gateware_version = self._read_chunk(self._read_length).decode("utf-8") gateware_version = self._read_string()
if gateware_version != software_version and \ if gateware_version != software_version and \
gateware_version + ".dirty" != software_version: gateware_version + ".dirty" != software_version:
logger.warning("Mismatch between gateware (%s) " logger.warning("Mismatch between gateware (%s) "
@ -242,7 +210,6 @@ class CommGeneric:
def switch_clock(self, external): def switch_clock(self, external):
self._write_header(_H2DMsgType.SWITCH_CLOCK) self._write_header(_H2DMsgType.SWITCH_CLOCK)
self._write_int8(external) self._write_int8(external)
self._write_flush()
self._read_empty(_D2HMsgType.CLOCK_SWITCH_COMPLETED) self._read_empty(_D2HMsgType.CLOCK_SWITCH_COMPLETED)
@ -251,7 +218,7 @@ class CommGeneric:
self._read_header() self._read_header()
self._read_expect(_D2HMsgType.LOG_REPLY) self._read_expect(_D2HMsgType.LOG_REPLY)
return self._read_chunk(self._read_length).decode("utf-8", "replace") return self._read_string()
def clear_log(self): def clear_log(self):
self._write_empty(_H2DMsgType.LOG_CLEAR) self._write_empty(_H2DMsgType.LOG_CLEAR)
@ -261,17 +228,15 @@ class CommGeneric:
def flash_storage_read(self, key): def flash_storage_read(self, key):
self._write_header(_H2DMsgType.FLASH_READ_REQUEST) self._write_header(_H2DMsgType.FLASH_READ_REQUEST)
self._write_string(key) self._write_string(key)
self._write_flush()
self._read_header() self._read_header()
self._read_expect(_D2HMsgType.FLASH_READ_REPLY) self._read_expect(_D2HMsgType.FLASH_READ_REPLY)
return self._read_chunk(self._read_length) return self._read_string()
def flash_storage_write(self, key, value): def flash_storage_write(self, key, value):
self._write_header(_H2DMsgType.FLASH_WRITE_REQUEST) self._write_header(_H2DMsgType.FLASH_WRITE_REQUEST)
self._write_string(key) self._write_string(key)
self._write_bytes(value) self._write_bytes(value)
self._write_flush()
self._read_header() self._read_header()
if self._read_type == _D2HMsgType.FLASH_ERROR_REPLY: if self._read_type == _D2HMsgType.FLASH_ERROR_REPLY:
@ -287,14 +252,12 @@ class CommGeneric:
def flash_storage_remove(self, key): def flash_storage_remove(self, key):
self._write_header(_H2DMsgType.FLASH_REMOVE_REQUEST) self._write_header(_H2DMsgType.FLASH_REMOVE_REQUEST)
self._write_string(key) self._write_string(key)
self._write_flush()
self._read_empty(_D2HMsgType.FLASH_OK_REPLY) self._read_empty(_D2HMsgType.FLASH_OK_REPLY)
def load(self, kernel_library): def load(self, kernel_library):
self._write_header(_H2DMsgType.LOAD_LIBRARY) self._write_header(_H2DMsgType.LOAD_KERNEL)
self._write_chunk(kernel_library) self._write_bytes(kernel_library)
self._write_flush()
self._read_empty(_D2HMsgType.LOAD_COMPLETED) self._read_empty(_D2HMsgType.LOAD_COMPLETED)
@ -460,7 +423,6 @@ class CommGeneric:
self._write_header(_H2DMsgType.RPC_REPLY) self._write_header(_H2DMsgType.RPC_REPLY)
self._write_bytes(return_tags) self._write_bytes(return_tags)
self._send_rpc_value(bytearray(return_tags), result, result, service) self._send_rpc_value(bytearray(return_tags), result, result, service)
self._write_flush()
except Exception as exn: except Exception as exn:
logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn) logger.debug("rpc service: %d %r %r ! %r", service_id, args, kwargs, exn)
@ -503,8 +465,6 @@ class CommGeneric:
self._write_int32(-1) # column not known self._write_int32(-1) # column not known
self._write_string(function) self._write_string(function)
self._write_flush()
def _serve_exception(self, embedding_map, symbolizer, demangler): def _serve_exception(self, embedding_map, symbolizer, demangler):
name = self._read_string() name = self._read_string()
message = self._read_string() message = self._read_string()

View File

@ -233,31 +233,29 @@ fn process_host_message(waiter: Waiter,
Err(_) => host_write(stream, host::Reply::KernelStartupFailed) Err(_) => host_write(stream, host::Reply::KernelStartupFailed)
}, },
host::Request::RpcReply { tag, data } => { host::Request::RpcReply { tag } => {
if session.kernel_state != KernelState::RpcWait { if session.kernel_state != KernelState::RpcWait {
unexpected!("unsolicited RPC reply") unexpected!("unsolicited RPC reply")
} }
try!(kern_recv(waiter, |reply| { let slot = try!(kern_recv(waiter, |reply| {
match reply { match reply {
kern::RpcRecvRequest { slot } => { kern::RpcRecvRequest { slot } => Ok(slot),
let mut data = io::Cursor::new(data);
rpc::recv_return(&mut data, &tag, slot, &|size| {
try!(kern_send(waiter, kern::RpcRecvReply {
alloc_size: size, exception: None
}));
kern_recv(waiter, |reply| {
match reply {
kern::RpcRecvRequest { slot } => Ok(slot),
_ => unreachable!()
}
})
})
}
other => other =>
unexpected!("unexpected reply from kernel CPU: {:?}", other) unexpected!("unexpected reply from kernel CPU: {:?}", other)
} }
})); }));
try!(rpc::recv_return(stream, &tag, slot, &|size| {
try!(kern_send(waiter, kern::RpcRecvReply {
alloc_size: size, exception: None
}));
kern_recv(waiter, |reply| {
match reply {
kern::RpcRecvRequest { slot } => Ok(slot),
_ => unreachable!()
}
})
}));
try!(kern_send(waiter, kern::RpcRecvReply { alloc_size: 0, exception: None })); try!(kern_send(waiter, kern::RpcRecvReply { alloc_size: 0, exception: None }));
session.kernel_state = KernelState::Running; session.kernel_state = KernelState::Running;
@ -352,12 +350,10 @@ fn process_kern_message(waiter: Waiter,
match stream { match stream {
None => unexpected!("unexpected RPC in flash kernel"), None => unexpected!("unexpected RPC in flash kernel"),
Some(ref mut stream) => { Some(ref mut stream) => {
let mut buf = Vec::new();
try!(rpc::send_args(&mut buf, tag, data));
try!(host_write(stream, host::Reply::RpcRequest { try!(host_write(stream, host::Reply::RpcRequest {
service: service, service: service
data: &buf[..]
})); }));
try!(rpc::send_args(stream, tag, data));
if !batch { if !batch {
session.kernel_state = KernelState::RpcWait session.kernel_state = KernelState::RpcWait
} }

View File

@ -26,7 +26,7 @@ pub enum Request {
LoadKernel(Vec<u8>), LoadKernel(Vec<u8>),
RunKernel, RunKernel,
RpcReply { tag: Vec<u8>, data: Vec<u8> }, RpcReply { tag: Vec<u8> },
RpcException { RpcException {
name: String, name: String,
message: String, message: String,
@ -45,29 +45,17 @@ pub enum Request {
impl Request { impl Request {
pub fn read_from(reader: &mut Read) -> io::Result<Request> { pub fn read_from(reader: &mut Read) -> io::Result<Request> {
const HEADER_SIZE: usize = 9;
try!(read_sync(reader)); try!(read_sync(reader));
let length = try!(read_u32(reader)) as usize; Ok(match try!(read_u8(reader)) {
let ty = try!(read_u8(reader));
Ok(match ty {
1 => Request::Log, 1 => Request::Log,
2 => Request::LogClear, 2 => Request::LogClear,
3 => Request::Ident, 3 => Request::Ident,
4 => Request::SwitchClock(try!(read_u8(reader))), 4 => Request::SwitchClock(try!(read_u8(reader))),
5 => { 5 => Request::LoadKernel(try!(read_bytes(reader))),
let mut code = vec![0; length - HEADER_SIZE];
try!(reader.read_exact(&mut code));
Request::LoadKernel(code)
}
6 => Request::RunKernel, 6 => Request::RunKernel,
7 => { 7 => Request::RpcReply {
let tag = try!(read_bytes(reader)); tag: try!(read_bytes(reader))
let mut data = vec![0; length - HEADER_SIZE - 4 - tag.len()]; },
try!(reader.read_exact(&mut data));
Request::RpcReply { tag: tag, data: data }
}
8 => Request::RpcException { 8 => Request::RpcException {
name: try!(read_string(reader)), name: try!(read_string(reader)),
message: try!(read_string(reader)), message: try!(read_string(reader)),
@ -119,7 +107,7 @@ pub enum Reply<'a> {
backtrace: &'a [usize] backtrace: &'a [usize]
}, },
RpcRequest { service: u32, data: &'a [u8] }, RpcRequest { service: u32 },
FlashRead(&'a [u8]), FlashRead(&'a [u8]),
FlashOk, FlashOk,
@ -131,88 +119,80 @@ pub enum Reply<'a> {
impl<'a> Reply<'a> { impl<'a> Reply<'a> {
pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { pub fn write_to(&self, writer: &mut Write) -> io::Result<()> {
let mut buf = Vec::new(); try!(write_sync(writer));
try!(write_sync(&mut buf));
try!(write_u32(&mut buf, 0)); // length placeholder
match *self { match *self {
Reply::Log(ref log) => { Reply::Log(ref log) => {
try!(write_u8(&mut buf, 1)); try!(write_u8(writer, 1));
try!(buf.write(log.as_bytes())); try!(write_string(writer, log));
}, },
Reply::Ident(ident) => { Reply::Ident(ident) => {
try!(write_u8(&mut buf, 2)); try!(write_u8(writer, 2));
try!(buf.write(b"AROR")); try!(writer.write(b"AROR"));
try!(buf.write(ident.as_bytes())); try!(write_string(writer, ident));
}, },
Reply::ClockSwitchCompleted => { Reply::ClockSwitchCompleted => {
try!(write_u8(&mut buf, 3)); try!(write_u8(writer, 3));
}, },
Reply::ClockSwitchFailed => { Reply::ClockSwitchFailed => {
try!(write_u8(&mut buf, 4)); try!(write_u8(writer, 4));
}, },
Reply::LoadCompleted => { Reply::LoadCompleted => {
try!(write_u8(&mut buf, 5)); try!(write_u8(writer, 5));
}, },
Reply::LoadFailed => { Reply::LoadFailed => {
try!(write_u8(&mut buf, 6)); try!(write_u8(writer, 6));
}, },
Reply::KernelFinished => { Reply::KernelFinished => {
try!(write_u8(&mut buf, 7)); try!(write_u8(writer, 7));
}, },
Reply::KernelStartupFailed => { Reply::KernelStartupFailed => {
try!(write_u8(&mut buf, 8)); try!(write_u8(writer, 8));
}, },
Reply::KernelException { Reply::KernelException {
name, message, param, file, line, column, function, backtrace name, message, param, file, line, column, function, backtrace
} => { } => {
try!(write_u8(&mut buf, 9)); try!(write_u8(writer, 9));
try!(write_string(&mut buf, name)); try!(write_string(writer, name));
try!(write_string(&mut buf, message)); try!(write_string(writer, message));
try!(write_u64(&mut buf, param[0])); try!(write_u64(writer, param[0]));
try!(write_u64(&mut buf, param[1])); try!(write_u64(writer, param[1]));
try!(write_u64(&mut buf, param[2])); try!(write_u64(writer, param[2]));
try!(write_string(&mut buf, file)); try!(write_string(writer, file));
try!(write_u32(&mut buf, line)); try!(write_u32(writer, line));
try!(write_u32(&mut buf, column)); try!(write_u32(writer, column));
try!(write_string(&mut buf, function)); try!(write_string(writer, function));
try!(write_u32(&mut buf, backtrace.len() as u32)); try!(write_u32(writer, backtrace.len() as u32));
for &addr in backtrace { for &addr in backtrace {
try!(write_u32(&mut buf, addr as u32)) try!(write_u32(writer, addr as u32))
} }
}, },
Reply::RpcRequest { service, data } => { Reply::RpcRequest { service } => {
try!(write_u8(&mut buf, 10)); try!(write_u8(writer, 10));
try!(write_u32(&mut buf, service)); try!(write_u32(writer, service));
try!(buf.write(data));
}, },
Reply::FlashRead(ref bytes) => { Reply::FlashRead(ref bytes) => {
try!(write_u8(&mut buf, 11)); try!(write_u8(writer, 11));
try!(buf.write(bytes)); try!(write_bytes(writer, bytes));
}, },
Reply::FlashOk => { Reply::FlashOk => {
try!(write_u8(&mut buf, 12)); try!(write_u8(writer, 12));
}, },
Reply::FlashError => { Reply::FlashError => {
try!(write_u8(&mut buf, 13)); try!(write_u8(writer, 13));
}, },
Reply::WatchdogExpired => { Reply::WatchdogExpired => {
try!(write_u8(&mut buf, 14)); try!(write_u8(writer, 14));
}, },
Reply::ClockFailure => { Reply::ClockFailure => {
try!(write_u8(&mut buf, 15)); try!(write_u8(writer, 15));
}, },
} }
Ok(())
let len = buf.len();
try!(write_u32(&mut &mut buf[4..8], len as u32));
writer.write_all(&buf)
} }
} }