2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-27 02:48:12 +08:00

drtio: add RX link layer, fixes, simple loopback demo

This commit is contained in:
Sebastien Bourdeauducq 2016-09-27 11:23:29 +08:00
parent 4e47decdbc
commit 8a92c2c7e5
3 changed files with 140 additions and 42 deletions

View File

@ -1,5 +1,5 @@
from functools import reduce
from operator import xor
from operator import xor, or_
from migen import *
@ -59,8 +59,6 @@ class LinkLayerTX(Module):
self.rt_frame = Signal()
self.rt_data = Signal(8*nwords)
self.transceiver_data = Signal(10*nwords)
# # #
# Idle and auxiliary traffic use special characters excluding
@ -88,7 +86,7 @@ class LinkLayerTX(Module):
self.aux_ack.eq(~self.rt_frame)
]
for i in range(nwords):
scrambled_ctl = scrambler.o[i*3:i*3+3]
scrambled_ctl = aux_scrambler.o[i*3:i*3+3]
self.sync += [
encoder.k[i].eq(1),
If(scrambled_ctl == 7,
@ -100,15 +98,18 @@ class LinkLayerTX(Module):
# Real-time traffic uses data characters and is framed by the special
# characters of auxiliary traffic. RT traffic is also scrambled.
rt_scrambler = Scrambler(8*nwords)
rt_scrambler = CEInserter()(Scrambler(8*nwords))
self.submodules += rt_scrambler
self.comb += rt_scrambler.i.eq(self.rt_data)
self.comb += [
rt_scrambler.i.eq(self.rt_data),
rt_scrambler.ce.eq(self.rt_frame)
]
rt_frame_r = Signal()
self.sync += [
rt_frame_r.eq(self.rt_frame),
If(rt_frame_r,
[k.eq(0) for k in encoder.k],
[d.eq(self.rt_data[i*8:i*8+8]) for i, d in enumerate(encoder.d)]
[d.eq(rt_scrambler.o[i*8:i*8+8]) for i, d in enumerate(encoder.d)]
)
]
@ -134,3 +135,51 @@ class LinkLayerTX(Module):
link_init_counter.eq(0)
)
]
class LinkLayerRX(Module):
def __init__(self, decoders):
nwords = len(decoders)
# nwords must be a power of 2
assert nwords & (nwords - 1) == 0
self.link_init = Signal()
self.aux_frame = Signal()
self.aux_data = Signal(2*nwords)
self.rt_frame = Signal()
self.rt_data = Signal(8*nwords)
# # #
aux_descrambler = CEInserter()(Descrambler(2*nwords))
rt_descrambler = CEInserter()(Descrambler(8*nwords))
self.submodules += aux_descrambler, rt_descrambler
self.comb += [
self.aux_frame.eq(~aux_descrambler.o[2]),
self.aux_data.eq(
Cat(*[aux_descrambler.o[3*i:3*i+2] for i in range(nwords)])),
self.rt_data.eq(rt_descrambler.o),
]
link_init_d = Signal()
rt_frame_d = Signal()
self.sync += [
self.link_init.eq(link_init_d),
self.rt_frame.eq(rt_frame_d)
]
self.comb += [
If(decoders[0].k,
If((decoders[0].d == K(28, 7)) | (decoders[0].d == K(29, 7)),
link_init_d.eq(1)
),
aux_descrambler.ce.eq(1)
).Else(
rt_frame_d.eq(1),
rt_descrambler.ce.eq(1)
),
aux_descrambler.i.eq(Cat(*[d.d >> 5 for d in decoders])),
rt_descrambler.i.eq(Cat(*[d.d for d in decoders]))
]

View File

@ -0,0 +1,84 @@
import unittest
from types import SimpleNamespace
from migen import *
from artiq.gateware.drtio.link_layer import *
def process(dut, seq):
rseq = []
def pump():
yield dut.i.eq(seq[0])
yield
for w in seq[1:]:
yield dut.i.eq(w)
yield
rseq.append((yield dut.o))
yield
rseq.append((yield dut.o))
run_simulation(dut, pump())
return rseq
class TestScrambler(unittest.TestCase):
def test_roundtrip(self):
seq = list(range(256))*3
scrambled_seq = process(Scrambler(8), seq)
descrambled_seq = process(Descrambler(8), scrambled_seq)
self.assertNotEqual(seq, scrambled_seq)
self.assertEqual(seq, descrambled_seq)
def test_resync(self):
seq = list(range(256))
scrambled_seq = process(Scrambler(8), seq)
descrambled_seq = process(Descrambler(8), scrambled_seq[20:])
self.assertEqual(seq[100:], descrambled_seq[80:])
class Loopback(Module):
def __init__(self, nwords):
ks = [Signal() for k in range(nwords)]
ds = [Signal(8) for d in range(nwords)]
encoder = SimpleNamespace(k=ks, d=ds)
decoders = [SimpleNamespace(k=k, d=d) for k, d in zip(ks, ds)]
self.submodules.tx = LinkLayerTX(encoder)
self.submodules.rx = LinkLayerRX(decoders)
class TestLinkLayer(unittest.TestCase):
def test_packets(self):
dut = Loopback(4)
rt_packets = [
[0x12459970, 0x9938cdef, 0x12340000],
[0xabcdef00, 0x12345678],
[0xeeeeeeee, 0xffffffff, 0x01020304, 0x11223344]
]
def transmit_rt_packets():
for packet in rt_packets:
yield dut.tx.rt_frame.eq(1)
for data in packet:
yield dut.tx.rt_data.eq(data)
yield
yield dut.tx.rt_frame.eq(0)
yield
# flush
for i in range(20):
yield
rx_rt_packets = []
@passive
def receive_rt_packets():
while True:
packet = []
rx_rt_packets.append(packet)
while not (yield dut.rx.rt_frame):
yield
while (yield dut.rx.rt_frame):
packet.append((yield dut.rx.rt_data))
yield
run_simulation(dut, [transmit_rt_packets(), receive_rt_packets()])
for packet in rx_rt_packets:
print(" ".join("{:08x}".format(x) for x in packet))

View File

@ -1,35 +0,0 @@
import unittest
from migen import *
from artiq.gateware.drtio.link_layer import Scrambler, Descrambler
def process(dut, seq):
rseq = []
def pump():
yield dut.i.eq(seq[0])
yield
for w in seq[1:]:
yield dut.i.eq(w)
yield
rseq.append((yield dut.o))
yield
rseq.append((yield dut.o))
run_simulation(dut, pump())
return rseq
class TestScrambler(unittest.TestCase):
def test_roundtrip(self):
seq = list(range(256))*3
scrambled_seq = process(Scrambler(8), seq)
descrambled_seq = process(Descrambler(8), scrambled_seq)
self.assertNotEqual(seq, scrambled_seq)
self.assertEqual(seq, descrambled_seq)
def test_resync(self):
seq = list(range(256))
scrambled_seq = process(Scrambler(8), seq)
descrambled_seq = process(Descrambler(8), scrambled_seq[20:])
self.assertEqual(seq[100:], descrambled_seq[80:])