import typing as tp import numpy as np from numba.core.types import * from numba.experimental import jitclass from wrpll_simulation.config import Timesim_Config from wrpll_simulation.timesim_node import * @jitclass class WRPLL_Timesim(object): cfg: Timesim_Config time: float32[:] freq_diff: float32[:] phase_diff: float32[:] helper_error: int16[:] main_error: int16[:] def __init__(self, cfg: Timesim_Config, rng: tp.Generator): # subclass/inheritance is not supported by numba jitclass # https://github.com/numba/numba/issues/1694 self.cfg = cfg sim_length = cfg.sim_length stop_time = cfg.timestep_size * sim_length self.time = np.linspace(0, stop_time, sim_length).astype(np.float32) self.freq_diff = np.zeros(sim_length, dtype=np.float32) self.phase_diff = np.zeros(sim_length, dtype=np.float32) self.helper_error = np.zeros(sim_length, dtype=np.int16) self.main_error = np.zeros(sim_length, dtype=np.int16) # __post_init__ is not supported by numba jitclass # https://github.com/numba/numba/issues/4037 self.simulate(rng) def simulate(self, rng: tp.Generator): cfg = self.cfg timestep_size = cfg.timestep_size irq_delay = self.seconds_to_step(cfg.irq_delay) i2c_comm_delay = self.seconds_to_step(cfg.i2c_comm_delay) # simulation node gtx = Phase_Accumlator(cfg.gtx_init_freq, cfg.gtx_init_phase) helper = Phase_Accumlator(cfg.helper_init_freq, cfg.helper_init_phase) main = Phase_Accumlator(cfg.main_init_freq, cfg.main_init_phase) ddmtd_gtx = DDMTD(cfg.blind_period) ddmtd_main = DDMTD(cfg.blind_period) gtx_tag_irq = EventManager_IRQ() main_tag_irq = EventManager_IRQ() tag_collector = Tag_Collector(cfg.beating_period) helper_PLL = PI_loop(cfg.helper_PI, cfg.helper_init_freq, 0, cfg.adpll_limit) main_PLL = PI_loop(cfg.main_PI, cfg.main_init_freq, 0, cfg.adpll_limit) counter = 0 print("Running...") for i in range(cfg.sim_length): if cfg.has_jitter: gtx.update(timestep_size + rng.normal(0, cfg.gtx_jitter)) helper.update(timestep_size + rng.normal(0, cfg.dcxo_jitter)) main.update(timestep_size + rng.normal(0, cfg.dcxo_jitter)) else: gtx.update(timestep_size) helper.update(timestep_size) main.update(timestep_size) # GATEWARE if helper.is_rising(): ddmtd_gtx.sync_update(gtx.o, counter) ddmtd_main.sync_update(main.o, counter) # for clock domain crossing gtx_tag_irq.multireg(ddmtd_gtx.tag_ready, ddmtd_gtx.tag) main_tag_irq.multireg(ddmtd_main.tag_ready, ddmtd_main.tag) counter += 1 if main.is_rising(): # Generate interrupt request gtx_tag_irq.sync_update(i) main_tag_irq.sync_update(i) # FIRMWARE if gtx_tag_irq.is_due(i, irq_delay): tag_collector.collect_gtx_tag(gtx_tag_irq.tag_csr) helper_PLL.update(i, tag_collector.get_period_error()) if tag_collector.is_phase_error_ready(): tag_collector.set_phase_error_ready(False) main_PLL.update(i, tag_collector.get_phase_error()) if main_tag_irq.is_due(i, irq_delay): tag_collector.collect_main_tag(main_tag_irq.tag_csr) if tag_collector.is_phase_error_ready(): tag_collector.set_phase_error_ready(False) main_PLL.update(i, tag_collector.get_phase_error()) if helper_PLL.i2c_is_due(i, i2c_comm_delay): helper.set_freq(helper_PLL.get_new_freq()) if main_PLL.i2c_is_due(i, i2c_comm_delay): main.set_freq(main_PLL.get_new_freq()) # Data Logging self.freq_diff[i] = np.float32(main.freq - gtx.freq) self.phase_diff[i] = np.float32((main.phase - gtx.phase) % 360) if self.phase_diff[i] > 180: self.phase_diff[i] -= 360 if helper_PLL.i2c_is_due(i, i2c_comm_delay): self.helper_error[i] = tag_collector.get_period_error() elif i > 0: self.helper_error[i] = self.helper_error[i - 1] if main_PLL.i2c_is_due(i, i2c_comm_delay): self.main_error[i] = tag_collector.get_phase_error() elif i > 0: self.main_error[i] = self.main_error[i - 1] def seconds_to_step(self, seconds: float64): return int(seconds / self.cfg.timestep_size)