diff --git a/README.md b/README.md index fe18cc2..14cf50f 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Formally verified implementation of the ARTIQ RTIO core in nMigen - Devise a suitable migration strategy for `artiq.gateware.rtio` from Migen to nMigen - [ ] Implement the core in nMigen - - [x] `rtio.cri` (`Interface` and `CRIDecoder` only) -- - [ ] `rtio.rtlink` +- - [x] `rtio.rtlink` - - [ ] `rtio.sed.layouts` - - [ ] `rtio.sed.output_network` - - [ ] `rtio.sed.output_driver` diff --git a/rtio/rtlink.py b/rtio/rtlink.py new file mode 100644 index 0000000..cd9d0e0 --- /dev/null +++ b/rtio/rtlink.py @@ -0,0 +1,90 @@ +from nmigen import * + +class OInterface(object): + def __init__(self, data_width, address_width=0, + fine_ts_width=0, enable_replace=True, + delay=0): + self.stb = Signal() + self.busy = Signal() + + assert 0 <= data_width <= 512 + assert 0 <= address_width <= 8 + assert 0 <= fine_ts_width <= 4 + + if data_width: + self.data = Signal(data_width, reset_less=True) + if address_width: + self.address = Signal(address_width, reset_less=True) + if fine_ts_width: + self.fine_ts = Signal(fine_ts_width, reset_less=True) + + self.enable_replace = enable_replace + + if delay < 0: + raise ValueError("only positive delays allowed", delay) + self.delay = delay + + @classmethod + def like(cls, other): + return cls(get_data_width(other), + get_address_width(other), + get_fine_ts_width(other), + other.enable_replace, + other.delay) + +class IInterface(object): + def __init__(self, data_width, + timestamped=True, fine_ts_width=0, delay=0): + self.stb = Signal() + + assert 0 <= data_width <= 32 + assert 0 <= fine_ts_width <= 4 + + if data_width: + self.data = Signal(data_width, reset_less=True) + if fine_ts_width: + self.fine_ts = Signal(fine_ts_width, reset_less=True) + + assert(not fine_ts_width or timestamped) + self.timestamped = timestamped + if delay < 0: + raise ValueError("only positive delays") + self.delay = delay + + @classmethod + def like(cls, other): + return cls(get_data_width(other), + other.timestamped, + get_fine_ts_width(other), + other.delay) + +class Interface(object): + def __init__(self, o, i=None): + self.o = o + self.i = i + + @classmethod + def like(cls, other): + if other.i is None: + return cls(OInterface.like(other.o)) + else: + return cls(OInterface.like(other.o), + IInterface.like(other.i)) + +def _get_or_zero(interface, attr): + if interface is None: + return 0 + assert isinstance(interface, (OInterface, IInterface)) + if hasattr(interface, attr): + return len(getattr(interface, attr)) + else: + return 0 + +def get_data_width(interface): + return _get_or_zero(interface, "data") + +def get_address_width(interface): + return _get_or_zero(interface, "address") + +def get_fine_ts_width(interface): + return _get_or_zero(interface, "fine_ts")