nac3artiq: get closer to original ARTIQ semantics in demo

Currently crashes the compiler with:
thread '<unnamed>' panicked at 'called `Option::unwrap()` on a `None` value', nac3core/src/codegen/expr.rs:395:58
escape-analysis
Sebastien Bourdeauducq 2021-10-08 23:41:41 +08:00
parent 82efb0e720
commit 3b10172810
5 changed files with 144 additions and 175 deletions

View File

@ -1,27 +0,0 @@
from numpy import int32, int64
from language import *
@extern
def rtio_init():
raise NotImplementedError("syscall not simulated")
@extern
def rtio_get_counter() -> int64:
raise NotImplementedError("syscall not simulated")
@extern
def rtio_output(target: int32, data: int32):
raise NotImplementedError("syscall not simulated")
@extern
def rtio_input_timestamp(timeout_mu: int64, channel: int32) -> int64:
raise NotImplementedError("syscall not simulated")
@extern
def rtio_input_data(channel: int32) -> int32:
raise NotImplementedError("syscall not simulated")

View File

@ -1,66 +1,26 @@
from language import *
from artiq_builtins import *
from min_artiq import *
from numpy import int32, int64
@kernel
class Core:
@kernel
def reset(self):
rtio_init()
at_mu(rtio_get_counter() + int64(125000))
@kernel
def break_realtime(self):
min_now = rtio_get_counter() + int64(125000)
if now_mu() < min_now:
at_mu(min_now)
@kernel
class TTLOut:
channel: int32
target_o: int32
@kernel
def __init__(self, channel: int32):
self.channel = channel
self.target_o = channel << 8
@kernel
def output(self):
pass
@kernel
def set_o(self, o: bool):
rtio_output(self.target_o, 1 if o else 0)
@kernel
def on(self):
self.set_o(True)
@kernel
def off(self):
self.set_o(False)
@kernel
def pulse_mu(self, duration: int64):
self.on()
delay_mu(duration)
self.off()
@kernel
class Demo:
@kernel
def run(self):
core = Core()
led = TTLOut(0)
core: Core
led: TTLOut
core.reset()
def __init__(self):
self.core = Core()
self.led = TTLOut(0)
@kernel
def main_kernel(self):
self.core.reset()
while True:
led.pulse_mu(int64(100000000))
self.led.pulse_mu(int64(100000000))
delay_mu(int64(100000000))
def run(self):
self.core.run(self.main_kernel)
if __name__ == "__main__":
run_on_core(Demo().run)
Demo().run()

View File

@ -1,43 +0,0 @@
from language import *
from artiq_builtins import *
from numpy import int32, int64
test_int = 123
test_float = 123.456
test_list = [1] * 1
test_list2 = [[1.1], [], [3.0]]
test_list_fail = [1, 2, 3.0]
test_tuple = (1, 2, 3.0)
@kernel
class Test:
a: int32
@kernel
def __init__(self, a: int32):
self.a = a
test = Test(1)
print(test.a)
@kernel
class Demo:
@kernel
def run(self):
while True:
delay_mu(round64(test_float * 2.0))
delay_mu(int64(test_int))
delay_mu(int64(test_list[0]))
# delay_mu(int64(test_list_fail[0]))
delay_mu(int64(test_tuple[0]))
delay_mu(int64(test_tuple[2]))
delay_mu(int64(test_list2[2][0]))
delay_mu(int64(test.a))
test.a = 10
delay_mu(int64(test.a))
if __name__ == "__main__":
run_on_core(Demo().run)

View File

@ -1,50 +0,0 @@
from inspect import isclass, getmodule
from functools import wraps
import sys
import nac3artiq
import device_db
__all__ = ["extern", "kernel", "run_on_core"]
nac3 = nac3artiq.NAC3(device_db.device_db["core"]["arguments"]["target"])
allow_module_registration = True
registered_ids = set()
def extern(function):
global registered_ids
assert allow_module_registration
module = getmodule(function)
module_id = id(module)
if module_id not in registered_ids:
nac3.register_module(module)
registered_ids.add(module_id)
return function
def kernel(class_or_function):
global allow_module_registration
global registered_ids
assert allow_module_registration
module = getmodule(class_or_function)
module_id = id(module)
if module_id not in registered_ids:
nac3.register_module(module)
registered_ids.add(module_id)
return class_or_function
def get_defined_class(method):
return vars(sys.modules[method.__module__])[method.__qualname__.split('.')[0]]
def run_on_core(method, *args, **kwargs):
global allow_module_registration
if allow_module_registration:
nac3.analyze()
allow_module_registration = False
nac3.compile_method(id(get_defined_class(method)), method.__name__)

129
nac3artiq/min_artiq.py Normal file
View File

@ -0,0 +1,129 @@
from inspect import isclass, getmodule
from functools import wraps
import sys
from numpy import int32, int64
import nac3artiq
import device_db
__all__ = ["extern", "kernel", "portable", "Core", "TTLOut"]
nac3 = nac3artiq.NAC3(device_db.device_db["core"]["arguments"]["target"])
allow_module_registration = True
registered_ids = set()
def register_module_of(obj):
global registered_ids
assert allow_module_registration
module = getmodule(obj)
module_id = id(module)
if module_id not in registered_ids:
nac3.register_module(module)
registered_ids.add(module_id)
def extern(function):
register_module_of(function)
return function
def kernel(class_or_function):
register_module_of(class_or_function)
if isclass(class_or_function):
return class_or_function
else:
@wraps(class_or_function)
def device_only(*args, **kwargs):
raise RuntimeError("Kernels must not be called directly, use core.run(kernel_function) instead")
return device_only
def portable(function):
register_module_of(function)
return function
def get_defined_class(method):
return vars(sys.modules[method.__module__])[method.__qualname__.split('.')[0]]
@extern
def rtio_init():
raise NotImplementedError("syscall not simulated")
@extern
def rtio_get_counter() -> int64:
raise NotImplementedError("syscall not simulated")
@extern
def rtio_output(target: int32, data: int32):
raise NotImplementedError("syscall not simulated")
@extern
def rtio_input_timestamp(timeout_mu: int64, channel: int32) -> int64:
raise NotImplementedError("syscall not simulated")
@extern
def rtio_input_data(channel: int32) -> int32:
raise NotImplementedError("syscall not simulated")
@kernel
class Core:
def run(self, method, *args, **kwargs):
global allow_module_registration
if allow_module_registration:
nac3.analyze()
allow_module_registration = False
nac3.compile_method(id(get_defined_class(method)), method.__name__)
@kernel
def reset(self):
rtio_init()
at_mu(rtio_get_counter() + int64(125000))
@kernel
def break_realtime(self):
min_now = rtio_get_counter() + int64(125000)
if now_mu() < min_now:
at_mu(min_now)
@kernel
class TTLOut:
channel: int32
target_o: int32
def __init__(self, channel: int32):
self.channel = channel
self.target_o = channel << 8
@kernel
def output(self):
pass
@kernel
def set_o(self, o: bool):
rtio_output(self.target_o, 1 if o else 0)
@kernel
def on(self):
self.set_o(True)
@kernel
def off(self):
self.set_o(False)
@kernel
def pulse_mu(self, duration: int64):
self.on()
delay_mu(duration)
self.off()