diff --git a/cxp_kernel.py b/cxp_kernel.py new file mode 100644 index 0000000..a1c3136 --- /dev/null +++ b/cxp_kernel.py @@ -0,0 +1,310 @@ +""" +Non-realtime drivers for CXP. +""" + +# TODO: add api calls for CTRL packet similar i2c +# TODO: add timing critical trigger ack + + +from artiq.language.core import syscall, kernel +from artiq.language.types import TBool, TInt32, TNone +from artiq.coredevice.rtio import rtio_output +from artiq.experiment import * +import numpy as np +import math + + +# TODO: change this to read bytes and accept TBytearray +@syscall(flags={"nounwind", "nowrite"}) +def cxp_read_words(addr: TInt32, val: TList(TInt32), with_tag: TBool) -> TInt32: + raise NotImplementedError("syscall not simulated") + + +@syscall(flags={"nounwind", "nowrite"}) +def cxp_readu32(addr: TInt32, with_tag: TBool) -> TInt32: + raise NotImplementedError("syscall not simulated") + + +@syscall(flags={"nounwind", "nowrite"}) +def cxp_readu64(addr: TInt32, with_tag: TBool) -> TInt64: + raise NotImplementedError("syscall not simulated") + + +@syscall(flags={"nounwind", "nowrite"}) +def cxp_writeu32(addr: TInt32, val: TInt32, with_tag: TBool) -> TNone: + raise NotImplementedError("syscall not simulated") + + +@syscall(flags={"nounwind", "nowrite"}) +def cxp_writeu64(addr: TInt32, val: TInt64, with_tag: TBool) -> TNone: + raise NotImplementedError("syscall not simulated") + + +@syscall(flags={"nounwind", "nowrite"}) +def cxp_setup() -> TBool: + raise NotImplementedError("syscall not simulated") + + +@syscall(flags={"nounwind", "nowrite"}) +def cxp_debug_frame_print() -> TNone: + raise NotImplementedError("syscall not simulated") + + +# Bootstrap register +_XML_MANIFEST_SIZE = 0x0008 +_XML_MANIFEST_SEL = 0x000C +_XML_VER = 0x0010 +_XML_SCHEMA_VER = 0x0014 +_XML_URL_ADDR = 0x0018 +_WIDTH_ADDR = 0x3000 +_HEIGHT_ADDR = 0x3004 +_ACQUISITION_MODE_ADDR = 0x3008 +_ACQUISITION_START_ADDR = 0x300C +_ACQUISITION_STOP_ADDR = 0x3010 +_PIXEL_FORMAT_ADDR = 0x3014 +_DEVICE_TAP_GEOG_ADDR = 0x3018 +_IMG_1_STREAMID_ADDR = 0x301C + +_MAX_BYTE_SIZE = 100 # in bytes +_MAX_WORD_SIZE = _MAX_BYTE_SIZE // 4 # in bytes + + +class CoaXPress: + def __init__(self, channel, core_device="core", xml_url_len=_MAX_WORD_SIZE): + # __device_mgr is private + # self.core = dmgr.get(core_device) + + # you can get the channel via `print(len(rtio_channels))` before calling + # `rtio_channels.append(rtio.Channel.from_phy(cxp_interface))` + self.channel = channel + # the first 8 bits is reserved for the rtlink.OInterface.addr not for channel no. + self.target_o = channel << 8 + + self.with_tag = False + + self.xml_addr = 0 + self.width_addr = 0 + self.height_addr = 0 + self.acq_mode_addr = 0 + self.acq_start_addr = 0 + self.acq_stop_addr = 0 + self.pixel_fmt_addr = 0 + self.device_tap_geog_addr = 0 + self.img_1_streamid_addr = 0 + + self.xml_url = [0] * xml_url_len + + @staticmethod + def get_rtio_channels(channel, **kwargs): + return [(channel, None)] + + @kernel + def trigger(self, linktrig, trigdelay): + rtio_output(self.target_o, linktrig | trigdelay << 1) + + @kernel + def init(self): + self.with_tag = cxp_setup() + + self.xml_addr = self.read_u32(_XML_URL_ADDR) + # self.width_addr = self.read_u32(_WIDTH_ADDR) + # self.height_addr = self.read_u32(_HEIGHT_ADDR) + self.acq_mode_addr = self.read_u32(_ACQUISITION_MODE_ADDR) + self.acq_start_addr = self.read_u32(_ACQUISITION_START_ADDR) + self.acq_stop_addr = self.read_u32(_ACQUISITION_STOP_ADDR) + self.pixel_fmt_addr = self.read_u32(_PIXEL_FORMAT_ADDR) + # self.device_tap_geog_addr = self.read_u32(_DEVICE_TAP_GEOG_ADDR) + self.img_1_streamid_addr = self.read_u32(_IMG_1_STREAMID_ADDR) + + self.read_words(self.xml_addr, self.xml_url) + + @kernel + def read_u32(self, addr: TInt32) -> TInt32: + return cxp_readu32(addr, self.with_tag) + + @kernel + def read_u64(self, addr: TInt32) -> TInt64: + return cxp_readu64(addr, self.with_tag) + + @kernel + def read_words(self, addr: TInt32, val: TList(TInt32)): + cxp_read_words(addr, val, self.with_tag) + + @kernel + def write_u32(self, addr: TInt32, val: TInt32): + cxp_writeu32(addr, val, self.with_tag) + + @kernel + def write_u64(self, addr: TInt32, val: TInt64): + cxp_writeu64(addr, val, self.with_tag) + + @kernel + def write_wide(self, addr: TInt32, vals: TList(TInt32)): + for i in range(len(vals)): + cxp_writeu32(addr + i * 4, vals[i], self.with_tag) + + @kernel + def read_width(self) -> TInt32: + return self.read_u32(self.width_addr) + + @kernel + def read_height(self) -> TInt32: + return self.read_u32(self.height_addr) + + @kernel + def read_acq_mode(self) -> TInt64: + return self.read_u64(self.acq_mode_addr) + + @kernel + def write_acq_mode(self, val: TInt64): + self.write_u64(self.acq_mode_addr, val) + + @kernel + def start(self): + self.write_u32(self.acq_start_addr, 0x00000001) + + @kernel + def stop(self): + self.write_u32(self.acq_stop_addr, 0x00000001) + + @kernel + def get_frameid(self) -> TInt32: + return self.read_u32(self.img_1_streamid_addr) + + @kernel + def get_pixel_format(self) -> TInt32: + return self.read_u32(self.pixel_fmt_addr) + + @host_only + def print_xml_url(self): + url = "" + for x in self.xml_url: + url += x.to_bytes(4, byteorder="big").decode("ascii") + print(f"url = {url}") + if "Local:" in url: + file_name, start_addr, size = url.split(";", 2) + print( + f"file name: {file_name.replace("Local:", "")}, start addr: 0x{start_addr}, size; 0x{size.split("?", 1)[0]} bytes" + ) + + @kernel + def get_xml_data(self, xml_start_addr, xml_data): + i = -1 + addr_offset = 0 + for i in range(len(xml_data) // _MAX_WORD_SIZE): + buf = [0] * _MAX_WORD_SIZE + self.read_words(xml_start_addr + addr_offset, buf) + for j in range(len(buf)): + xml_data[j+i*_MAX_WORD_SIZE] = buf[j] + + addr_offset += _MAX_WORD_SIZE * 4 + + buf = [0]*(len(xml_data) % _MAX_WORD_SIZE) + self.read_words(xml_start_addr + addr_offset, buf) + for j in range(len(buf)): + xml_data[j+(i+1)*_MAX_WORD_SIZE] = buf[j] + + @host_only + def write_xml_data(self, xml_data, file_path): + byte_arr = bytearray() + for d in xml_data: + byte_arr += d.to_bytes(4, "big", signed=True) + with open(file_path, "wb") as binary_file: + binary_file.write(byte_arr) + + +# From the camera XML files +_USER_SET_SELECTOR = 0x10000050 +_REAL_ACQ_MODE = 0x10000bb4 +_REAL_ACQ_START = 0x10000498 +_REAL_ACQ_STOP = 0x100004a4 +_REAL_ACQ_ABORT = 0x100004b0 #stop all acq immediately +_BSL_SENSOR_STAND_BY = 0x100004c8 # put sensor in standby mode, certain parameter can only be change during standby +_BSL_POWER_MODE = 0x100000b4 +# strange d_470 -> d_469 (= 3) perhaps a obscure security trick? So I guess looking @ Index"3" is fine? +_TRIG_MODE_INDEX_3 = 0x10001424 # d_87 +_TRIG_SRC_INDEX_3 = 0x100081ac # d_479 +_TRIG_ACT_INDEX_3 = 0x1000293c # d_502 +_TRIG_SOFTWARE_INDEX_3 = 0x10000c34 # d_525 + +CXP_TRIG = True + +class IdleKernel(EnvExperiment): + def build(self): + self.setattr_device("core") + self.setattr_device("led0") + + # declare the class before using it in kernel + self.cxp = CoaXPress(0x0) + # self.vals = [0]*0x11ab3 + + xml_word_size = math.ceil(0x11ab3/4) + self.vals = [0] * xml_word_size + @kernel + def camera_setup(self): + self.cxp.init() + + # DEBUG: get xml data + # self.cxp.get_xml_data(0xc0000000, self.vals) + + + # DEBUG: Try to setup trigger + # All address below is from the XML, what's the point of bootstrap anyway? + # NOTE: setting is persistent over ConnectionReset but NOT power cycle + # self.cxp.write_u32(_REAL_ACQ_MODE, 1) # single frame mode + # self.cxp.write_u32(_REAL_ACQ_START, 1) # single acq start + # self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end + # self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT + + # boA2448-250cm is area scan camera: + # see https://docs.baslerweb.com/triggered-image-acquisition#hardware-and-software-triggering to setup triggering properly + + # TRIGGER: setup + # self.cxp.write_u32(_TRIG_SELECTOR, 3) # FrameStart by default, boA xml b_469 don't have an address for some reason + self.cxp.write_u32(_TRIG_MODE_INDEX_3, 1) # ON + if CXP_TRIG: + self.cxp.write_u32(_TRIG_SRC_INDEX_3, 7) # CXPTrigger0 + else: + self.cxp.write_u32(_TRIG_SRC_INDEX_3, 0) # use software trigger + self.cxp.write_u32(_TRIG_ACT_INDEX_3, 2) # trig on anyedge + + cxp_debug_frame_print() + + # TAKING PICTURE + self.cxp.write_u32(_REAL_ACQ_MODE, 1) # single frame mode + self.cxp.write_u32(_REAL_ACQ_START, 1) # single acq start + + # STOP acq + # self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end + # self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT + + # self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1) + return self.cxp.read_u32(_BSL_POWER_MODE) + + @kernel + def kernel(self): + # reset mu for rtio + self.core.reset() + self.core.break_realtime() + + if CXP_TRIG: + self.cxp.trigger(0 ,0x00) + else: + self.cxp.write_u32(_TRIG_SOFTWARE_INDEX_3, 0) # software trigger via register write + + # self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end + # self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT + # self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1) + for _ in range(4): + cxp_debug_frame_print() + + return 0 + + def print_hex(self, arr: TList(TInt32)): + print("[{}]".format(", ".join(hex(np.uint32(x)) for x in arr))) + + def run(self): + print(f"power mode = {self.camera_setup()}") + self.kernel() + # self.cxp.print_xml_url() + # self.cxp.write_xml_data(self.vals, "genicam_16e13898.zip")