Files
asi_usb_driver/driver.py

645 lines
19 KiB
Python

import array
from time import sleep, time_ns
from regmap import REGISTER_MAP
# display image
# magick -size 1920x1080 -depth 8 GRAY:image_001.raw image_001.jpg
GET_FW_VERS = 0xAD
IS_USB3 = 0xB4
GET_TEMP = 0xB3
READ_FPGA_REG = 0xBC
WRITE_FPGA_REG = 0xBD
READ_CAM_REG = 0xA7
WRITE_CAM_REG = 0xA6
WRITE_CAM_REG_BYTE = 0xB6
READ_CAM_REG_BYTE = 0xB7
TOGGLE_GPI = 0xBE
READ_SPI_FLASH = 0xC3
WRITE_SPI_FLASH = 0xC2
ERASE_SPI_FLASH = 0xC4
GET_SERIAL_NO = 0xC8
RESET_OR_START = 0xAF
RESET_OR_END = 0xAA
START_STREAM = 0xA9
DIR_READ = True
DIR_WRITE = False
BITDEPTH8 = 0
BITDEPTH16 = 1
BLANK_LINE_OFFSET = 0x3C
REG_FRAME_LENGTH_PKG_MIN = 0xE6
FPGA_SKIP_LINE = 0x2
def send_cmd(dev, bRequest, wValue, wIndex, direction, data_or_wLength, timeout=500):
return dev.ctrl_transfer(
0xC0 if direction else 0x40,
bRequest,
wValue,
wIndex,
data_or_wLength,
timeout,
)
class CameraFPGA:
def __init__(self, dev):
self.dev = dev
self.model = ""
self.version = [0 for _ in range(4)]
self.status_0 = 0
self.status_1 = 0
self.ddr = True
def read(self, addr):
return send_cmd(self.dev, READ_FPGA_REG, addr, 0, True, 1)[0]
def write(self, addr, data):
send_cmd(self.dev, WRITE_FPGA_REG, addr, data, False, [])
def get_version(self):
res = self.read(0x1C)
if res < 3:
res = self.read(0x1D)
self.model = res
ver1 = self.read(0x1E)
next_addr = 0x1F
else:
res = self.read(0x1D)
res2 = self.read(0x1E)
self.model = (res2 << 8) | res
self.read(0x1F)
ver1 = self.read(0x20)
next_addr = 0x21
ver2 = self.read(next_addr)
self.version[0] = ver1 >> 4
self.version[1] = ver1 & 0xF
self.version[2] = ver2 >> 4
self.version[3] = ver2 & 0xF
return (self.model, self.version)
def start(self):
self.status_0 = self.read(0)
self.status_0 &= 0xEF
self.write(0, self.status_0)
def stop(self):
self.status_0 = self.read(0)
self.status_0 |= 0x10
self.write(0, self.status_0)
def reset(self):
self.status_0 &= 0xFE
self.write(0, self.status_0)
self.status_0 |= 1
def enable_ddr(self, enable):
self.status_1 = self.read(10)
self.ddr = enable
if enable:
self.status_1 &= 0xBF
else:
self.status_1 |= 0x40
self.write(10, self.status_1)
def addr_test(self):
for _ in range(5):
if self.read(0x23) != 0:
return True
self.write(0x18, 0x80)
sleep(0.02)
return False
def set_as_master(self, enable):
self.status_0 = self.read(0)
if enable:
self.status_0 |= 0x20
else:
self.status_0 &= 0xDF
self.write(0, self.status_0)
def set_adc_width_output_width(self, adc_width, output_width):
self.status_1 = self.read(10)
if adc_width == 1:
self.status_1 |= 1
else:
self.status_1 &= 0xFE
if output_width == 1:
self.status_1 |= 0x10
else:
self.status_1 &= 0xEF
self.write(10, self.status_1)
def set_gain(self, gains):
assert len(gains) == 4
self.write(1, 1)
for i, g in enumerate(gains):
self.write(i + 0xC, g)
self.write(1, 0)
def set_vmax(self, vmax):
vmax = min(0xFFFFFF, vmax)
self.write(1, 1)
self.write(0x10, vmax & 0xFF)
self.write(0x11, (vmax >> 8) & 0xFF)
self.write(0x12, (vmax >> 16) & 0xFF)
self.write(1, 0)
def set_hmax(self, hmax):
hmax = min(0xFFFF, hmax)
self.write(1, 1)
self.write(0x13, hmax & 0xFF)
self.write(0x14, (hmax >> 8) & 0xFF)
self.write(1, 0)
def set_height(self, height):
self.write(1, 1)
self.write(8, height & 0xFF)
self.write(9, (height >> 8) & 0xFF)
self.write(1, 0)
def set_width(self, width):
self.write(1, 1)
self.write(4, width & 0xFF)
self.write(5, (width >> 8) & 0xFF)
self.write(1, 0)
def set_bandwidth(self, fps_perc):
bandwidth = max(1, min(int((25600 / fps_perc) - 256), 0xFFFE))
self.write(1, 1)
self.write(0x24, bandwidth & 0xFF)
self.write(0x25, (bandwidth >> 8) & 0xFF)
self.write(1, 0)
def set_bin_data_len(self, length):
if length == 0xFFFFFFFF:
length = 0xFFFF00FE
self.write(1, 1)
self.write(0x40, length & 0xFF)
self.write(0x41, (length >> 8) & 0xFF)
self.write(0x42, (length >> 0x10) & 0xFF)
self.write(0x43, (length >> 0x18) & 0xFF)
self.write(1, 0)
def set_hblk(self, hblk):
self.write(1, 1)
self.write(2, hblk & 0xFF)
self.write(3, (hblk >> 8) & 0xFF)
self.write(1, 0)
def set_vblk(self, vblk):
self.write(1, 1)
self.write(6, vblk & 0xFF)
self.write(7, (vblk >> 8) & 0xFF)
self.write(1, 0)
def set_wait_mode(self, enable):
self.status_0 = self.read(0)
if enable:
self.status_0 |= 0x40
else:
self.status_0 &= 0xBF
self.write(0, self.status_0)
def set_trigger_mode(self, enable):
self.status_0 = self.read(0)
if enable:
self.status_0 |= 0x80
else:
self.status_0 &= 0x7F
self.write(0, self.status_0)
def set_trigger_signal(self, enable):
reg = self.read(0xB)
if enable:
reg |= 0x1
else:
reg &= 0xFE
self.write(0xB, reg)
def set_xhs_stop(self, enable):
reg = self.read(0xB)
if enable:
reg |= 0x10
else:
reg &= 0xEF
self.write(0xB, reg)
def set_low_power(self, enable):
reg = self.read(0x19)
if enable:
reg |= 1
else:
reg &= 0xFE
self.write(0x19, reg)
class IMX662:
def __init__(self, dev):
self.dev = dev
def write(self, addr, data):
send_cmd(self.dev, WRITE_CAM_REG_BYTE, addr, data, DIR_WRITE, [])
def read(self, addr):
return send_cmd(self.dev, READ_CAM_REG_BYTE, addr, 0, DIR_READ, 1)[0]
def write_register_map(self):
for addr, data in REGISTER_MAP:
self.write(addr, data)
def setup_registers(self):
self.stop()
self.write_register_map()
self.write(0x3018, 0x14)
self.write(0x301B, 0)
self.write(0x3022, 1)
self.write(0x3023, 1)
self.start()
def start(self):
self.write(0x3001, 0)
def stop(self):
self.write(0x3001, 1)
def set_brightness(self, brightness):
self.stop()
self.write(0x30DC, brightness & 0xFF)
self.write(0x30DD, brightness >> 8)
self.start()
def set_ssh1(self, ssh1):
self.stop()
self.write(0x3050, ssh1 & 0xFF)
self.write(0x3051, ssh1 >> 8 & 0xFF)
self.write(0x3052, ssh1 >> 16 & 0xFF)
self.start()
def set_hmax(self, hmax):
self.stop()
self.write(0x302C, hmax & 0xFF)
self.write(0x302D, (hmax >> 8) & 0xFF)
self.start()
def set_gain(self, mode, gain):
self.stop()
self.write(0x3030, mode)
self.write(0x3070, gain)
self.write(0x3071, 0)
self.start()
def set_height_width(self, height, width):
height_rounded = height
width_rounded = width
if width % 0x10 != 0:
width_rounded = (width + 0x10) - (width % 0x10)
if height % 0x10 != 0:
height_rounded = (height + 0x10) - (height % 0x10)
# this is not multiple of 4, but seems to work
height_rounded = height_rounded + FPGA_SKIP_LINE
self.stop()
self.write(0x303E, width_rounded & 0xFF)
self.write(0x303F, (width_rounded >> 8) & 0xFF)
self.write(0x3046, height_rounded & 0xFF)
self.write(0x3047, (height_rounded >> 8) & 0xFF)
self.start()
def set_start_pos(self, start_x, start_y):
self.stop()
self.write(0x303C, start_x & 0xFF)
self.write(0x303D, (start_x >> 8) & 0xFF)
self.write(0x3044, start_y)
self.write(0x3045, (start_y >> 8) & 0xFF)
self.start()
class ASI662MM:
def __init__(self, dev):
self.dev = dev
self.fpga = CameraFPGA(dev)
self.sensor = IMX662(dev)
self.bit_depth = BITDEPTH8
# can be 1, 2 or 4
self.binning = 1
self.high_speed = 0
self.brightness = 50
# cmos clock
self.fclk = 20000
self.max_width = 1920
self.max_height = 1080
self.width = self.max_width
self.height = self.max_height
self.start_x = 0
self.start_y = 0
self.exposure = 100000
self.fps_perc = 40
# also the HMAX
self.pkg = REG_FRAME_LENGTH_PKG_MIN
self.log2_width = 4
self.gain = 0
self.long_exposure = False
self.calc_frame_time()
def send_cmd(self, *args, **kwargs):
return send_cmd(self.dev, *args, **kwargs)
def get_serial_no(self):
return list(self.send_cmd(GET_SERIAL_NO, 0, 0, DIR_READ, 8))
def get_firmware_vers(self):
ver = self.send_cmd(GET_FW_VERS, 0, 0, DIR_READ, 2)
version = 0x10
if chr(ver[0]) == "V":
version = ver[1]
if ver[1] <= 0xF:
version <<= 4
return version
def is_usb3_host(self):
return bool(self.send_cmd(IS_USB3, 0, 0, DIR_READ, 1)[0])
def set_gpif32dq(self, enable):
self.send_cmd(TOGGLE_GPI, int(enable), 0, DIR_WRITE, [])
def get_temperature(self):
val = self.send_cmd(GET_TEMP, 0, 0, DIR_READ, 2)
val = (val[0] >> 4) + (val[1] << 4)
if val >= 0x800:
temp = (0x1000 - val) * -0.0625
else:
temp = val * 0.0625
return temp
def read_spi_from_flash(self, addr, length):
return self.send_cmd(READ_SPI_FLASH, 0, addr >> 8, DIR_READ, length)
def write_to_spi_flash(self, addr, data):
self.send_cmd(WRITE_SPI_FLASH, 0, addr >> 8, DIR_WRITE, data)
def erase_sector_spi_flash(self, sector):
self.send_cmd(ERASE_SPI_FLASH, 1, sector, DIR_WRITE, "\x01")
while True:
ret = self.send_cmd(ERASE_SPI_FLASH, 0, sector, DIR_READ, 1)
if len(ret) == 0:
break
def read_spi(self, addr, length):
self.set_gpif32dq(False)
data = self.read_spi_from_flash(0x70000, 0x100)
data = data[addr : addr + length]
self.set_gpif32dq(True)
return data
def write_spi(self, addr, data):
self.set_gpif32dq(False)
flash = self.read_spi_from_flash(0x70000, 0x100)
flash[addr : addr + len(data)] = data
self.write_to_spi_flash(0x70000, flash)
self.set_gpif32dq(True)
# TODO: needed by hpc
def read_from_flah_uncompress(self):
pass
# binning, bit_depth and high_speed modes
def camera_mode_check(self):
val = self.fpga.read(0x1C)
if 0xC0 > val >= 0xA0:
return (1, 1, 0)
if val < 0xC0:
return (0, 0, 0)
return (1, 1, 1)
# TODO: hot pixel correlation
def set_hpc_state(self, enable):
pass
def start_cmd(self):
self.send_cmd(RESET_OR_START, 0, 0, DIR_WRITE, [], timeout=200)
def stop_sensor_streaming(self):
self.fpga.stop()
self.sensor.write(0x3000, 1)
def start_sensor_streaming(self):
self.sensor.write(0x3004, 0)
self.sensor.write(0x3000, 0)
sleep(0.03)
self.fpga.start()
# TODO: additional details based on usb fps percentage
def calc_frame_time(self):
num_lines = BLANK_LINE_OFFSET + (self.height * self.binning)
line_time = (self.pkg * 1000) / self.fclk
self.frame_time = num_lines * line_time
def init_sensor_mode(self, binning, bit_depth, high_speed):
self.binning = binning
self.bit_depth = bit_depth
self.high_speed = high_speed
adc_width = 0
self.sensor.stop()
if binning not in [2, 4]:
self.sensor.write(0x301B, 0)
if high_speed == 0 or bit_depth == BITDEPTH16:
adc_width = 1
self.fpga.set_adc_width_output_width(adc_width, bit_depth)
self.sensor.write(0x3022, adc_width)
self.sensor.write(0x3023, 0x01)
self.sensor.write(0x3C38, 0x21)
self.sensor.start()
def get_image_bytes(self):
binning = self.binning
if binning in [2, 4]:
binning >>= 1
return self.width * self.height * (1 + self.bit_depth) * (binning**2)
def set_brightness(self, brightness):
assert 0 <= brightness <= 300
self.brightness = brightness
self.sensor.set_brightness(brightness)
def calc_max_fps(self):
pass
# TODO: complete long exposure mode, assumes no hardware bin atm
# exposure default value is 100000
def set_exposure(self, exposure):
assert 32 <= exposure <= 2000000000
self.exposure = exposure
if exposure >= 1000000:
if not self.long_exposure:
self.fpga.set_wait_mode(True)
self.fpga.set_trigger_mode(True)
self.long_exposure = True
elif self.long_exposure:
self.fpga.set_trigger_mode(False)
self.fpga.set_wait_mode(False)
self.long_exposure = False
self.calc_max_fps()
self.calc_frame_time()
line_time = (self.pkg * 1000) / self.fclk
if self.long_exposure:
exposure = int(self.frame_time + 10000)
if exposure > self.frame_time:
# increase vertical span, minimal storage time adjustment
if self.binning not in [2, 4]:
const = 8
else:
const = 0x224
vmax = min(int(exposure / line_time) + const, 0xFFFFFF)
ssh1 = 8
else:
# increase storage time adjustment, keep vertical span to a frame
num_lines = BLANK_LINE_OFFSET + (self.height * self.binning)
if self.binning in [2, 4]:
num_lines *= 2
integration_lines = int(exposure / line_time)
ssh1 = num_lines - integration_lines
ssh1 = max(8, min(ssh1, num_lines - 8))
vmax = min(0xFFFFFF, num_lines)
ssh1 = min(0x1FFFF, ssh1)
self.fpga.set_vmax(vmax)
self.sensor.set_ssh1(ssh1)
def set_gain(self, gain, auto_gain=False):
assert 0 <= gain <= 600
self.gain = gain
if gain > 251:
gain = (gain - 150) // 3
mode = 1
else:
gain = gain // 3
mode = 0
self.sensor.set_gain(mode, gain)
# TODO: finish
def set_fps_perc(self, fps_perc, auto_fps=False):
assert 40 <= fps_perc <= 100
self.fps_perc = fps_perc
if self.fpga.ddr:
pass
else:
pass
hmax = int(self.pkg * 1.85625)
# according to datasheet, this is only relevant with sensor in master mode
self.sensor.set_hmax(hmax)
self.fpga.set_hmax(self.pkg)
self.fpga.set_bandwidth(fps_perc)
self.calc_frame_time()
self.calc_max_fps()
# TODO: dark buffer, hpc table support, ensure multiple of 2 hst, multiple of 4 vst, assert in height width
def set_start_pos(self, start_x, start_y):
self.start_x = start_x
self.start_y = start_y
self.fpga.set_hblk(0)
self.fpga.set_vblk(FPGA_SKIP_LINE)
self.sensor.set_start_pos(self.start_x, self.start_y)
def set_bit_depth(self, bit_depth):
self.bit_depth = bit_depth
if self.high_speed == 0 or bit_depth == BITDEPTH16:
self.fpga.set_adc_width_output_width(1, bit_depth)
else:
self.fpga.set_adc_width_output_width(0, bit_depth)
# TODO: support binning
def set_roi(self, width, height, start_x, start_y):
self.set_start_pos(start_x, start_y)
self.width = width * self.binning
self.height = height * self.binning
data_len = width * height * (1 + self.bit_depth) * (self.binning**2) + 3
self.fpga.set_bin_data_len(data_len >> 2)
self.sensor.set_height_width(height, width)
self.fpga.set_height(height)
self.fpga.set_width(width)
def init_camera(self):
self.fpga.get_version()
self.sensor.setup_registers()
self.fpga.reset()
sleep(0.02)
send_cmd(self.dev, RESET_OR_START, 0, 0, DIR_WRITE, [], timeout=200)
if self.fpga.addr_test():
self.fpga.set_as_master(True)
self.fpga.stop()
self.fpga.enable_ddr(True)
self.fpga.set_adc_width_output_width(1, 0)
self.fpga.set_gain([0x80 for _ in range(4)])
self.set_brightness(0)
# TODO: if auto_fps, perform adjustment
self.fclk = 20000
self.init_sensor_mode(1, 0, 0)
self.set_fps_perc(40)
self.set_gain(0)
self.set_exposure(100000)
self.stop_sensor_streaming()
return True
else:
return False
def take_picture(self):
self.start_capture()
try:
return self.get_frame()
finally:
self.stop_capture()
def start_capture(self):
self.num_transfers = (self.get_image_bytes() // 0x100000) + 1
sleep(0.05)
self.send_cmd(RESET_OR_END, 0, 0, DIR_WRITE, [], 200)
self.stop_sensor_streaming()
self.send_cmd(START_STREAM, 0, 0, DIR_WRITE, [], 200)
self.start_sensor_streaming()
self.dev.clear_halt(0x81)
def get_frame(self):
if self.long_exposure:
sleep(0.025)
self.fpga.set_trigger_signal(True)
start_ns = time_ns()
counter = 0
if self.exposure < 1001000:
sleep((self.exposure - 225000) / 1e6)
else:
while True:
if counter == 0x3C:
self.fpga.set_low_power(True)
elif counter == 0x50:
self.fpga.set_xhs_stop(True)
curr_ns = time_ns()
sleep(0.01)
if self.exposure * 1000 - 225000000 <= curr_ns - start_ns:
break
counter += 1
self.fpga.set_low_power(False)
sleep(0.2)
self.fpga.set_xhs_stop(False)
self.fpga.set_trigger_signal(False)
data = array.array("B", [])
remaining_bytes = self.get_image_bytes()
while remaining_bytes > 0:
read_bytes = min(remaining_bytes, 0x100000)
data += self.dev.read(0x81, read_bytes)
remaining_bytes -= read_bytes
return data
def stop_capture(self):
self.stop_sensor_streaming()
send_cmd(self.dev, RESET_OR_END, 0, 0, DIR_WRITE, [], 200)
self.dev.clear_halt(0x81)