from nmigen import * from nmigen_stdio import serial from nmigen_soc import csr from nmigen.utils import * __all__ = ["UARTCore"] class UARTCore(Elaboratable): def __init__(self, uart_pins, *, sys_clk_freq, bus_data_width, baudrate=115200, divisor_bits=32, bus_granularity=None): self.uart_pins = uart_pins if divisor_bits > 32: raise NotImplementedError("Currently only supports divisor values " "not wider than 32 bits") self.div_bits = divisor_bits self.div = int(sys_clk_freq/baudrate) - 1 bus_dw = bus_data_width if bus_granularity is None: bus_granularity = bus_data_width bus_gr = bus_granularity with csr.Bank(name="uart", addr_width=log2_int(4*bus_dw//bus_gr), data_width=bus_gr) as self.csr: self.csr.r += [ csr.Register( "control", "rw", width=bus_dw, fields=[csr.Field("divisor", "rw", width=self.div_bits, reset_value=self.div)] ), csr.Register( "flags", "rw", width=bus_dw, fields=[csr.Field("rx_ready", "r", startbit=0, reset_value=0), csr.Field("tx_ready", "r", startbit=4, reset_value=1), csr.Field("rx_enable", "rw", startbit=8)] ), csr.Register( "txdata", "w", width=bus_dw, fields=[csr.Field("value", width=8)] ), csr.Register( "rxdata", "r", width=bus_dw, fields=[csr.Field("value", width=8)] ) ] self.wb2csr = csr.WishboneCSRBridge(self.csr.dec.bus, data_width=bus_data_width) self.csr_bus = self.wb2csr.wb_bus def elaborate(self, platform): m = Module() m.submodules.rxtx = rxtx = serial.AsyncSerial( divisor=self.div, divisor_bits=self.div_bits, pins=self.uart_pins ) rx = rxtx.rx tx = rxtx.tx m.submodules += self.csr, self.wb2csr # CSR - Divisor m.d.comb += rxtx.divisor.eq(self.csr.r.control.f.divisor.s) # CSR - RX/TX tx_ack_now = Signal() m.d.sync += tx_ack_now.eq(self.csr.r.txdata.bus.w_stb) # RX Ready Flag is read-once-and-clear with m.If(self.csr.r.flags.bus.r_stb): m.d.comb += [ self.csr.r.flags.f.rx_ready.set_stb.eq(1), self.csr.r.flags.f.rx_ready.set_val.eq(0) ] with m.Elif(rx.r_rdy): m.d.comb += [ self.csr.r.flags.f.rx_ready.set_stb.eq(1), self.csr.r.flags.f.rx_ready.set_val.eq(1) ] m.d.comb += [ # TX self.csr.r.flags.f.tx_ready.set_stb.eq(1), self.csr.r.flags.f.tx_ready.set_val.eq(~tx.busy), tx.ack.eq(tx_ack_now), tx.data.eq(self.csr.r.txdata.f.value.s), # RX self.csr.r.rxdata.f.value.set_stb.eq(1), self.csr.r.rxdata.f.value.set_val.eq(rx.data), rx.ack.eq(self.csr.r.flags.f.rx_enable.s) ] return m