"""CodeAIR factory test and remote flight interface.
This program allows a CodeX to be paired with the CodeAIR for remote controlled flight and telemetry.
Pairing is done via close-range infrared (IR), while remote flight is via radio link.
"""
import radio
from codeair import *
from flight import *
from time import sleep
import time
import infrared
import json
import os
import microcontroller
import selftest
PAIRING_SAVE_FILENAME = 'pairing.json'
radio.on()
ir = infrared.Infrared()
rf_chan = 0
rf_id = 0
[docs]def radio_send(op, args=()):
"""Transmit a CodeAIR packet (ch, id, op, args) to paired CodeAIR"""
msg = json.dumps((rf_chan, rf_id, op, args))
radio.send(msg)
[docs]def radio_poll(timeout=0.1):
"""Wait up to timeout secs to receive a CodeAIR packet. Return (ch, id, op, args) or None"""
timeout *= 1000
t_start = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t_start) < timeout:
msg = radio.receive()
if msg:
# print("RX: ", msg)
try:
rch, rnid, op, args = json.loads(msg)
except:
continue
if rch == rf_chan and rnid == rf_id:
return (op, args)
else:
sleep(0.01)
return None
blue_brightness = 50
i_led = 0
dir_led = +1
[docs]def cycle_blue():
"""Call periodically to cycle blue LEDs in KITT pattern"""
global i_led, dir_led
leds.set(i_led, 0)
i_led = i_led + dir_led
if i_led == 0 or i_led == 7:
dir_led *= -1
leds.set(i_led, blue_brightness)
[docs]def save_pairing(ch, nid):
"""Save pairing info to local file."""
rec = (ch, nid)
try:
with open(PAIRING_SAVE_FILENAME, 'wb') as fp:
json.dump(rec, fp)
except:
print("Unable to save pairing.")
[docs]def get_saved_pairing():
"""Return (ch, nid) or None if no save file found."""
try:
with open(PAIRING_SAVE_FILENAME) as fp:
ch, nid = json.load(fp)
return (ch, nid)
except:
print("No pairing found.")
return None
def forget_pairing():
try:
os.remove(PAIRING_SAVE_FILENAME)
except:
pass
os.sync()
time.sleep(0.2)
microcontroller.reset()
[docs]def seek_pairing():
"""Wait to receive IR pairing message over infrared.
Return paired (ch, nid).
"""
while True:
cycle_blue()
buf = ir.receive()
if buf:
# print("rx(%d)" % len(buf), buf)
if buf[0] == 'P' and len(buf) == 3:
ch = ord(buf[1])
nid = ord(buf[2])
# print("tx: %d:%d" % (ch, nid))
ir.send('X' + buf[1] + buf[2])
ir.flush()
speaker.beep(440, 100)
elif buf == "ACK":
# print("got ACK!")
break
sleep(0.05)
leds.set_mask(0xFF, 0)
speaker.beep(1000, 100)
for _ in range(3):
pixels.fill(GREEN)
sleep(0.1)
pixels.off()
sleep(0.05)
return (ch, nid)
# Controller info bitfields
CF_INFO_CAN_ARM = 0x01
CF_INFO_IS_ARMED = 0x02
CF_INFO_AUTO_ARM = 0x04
CF_INFO_CAN_FLY = 0x08
CF_INFO_FLYING = 0x10
CF_INFO_TUMBLED = 0x20
CF_INFO_LOCKED = 0x40
CF_INFO_CRASHED = 0x80
[docs]class FlightControl:
"""Manage flying commands from remote paired CodeX"""
# States
IDLE = 0
CAL1 = 1
CAL2 = 2
TAKINGOFF = 3
FLYING = 4
LANDING = 5
statenames = ("IDLE", "CAL1", "CAL2", "TAKINGOFF", "FLYING", "LANDING", )
def __init__(self):
self.TAKEOFF_CEILING = 0.3 # m (initial height)
self.TOO_CLOSE = 0.3 # m (for collision avoidance)
self.HEIGHT_CEILING = 3.0 # m (max height allowed)
self.BATT_MIN = 3.2 # V (lower and we land automatically)
self.WARN_FREQ = 880 # Warning tone pre-takeoff
self.CAL_DELAY = 2000 # ms
self.LANDED_ALT_MM = 25 # mm height considered "landed"
self.ZDIFF_ADJ = 0.2 # m : attempt to correct height if measured vs predicted Z exceeds this.
# Telemetry
self.rng_front = 0
self.rng_up = 0
self.rng_down = 0
self.heading = 0
self.batt = 0
self.cf_info = 0
self.debug_lvl = 1 # 0-5 => none -> verbose
# Tracking poll rate (windowed average)
self.last_poll_ms = 0
self.poll_win = [0] * 10
self.poll_sum = 0
self.poll_rate = 1.0 # Init long, don't fly till we see fast (~100ms) polling
self.z_calc = 0.0 # calculated height
self.t_cal = 0
self.setpoint_override = None
self.takeoff_sound = 0
self.set_state(self.IDLE)
def set_state(self, new_state):
self.state = new_state
leds.set_mask(1 << self.state, blue_brightness)
if self.debug_lvl:
debug = "state=%s, poll_rate=%1.2f" % (self.statenames[self.state], self.poll_rate)
radio_send("prt", (debug,))
print("%9d : state=%d, poll_rate=%1.2f" % (time.ticks_ms(), self.state, self.poll_rate))
[docs] def takeoff(self):
"""Ascend autonomously to self.TAKEOFF_CEILING"""
if self.state != self.IDLE or \
self.rng_down > 50 or \
self.poll_rate > 0.3 or \
not (self.cf_info & CF_INFO_CAN_FLY) or \
self.batt < self.BATT_MIN:
return # Can't take off!
# Reset position estimator
set_param('kalman.resetEstimation', 1)
self.t_cal = time.ticks_ms()
self.z_calc = 0.0
# Autonomous ascent to self.TAKEOFF_CEILING
self.setpoint_override = (0, +0.2, 0)
self.set_state(self.CAL1)
# Begin warning tones
self.takeoff_sound = 0
self.cycle_takeoff_sound()
# Flying colors! Right=GREEN, Left=RED, Rear=WHITE
pixels.set((GREEN, GREEN, RED, RED, WHITE, WHITE, WHITE, WHITE))
[docs] def land(self):
"""Descend gracefully to ground"""
if not self.state == self.FLYING:
return
self.landing_sound = 10
speaker.beep(300 * self.landing_sound * 100, 0)
# Descend to ground
self.setpoint_override = (0, -0.2, 0)
self.set_state(self.LANDING)
def cycle_takeoff_sound(self):
self.takeoff_sound = (self.takeoff_sound + 1) % 5
if self.takeoff_sound < 3:
speaker.beep(self.WARN_FREQ, 0)
else:
speaker.off()
[docs] def update_setpoint(self, x_vel, z_vel, yaw):
"""Handle new OTA setpoint received from radio controller"""
if self.state == self.CAL1:
ms = time.ticks_ms()
if time.ticks_diff(ms, self.t_cal) > 100:
set_param('kalman.resetEstimation', 0)
self.set_state(self.CAL2)
self.t_cal = ms
self.cycle_takeoff_sound()
return
elif self.state == self.CAL2:
ms = time.ticks_ms()
self.cycle_takeoff_sound()
if time.ticks_diff(ms, self.t_cal) > self.CAL_DELAY:
speaker.off()
self.set_state(self.TAKINGOFF)
self.t_cal = ms
return
elif self.state == self.LANDING:
if self.landing_sound:
self.landing_sound -= 1
if self.landing_sound == 0:
speaker.off()
else:
speaker.beep(300 + self.landing_sound * 100, 0)
if self.rng_down < self.LANDED_ALT_MM:
self.stop()
return
elif self.state == self.IDLE:
return
if self.setpoint_override:
x_vel, z_vel, yaw = self.setpoint_override
# Collision avoidance
if self.rng_up < (self.TOO_CLOSE * 1000):
z_vel = min(z_vel, 0) # restrict to down movement
radio_send("prt", ("up limit!",))
if self.rng_front < (self.TOO_CLOSE * 1000):
x_vel = min(x_vel, 0) # restrict to backward movement
radio_send("prt", ("front limit!",))
if self.rng_down < (self.TOO_CLOSE * 1000):
z_vel = max(z_vel, -0.2) # up or slowly descending
radio_send("prt", ("down limit!",))
# Calculate zdistance
zdistance = self.calc_target_height(z_vel)
# Enforce a ceiling
if zdistance > self.HEIGHT_CEILING:
zdistance = self.HEIGHT_CEILING
radio_send("prt", ("enforcing max z!",))
if self.state == self.TAKINGOFF:
zdistance = min(zdistance, self.TAKEOFF_CEILING)
ms = time.ticks_ms()
if time.ticks_diff(ms, self.t_cal) > 2000: # 2 sec should get us to initial height
self.setpoint_override = None
self.set_state(self.FLYING)
# Make it fly!
cf_commander.send_hover_setpoint(x_vel, 0, yaw, zdistance)
if self.debug_lvl > 4:
debug = "hover(v=%1.1f, yr=%1.1f, zabs=%1.3f) zvel=%1.2f, zrng=%3.1f" % (x_vel, yaw, zdistance, z_vel, self.rng_down)
radio_send("prt", (debug,))
[docs] def calc_target_height(self, z_vel):
"""Return the target height at given z_vel considering poll rate"""
self.z_calc += z_vel * self.poll_rate
# Test: detect flying over objects
z_meas = self.rng_down / 1000
z_diff = z_meas - self.z_calc
if z_vel == 0 and abs(z_diff) > self.ZDIFF_ADJ:
# Attempt to correct height estimate
self.z_calc = z_meas
if self.debug_lvl > 1:
radio_send("prt", ("z_diff=%2.3f" % z_diff ,))
return self.z_calc
[docs] def stop(self):
"""Called after landing, or to catastrophically Abort flight"""
# Stop using low level setpoints and hand responsibility over to the high level commander to
# avoid time out when no setpoints are received any more
cf_commander.send_stop_setpoint()
cf_commander.send_notify_setpoint_stop()
self.setpoint_override = None
speaker.off()
pixels.off()
self.set_state(self.IDLE)
[docs] def abort(self):
"""Stop immediately, plummet to the ground! Also, recover from 'CRASHED' state."""
radio_send("prt", ("**ABORT!",))
self.stop()
speaker.off()
if self.cf_info & CF_INFO_CRASHED:
# If we're in crashed state, ABORT command will reboot the STM32
reset_controller()
sleep(0.1)
microcontroller.reset() # The big hammer. Should be able to recover without this :-/
[docs] def track_poll_rate(self):
"""Call each time we're polled, maintains accurate poll rate"""
# Maintaining a windowed average of the poll rate over the last len(self.poll_win) polls.
# Possibly overkill, since we could just hardcode the rate.
ms = time.ticks_ms()
dt = time.ticks_diff(ms, self.last_poll_ms)
self.last_poll_ms = ms
old = self.poll_win.pop()
self.poll_win.insert(0, dt)
self.poll_sum += dt - old
self.poll_rate = (self.poll_sum / len(self.poll_win)) / 1000 # seconds
def check_low_batt(self):
if self.batt < self.BATT_MIN:
self.land()
[docs] def update_dat(self, front, up, down, heading, thrust, info, batt):
"""Called from radio controller poll loop. Should be called at 100ms rate."""
self.track_poll_rate()
self.rng_front = front
self.rng_up = up
self.rng_down = down
self.heading = heading
self.cf_info = info
self.batt = batt
self.check_low_batt()
# Basic telemetry data report is "d1"
# d1 => (front, up, down, heading, thrust, info, batt) -- bat from ESP32 side
# Logging: rangers(2+2+2)+yaw(4)+thrust(4)+info(2) = 16 bytes (26 byte limit)
D1_GROUP = ('range.front', 'range.up', 'range.zrange', 'stateEstimate.yaw', 'stabilizer.thrust', 'supervisor.info')
[docs]def enter_ctrl_mode():
"""Control loop - receive and execute remote commands"""
global rv
print("Ctrl mode: ch=%d, nid=%d" % (rf_chan, rf_id))
radio.config(channel=rf_chan)
# Loop taking commands via radio!
while True:
ret = radio_poll()
if ret:
op, args = ret
if op == "ver":
ver = version()
radio_send(op, (ver,))
elif op == "d1":
# Primary flight telemetry data
dat = get_data(D1_GROUP) + (power.battery_voltage(1), )
fcon.update_dat(*dat)
if args:
x_vel, z_vel, yaw = args
fcon.update_setpoint(x_vel / 100, z_vel / 100, yaw) # Convert cm/s to m/s
radio_send(op, dat)
elif op == "d2":
# Diagnostics for self-test, etc.
dat = get_data(FLOW) # (dX, dY)
radio_send(op, dat)
elif op == "led":
i_led, color = args
n_led = i_led * 2
pixels.set(n_led, color)
pixels.set(n_led + 1, color)
elif op == "fly":
do_fly = args[0]
if do_fly:
fcon.takeoff()
else:
fcon.land()
elif op == "abt":
fcon.abort()
elif op == "py":
try:
rv = None
exec(args[0], locals(), globals()) # explicit locals/globals reqd for frozen
if fcon.debug_lvl:
print(f"exec({args[0]}) => {rv}")
if not rv is None:
radio_send(op, (rv,))
except Exception as err:
print("Error executing py command: ", err)
if buttons.was_pressed(BTN_0):
leds.set_mask(0xFF, 0)
sleep(0.1)
buttons.was_pressed()
for count in range(20):
leds.set_mask(0x80, 70)
sleep(0.1)
leds.set_mask(0xFF, 0)
sleep(0.1)
if buttons.was_pressed(BTN_1):
forget_pairing()
[docs]def run():
"""Main program"""
global fcon, rf_chan, rf_id
print("--- Flight Controller ---")
fcon = FlightControl()
# Check flight controller (STM32) firmware version
selftest.check_firmware()
# Allow remote CodeX to pair
sync = get_saved_pairing()
if not sync:
sync = seek_pairing()
save_pairing(*sync)
# Use paired controller for flying!
rf_chan, rf_id = sync
enter_ctrl_mode()
if __name__ == '__main__':
run()