artiq-zynq/sim_idle.py
morgan 3748bff2e1 temp: flake,libunwind,scripts,notes,diagram
flake: download llvm11 binary instead of compiling
local_run: preset cxp zc706 dev board ip addr
flake: add pillow for sim
libunwind build: suppress libunwind warning
2025-01-22 13:41:19 +08:00

150 lines
4.5 KiB
Python

from migen import *
from misoc.interconnect import stream
from src.gateware.cxp_pipeline import Packet_Wrapper
char_width = 8
word_dw = 32
word_layout = [("data", word_dw), ("k", word_dw//8)]
def K(x, y):
return ((y << 5) | x)
KCode = {
"pak_start" : C(K(27, 7), char_width),
"io_ack" : C(K(28, 6), char_width),
"trig_indic_28_2" : C(K(28, 2), char_width),
"stream_marker" : C(K(28, 3), char_width),
"trig_indic_28_4" : C(K(28, 4), char_width),
"pak_end" : C(K(29, 7), char_width),
"idle_comma" : C(K(28, 5), char_width),
"idle_alignment" : C(K(28, 1), char_width),
}
class TX_Bootstrap(Module):
def __init__(self):
self.tx_testseq = Signal()
# # #
self.submodules.fsm = fsm = FSM(reset_state="IDLE")
self.source = stream.Endpoint(word_layout)
self.cnt = Signal(max=0xFFF)
fsm.act("IDLE",
If(self.tx_testseq,
NextValue(self.cnt, self.cnt.reset),
NextState("WRITE_TEST_PACKET_TYPE"),
)
)
fsm.act("WRITE_TEST_PACKET_TYPE",
self.source.stb.eq(1),
self.source.data.eq(Replicate(C(0x04, char_width), 4)),
self.source.k.eq(Replicate(0, 4)),
If(self.source.ack,NextState("WRITE_TEST_COUNTER"))
)
# testword = Signal(word_dw)
# self.comb += [
# testword[:8].eq(self.cnt[:8]),
# testword[8:16].eq(self.cnt[:8]+1),
# testword[16:24].eq(self.cnt[:8]+2),
# testword[24:].eq(self.cnt[:8]+3),
# ]
fsm.act("WRITE_TEST_COUNTER",
self.source.stb.eq(1),
self.source.data[:8].eq(self.cnt[:8]),
self.source.data[8:16].eq(self.cnt[:8]+1),
self.source.data[16:24].eq(self.cnt[:8]+2),
self.source.data[24:].eq(self.cnt[:8]+3),
self.source.k.eq(Cat(0, 0, 0, 0)),
If(self.source.ack,
If(self.cnt == 0x0FF-3,
self.source.eop.eq(1),
NextState("IDLE")
).Else(
NextValue(self.cnt, self.cnt + 4),
)
)
)
class Idle_Word_Inserter(Module):
def __init__(self):
# Section 9.2.5 (CXP-001-2021)
# Send K28.5, K28.1, K28.1, D21.5 as idle word
self.submodules.fsm = fsm = FSM(reset_state="WRITE_IDLE")
self.sink = stream.Endpoint(word_layout)
self.source = stream.Endpoint(word_layout)
cnt = Signal(max=0x10, reset=0xF)
fsm.act("WRITE_IDLE",
self.source.stb.eq(1),
self.source.data.eq(Cat(KCode["idle_comma"], KCode["idle_alignment"], KCode["idle_alignment"], C(0xB5, char_width))),
self.source.k.eq(Cat(1, 1, 1, 0)),
self.sink.ack.eq(1),
If(self.sink.stb,
self.sink.ack.eq(0),
If(self.source.ack,
NextValue(cnt, cnt.reset),
NextState("COPY"),
)
),
)
fsm.act("COPY",
self.sink.connect(self.source),
# increment when upstream has data and got ack
If(self.sink.stb & self.source.ack, NextValue(cnt, cnt - 1)),
If((((~self.sink.stb) | (self.sink.eop) | (cnt == 0) ) & self.source.ack), NextState("WRITE_IDLE"))
)
class Pipeline(Module):
def __init__(self):
self.submodules.bootstrap = boostrap = TX_Bootstrap()
self.submodules.wrapper = wrapper = Packet_Wrapper()
# self.submodules.buffer = buffer = stream.SyncFIFO(word_layout, 32)
self.submodules.idle_inserter = idle_inserter = Idle_Word_Inserter()
# # #
pipeline = [boostrap, wrapper, idle_inserter]
for s, d in zip(pipeline, pipeline[1:]):
self.comb += s.source.connect(d.sink)
# self.sink = pipeline[0].sink
self.source = pipeline[-1].source
# no backpressure
# self.comb += self.source.ack.eq(1)
dut = Pipeline()
def check_case():
source = dut.source
# sink = dut.sink
# for i in range(1, 30):
# yield sink.data.eq(i)
# yield sink.stb.eq(1)
# yield
yield dut.bootstrap.tx_testseq.eq(1)
yield
yield dut.bootstrap.tx_testseq.eq(0)
for i in range(10):
yield
for _ in range(100):
yield source.ack.eq(1)
yield
def testbench():
yield from check_case()
run_simulation(dut, testbench(), vcd_name="sim-cxp.vcd")