forked from M-Labs/artiq-zynq
morgan
492d7888c3
driver: add api init driver: add cxp syscall driver: add xml download driver: change to using linktrigger (hw trig) driver: add docs on explaining trigger -> packet receive delay driver: add roi & gate drivers driver: save power after trig & add rio timeout
365 lines
12 KiB
Python
365 lines
12 KiB
Python
"""
|
|
Non-realtime drivers for CXP.
|
|
"""
|
|
|
|
# TODO: add api calls for CTRL packet similar i2c
|
|
# TODO: add timing critical trigger ack
|
|
|
|
|
|
from artiq.language.core import syscall, kernel
|
|
from artiq.language.types import TBool, TInt32, TNone
|
|
from artiq.coredevice.rtio import rtio_output,rtio_input_timestamped_data
|
|
from artiq.experiment import *
|
|
import numpy as np
|
|
import math
|
|
|
|
# TODO: change this to read bytes and accept TBytearray
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_read_words(addr: TInt32, val: TList(TInt32), with_tag: TBool) -> TInt32:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_readu32(addr: TInt32, with_tag: TBool) -> TInt32:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_readu64(addr: TInt32, with_tag: TBool) -> TInt64:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_writeu32(addr: TInt32, val: TInt32, with_tag: TBool) -> TNone:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_writeu64(addr: TInt32, val: TInt64, with_tag: TBool) -> TNone:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_setup() -> TBool:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
@syscall(flags={"nounwind", "nowrite"})
|
|
def cxp_debug_frame_print() -> TNone:
|
|
raise NotImplementedError("syscall not simulated")
|
|
|
|
|
|
# Bootstrap register
|
|
_XML_MANIFEST_SIZE = 0x0008
|
|
_XML_MANIFEST_SEL = 0x000C
|
|
_XML_VER = 0x0010
|
|
_XML_SCHEMA_VER = 0x0014
|
|
_XML_URL_ADDR = 0x0018
|
|
_WIDTH_ADDR = 0x3000
|
|
_HEIGHT_ADDR = 0x3004
|
|
_ACQUISITION_MODE_ADDR = 0x3008
|
|
_ACQUISITION_START_ADDR = 0x300C
|
|
_ACQUISITION_STOP_ADDR = 0x3010
|
|
_PIXEL_FORMAT_ADDR = 0x3014
|
|
_DEVICE_TAP_GEOG_ADDR = 0x3018
|
|
_IMG_1_STREAMID_ADDR = 0x301C
|
|
|
|
_MAX_BYTE_SIZE = 100 # in bytes
|
|
_MAX_WORD_SIZE = _MAX_BYTE_SIZE // 4 # in bytes
|
|
|
|
|
|
class CoaXPress:
|
|
def __init__(self, channel_base, core_device="core", xml_url_len=_MAX_WORD_SIZE):
|
|
# __device_mgr is private
|
|
# self.core = dmgr.get(core_device)
|
|
|
|
# you can get the channel via `print(len(rtio_channels))` before calling
|
|
# `rtio_channels.append(rtio.Channel.from_phy(cxp_interface))`
|
|
self.channel_base = channel_base
|
|
# the first 8 bits is reserved for the rtlink.OInterface.addr not for channel no.
|
|
self.target_o = channel_base << 8
|
|
|
|
self.with_tag = False
|
|
|
|
self.xml_addr = 0
|
|
self.width_addr = 0
|
|
self.height_addr = 0
|
|
self.acq_mode_addr = 0
|
|
self.acq_start_addr = 0
|
|
self.acq_stop_addr = 0
|
|
self.pixel_fmt_addr = 0
|
|
self.device_tap_geog_addr = 0
|
|
self.img_1_streamid_addr = 0
|
|
|
|
self.xml_url = [0] * xml_url_len
|
|
|
|
@staticmethod
|
|
def get_rtio_channels(channel, **kwargs):
|
|
return [(channel, None)]
|
|
|
|
@kernel
|
|
def trigger(self, linktrig, trigdelay):
|
|
rtio_output(self.channel_base << 8, linktrig | trigdelay << 1)
|
|
|
|
@kernel
|
|
def setup_roi(self, n, x0, y0, x1, y1):
|
|
# DEBUG:
|
|
# c = int64(self.core.ref_multiplier)
|
|
c = int64(8)
|
|
rtio_output(((self.channel_base + 1) << 8) | (4*n+0), x0)
|
|
delay_mu(c)
|
|
rtio_output(((self.channel_base + 1) << 8) | (4*n+1), y0)
|
|
delay_mu(c)
|
|
rtio_output(((self.channel_base + 1) << 8) | (4*n+2), x1)
|
|
delay_mu(c)
|
|
rtio_output(((self.channel_base + 1) << 8) | (4*n+3), y1)
|
|
delay_mu(c)
|
|
|
|
# TODO: add gate
|
|
|
|
@kernel
|
|
def input_mu(self, data, tt, timeout_mu=-1):
|
|
assert len(data) == len(tt)
|
|
channel = self.channel_base + 2
|
|
|
|
for i in range(len(data)):
|
|
timestamp, roi_output = rtio_input_timestamped_data(timeout_mu, channel)
|
|
data[i] = roi_output
|
|
tt[i] = timestamp
|
|
|
|
@kernel
|
|
def init(self):
|
|
self.with_tag = cxp_setup()
|
|
|
|
self.xml_addr = self.read_u32(_XML_URL_ADDR)
|
|
# self.width_addr = self.read_u32(_WIDTH_ADDR)
|
|
# self.height_addr = self.read_u32(_HEIGHT_ADDR)
|
|
self.acq_mode_addr = self.read_u32(_ACQUISITION_MODE_ADDR)
|
|
self.acq_start_addr = self.read_u32(_ACQUISITION_START_ADDR)
|
|
self.acq_stop_addr = self.read_u32(_ACQUISITION_STOP_ADDR)
|
|
self.pixel_fmt_addr = self.read_u32(_PIXEL_FORMAT_ADDR)
|
|
# self.device_tap_geog_addr = self.read_u32(_DEVICE_TAP_GEOG_ADDR)
|
|
self.img_1_streamid_addr = self.read_u32(_IMG_1_STREAMID_ADDR)
|
|
|
|
self.read_words(self.xml_addr, self.xml_url)
|
|
|
|
@kernel
|
|
def read_u32(self, addr: TInt32) -> TInt32:
|
|
return cxp_readu32(addr, self.with_tag)
|
|
|
|
@kernel
|
|
def read_u64(self, addr: TInt32) -> TInt64:
|
|
return cxp_readu64(addr, self.with_tag)
|
|
|
|
@kernel
|
|
def read_words(self, addr: TInt32, val: TList(TInt32)):
|
|
cxp_read_words(addr, val, self.with_tag)
|
|
|
|
@kernel
|
|
def write_u32(self, addr: TInt32, val: TInt32):
|
|
cxp_writeu32(addr, val, self.with_tag)
|
|
|
|
@kernel
|
|
def write_u64(self, addr: TInt32, val: TInt64):
|
|
cxp_writeu64(addr, val, self.with_tag)
|
|
|
|
@kernel
|
|
def write_wide(self, addr: TInt32, vals: TList(TInt32)):
|
|
for i in range(len(vals)):
|
|
cxp_writeu32(addr + i * 4, vals[i], self.with_tag)
|
|
|
|
@kernel
|
|
def read_width(self) -> TInt32:
|
|
return self.read_u32(self.width_addr)
|
|
|
|
@kernel
|
|
def read_height(self) -> TInt32:
|
|
return self.read_u32(self.height_addr)
|
|
|
|
@kernel
|
|
def read_acq_mode(self) -> TInt64:
|
|
return self.read_u64(self.acq_mode_addr)
|
|
|
|
@kernel
|
|
def write_acq_mode(self, val: TInt64):
|
|
self.write_u64(self.acq_mode_addr, val)
|
|
|
|
@kernel
|
|
def start(self):
|
|
self.write_u32(self.acq_start_addr, 0x00000001)
|
|
|
|
@kernel
|
|
def stop(self):
|
|
self.write_u32(self.acq_stop_addr, 0x00000001)
|
|
|
|
@kernel
|
|
def get_frameid(self) -> TInt32:
|
|
return self.read_u32(self.img_1_streamid_addr)
|
|
|
|
@kernel
|
|
def get_pixel_format(self) -> TInt32:
|
|
return self.read_u32(self.pixel_fmt_addr)
|
|
|
|
@host_only
|
|
def print_xml_url(self):
|
|
url = ""
|
|
for x in self.xml_url:
|
|
url += x.to_bytes(4, byteorder="big").decode("ascii")
|
|
print(f"url = {url}")
|
|
if "Local:" in url:
|
|
file_name, start_addr, size = url.split(";", 2)
|
|
print(
|
|
f"file name: {file_name.replace("Local:", "")}, start addr: 0x{start_addr}, size; 0x{size.split("?", 1)[0]} bytes"
|
|
)
|
|
|
|
@kernel
|
|
def get_xml_data(self, xml_start_addr, xml_data):
|
|
i = -1
|
|
addr_offset = 0
|
|
for i in range(len(xml_data) // _MAX_WORD_SIZE):
|
|
buf = [0] * _MAX_WORD_SIZE
|
|
self.read_words(xml_start_addr + addr_offset, buf)
|
|
for j in range(len(buf)):
|
|
xml_data[j+i*_MAX_WORD_SIZE] = buf[j]
|
|
|
|
addr_offset += _MAX_WORD_SIZE * 4
|
|
|
|
buf = [0]*(len(xml_data) % _MAX_WORD_SIZE)
|
|
self.read_words(xml_start_addr + addr_offset, buf)
|
|
for j in range(len(buf)):
|
|
xml_data[j+(i+1)*_MAX_WORD_SIZE] = buf[j]
|
|
|
|
@host_only
|
|
def write_xml_data(self, xml_data, file_path):
|
|
byte_arr = bytearray()
|
|
for d in xml_data:
|
|
byte_arr += d.to_bytes(4, "big", signed=True)
|
|
with open(file_path, "wb") as binary_file:
|
|
binary_file.write(byte_arr)
|
|
|
|
|
|
# From the camera XML files
|
|
_USER_SET_SELECTOR = 0x10000050
|
|
_REAL_ACQ_MODE = 0x10000bb4
|
|
_REAL_ACQ_START = 0x10000498
|
|
_REAL_ACQ_STOP = 0x100004a4
|
|
_REAL_ACQ_ABORT = 0x100004b0 #stop all acq immediately
|
|
_BSL_SENSOR_STAND_BY = 0x100004c8 # put sensor in standby mode, certain parameter can only be change during standby
|
|
_BSL_POWER_MODE = 0x100000b4
|
|
# strange d_470 -> d_469 (= 3) perhaps a obscure security trick? So I guess looking @ Index"3" is fine?
|
|
_TRIG_MODE_INDEX_3 = 0x10001424 # d_87
|
|
_TRIG_SRC_INDEX_3 = 0x100081ac # d_479
|
|
_TRIG_ACT_INDEX_3 = 0x1000293c # d_502
|
|
_TRIG_SOFTWARE_INDEX_3 = 0x10000c34 # d_525
|
|
|
|
CXP_TRIG = True
|
|
|
|
class IdleKernel(EnvExperiment):
|
|
def build(self):
|
|
self.setattr_device("core")
|
|
self.setattr_device("led0")
|
|
|
|
# declare the class before using it in kernel
|
|
self.cxp = CoaXPress(0x0)
|
|
# self.vals = [0]*0x11ab3
|
|
|
|
xml_word_size = math.ceil(0x11ab3/4)
|
|
self.vals = [0] * xml_word_size
|
|
|
|
self.cnt = [0]
|
|
self.timestamp = [np.int64(0)]
|
|
|
|
@kernel
|
|
def camera_init(self):
|
|
self.cxp.init()
|
|
|
|
@kernel
|
|
def camera_trigger_setup(self):
|
|
# self.cxp.init()
|
|
|
|
# DEBUG: get xml data
|
|
# self.cxp.get_xml_data(0xc0000000, self.vals)
|
|
|
|
|
|
# DEBUG: Try to setup trigger
|
|
# All address below is from the XML, what's the point of bootstrap anyway?
|
|
# NOTE: setting is persistent over ConnectionReset but NOT power cycle
|
|
# self.cxp.write_u32(_REAL_ACQ_MODE, 1) # single frame mode
|
|
# self.cxp.write_u32(_REAL_ACQ_START, 1) # single acq start
|
|
# self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end
|
|
# self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT
|
|
|
|
# boA2448-250cm is area scan camera:
|
|
# see https://docs.baslerweb.com/triggered-image-acquisition#hardware-and-software-triggering to setup triggering properly
|
|
|
|
# TRIGGER: setup
|
|
# self.cxp.write_u32(_TRIG_SELECTOR, 3) # FrameStart by default, boA xml b_469 don't have an address for some reason
|
|
self.cxp.write_u32(_TRIG_MODE_INDEX_3, 1) # ON
|
|
if CXP_TRIG:
|
|
self.cxp.write_u32(_TRIG_SRC_INDEX_3, 7) # CXPTrigger0
|
|
else:
|
|
self.cxp.write_u32(_TRIG_SRC_INDEX_3, 0) # use software trigger
|
|
self.cxp.write_u32(_TRIG_ACT_INDEX_3, 2) # trig on anyedge
|
|
|
|
cxp_debug_frame_print()
|
|
|
|
# TAKING PICTURE
|
|
self.cxp.write_u32(_REAL_ACQ_MODE, 1) # single frame mode
|
|
self.cxp.write_u32(_REAL_ACQ_START, 1) # single acq start
|
|
|
|
# STOP acq
|
|
# self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end
|
|
# self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT
|
|
|
|
# self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1)
|
|
return self.cxp.read_u32(_BSL_POWER_MODE)
|
|
|
|
@kernel
|
|
def camera_trigger(self):
|
|
# reset mu for rtio
|
|
self.core.reset()
|
|
self.core.break_realtime()
|
|
|
|
self.cxp.setup_roi(0, 1, 1, 10, 10)
|
|
delay_mu(1000000)
|
|
|
|
if CXP_TRIG:
|
|
self.cxp.trigger(0 ,0x00)
|
|
else:
|
|
self.cxp.write_u32(_TRIG_SOFTWARE_INDEX_3, 0) # software trigger via register write
|
|
|
|
delay_mu(100)
|
|
|
|
# self.cxp.write_u32(_REAL_ACQ_STOP, 1) # single acq end
|
|
# self.cxp.write_u32(_REAL_ACQ_ABORT, 1) # single acq ABORT
|
|
# self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1)
|
|
|
|
# NOTE: This may not print when using CXP hardware TRIG
|
|
# As the write_u32 trigger a packet printout that delays the CPU enough that the frame arrive
|
|
# But using hw trigger, the print is not necessory i.e. not enough time delay for the zc706 to receive the frame
|
|
# cxp_debug_frame_print()
|
|
|
|
self.cxp.input_mu(self.cnt, self.timestamp, 10000)
|
|
|
|
for _ in range(4):
|
|
cxp_debug_frame_print()
|
|
|
|
# reduce power draw & temperature
|
|
# overtemperature can cause unstable connection
|
|
self.cxp.write_u32(_BSL_SENSOR_STAND_BY, 1)
|
|
|
|
return self.cxp.read_u32(_BSL_POWER_MODE)
|
|
|
|
def print_hex(self, arr: TList(TInt32)):
|
|
print("[{}]".format(", ".join(hex(np.uint32(x)) for x in arr)))
|
|
|
|
def run(self):
|
|
self.camera_init()
|
|
print(f"power mode before trigger = {self.camera_trigger_setup()}")
|
|
print(f"power mode after trigger = {self.camera_trigger()}")
|
|
print(f"count = {np.uint32(self.cnt)} | timestamp = {self.timestamp}")
|
|
|
|
|
|
# self.cxp.print_xml_url()
|
|
# self.cxp.write_xml_data(self.vals, "genicam_16e13898.zip")
|