""" Non-realtime drivers for CXP. """ # TODO: add api calls for CTRL packet similar i2c # TODO: add timing critical trigger ack from artiq.language.core import syscall, kernel from artiq.language.types import TBool, TInt32, TNone from artiq.coredevice.rtio import rtio_output,rtio_input_timestamped_data from artiq.experiment import * import numpy as np import math # TODO: change this to read bytes and accept TBytearray @syscall(flags={"nounwind", "nowrite"}) def cxp_read_words(addr: TInt32, val: TList(TInt32), with_tag: TBool) -> TInt32: raise NotImplementedError("syscall not simulated") @syscall(flags={"nounwind", "nowrite"}) def cxp_readu32(addr: TInt32, with_tag: TBool) -> TInt32: raise NotImplementedError("syscall not simulated") @syscall(flags={"nounwind", "nowrite"}) def cxp_readu64(addr: TInt32, with_tag: TBool) -> TInt64: raise NotImplementedError("syscall not simulated") @syscall(flags={"nounwind", "nowrite"}) def cxp_writeu32(addr: TInt32, val: TInt32, with_tag: TBool) -> TNone: raise NotImplementedError("syscall not simulated") @syscall(flags={"nounwind", "nowrite"}) def cxp_writeu64(addr: TInt32, val: TInt64, with_tag: TBool) -> TNone: raise NotImplementedError("syscall not simulated") @syscall(flags={"nounwind", "nowrite"}) def cxp_setup() -> TBool: raise NotImplementedError("syscall not simulated") @syscall(flags={"nounwind", "nowrite"}) def cxp_debug_frame_print() -> TNone: raise NotImplementedError("syscall not simulated") # Bootstrap register _XML_MANIFEST_SIZE = 0x0008 _XML_MANIFEST_SEL = 0x000C _XML_VER = 0x0010 _XML_SCHEMA_VER = 0x0014 _XML_URL_ADDR = 0x0018 _WIDTH_ADDR = 0x3000 _HEIGHT_ADDR = 0x3004 _ACQUISITION_MODE_ADDR = 0x3008 _ACQUISITION_START_ADDR = 0x300C _ACQUISITION_STOP_ADDR = 0x3010 _PIXEL_FORMAT_ADDR = 0x3014 _DEVICE_TAP_GEOG_ADDR = 0x3018 _IMG_1_STREAMID_ADDR = 0x301C _MAX_BYTE_SIZE = 100 # in bytes _MAX_WORD_SIZE = _MAX_BYTE_SIZE // 4 # in bytes class CoaXPress: def __init__(self, channel_base, core_device="core", xml_url_len=_MAX_WORD_SIZE): # __device_mgr is private # self.core = dmgr.get(core_device) # you can get the channel via `print(len(rtio_channels))` before calling # `rtio_channels.append(rtio.Channel.from_phy(cxp_interface))` self.channel_base = channel_base # the first 8 bits is reserved for the rtlink.OInterface.addr not for channel no. self.target_o = channel_base << 8 self.with_tag = False self.xml_addr = 0 self.width_addr = 0 self.height_addr = 0 self.acq_mode_addr = 0 self.acq_start_addr = 0 self.acq_stop_addr = 0 self.pixel_fmt_addr = 0 self.device_tap_geog_addr = 0 self.img_1_streamid_addr = 0 self.xml_url = [0] * xml_url_len @staticmethod def get_rtio_channels(channel, **kwargs): return [(channel, None)] @kernel def trigger(self, linktrig, trigdelay): rtio_output(self.channel_base << 8, linktrig | trigdelay << 1) @kernel def setup_roi(self, n, x0, y0, x1, y1): # DEBUG: # c = int64(self.core.ref_multiplier) c = int64(8) rtio_output(((self.channel_base + 1) << 8) | (4*n+0), x0) delay_mu(c) rtio_output(((self.channel_base + 1) << 8) | (4*n+1), y0) delay_mu(c) rtio_output(((self.channel_base + 1) << 8) | (4*n+2), x1) delay_mu(c) rtio_output(((self.channel_base + 1) << 8) | (4*n+3), y1) delay_mu(c) # TODO: add gate @kernel def input_mu(self, data, tt, timeout_mu=-1): assert len(data) == len(tt) channel = self.channel_base + 2 for i in range(len(data)): timestamp, roi_output = rtio_input_timestamped_data(timeout_mu, channel) data[i] = roi_output tt[i] = timestamp @kernel def init(self): self.with_tag = cxp_setup() self.xml_addr = self.read_u32(_XML_URL_ADDR) # self.width_addr = self.read_u32(_WIDTH_ADDR) # self.height_addr = self.read_u32(_HEIGHT_ADDR) self.acq_mode_addr = self.read_u32(_ACQUISITION_MODE_ADDR) self.acq_start_addr = self.read_u32(_ACQUISITION_START_ADDR) self.acq_stop_addr = self.read_u32(_ACQUISITION_STOP_ADDR) self.pixel_fmt_addr = self.read_u32(_PIXEL_FORMAT_ADDR) # self.device_tap_geog_addr = self.read_u32(_DEVICE_TAP_GEOG_ADDR) self.img_1_streamid_addr = self.read_u32(_IMG_1_STREAMID_ADDR) self.read_words(self.xml_addr, self.xml_url) @kernel def read_u32(self, addr: TInt32) -> TInt32: return cxp_readu32(addr, self.with_tag) @kernel def read_u64(self, addr: TInt32) -> TInt64: return cxp_readu64(addr, self.with_tag) @kernel def read_words(self, addr: TInt32, val: TList(TInt32)): cxp_read_words(addr, val, self.with_tag) @kernel def write_u32(self, addr: TInt32, val: TInt32): cxp_writeu32(addr, val, self.with_tag) @kernel def write_u64(self, addr: TInt32, val: TInt64): cxp_writeu64(addr, val, self.with_tag) @kernel def write_wide(self, addr: TInt32, vals: TList(TInt32)): for i in range(len(vals)): cxp_writeu32(addr + i * 4, vals[i], self.with_tag) @kernel def read_width(self) -> TInt32: return self.read_u32(self.width_addr) @kernel def read_height(self) -> TInt32: return self.read_u32(self.height_addr) @kernel def read_acq_mode(self) -> TInt64: return self.read_u64(self.acq_mode_addr) @kernel def write_acq_mode(self, val: TInt64): self.write_u64(self.acq_mode_addr, val) @kernel def start(self): self.write_u32(self.acq_start_addr, 0x00000001) @kernel def stop(self): self.write_u32(self.acq_stop_addr, 0x00000001) @kernel def get_frameid(self) -> TInt32: return self.read_u32(self.img_1_streamid_addr) @kernel def get_pixel_format(self) -> TInt32: return self.read_u32(self.pixel_fmt_addr) @host_only def print_xml_url(self): url = "" for x in self.xml_url: url += x.to_bytes(4, byteorder="big").decode("ascii") print(f"url = {url}") if "Local:" in url: file_name, start_addr, size = url.split(";", 2) print( f"file name: {file_name.replace("Local:", "")}, start addr: 0x{start_addr}, size; 0x{size.split("?", 1)[0]} bytes" ) @kernel def get_xml_data(self, xml_start_addr, xml_data): i = -1 addr_offset = 0 for i in range(len(xml_data) // _MAX_WORD_SIZE): buf = [0] * _MAX_WORD_SIZE self.read_words(xml_start_addr + addr_offset, buf) for j in range(len(buf)): xml_data[j+i*_MAX_WORD_SIZE] = buf[j] addr_offset += _MAX_WORD_SIZE * 4 buf = [0]*(len(xml_data) % _MAX_WORD_SIZE) self.read_words(xml_start_addr + addr_offset, buf) for j in range(len(buf)): xml_data[j+(i+1)*_MAX_WORD_SIZE] = buf[j] @host_only def write_xml_data(self, xml_data, file_path): byte_arr = bytearray() for d in xml_data: byte_arr += d.to_bytes(4, "big", signed=True) with open(file_path, "wb") as binary_file: binary_file.write(byte_arr) # From the camera XML files _USER_SET_SELECTOR = 0x10000050 _REAL_ACQ_MODE = 0x10000bb4 _REAL_ACQ_START = 0x10000498 _REAL_ACQ_STOP = 0x100004a4 _REAL_ACQ_ABORT = 0x100004b0 #stop all acq immediately _BSL_SENSOR_STAND_BY = 0x100004c8 # put sensor in standby mode, certain parameter can only be change during standby _BSL_POWER_MODE = 0x100000b4 # strange d_470 -> d_469 (= 3) perhaps a obscure security trick? So I guess looking @ Index"3" is fine? _TRIG_MODE_INDEX_3 = 0x10001424 # d_87 _TRIG_SRC_INDEX_3 = 0x100081ac # d_479 _TRIG_ACT_INDEX_3 = 0x1000293c # d_502 _TRIG_SOFTWARE_INDEX_3 = 0x10000c34 # d_525 CXP_TRIG = True class IdleKernel(EnvExperiment): def build(self): self.setattr_device("core") self.setattr_device("led0") # declare the class before using it in kernel self.cxp = CoaXPress(0x0) # self.vals = [0]*0x11ab3 xml_word_size = math.ceil(0x11ab3/4) self.vals = [0] * xml_word_size self.cnt = [0] self.timestamp = [np.int64(0)] @kernel def camera_init(self): self.cxp.init() @kernel def camera_trigger_setup(self): # self.cxp.init() # DEBUG: get xml data # self.cxp.get_xml_data(0xc0000000, self.vals) # DEBUG: Try to setup trigger # All address below is from the XML, what's the point of bootstrap anyway? # NOTE: setting is persistent over ConnectionReset but NOT power cycle # self.cxp.write_u32(_REAL_ACQ_MODE, 1) # single frame mode # self.cxp.write_u32(_REAL_ACQ_START, 1) # single acq start # self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end # self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT # boA2448-250cm is area scan camera: # see https://docs.baslerweb.com/triggered-image-acquisition#hardware-and-software-triggering to setup triggering properly # TRIGGER: setup # self.cxp.write_u32(_TRIG_SELECTOR, 3) # FrameStart by default, boA xml b_469 don't have an address for some reason self.cxp.write_u32(_TRIG_MODE_INDEX_3, 1) # ON if CXP_TRIG: self.cxp.write_u32(_TRIG_SRC_INDEX_3, 7) # CXPTrigger0 else: self.cxp.write_u32(_TRIG_SRC_INDEX_3, 0) # use software trigger self.cxp.write_u32(_TRIG_ACT_INDEX_3, 2) # trig on anyedge cxp_debug_frame_print() # TAKING PICTURE self.cxp.write_u32(_REAL_ACQ_MODE, 1) # single frame mode self.cxp.write_u32(_REAL_ACQ_START, 1) # single acq start # STOP acq # self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end # self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT # self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1) return self.cxp.read_u32(_BSL_POWER_MODE) @kernel def camera_trigger(self): # reset mu for rtio self.core.reset() self.core.break_realtime() self.cxp.setup_roi(0, 1, 1, 10, 10) delay_mu(1000000) if CXP_TRIG: self.cxp.trigger(0 ,0x00) else: self.cxp.write_u32(_TRIG_SOFTWARE_INDEX_3, 0) # software trigger via register write delay_mu(100) # self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end # self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT # self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1) # NOTE: This may not print when using CXP hardware TRIG # As the write_u32 trigger a packet printout that delays the CPU enough that the frame arrive # But using hw trigger, the print is not necessory i.e. not enough time delay for the zc706 to receive the frame # cxp_debug_frame_print() self.cxp.input_mu(self.cnt, self.timestamp, 100) for _ in range(10): cxp_debug_frame_print() return 0 def print_hex(self, arr: TList(TInt32)): print("[{}]".format(", ".join(hex(np.uint32(x)) for x in arr))) def run(self): self.camera_init() print(f"power mode = {self.camera_trigger_setup()}") self.camera_trigger() print(f"count = {self.cnt} | timestamp = {self.timestamp}") # self.cxp.print_xml_url() # self.cxp.write_xml_data(self.vals, "genicam_16e13898.zip")