diff --git a/artiq/gateware/drtio/link_layer.py b/artiq/gateware/drtio/link_layer.py index 76acbed4f..0274a0725 100644 --- a/artiq/gateware/drtio/link_layer.py +++ b/artiq/gateware/drtio/link_layer.py @@ -1,6 +1,45 @@ +from functools import reduce +from operator import xor + from migen import * +class Scrambler(Module): + def __init__(self, n_io, n_state=23, taps=[17, 22]): + self.i = Signal(n_io) + self.o = Signal(n_io) + + # # # + + state = Signal(n_state, reset=1) + curval = [state[i] for i in range(n_state)] + for i in reversed(range(n_io)): + out = self.i[i] ^ reduce(xor, [curval[tap] for tap in taps]) + self.sync += self.o[i].eq(out) + curval.insert(0, out) + curval.pop() + + self.sync += state.eq(Cat(*curval[:n_state])) + + +class Descrambler(Module): + def __init__(self, n_io, n_state=23, taps=[17, 22]): + self.i = Signal(n_io) + self.o = Signal(n_io) + + # # # + + state = Signal(n_state, reset=1) + curval = [state[i] for i in range(n_state)] + for i in reversed(range(n_io)): + flip = reduce(xor, [curval[tap] for tap in taps]) + self.sync += self.o[i].eq(self.i[i] ^ flip) + curval.insert(0, self.i[i]) + curval.pop() + + self.sync += state.eq(Cat(*curval[:n_state])) + + def K(x, y): return (y << 5) | x @@ -33,7 +72,7 @@ class LinkLayerTX(Module): # the following meanings: # 100 idle/auxiliary framing # 0AB 2 bits of auxiliary data - aux_scrambler = Scrambler(3*nwords) + aux_scrambler = CEInserter()(Scrambler(3*nwords)) self.submodules += aux_scrambler aux_data_ctl = [] for i in range(nwords): diff --git a/artiq/test/gateware/drtio/test_scrambler.py b/artiq/test/gateware/drtio/test_scrambler.py new file mode 100644 index 000000000..5c57a4454 --- /dev/null +++ b/artiq/test/gateware/drtio/test_scrambler.py @@ -0,0 +1,35 @@ +import unittest + +from migen import * + +from artiq.gateware.drtio.link_layer import Scrambler, Descrambler + + +def process(dut, seq): + rseq = [] + def pump(): + yield dut.i.eq(seq[0]) + yield + for w in seq[1:]: + yield dut.i.eq(w) + yield + rseq.append((yield dut.o)) + yield + rseq.append((yield dut.o)) + run_simulation(dut, pump()) + return rseq + + +class TestScrambler(unittest.TestCase): + def test_roundtrip(self): + seq = list(range(256))*3 + scrambled_seq = process(Scrambler(8), seq) + descrambled_seq = process(Descrambler(8), scrambled_seq) + self.assertNotEqual(seq, scrambled_seq) + self.assertEqual(seq, descrambled_seq) + + def test_resync(self): + seq = list(range(256)) + scrambled_seq = process(Scrambler(8), seq) + descrambled_seq = process(Descrambler(8), scrambled_seq[20:]) + self.assertEqual(seq[100:], descrambled_seq[80:])