coredevice/analyzer: basic VCD writing

This commit is contained in:
Sebastien Bourdeauducq 2015-12-20 19:32:52 +08:00
parent e4615e7b37
commit cdcb57effe
2 changed files with 116 additions and 6 deletions

View File

@ -1,8 +1,13 @@
from enum import Enum
from operator import itemgetter
from collections import namedtuple
from itertools import count
from enum import Enum
import struct
import importlib
import logging
from artiq.coredevice import ttl
logger = logging.getLogger(__name__)
@ -75,3 +80,100 @@ def decode_dump(data):
position += 32
return messages
def vcd_codes():
codechars = [chr(i) for i in range(33, 127)]
for n in count():
q, r = divmod(n, len(codechars))
code = codechars[r]
while q > 0:
q, r = divmod(q, len(codechars))
code = codechars[r] + code
yield code
class VCDChannel:
def __init__(self, out, code):
self.out = out
self.code = code
def set_value(self, value):
if len(value) > 1:
self.out.write("b" + value + " " + self.code + "\n")
else:
self.out.write(value + self.code + "\n")
class VCDManager:
def __init__(self, filename):
self.out = open(filename, "w")
self.codes = vcd_codes()
self.current_time = None
def get_channel(self, name, width):
code = next(self.codes)
self.out.write("$var wire {width} {code} {name} $end\n"
.format(name=name, code=code, width=width))
return VCDChannel(self.out, code)
def set_time(self, time):
if time != self.current_time:
self.out.write("#{}\n".format(time))
self.current_time = time
def close(self):
self.out.close()
class TTLHandler:
def __init__(self, vcd_manager, name):
self.channel_value = vcd_manager.get_channel(name, 1)
self.last_value = "X"
self.oe = True
def process_message(self, message):
if isinstance(message, OutputMessage):
if message.address == 0:
self.last_value = str(message.data)
if self.oe:
self.channel_value.set_value(self.last_value)
elif messages.address == 1:
self.oe = bool(message.data)
if self.oe:
self.channel_value.set_value(self.last_value)
else:
self.channel_value.set_value("X")
def create_channel_handlers(vcd_manager, devices):
channel_handlers = dict()
for name, desc in sorted(devices.items(), key=itemgetter(0)):
if isinstance(desc, dict) and desc["type"] == "local":
module = importlib.import_module(desc["module"])
device_class = getattr(module, desc["class"])
if device_class in {ttl.TTLOut, ttl.TTLInOut}:
channel = desc["arguments"]["channel"]
channel_handlers[channel] = TTLHandler(vcd_manager, name)
return channel_handlers
def get_message_time(message):
return getattr(message, "timestamp", message.rtio_counter)
def messages_to_vcd(filename, devices, messages):
messages = [m for m in messages if get_message_time(m)] # TODO: remove this hack
messages = sorted(messages, key=get_message_time)
vcd_manager = VCDManager(filename)
try:
channel_handlers = create_channel_handlers(vcd_manager, devices)
vcd_manager.set_time(0)
if messages:
start_time = get_message_time(messages[0])
for message in messages:
if message.channel in channel_handlers:
vcd_manager.set_time(
get_message_time(message) - start_time)
channel_handlers[message.channel].process_message(message)
finally:
vcd_manager.close()

View File

@ -5,7 +5,7 @@ import struct
from artiq.master.databases import DeviceDB
from artiq.master.worker_db import DeviceManager
from artiq.coredevice.analyzer import decode_dump
from artiq.coredevice.analyzer import decode_dump, messages_to_vcd
def get_argparser():
@ -46,7 +46,12 @@ def get_argparser():
subparsers.add_parser("cfg-erase", help="erase core device config")
subparsers.add_parser("analyzer-dump")
p_analyzer = subparsers.add_parser("analyzer-dump",
help="dump analyzer contents")
p_analyzer.add_argument("-m", default=False, action="store_true",
help="print raw messages")
p_analyzer.add_argument("-f", type=str, default="",
help="format and write contents to VCD file")
return parser
@ -79,9 +84,12 @@ def main():
elif args.action == "cfg-erase":
comm.flash_storage_erase()
elif args.action == "analyzer-dump":
dump = comm.get_analyzer_dump()
for msg in decode_dump(dump):
print(msg)
messages = decode_dump(comm.get_analyzer_dump())
if args.m:
for message in messages:
print(message)
if args.f:
messages_to_vcd(args.f, device_mgr.get_device_db(), messages)
finally:
device_mgr.close_devices()