This commit is contained in:
Sebastien Bourdeauducq 2020-06-08 18:16:38 +08:00
parent c980b3e634
commit bdf0831eb2

View File

@ -87,7 +87,7 @@ async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> {
} }
async fn read_request(stream: &TcpStream, allow_close: bool) -> Result<Option<Request>> { async fn read_request(stream: &TcpStream, allow_close: bool) -> Result<Option<Request>> {
match expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await { match expect(stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await {
Ok(true) => {} Ok(true) => {}
Ok(false) => Ok(false) =>
return Err(Error::UnexpectedPattern), return Err(Error::UnexpectedPattern),
@ -112,14 +112,14 @@ async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) ->
let reply = control.rx.async_recv().await; let reply = control.rx.async_recv().await;
match *reply { match *reply {
kernel::Message::RpcSend { is_async, data } => { kernel::Message::RpcSend { is_async, data } => {
write_header(&stream, Reply::RPCRequest).await?; write_header(stream, Reply::RPCRequest).await?;
write_bool(&stream, is_async).await?; write_bool(stream, is_async).await?;
stream.send(data.iter().copied()).await?; stream.send(data.iter().copied()).await?;
if !is_async { if !is_async {
let host_request = read_request(stream, false).await?.unwrap(); let host_request = read_request(stream, false).await?.unwrap();
match host_request { match host_request {
Request::RPCReply => { Request::RPCReply => {
let tag = read_bytes(&stream, 512).await?; let tag = read_bytes(stream, 512).await?;
let rpc_recv_request = control.rx.async_recv().await; let rpc_recv_request = control.rx.async_recv().await;
let slot = if let kernel::Message::RpcRecvRequest(slot) = *rpc_recv_request { let slot = if let kernel::Message::RpcRecvRequest(slot) = *rpc_recv_request {
slot slot
@ -146,7 +146,7 @@ async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) ->
} }
}, },
kernel::Message::KernelFinished => { kernel::Message::KernelFinished => {
write_header(&stream, Reply::KernelFinished).await?; write_header(stream, Reply::KernelFinished).await?;
break; break;
}, },
_ => { _ => {
@ -158,7 +158,7 @@ async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) ->
} }
async fn handle_connection(stream: &TcpStream, control: Rc<RefCell<kernel::Control>>) -> Result<()> { async fn handle_connection(stream: &TcpStream, control: Rc<RefCell<kernel::Control>>) -> Result<()> {
expect(&stream, b"ARTIQ coredev\n").await?; expect(stream, b"ARTIQ coredev\n").await?;
debug!("received connection"); debug!("received connection");
loop { loop {
let request = read_request(stream, true).await?; let request = read_request(stream, true).await?;
@ -168,25 +168,25 @@ async fn handle_connection(stream: &TcpStream, control: Rc<RefCell<kernel::Contr
let request = request.unwrap(); let request = request.unwrap();
match request { match request {
Request::SystemInfo => { Request::SystemInfo => {
write_header(&stream, Reply::SystemInfo).await?; write_header(stream, Reply::SystemInfo).await?;
stream.send("ARZQ".bytes()).await?; stream.send("ARZQ".bytes()).await?;
}, },
Request::LoadKernel => { Request::LoadKernel => {
let buffer = read_bytes(&stream, 1024*1024).await?; let buffer = read_bytes(stream, 1024*1024).await?;
let mut control = control.borrow_mut(); let mut control = control.borrow_mut();
control.restart(); control.restart();
control.tx.async_send(kernel::Message::LoadRequest(Arc::new(buffer))).await; control.tx.async_send(kernel::Message::LoadRequest(Arc::new(buffer))).await;
let reply = control.rx.async_recv().await; let reply = control.rx.async_recv().await;
match *reply { match *reply {
kernel::Message::LoadCompleted => write_header(&stream, Reply::LoadCompleted).await?, kernel::Message::LoadCompleted => write_header(stream, Reply::LoadCompleted).await?,
kernel::Message::LoadFailed => { kernel::Message::LoadFailed => {
write_header(&stream, Reply::LoadFailed).await?; write_header(stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"core1 failed to process data").await?; write_chunk(stream, b"core1 failed to process data").await?;
}, },
_ => { _ => {
error!("unexpected message from core1: {:?}", reply); error!("unexpected message from core1: {:?}", reply);
write_header(&stream, Reply::LoadFailed).await?; write_header(stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"core1 sent unexpected reply").await?; write_chunk(stream, b"core1 sent unexpected reply").await?;
} }
} }
}, },