Add rtio.rtlink

This commit is contained in:
Donald Sebastian Leung 2020-10-07 16:32:20 +08:00
parent 3cbcb630bd
commit 1f838186e8
2 changed files with 91 additions and 1 deletions

View File

@ -7,7 +7,7 @@ Formally verified implementation of the ARTIQ RTIO core in nMigen
- Devise a suitable migration strategy for `artiq.gateware.rtio` from Migen to nMigen - Devise a suitable migration strategy for `artiq.gateware.rtio` from Migen to nMigen
- [ ] Implement the core in nMigen - [ ] Implement the core in nMigen
- - [x] `rtio.cri` (`Interface` and `CRIDecoder` only) - - [x] `rtio.cri` (`Interface` and `CRIDecoder` only)
- - [ ] `rtio.rtlink` - - [x] `rtio.rtlink`
- - [ ] `rtio.sed.layouts` - - [ ] `rtio.sed.layouts`
- - [ ] `rtio.sed.output_network` - - [ ] `rtio.sed.output_network`
- - [ ] `rtio.sed.output_driver` - - [ ] `rtio.sed.output_driver`

90
rtio/rtlink.py Normal file
View File

@ -0,0 +1,90 @@
from nmigen import *
class OInterface(object):
def __init__(self, data_width, address_width=0,
fine_ts_width=0, enable_replace=True,
delay=0):
self.stb = Signal()
self.busy = Signal()
assert 0 <= data_width <= 512
assert 0 <= address_width <= 8
assert 0 <= fine_ts_width <= 4
if data_width:
self.data = Signal(data_width, reset_less=True)
if address_width:
self.address = Signal(address_width, reset_less=True)
if fine_ts_width:
self.fine_ts = Signal(fine_ts_width, reset_less=True)
self.enable_replace = enable_replace
if delay < 0:
raise ValueError("only positive delays allowed", delay)
self.delay = delay
@classmethod
def like(cls, other):
return cls(get_data_width(other),
get_address_width(other),
get_fine_ts_width(other),
other.enable_replace,
other.delay)
class IInterface(object):
def __init__(self, data_width,
timestamped=True, fine_ts_width=0, delay=0):
self.stb = Signal()
assert 0 <= data_width <= 32
assert 0 <= fine_ts_width <= 4
if data_width:
self.data = Signal(data_width, reset_less=True)
if fine_ts_width:
self.fine_ts = Signal(fine_ts_width, reset_less=True)
assert(not fine_ts_width or timestamped)
self.timestamped = timestamped
if delay < 0:
raise ValueError("only positive delays")
self.delay = delay
@classmethod
def like(cls, other):
return cls(get_data_width(other),
other.timestamped,
get_fine_ts_width(other),
other.delay)
class Interface(object):
def __init__(self, o, i=None):
self.o = o
self.i = i
@classmethod
def like(cls, other):
if other.i is None:
return cls(OInterface.like(other.o))
else:
return cls(OInterface.like(other.o),
IInterface.like(other.i))
def _get_or_zero(interface, attr):
if interface is None:
return 0
assert isinstance(interface, (OInterface, IInterface))
if hasattr(interface, attr):
return len(getattr(interface, attr))
else:
return 0
def get_data_width(interface):
return _get_or_zero(interface, "data")
def get_address_width(interface):
return _get_or_zero(interface, "address")
def get_fine_ts_width(interface):
return _get_or_zero(interface, "fine_ts")