driver: add roi & gate drivers

This commit is contained in:
morgan 2025-01-17 11:18:35 +08:00
parent dbc00b1bcb
commit 060141cf45

View File

@ -8,12 +8,11 @@ Non-realtime drivers for CXP.
from artiq.language.core import syscall, kernel
from artiq.language.types import TBool, TInt32, TNone
from artiq.coredevice.rtio import rtio_output
from artiq.coredevice.rtio import rtio_output,rtio_input_timestamped_data
from artiq.experiment import *
import numpy as np
import math
# TODO: change this to read bytes and accept TBytearray
@syscall(flags={"nounwind", "nowrite"})
def cxp_read_words(addr: TInt32, val: TList(TInt32), with_tag: TBool) -> TInt32:
@ -70,15 +69,15 @@ _MAX_WORD_SIZE = _MAX_BYTE_SIZE // 4 # in bytes
class CoaXPress:
def __init__(self, channel, core_device="core", xml_url_len=_MAX_WORD_SIZE):
def __init__(self, channel_base, core_device="core", xml_url_len=_MAX_WORD_SIZE):
# __device_mgr is private
# self.core = dmgr.get(core_device)
# you can get the channel via `print(len(rtio_channels))` before calling
# `rtio_channels.append(rtio.Channel.from_phy(cxp_interface))`
self.channel = channel
self.channel_base = channel_base
# the first 8 bits is reserved for the rtlink.OInterface.addr not for channel no.
self.target_o = channel << 8
self.target_o = channel_base << 8
self.with_tag = False
@ -100,7 +99,33 @@ class CoaXPress:
@kernel
def trigger(self, linktrig, trigdelay):
rtio_output(self.target_o, linktrig | trigdelay << 1)
rtio_output(self.channel_base << 8, linktrig | trigdelay << 1)
@kernel
def setup_roi(self, n, x0, y0, x1, y1):
# DEBUG:
# c = int64(self.core.ref_multiplier)
c = int64(8)
rtio_output(((self.channel_base + 1) << 8) | (4*n+0), x0)
delay_mu(c)
rtio_output(((self.channel_base + 1) << 8) | (4*n+1), y0)
delay_mu(c)
rtio_output(((self.channel_base + 1) << 8) | (4*n+2), x1)
delay_mu(c)
rtio_output(((self.channel_base + 1) << 8) | (4*n+3), y1)
delay_mu(c)
# TODO: add gate
@kernel
def input_mu(self, data, tt, timeout_mu=-1):
assert len(data) == len(tt)
channel = self.channel_base + 2
for i in range(len(data)):
timestamp, roi_output = rtio_input_timestamped_data(timeout_mu, channel)
data[i] = roi_output
tt[i] = timestamp
@kernel
def init(self):
@ -227,7 +252,7 @@ _TRIG_SRC_INDEX_3 = 0x100081ac # d_479
_TRIG_ACT_INDEX_3 = 0x1000293c # d_502
_TRIG_SOFTWARE_INDEX_3 = 0x10000c34 # d_525
CXP_TRIG = False
CXP_TRIG = True
class IdleKernel(EnvExperiment):
def build(self):
@ -240,10 +265,18 @@ class IdleKernel(EnvExperiment):
xml_word_size = math.ceil(0x11ab3/4)
self.vals = [0] * xml_word_size
self.cnt = [0]
self.timestamp = [np.int64(0)]
@kernel
def camera_setup(self):
def camera_init(self):
self.cxp.init()
@kernel
def camera_trigger_setup(self):
# self.cxp.init()
# DEBUG: get xml data
# self.cxp.get_xml_data(0xc0000000, self.vals)
@ -282,16 +315,21 @@ class IdleKernel(EnvExperiment):
return self.cxp.read_u32(_BSL_POWER_MODE)
@kernel
def kernel(self):
def camera_trigger(self):
# reset mu for rtio
self.core.reset()
self.core.break_realtime()
self.cxp.setup_roi(0, 1, 1, 10, 10)
delay_mu(1000000)
if CXP_TRIG:
self.cxp.trigger(0 ,0x00)
else:
self.cxp.write_u32(_TRIG_SOFTWARE_INDEX_3, 0) # software trigger via register write
delay_mu(100)
# self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end
# self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT
# self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1)
@ -299,7 +337,12 @@ class IdleKernel(EnvExperiment):
# NOTE: This may not print when using CXP hardware TRIG
# As the write_u32 trigger a packet printout that delays the CPU enough that the frame arrive
# But using hw trigger, the print is not necessory i.e. not enough time delay for the zc706 to receive the frame
cxp_debug_frame_print()
# cxp_debug_frame_print()
self.cxp.input_mu(self.cnt, self.timestamp, 100)
for _ in range(10):
cxp_debug_frame_print()
return 0
@ -307,7 +350,11 @@ class IdleKernel(EnvExperiment):
print("[{}]".format(", ".join(hex(np.uint32(x)) for x in arr)))
def run(self):
print(f"power mode = {self.camera_setup()}")
self.kernel()
self.camera_init()
print(f"power mode = {self.camera_trigger_setup()}")
self.camera_trigger()
print(f"count = {self.cnt} | timestamp = {self.timestamp}")
# self.cxp.print_xml_url()
# self.cxp.write_xml_data(self.vals, "genicam_16e13898.zip")