2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-19 15:16:41 +08:00

drtio: implement TSC load in satellite

This commit is contained in:
Sebastien Bourdeauducq 2016-10-07 19:30:53 +08:00
parent 43caffc168
commit 23b3302200
2 changed files with 40 additions and 3 deletions

View File

@ -64,7 +64,7 @@ class ReceiveDatapath(Module):
# outputs
self.frame_r = Signal()
self.data_r = Signal()
self.data_r = Signal(ws)
self.packet_type = Signal(8)
self.packet_last = Signal()
self.packet_as = dict()
@ -171,15 +171,20 @@ class TransmitDatapath(Module):
class RTPacketSatellite(Module):
def __init__(self, nwords):
# link layer interface
ws = 8*nwords
self.rx_rt_frame = Signal()
self.rx_rt_data = Signal(ws)
self.tx_rt_frame = Signal()
self.tx_rt_data = Signal(ws)
# I/O Timer interface
self.tsc_load = Signal()
self.tsc_value = Signal(64)
# # #
# RX/TX datapath
rx_plm = get_m2s_layouts(ws)
rx_dp = ReceiveDatapath(ws, rx_plm)
self.submodules += rx_dp
@ -187,7 +192,6 @@ class RTPacketSatellite(Module):
rx_dp.frame.eq(self.rx_rt_frame),
rx_dp.data.eq(self.rx_rt_data)
]
tx_plm = get_s2m_layouts(ws)
tx_dp = TransmitDatapath(ws, tx_plm)
self.submodules += tx_dp
@ -196,6 +200,12 @@ class RTPacketSatellite(Module):
self.tx_rt_data.eq(tx_dp.data)
]
# glue
self.comb += [
self.tsc_value.eq(rx_dp.packet_as["set_time"].timestamp)
]
# main control FSM
fsm = FSM(reset_state="WAIT_INPUT")
self.submodules += fsm
@ -214,6 +224,7 @@ class RTPacketSatellite(Module):
If(rx_dp.packet_last,
Case(rx_dp.packet_type, {
rx_plm.types["echo_request"]: NextState("ECHO"),
rx_plm.types["set_time"]: NextState("SET_TIME"),
"default": NextState("ERROR_UNKNOWN_TYPE")
})
)
@ -227,6 +238,10 @@ class RTPacketSatellite(Module):
tx_dp.stb.eq(1),
If(tx_dp.done, NextState("WAIT_INPUT"))
)
fsm.act("SET_TIME",
self.tsc_load.eq(1),
NextState("WAIT_INPUT")
)
fsm.act("ERROR_FRAME_MISSED",
tx_dp.send("error", code=error_codes["frame_missed"]),
tx_dp.stb.eq(1),

View File

@ -91,3 +91,25 @@ class TestSatellite(unittest.TestCase):
self.assertEqual(trailer, [])
completed = True
run_simulation(dut, [send(), pr.receive(receive)])
def test_set_time(self):
for nwords in range(1, 8):
dut = RTPacketSatellite(nwords)
pt = PacketInterface("m2s", dut.rx_rt_frame, dut.rx_rt_data)
tx_times = [0x12345678aabbccdd, 0x0102030405060708,
0xaabbccddeeff1122]
def send():
for t in tx_times:
yield from pt.send("set_time", timestamp=t)
# flush
for i in range(10):
yield
rx_times = []
@passive
def receive():
while True:
if (yield dut.tsc_load):
rx_times.append((yield dut.tsc_value))
yield
run_simulation(dut, [send(), receive()], vcd_name="foo.vcd")
self.assertEqual(tx_times, rx_times)