diff --git a/software/glasgow/applet/all.py b/software/glasgow/applet/all.py index 35b8960..3f37b9f 100644 --- a/software/glasgow/applet/all.py +++ b/software/glasgow/applet/all.py @@ -44,3 +44,5 @@ from .video.rgb_input import VideoRGBInputApplet from .video.vga_output import VGAOutputApplet from .video.vga_terminal import VGATerminalApplet from .video.ws2812_output import VideoWS2812OutputApplet + +from .logic import LogicApplet diff --git a/software/glasgow/applet/logic.py b/software/glasgow/applet/logic.py new file mode 100644 index 0000000..c721acd --- /dev/null +++ b/software/glasgow/applet/logic.py @@ -0,0 +1,92 @@ +import sys +import logging +import os +import asyncio +from nmigen.compat import * +from nmigen.compat.genlib.cdc import MultiReg + +from . import * + + +class LogicSubtarget(Module): + def __init__(self, pads, in_fifo): + input = Signal.like(pads.d_t.i) + latch = Signal.like(pads.d_t.i) + self.submodules += MultiReg(pads.d_t.i, input) + + self.comb += [ + in_fifo.din[0:4].eq(input[0:4]), + in_fifo.din[4:8].eq(latch[0:4]), + ] + + self.submodules.fsm = FSM() + self.fsm.act("CAPTURE-1", + NextValue(latch, input), + NextState("CAPTURE-2") + ) + self.fsm.act("CAPTURE-2", + in_fifo.we.eq(1), + If(in_fifo.writable, + NextState("CAPTURE-1") + ).Else( + NextState("OVERFLOW") + ) + ) + self.fsm.act("OVERFLOW", + NextState("OVERFLOW") + ) + + +async def async_stdout(limit=asyncio.streams._DEFAULT_LIMIT, loop=None): + if loop is None: + loop = asyncio.get_event_loop() + + writer_transport, writer_protocol = await loop.connect_write_pipe( + lambda: asyncio.streams.FlowControlMixin(loop=loop), + os.fdopen(sys.stdout.fileno(), "wb")) + writer = asyncio.streams.StreamWriter( + writer_transport, writer_protocol, None, loop) + + return writer + + +class LogicApplet(GlasgowApplet, name="logic"): + logger = logging.getLogger(__name__) + preview = True + + @classmethod + def add_build_arguments(cls, parser, access): + super().add_build_arguments(parser, access) + + access.add_pin_set_argument(parser, "d", required=True, width=range(5)) + + def build(self, target, args): + self.mux_interface = iface = target.multiplexer.claim_interface(self, args) + iface.add_subtarget(LogicSubtarget( + pads=iface.get_pads(args, pin_sets=("d",)), + in_fifo=iface.get_in_fifo(auto_flush=False, depth=8192), + )) + + @classmethod + def add_run_arguments(cls, parser, access): + super().add_run_arguments(parser, access) + + async def run(self, device, args): + return await device.demultiplexer.claim_interface(self, self.mux_interface, args) + + @classmethod + def add_interact_arguments(cls, parser): + pass + + async def interact(self, device, args, iface): + output_stream = await async_stdout() + while True: + data = await iface.read(65536) + output_stream.write(data) + +# ------------------------------------------------------------------------------------------------ + +class LogicAppletTestCase(GlasgowAppletTestCase, applet=LogicApplet): + @synthesis_test + def test_build(self): + self.assertBuilds()