drtio: add scrambler/descrambler and test

This commit is contained in:
Sebastien Bourdeauducq 2016-09-26 14:14:14 +08:00
parent fa83ad0d9c
commit 4e47decdbc
2 changed files with 75 additions and 1 deletions

View File

@ -1,6 +1,45 @@
from functools import reduce
from operator import xor
from migen import * from migen import *
class Scrambler(Module):
def __init__(self, n_io, n_state=23, taps=[17, 22]):
self.i = Signal(n_io)
self.o = Signal(n_io)
# # #
state = Signal(n_state, reset=1)
curval = [state[i] for i in range(n_state)]
for i in reversed(range(n_io)):
out = self.i[i] ^ reduce(xor, [curval[tap] for tap in taps])
self.sync += self.o[i].eq(out)
curval.insert(0, out)
curval.pop()
self.sync += state.eq(Cat(*curval[:n_state]))
class Descrambler(Module):
def __init__(self, n_io, n_state=23, taps=[17, 22]):
self.i = Signal(n_io)
self.o = Signal(n_io)
# # #
state = Signal(n_state, reset=1)
curval = [state[i] for i in range(n_state)]
for i in reversed(range(n_io)):
flip = reduce(xor, [curval[tap] for tap in taps])
self.sync += self.o[i].eq(self.i[i] ^ flip)
curval.insert(0, self.i[i])
curval.pop()
self.sync += state.eq(Cat(*curval[:n_state]))
def K(x, y): def K(x, y):
return (y << 5) | x return (y << 5) | x
@ -33,7 +72,7 @@ class LinkLayerTX(Module):
# the following meanings: # the following meanings:
# 100 idle/auxiliary framing # 100 idle/auxiliary framing
# 0AB 2 bits of auxiliary data # 0AB 2 bits of auxiliary data
aux_scrambler = Scrambler(3*nwords) aux_scrambler = CEInserter()(Scrambler(3*nwords))
self.submodules += aux_scrambler self.submodules += aux_scrambler
aux_data_ctl = [] aux_data_ctl = []
for i in range(nwords): for i in range(nwords):

View File

@ -0,0 +1,35 @@
import unittest
from migen import *
from artiq.gateware.drtio.link_layer import Scrambler, Descrambler
def process(dut, seq):
rseq = []
def pump():
yield dut.i.eq(seq[0])
yield
for w in seq[1:]:
yield dut.i.eq(w)
yield
rseq.append((yield dut.o))
yield
rseq.append((yield dut.o))
run_simulation(dut, pump())
return rseq
class TestScrambler(unittest.TestCase):
def test_roundtrip(self):
seq = list(range(256))*3
scrambled_seq = process(Scrambler(8), seq)
descrambled_seq = process(Descrambler(8), scrambled_seq)
self.assertNotEqual(seq, scrambled_seq)
self.assertEqual(seq, descrambled_seq)
def test_resync(self):
seq = list(range(256))
scrambled_seq = process(Scrambler(8), seq)
descrambled_seq = process(Descrambler(8), scrambled_seq[20:])
self.assertEqual(seq[100:], descrambled_seq[80:])