#!/usr/bin/env python3 import argparse import sys import textwrap from collections import defaultdict from itertools import count, filterfalse from artiq import __version__ as artiq_version from artiq.coredevice import jsondesc from artiq.coredevice.phaser import PHASER_GW_MIQRO, PHASER_GW_BASE def get_cpu_target(description): if description.get("type", None) == "shuttler": return "rv32g" if description["target"] == "kasli": if description["hw_rev"] in ("v1.0", "v1.1"): return "rv32ima" else: return "rv32g" elif description["target"] == "kasli_soc": return "cortexa9" else: raise NotImplementedError def get_num_leds(description): drtio_role = description["drtio_role"] target = description["target"] hw_rev = description["hw_rev"] kasli_board_leds = { "v1.0": 4, "v1.1": 6, "v2.0": 3 } if target == "kasli": if hw_rev in ("v1.0", "v1.1") and drtio_role != "standalone": # LEDs are used for DRTIO status on v1.0 and v1.1 return kasli_board_leds[hw_rev] - 3 return kasli_board_leds[hw_rev] elif target == "kasli_soc": return 2 else: raise ValueError def process_header(output, description): print(textwrap.dedent(""" # Autogenerated for the {variant} variant core_addr = "{core_addr}" device_db = {{ "core": {{ "type": "local", "module": "artiq.coredevice.core", "class": "Core", "arguments": {{ "host": core_addr, "ref_period": {ref_period}, "analyzer_proxy": "core_analyzer", "target": "{cpu_target}", "satellite_cpu_targets": {{}} }}, }}, "core_log": {{ "type": "controller", "host": "::1", "port": 1068, "command": "aqctl_corelog -p {{port}} --bind {{bind}} " + core_addr }}, "core_moninj": {{ "type": "controller", "host": "::1", "port_proxy": 1383, "port": 1384, "command": "aqctl_moninj_proxy --port-proxy {{port_proxy}} --port-control {{port}} --bind {{bind}} " + core_addr }}, "core_analyzer": {{ "type": "controller", "host": "::1", "port_proxy": 1385, "port": 1386, "command": "aqctl_coreanalyzer_proxy --port-proxy {{port_proxy}} --port-control {{port}} --bind {{bind}} " + core_addr }}, "core_cache": {{ "type": "local", "module": "artiq.coredevice.cache", "class": "CoreCache" }}, "core_dma": {{ "type": "local", "module": "artiq.coredevice.dma", "class": "CoreDMA" }}, "i2c_switch0": {{ "type": "local", "module": "artiq.coredevice.i2c", "class": "I2CSwitch", "arguments": {{"address": 0xe0}} }}, "i2c_switch1": {{ "type": "local", "module": "artiq.coredevice.i2c", "class": "I2CSwitch", "arguments": {{"address": 0xe2}} }}, }} """).format( variant=description["variant"], core_addr=description["core_addr"], ref_period=1/(8*description["rtio_frequency"]), cpu_target=get_cpu_target(description)), file=output) class PeripheralManager: def __init__(self, output, primary_description): self.counts = defaultdict(int) self.output = output self.primary_description = primary_description def get_name(self, ty): count = self.counts[ty] self.counts[ty] = count + 1 return "{}{}".format(ty, count) def gen(self, string, **kwargs): print(textwrap.dedent(string).format(**kwargs), file=self.output) def process_dio(self, rtio_offset, peripheral, num_channels=8): class_names = { "input": "TTLInOut", "output": "TTLOut", "clkgen": "TTLClockGen" } classes = [ class_names[peripheral["bank_direction_low"]], class_names[peripheral["bank_direction_high"]] ] channel = count(0) name = [self.get_name("ttl") for _ in range(num_channels)] for i in range(num_channels): self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "{class_name}", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=name[i], class_name=classes[i // 4], channel=rtio_offset + next(channel)) if peripheral["edge_counter"]: for i in range(num_channels): class_name = classes[i // 4] if class_name == "TTLInOut": self.gen(""" device_db["{name}_counter"] = {{ "type": "local", "module": "artiq.coredevice.edge_counter", "class": "EdgeCounter", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=name[i], channel=rtio_offset + next(channel)) return next(channel) def process_dio_spi(self, rtio_offset, peripheral): channel = count(0) for spi in peripheral["spi"]: self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=self.get_name(spi["name"]), channel=rtio_offset + next(channel)) for ttl in peripheral["ttl"]: ttl_class_names = { "input": "TTLInOut", "output": "TTLOut" } name = self.get_name(ttl["name"]) self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "{class_name}", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=name, class_name=ttl_class_names[ttl["direction"]], channel=rtio_offset + next(channel)) if ttl["edge_counter"]: self.gen(""" device_db["{name}_counter"] = {{ "type": "local", "module": "artiq.coredevice.edge_counter", "class": "EdgeCounter", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=name, channel=rtio_offset + next(channel)) return next(channel) def process_urukul(self, rtio_offset, peripheral): urukul_name = self.get_name("urukul") synchronization = peripheral["synchronization"] channel = count(0) self.gen(""" device_db["eeprom_{name}"] = {{ "type": "local", "module": "artiq.coredevice.kasli_i2c", "class": "KasliEEPROM", "arguments": {{"port": "EEM{eem}"}} }} device_db["spi_{name}"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=urukul_name, eem=peripheral["ports"][0], channel=rtio_offset+next(channel)) if synchronization: self.gen(""" device_db["ttl_{name}_sync"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLClockGen", "arguments": {{"channel": 0x{channel:06x}, "acc_width": 4}} }}""", name=urukul_name, channel=rtio_offset+next(channel)) self.gen(""" device_db["ttl_{name}_io_update"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=urukul_name, channel=rtio_offset+next(channel)) if len(peripheral["ports"]) > 1: for i in range(4): self.gen(""" device_db["ttl_{name}_sw{uchn}"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=urukul_name, uchn=i, channel=rtio_offset+next(channel)) self.gen(""" device_db["{name}_cpld"] = {{ "type": "local", "module": "artiq.coredevice.urukul", "class": "CPLD", "arguments": {{ "spi_device": "spi_{name}", "sync_device": {sync_device}, "io_update_device": "ttl_{name}_io_update", "refclk": {refclk}, "clk_sel": {clk_sel}, "clk_div": {clk_div} }} }}""", name=urukul_name, sync_device="\"ttl_{name}_sync\"".format(name=urukul_name) if synchronization else "None", refclk=peripheral.get("refclk", self.primary_description["rtio_frequency"]), clk_sel=peripheral["clk_sel"], clk_div=peripheral["clk_div"]) dds = peripheral["dds"] pll_vco = peripheral.get("pll_vco") for i in range(4): if dds == "ad9910": self.gen(""" device_db["{name}_ch{uchn}"] = {{ "type": "local", "module": "artiq.coredevice.ad9910", "class": "AD9910", "arguments": {{ "pll_n": {pll_n}, "pll_en": {pll_en}, "chip_select": {chip_select}, "cpld_device": "{name}_cpld"{sw}{pll_vco}{sync_delay_seed}{io_update_delay} }} }}""", name=urukul_name, chip_select=4 + i, uchn=i, sw=",\n \"sw_device\": \"ttl_{name}_sw{uchn}\"".format(name=urukul_name, uchn=i) if len(peripheral["ports"]) > 1 else "", pll_vco=",\n \"pll_vco\": {}".format(pll_vco) if pll_vco is not None else "", pll_n=peripheral.get("pll_n", 32), pll_en=peripheral["pll_en"], sync_delay_seed=",\n \"sync_delay_seed\": \"eeprom_{}:{}\"".format(urukul_name, 64 + 4*i) if synchronization else "", io_update_delay=",\n \"io_update_delay\": \"eeprom_{}:{}\"".format(urukul_name, 64 + 4*i) if synchronization else "") elif dds == "ad9912": self.gen(""" device_db["{name}_ch{uchn}"] = {{ "type": "local", "module": "artiq.coredevice.ad9912", "class": "AD9912", "arguments": {{ "pll_n": {pll_n}, "pll_en": {pll_en}, "chip_select": {chip_select}, "cpld_device": "{name}_cpld"{sw}{pll_vco} }} }}""", name=urukul_name, chip_select=4 + i, uchn=i, sw=",\n \"sw_device\": \"ttl_{name}_sw{uchn}\"".format(name=urukul_name, uchn=i) if len(peripheral["ports"]) > 1 else "", pll_vco=",\n \"pll_vco\": {}".format(pll_vco) if pll_vco is not None else "", pll_n=peripheral.get("pll_n", 8), pll_en=peripheral["pll_en"]) else: raise ValueError return next(channel) def process_mirny(self, rtio_offset, peripheral): legacy_almazny = ("v1.0", "v1.1") mirny_name = self.get_name("mirny") channel = count(0) self.gen(""" device_db["spi_{name}"]={{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=mirny_name, channel=rtio_offset+next(channel)) for i in range(4): self.gen(""" device_db["ttl_{name}_sw{mchn}"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{ttl_channel:06x}}} }}""", name=mirny_name, mchn=i, ttl_channel=rtio_offset+next(channel)) for i in range(4): self.gen(""" device_db["{name}_ch{mchn}"] = {{ "type": "local", "module": "artiq.coredevice.adf5356", "class": "ADF5356", "arguments": {{ "channel": {mchn}, "sw_device": "ttl_{name}_sw{mchn}", "cpld_device": "{name}_cpld", }} }}""", name=mirny_name, mchn=i) if peripheral["almazny"] and peripheral["almazny_hw_rev"] not in legacy_almazny: self.gen(""" device_db["{name}_almazny{i}"] = {{ "type": "local", "module": "artiq.coredevice.almazny", "class": "AlmaznyChannel", "arguments": {{ "host_mirny": "{name}_cpld", "channel": {i}, }}, }}""", name=mirny_name, i=i) clk_sel = peripheral["clk_sel"] if isinstance(peripheral["clk_sel"], str): clk_sel = '"' + peripheral["clk_sel"] + '"' self.gen(""" device_db["{name}_cpld"] = {{ "type": "local", "module": "artiq.coredevice.mirny", "class": "Mirny", "arguments": {{ "spi_device": "spi_{name}", "refclk": {refclk}, "clk_sel": {clk_sel} }}, }}""", name=mirny_name, refclk=peripheral["refclk"], clk_sel=clk_sel) if peripheral["almazny"] and peripheral["almazny_hw_rev"] in legacy_almazny: self.gen(""" device_db["{name}_almazny"] = {{ "type": "local", "module": "artiq.coredevice.almazny", "class": "AlmaznyLegacy", "arguments": {{ "host_mirny": "{name}_cpld", }}, }}""", name=mirny_name) return next(channel) def process_novogorny(self, rtio_offset, peripheral): self.gen(""" device_db["spi_{name}_adc"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{adc_channel:06x}}} }} device_db["ttl_{name}_cnv"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{cnv_channel:06x}}}, }} device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.novogorny", "class": "Novogorny", "arguments": {{ "spi_adc_device": "spi_{name}_adc", "cnv_device": "ttl_{name}_cnv" }} }}""", name=self.get_name("novogorny"), adc_channel=rtio_offset, cnv_channel=rtio_offset + 1) return 2 def process_sampler(self, rtio_offset, peripheral): self.gen(""" device_db["spi_{name}_adc"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{adc_channel:06x}}} }} device_db["spi_{name}_pgia"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{pgia_channel:06x}}} }} device_db["ttl_{name}_cnv"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{cnv_channel:06x}}}, }} device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.sampler", "class": "Sampler", "arguments": {{ "spi_adc_device": "spi_{name}_adc", "spi_pgia_device": "spi_{name}_pgia", "cnv_device": "ttl_{name}_cnv", "hw_rev": "{hw_rev}" }} }}""", name=self.get_name("sampler"), hw_rev=peripheral.get("hw_rev", "v2.2"), adc_channel=rtio_offset, pgia_channel=rtio_offset + 1, cnv_channel=rtio_offset + 2) return 3 def process_suservo(self, rtio_offset, peripheral): suservo_name = self.get_name("suservo") sampler_name = self.get_name("sampler") urukul_names = [self.get_name("urukul") for _ in range(2)] channel = count(0) for i in range(8): self.gen(""" device_db["{suservo_name}_ch{suservo_chn}"] = {{ "type": "local", "module": "artiq.coredevice.suservo", "class": "Channel", "arguments": {{"channel": 0x{suservo_channel:06x}, "servo_device": "{suservo_name}"}} }}""", suservo_name=suservo_name, suservo_chn=i, suservo_channel=rtio_offset+next(channel)) self.gen(""" device_db["{suservo_name}"] = {{ "type": "local", "module": "artiq.coredevice.suservo", "class": "SUServo", "arguments": {{ "channel": 0x{suservo_channel:06x}, "pgia_device": "spi_{sampler_name}_pgia", "cpld_devices": {cpld_names_list}, "dds_devices": {dds_names_list}, "sampler_hw_rev": "{sampler_hw_rev}" }} }}""", suservo_name=suservo_name, sampler_name=sampler_name, sampler_hw_rev=peripheral["sampler_hw_rev"], cpld_names_list=[urukul_name + "_cpld" for urukul_name in urukul_names], dds_names_list=[urukul_name + "_dds" for urukul_name in urukul_names], suservo_channel=rtio_offset+next(channel)) self.gen(""" device_db["spi_{sampler_name}_pgia"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{sampler_channel:06x}}} }}""", sampler_name=sampler_name, sampler_channel=rtio_offset+next(channel)) pll_vco = peripheral.get("pll_vco") for urukul_name in urukul_names: self.gen(""" device_db["spi_{urukul_name}"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{urukul_channel:06x}}} }} device_db["{urukul_name}_cpld"] = {{ "type": "local", "module": "artiq.coredevice.urukul", "class": "CPLD", "arguments": {{ "spi_device": "spi_{urukul_name}", "refclk": {refclk}, "clk_sel": {clk_sel} }} }} device_db["{urukul_name}_dds"] = {{ "type": "local", "module": "artiq.coredevice.ad9910", "class": "AD9910", "arguments": {{ "pll_n": {pll_n}, "pll_en": {pll_en}, "chip_select": 3, "cpld_device": "{urukul_name}_cpld"{pll_vco} }} }}""", urukul_name=urukul_name, urukul_channel=rtio_offset+next(channel), refclk=peripheral.get("refclk", self.primary_description["rtio_frequency"]), clk_sel=peripheral["clk_sel"], pll_vco=",\n \"pll_vco\": {}".format(pll_vco) if pll_vco is not None else "", pll_n=peripheral["pll_n"], pll_en=peripheral["pll_en"]) return next(channel) def process_zotino(self, rtio_offset, peripheral): self.gen(""" device_db["spi_{name}"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{spi_channel:06x}}} }} device_db["ttl_{name}_ldac"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{ldac_channel:06x}}} }} device_db["ttl_{name}_clr"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{clr_channel:06x}}} }} device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.zotino", "class": "Zotino", "arguments": {{ "spi_device": "spi_{name}", "ldac_device": "ttl_{name}_ldac", "clr_device": "ttl_{name}_clr" }} }}""", name=self.get_name("zotino"), spi_channel=rtio_offset, ldac_channel=rtio_offset + 1, clr_channel=rtio_offset + 2) return 3 def process_grabber(self, rtio_offset, peripheral): self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.grabber", "class": "Grabber", "arguments": {{"channel_base": 0x{channel:06x}}} }}""", name=self.get_name("grabber"), channel=rtio_offset) return 2 def process_fastino(self, rtio_offset, peripheral): self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.fastino", "class": "Fastino", "arguments": {{"channel": 0x{channel:06x}, "log2_width": {log2_width}}} }}""", name=self.get_name("fastino"), channel=rtio_offset, log2_width=peripheral["log2_width"]) return 1 def process_phaser(self, rtio_offset, peripheral): mode = peripheral["mode"] if mode == "miqro": dac = f', "dac": {{"pll_m": 16, "pll_n": 3, "interpolation": 2}}, "gw_rev": {PHASER_GW_MIQRO}' n_channels = 3 else: dac = f', "gw_rev": {PHASER_GW_BASE}' n_channels = 5 self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.phaser", "class": "Phaser", "arguments": {{ "channel_base": 0x{channel:06x}, "miso_delay": 1{dac} }} }}""", name=self.get_name("phaser"), dac=dac, channel=rtio_offset) return n_channels def process_hvamp(self, rtio_offset, peripheral): hvamp_name = self.get_name("hvamp") for i in range(8): self.gen(""" device_db["ttl_{name}_sw{ch}"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=hvamp_name, ch=i, channel=rtio_offset+i) return 8 def process_shuttler(self, shuttler_peripheral): shuttler_name = self.get_name("shuttler") rtio_offset = shuttler_peripheral["drtio_destination"] << 16 rtio_offset += self.add_board_leds(rtio_offset, board_name=shuttler_name) channel = count(0) self.gen(""" device_db["{name}_config"] = {{ "type": "local", "module": "artiq.coredevice.shuttler", "class": "Config", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=shuttler_name, channel=rtio_offset + next(channel)) self.gen(""" device_db["{name}_trigger"] = {{ "type": "local", "module": "artiq.coredevice.shuttler", "class": "Trigger", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=shuttler_name, channel=rtio_offset + next(channel)) for i in range(16): self.gen(""" device_db["{name}_dcbias{ch}"] = {{ "type": "local", "module": "artiq.coredevice.shuttler", "class": "DCBias", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=shuttler_name, ch=i, channel=rtio_offset + next(channel)) self.gen(""" device_db["{name}_dds{ch}"] = {{ "type": "local", "module": "artiq.coredevice.shuttler", "class": "DDS", "arguments": {{"channel": 0x{channel:06x}}}, }}""", name=shuttler_name, ch=i, channel=rtio_offset + next(channel)) device_class_names = ["Relay", "ADC"] for i, device_name in enumerate(device_class_names): spi_name = "{name}_spi{ch}".format( name=shuttler_name, ch=i) self.gen(""" device_db["{spi}"] = {{ "type": "local", "module": "artiq.coredevice.spi2", "class": "SPIMaster", "arguments": {{"channel": 0x{channel:06x}}}, }}""", spi=spi_name, channel=rtio_offset + next(channel)) self.gen(""" device_db["{name}_{device}"] = {{ "type": "local", "module": "artiq.coredevice.shuttler", "class": "{dev_class}", "arguments": {{"spi_device": "{spi}"}}, }}""", name=shuttler_name, device=device_name.lower(), dev_class=device_name, spi=spi_name) return 0 def process(self, rtio_offset, peripheral): processor = getattr(self, "process_"+str(peripheral["type"])) return processor(rtio_offset, peripheral) def add_board_leds(self, rtio_offset, board_name=None, num_leds=2): for i in range(num_leds): if board_name is None: led_name = self.get_name("led") else: led_name = self.get_name("{}_led".format(board_name)) self.gen(""" device_db["{name}"] = {{ "type": "local", "module": "artiq.coredevice.ttl", "class": "TTLOut", "arguments": {{"channel": 0x{channel:06x}}} }}""", name=led_name, channel=rtio_offset+i) return num_leds def split_drtio_eem(peripherals): # Shuttler is the only peripheral that uses DRTIO-over-EEM at this moment drtio_eem_filter = lambda peripheral: peripheral["type"] == "shuttler" return filterfalse(drtio_eem_filter, peripherals), \ list(filter(drtio_eem_filter, peripherals)) def process(output, primary_description, satellites): drtio_role = primary_description["drtio_role"] if drtio_role not in ("standalone", "master"): raise ValueError("Invalid primary node DRTIO role") if drtio_role == "standalone" and satellites: raise ValueError("A standalone system cannot have satellites") process_header(output, primary_description) pm = PeripheralManager(output, primary_description) local_peripherals, drtio_peripherals = split_drtio_eem(primary_description["peripherals"]) print("# {} peripherals".format(drtio_role), file=output) rtio_offset = 0 for peripheral in local_peripherals: n_channels = pm.process(rtio_offset, peripheral) rtio_offset += n_channels num_leds = get_num_leds(primary_description) pm.add_board_leds(rtio_offset, num_leds=num_leds) rtio_offset += num_leds for destination, description in satellites: if description["drtio_role"] != "satellite": raise ValueError("Invalid DRTIO role for satellite at destination {}".format(destination)) peripherals, satellite_drtio_peripherals = split_drtio_eem(description["peripherals"]) drtio_peripherals.extend(satellite_drtio_peripherals) print(textwrap.dedent(""" # DEST#{dest} peripherals device_db["core"]["arguments"]["satellite_cpu_targets"][{dest}] = \"{target}\"""").format( dest=destination, target=get_cpu_target(description)), file=output) rtio_offset = destination << 16 for peripheral in peripherals: n_channels = pm.process(rtio_offset, peripheral) rtio_offset += n_channels num_leds = get_num_leds(description) pm.add_board_leds(rtio_offset, num_leds=num_leds) rtio_offset += num_leds for i, peripheral in enumerate(drtio_peripherals): if not("drtio_destination" in peripheral): if primary_description["target"] == "kasli": if primary_description["hw_rev"] in ("v1.0", "v1.1"): peripheral["drtio_destination"] = 3 + i else: peripheral["drtio_destination"] = 4 + i elif primary_description["target"] == "kasli_soc": peripheral["drtio_destination"] = 5 + i else: raise NotImplementedError print(textwrap.dedent(""" # DEST#{dest} peripherals device_db["core"]["arguments"]["satellite_cpu_targets"][{dest}] = \"{target}\"""").format( dest=peripheral["drtio_destination"], target=get_cpu_target(peripheral)), file=output) processor = getattr(pm, "process_"+str(peripheral["type"])) processor(peripheral) def main(): parser = argparse.ArgumentParser( description="ARTIQ device database template builder") parser.add_argument("--version", action="version", version="ARTIQ v{}".format(artiq_version), help="print the ARTIQ version number") parser.add_argument("primary_description", metavar="PRIMARY_DESCRIPTION", help="JSON system description file for the primary (standalone or master) node") parser.add_argument("-o", "--output", help="output file, defaults to standard output if omitted") parser.add_argument("-s", "--satellite", nargs=2, action="append", default=[], metavar=("DESTINATION", "DESCRIPTION"), type=str, help="add DRTIO satellite at the given destination number with " "devices from the given JSON description") args = parser.parse_args() primary_description = jsondesc.load(args.primary_description) satellites = [] for destination, description_path in args.satellite: satellite_description = jsondesc.load(description_path) satellites.append((int(destination, 0), satellite_description)) if args.output is not None: with open(args.output, "w") as f: process(f, primary_description, satellites) else: process(sys.stdout, primary_description, satellites) if __name__ == "__main__": main()