"""Flight Controller Database Interface
Access to the parameter and logging systems on CodeAIR's Crazyflie flight controller.
See also `flight_lib/flight_lib`
"""
import json
import crtp
import time
import struct
import flight_catalog
SYSLINK_RADIO_RAW = 0x00
SYSLINK_RADIO_CHANNEL = 0x01
SYSLINK_RADIO_DATARATE = 0x02
SYSLINK_RADIO_CONTWAVE = 0x03
SYSLINK_RADIO_RSSI = 0x04
SYSLINK_RADIO_ADDRESS = 0x05
SYSLINK_RADIO_RAW_BROADCAST = 0x06
SYSLINK_RADIO_POWER = 0x07
SYSLINK_RADIO_P2P = 0x08
SYSLINK_RADIO_P2P_ACK = 0x09
SYSLINK_RADIO_P2P_BROADCAST = 0x0A
SYSLINK_PM_SOURCE = 0x10
SYSLINK_PM_ONOFF_SWITCHOFF = 0x11
SYSLINK_PM_BATTERY_VOLTAGE = 0x12
SYSLINK_PM_BATTERY_STATE = 0x13
SYSLINK_PM_BATTERY_AUTOUPDATE = 0x14
SYSLINK_PM_SHUTDOWN_REQUEST = 0x15
SYSLINK_PM_SHUTDOWN_ACK = 0x16
SYSLINK_PM_LED_ON = 0x17
SYSLINK_PM_LED_OFF = 0x18
SYSLINK_OW_SCAN = 0x20
SYSLINK_OW_GETINFO = 0x21
SYSLINK_OW_READ = 0x22
SYSLINK_OW_WRITE = 0x23
SYSLINK_SYS_NRF_VERSION = 0x30
SYSLINK_DEBUG_PROBE = 0xF0
CRTP_PORT_CONSOLE = 0x00
CRTP_PORT_PARAM = 0x02
CRTP_PORT_SETPOINT = 0x03
CRTP_PORT_MEM = 0x04
CRTP_PORT_LOG = 0x05
CRTP_PORT_LOCALIZATION = 0x06
CRTP_PORT_SETPOINT_GENERIC = 0x07
CRTP_PORT_SETPOINT_HL = 0x08
CRTP_PORT_PLATFORM = 0x0D
CRTP_PORT_LINK = 0x0F
# Log 'type' field
LOG_UINT8 = 1
LOG_UINT16 = 2
LOG_UINT32 = 3
LOG_INT8 = 4
LOG_INT16 = 5
LOG_INT32 = 6
LOG_FLOAT = 7
LOG_FP16 = 8
log_type_ops = (
(1, 'B'),
(2, 'H'),
(4, 'I'),
(1, 'b'),
(2, 'h'),
(4, 'i'),
(4, 'f'),
(2, 'e'),
)
#---- Utility functions ----
[docs]def read_crtp(port, channel, timeout_ms=100):
"""Pump raw rx queue from flight controller, return selected data or None if timeout(ms)"""
for _ in range(int(timeout_ms) / 10):
pkt = crtp.receive()
if pkt:
rport, rchan, rdat = pkt
if rport == port and rchan == channel:
return rdat
return None
toc_cache = {} # { crc : {group_dot_name: (item_id, item_type)} }
def toc_lookup(toc, crc, group_dot_name):
# Lookup from toc_param database, cache for future access
dcache = toc_cache.setdefault(crc, {})
param = dcache.get(group_dot_name, None)
if not param:
group, name = group_dot_name.split('.')
for index, ptype, pgroup, pname in toc:
if pgroup == group and pname == name:
param = (index, ptype)
dcache[group_dot_name] = param
break
if not param:
print(f"Error: {group_dot_name} not found.")
return None
return param
[docs]class LogManager:
"""Interface to query the CF logging framework:
* Retrieve the list of available logging items (TOC) : [(id, type, group, name), ...]
* Register new logging configs (log blocks)
* Start/Stop logging
* Also handle fetching of Param TOC, since logic is identical.
This class uses :doc:`crtp` to implement the `log interface <https://www.bitcraze.io/documentation/repository/crazyflie-firmware/master/functional-areas/crtp/crtp_log/>`_
"""
# CRTP channels
TOC_CH = 0
CONTROL_CH = 1
LOG_CH = 2
# Logging system commands.
CMD_GET_ITEM_V2 = 2 # V1 was 0
CMD_GET_INFO_V2 = 3 # V1 was 1
# CONTROL channel commands respond with data[0]==CONTROL_XX, data[2]==err_code (0=success)
CONTROL_STOP_BLOCK = 4
CONTROL_RESET = 5
CONTROL_CREATE_BLOCK_V2 = 6 # V1 was 0
CONTROL_APPEND_BLOCK_V2 = 7 # V1 was 1
CONTROL_START_BLOCK_V2 = 8 # V1 was 3
def __init__(self):
self.load_tocs()
self.expected_toc_count = 0
self.state = 'IDLE'
self.cache = {} # most recent log data {"name": value}
self.log_blocks = [] # up to 16 blocks of ([id0, id1,...], decode_fmt_str, val_sz)
self.latest_update_ms = 0
self.log_rate = 0.1
def load_tocs(self):
self.log_toc = flight_catalog.log_toc
self.log_crc = flight_catalog.log_crc
self.param_toc = flight_catalog.param_toc
self.param_crc = flight_catalog.param_crc
def _err(self, msg):
print("LOG Error: " + msg)
self.state = 'IDLE'
def crtp_rx(self, port, chan, data):
if port != CRTP_PORT_LOG and port != CRTP_PORT_PARAM:
return
elif chan == self.TOC_CH:
# Table of Contents
working_toc = self.log_toc if port == CRTP_PORT_LOG else self.param_toc
if self.state == 'WAIT_INFO':
if data[0] != self.CMD_GET_INFO_V2:
self._err("Unexpected command byte in GET_INFO response")
return
count, crc = struct.unpack_from("<HI", data, 1)
self.expected_toc_count = count
# print(f"TOC: got info, count={self.expected_toc_count}, crc=0x{crc:04X}")
if self.fetch_toc_info_only:
self.toc_info = (count, crc)
self.state = 'IDLE'
else:
if port == CRTP_PORT_LOG:
self.log_crc = crc
else:
self.param_crc = crc
# Begin requesting items
working_toc.clear()
self.get_toc_item(port, 0, True)
elif self.state.startswith('WAIT_ITEM'):
if len(data) < 6:
self._err(f"Item ID not found in TOC, data={data}")
return
elif data[0] != self.CMD_GET_ITEM_V2:
self._err("Unexpected command byte in GET_ITEM response")
return
item_id = data[2] << 8 | data[1]
item_type = data[3] & 0x0F
strings = data[4:].split(b'\x00', 2)
item_group = strings[0].decode('utf-8')
item_name = strings[1].decode('utf-8')
if self.state == 'WAIT_ITEM':
# print(f"TOC item: {(item_id, item_type, item_group, item_name)}")
self.state = 'IDLE'
else:
working_toc.append( (item_id, item_type, item_group, item_name) )
next_id = len(working_toc)
if next_id >= self.expected_toc_count:
self.state = 'IDLE'
else:
self.get_toc_item(port, next_id, True)
elif chan == self.CONTROL_CH:
if self.state == 'WAIT_CONTROL':
# Check if response to our send_ctrl_cmd(), invoke callback
if data[0] == self.ctrl_await_cmd:
self.state = 'IDLE'
if self.ctrl_cb:
self.ctrl_cb(data[2])
def pump_rx_queue(self, timeout_ms=100):
t_start = time.ticks_ms()
while self.state != 'IDLE':
pkt = crtp.receive()
if pkt:
self.crtp_rx(*pkt)
else:
time.sleep(0.01)
if time.ticks_diff(time.ticks_ms(), t_start) > timeout_ms:
self.state = 'IDLE'
return False
return True
def get_toc_item(self, port, id, get_next=False):
self.state = 'WAIT_ITEM_NEXT' if get_next else 'WAIT_ITEM'
# print(f"requesting item id={id}")
crtp.send(port, self.TOC_CH, bytes([self.CMD_GET_ITEM_V2, id & 0xFF, id >> 8]))
[docs] def fetch_toc(self, port, info_only=False):
"""Request TOC info, either PARAM or LOG based on port (CRTP_PORT_LOG or CRTP_PORT_PARAM)"""
if not (port == CRTP_PORT_LOG or port == CRTP_PORT_PARAM):
print("Error: port must be CRTP_PORT_LOG(5) or CRTP_PORT_PARAM(2).")
return False
self.fetch_toc_info_only = info_only # If True only update self.toc_info = (count, crc)
self.state = 'WAIT_INFO'
crtp.send(port, self.TOC_CH, bytes([self.CMD_GET_INFO_V2]))
if self.pump_rx_queue(30000):
# print("TOC fetch complete: %d items" % len(self.log_toc if port == CRTP_PORT_LOG else self.param_toc))
return True
else:
print("TOC fetch timeout.")
return False
[docs] def check_catalog_crcs(self):
"""Read CRCs from flight controller, return True if they match catalog import"""
match = True
self.fetch_toc(CRTP_PORT_LOG, True)
log_count, log_crc = self.toc_info
self.fetch_toc(CRTP_PORT_PARAM, True)
param_count, param_crc = self.toc_info
if log_count != len(flight_catalog.log_toc) or log_crc != flight_catalog.log_crc:
print("Log TOC mismatch.")
match = False
if param_count != len(flight_catalog.param_toc) or param_crc != flight_catalog.param_crc:
print("Param TOC mismatch.")
match = False
return match
[docs] def generate_catalog(self, filename='new_flight_catalog.py'):
"""Create Python file with log/param TOCs to bake into new CodeAIR builds"""
# Note: Using json strings because importing plain toc (list of tuples) causes "pystack exhausted" error.
with open(filename, 'w') as fp:
print("Fetching log TOC")
self.fetch_toc(CRTP_PORT_LOG)
print("Fetching param TOC")
self.fetch_toc(CRTP_PORT_PARAM)
fp.write('"""Generated Python Code: Flight Controller parameter and log info catalog"""\r\nimport json\r\n')
fp.write(f"log_crc = 0x{self.log_crc:04X}\r\nlog_toc = '")
print("Writing log TOC to file")
json.dump(self.log_toc, fp)
fp.write(f"'\r\nlog_toc = json.loads(log_toc)\r\n\r\n")
fp.write(f"param_crc = 0x{self.param_crc:04X}\r\nparam_toc = '")
print("Writing param TOC to file")
json.dump(self.param_toc, fp)
fp.write(f"'\r\nparam_toc = json.loads(param_toc)\r\n\r\n")
print("Wrote catalog file.")
[docs] def register_items(self, item_list, rate=0.1):
"""Register and start logging the given list of items. Removes all prior logging. Rate is seconds.
item_list contains strings of form 'group.variable'.
Internals:
* The Crazyflie allows up to 16 "log blocks" to be registered.
* You register a log block by supplying a list of [TYPE8:ID16] tuples referring to TOC items.
* The resulting LOG_CH reports for a block must fit within a 30 byte packet, where 4 bytes are consumed
with [BLK_ID_8, TIMESTAMP_24], leaving 26 bytes cumulative payload for log values. This means the
max number of IDs you can register in a block depends on the size of the items (8/16/32 bits).
* Ex: you could register up to 6 32-bit items in a log block.
Our Python API exposes a single function to register an id_list, hiding the underlying "log block" system.
* We don't support dynamically appending/deleting discrete blocks. All are created/removed at once.
* Currently we don't support different rates for different params. The underlying system has a rate for each block.
* Currently supports only a single log block.
* TODO: Extend to create up to max (16) blocks to support more log items.
Protocol:
* CRTP(CRTP_PORT_LOG, CONTROL_CH)
* Create block: ( CONTROL_CREATE_BLOCK_V2_8, BLK_ID_8, [TYPE_8, ID_16]*n )
* Delete all: (CONTROL_RESET)
Notes:
Successful registration will initialize -
.. code-block:: python
self.cache # {'name': None,...} # Dict keyed by names from all blocks
self.log_blocks = [
[name0, name1,...], # list of names in this block
decode_fmt_str, # struct format str for decoding this log block
val_sz, # = struct.calcsize(decode_fmt_str)
reg_fields, # registration fields (packet payload)
]
Returns:
bool: True if id_list can be registered, False if it would exceed logging payload limits or if list contains an invalid ID.
"""
# Remove old log blocks
self.unregister_all()
MAX_BLOCK_PAYLOAD = 26
self.log_rate = rate
self.log_blocks = []
cur_block_reg = [] # registration fields ([TYPE_8, ID_16]*n)
cur_block_id = 0
cur_block_val_sz = 0
cur_block_names = []
cur_block_fmt = '<'
for name in item_list:
item = toc_lookup(self.log_toc, self.log_crc, name)
if item:
item_id, item_type = item
else:
return False
# Found item. Add to cache and register log_block.
self.cache[name] = None
val_sz, val_fmt = log_type_ops[item_type - 1]
if cur_block_val_sz + val_sz > MAX_BLOCK_PAYLOAD:
# Advance to next block, or error if overflow.
# TODO: Append cur_block_* to list of block registrations, through up to 16 blocks.
print(f"Error: Exceeded max cumulative log variable size.")
return False
cur_block_names.append(name)
cur_block_val_sz += val_sz
cur_block_fmt += val_fmt
cur_block_reg.extend((item_type, item_id & 0xFF, item_id >> 8))
# Append last (or only) block
self.log_blocks.append(
(cur_block_names, cur_block_fmt, cur_block_val_sz, cur_block_reg)
)
# Register block(s)
for index, b in enumerate(self.log_blocks):
names, fmt, sz, reg = b
if False:
print(f"Create Log Block {index}:")
print(f" names={names}")
reg_record = ' '.join([f'{b:02X}' for b in reg])
print(f" reg=[{reg_record}]")
print(f" fmt({sz})={fmt}")
# TODO: Initiate state machine to send CRTP packets, awaiting each response
# For initial single-block case just send and pray!
crtp.send(CRTP_PORT_LOG, self.CONTROL_CH, bytes([self.CONTROL_CREATE_BLOCK_V2, index] + reg))
return True
[docs] def enable(self, do_enable):
"""Resume / pause logging.
NOTE: Must register_items() FIRST, before enable!
"""
rate_ms = int(self.log_rate * 1000)
#TODO: iterate over self.log_blocks. Just one block for now
index = 0
# def cmd_ack(err):
# print(f"Command ack'd with: {err}")
cmd_ack = None
if do_enable:
self.send_ctrl_cmd(self.CONTROL_START_BLOCK_V2, [index, rate_ms & 0xFF, rate_ms >> 8], cmd_ack)
else:
self.send_ctrl_cmd(self.CONTROL_STOP_BLOCK, [index], cmd_ack)
[docs] def unregister_all(self):
"""Delete all registered log blocks"""
self.send_ctrl_cmd(self.CONTROL_RESET, [])
[docs] def send_ctrl_cmd(self, cmd, data, callback=None):
"""Send control channel command, and callback(err_code) upon ACK. Zero means no error."""
self.ctrl_cb = callback
self.ctrl_await_cmd = cmd
self.state = 'WAIT_CONTROL'
crtp.send(CRTP_PORT_LOG, self.CONTROL_CH, bytes([cmd] + data))
self.pump_rx_queue()
[docs] def decode_log(self, blk_id, timestamp_ms, data):
"""Decode received log values for this block, updating self.cache.
"""
if blk_id >= len(self.log_blocks):
print(f"Error: decoded blk_id={blk_id} out of range!")
return
# Get list of item ids, and struct format string for this block
block_items, block_fmt, val_sz, _ = self.log_blocks[blk_id]
if len(data) != val_sz:
print(f"Error: block {blk_id} wrong data length (val_sz={val_sz}, data={len(data)}).")
self.latest_update_ms = timestamp_ms
vals = struct.unpack(block_fmt, data)
for index, name in enumerate(block_items):
self.cache[name] = vals[index]
# print(f"decode_log @{self.latest_update_ms}ms : {self.cache}") # DEBUG
def fetch_log_data(self, timeout_ms=100):
# Ingest log data from crtp queue, decode, and deposit in self.cache
for _ in range(int(timeout_ms) / 10):
log_dat = crtp.log_read()
if log_dat:
self.decode_log(*log_dat)
break
else:
time.sleep(0.01)
[docs]class ParamManager:
"""Interface to query the CF parameter framework.
Unlike logging, we don't register for events in the param system.
"""
# Ordered type format mapping
types = ('b', 'h', 'i', 'q', 'x', 'e', 'f', 'd', 'B', 'H', 'L', 'Q')
# Type mapping for set_by_name()
types_by_name = {
'U8' : (0x08, 'B'),
'U16' : (0x09, 'H'),
'U32' : (0x0a, 'L'),
'U64' : (0x0b, 'Q'),
'I8' : (0x00, 'b'),
'I16' : (0x01, 'h'),
'I32' : (0x02, 'i'),
'I64' : (0x03, 'q'),
'F16' : (0x05, 'e'),
'F32' : (0x06, 'f'),
'F64' : (0x07, 'd'),
}
def __init__(self, log_mgr):
self.log_mgr = log_mgr # for the param_toc, param_crc
self.cache = {} # group_dot_name: (id, type)
[docs] def set(self, group_dot_name, value):
"""Set parameter based on dotted name, ex: 'motorPowerSet.enable'"""
param = toc_lookup(self.log_mgr.param_toc, self.log_mgr.param_crc, group_dot_name)
if not param:
return
found_id, found_type = param
data = struct.pack("<H%c" % self.types[found_type], found_id, value)
crtp.send(CRTP_PORT_PARAM, 2, data)
[docs] def get(self, group_dot_name):
"""Read parameter. Returns data or None."""
param = toc_lookup(self.log_mgr.param_toc, self.log_mgr.param_crc, group_dot_name)
if not param:
return None
found_id, found_type = param
# Read param
data = struct.pack("<H", found_id)
crtp.flush()
crtp.send(CRTP_PORT_PARAM, 1, data)
# Get response
rdat = read_crtp(CRTP_PORT_PARAM, 1)
if not rdat:
print("Error: timeout reading param")
return None
# Note: See CF:paramReadProcess(CRTPPacket *p) for packet format detail (differs from docs)
vals = struct.unpack_from("<HB%c" % self.types[found_type], rdat)
if vals[0] != found_id:
print(f"Error: unexpected param id ({vals[0]})")
return None
return vals[2]
[docs] def set_by_name(self, group_dot_name, type_id, value):
"""Set param without using TOC lookup (lookup on STM32 instead)"""
# EX: set_param('motorPowerSet.enable', 'U8', 1)
# https://www.bitcraze.io/documentation/repository/crazyflie-firmware/master/functional-areas/crtp/crtp_parameters/#set-by-name
name_array = bytes(group_dot_name.replace('.', '\0') + '\0', 'utf-8')
len_name = len(name_array)
type_code, type_fmt = self.types_by_name[type_id]
data = struct.pack(f'<B{len_name}sB{type_fmt}', 0, name_array, type_code, value)
crtp.send(CRTP_PORT_PARAM, 3, data)