"""CodeAIR: Flight Data Recorder
"""
import time
import traceback
import flight
from codeair_drivers import console_bytes
from ansi_term import *
def _emergency_landing(logger):
"""The default 'abort' function. Make an emergency landing on unhandled exceptions.
This prevents the drone from simply "stopping mid-air" and crashing when an exception occurs.
Args:
logger (FlightDataRecorder): The flight data recorder instance.
"""
if flight.fly.is_flying():
import codeair # Late import to avoid circular dependency
logger.log(ANSI_FG_YELLOW + "Emergency Landing" + ANSI_RESET)
codeair.pixels.fill(codeair.RED)
flight.fly.land()
[docs]class FlightDataRecorder:
"""A class to record flight data and handle logging.
Attributes:
filename (str): The name of the log file. None for default.
abort (function): The function to call on an unhandled exception.
"""
FMODE_OVERWRITE = 0
FMODE_APPEND = 1
FMODE_AUTO = 2
def __init__(self, filename=None, abort=_emergency_landing):
"""Initializes the FlightDataRecorder with a log file and abort function.
"""
self.filename = filename
self.abort = abort
self.log_file = None
self.file_mode = self.FMODE_AUTO
self.do_flush = False
self.reported = False
def _check_filename(self):
"""Checks if the filename is set, and sets a default if not."""
if not self.filename:
import codeair
self.filename = "flight_log.txt"
# Log to SD card if available (user must mount first)
if codeair.sdcard.is_ready():
self.filename = "/sd/" + self.filename
def _get_fmode_str(self):
"""Returns the string representation of the file mode."""
if self.file_mode == self.FMODE_OVERWRITE:
return "wb"
elif self.file_mode == self.FMODE_APPEND:
return "ab"
elif self.reported:
return "wb" # Overwrite if reported
else:
return "ab" # Append if not reported
[docs] def set_file_mode(self, mode, do_flush=False):
"""Sets the file mode for logging. Default is auto mode, which appends unless the log has been reported.
Args:
mode (int): The file mode (self.FMODE_OVERWRITE, self.FMODE_APPEND, self.FMODE_AUTO).
do_flush (bool): Set True to aggressively flush every log write to disk.
"""
self.file_mode = mode
self.do_flush = do_flush
[docs] def print_report(self, check_connection=True):
"""Prints the contents of the previous log file to the console.
Args:
check_connection (bool): If True, check if CodeSpace is connected before reporting.
"""
import codeair
if self.log_file:
print("Can't report while logging.")
return
# Ping CodeSpace to see if we can report.
if check_connection and not codeair.pipe.ping():
return
self.reported = True
try:
self._check_filename()
with open(self.filename, "rb") as f:
print("\n==== Begin Log Report ====")
# Use console_bytes() to print the log file in binary chunks.
# This enables us to handle binary embedded data (images, charts).
while True:
buf = f.read(128)
if not buf:
break
console_bytes(buf)
print("==== End Log Report ====\n")
except OSError:
print("No flight log found.")
def __enter__(self):
"""Enters the context for flight data recording.
Returns:
FlightDataRecorder: The instance of the flight data recorder.
"""
try:
self._check_filename()
file_mode = self._get_fmode_str()
self.log_file = open(self.filename, file_mode)
self.reported = False
self.log(ANSI_FG_CYAN + "--- Flight Log Started ---" + ANSI_RESET)
return self
except OSError as e:
print(f"Error opening log file: {e}")
return self
def __exit__(self, exc_type, exc_value, tb):
"""Exits the context, logging unhandled exceptions.
Args:
exc_type (type): The exception type.
exc_value (Exception): The exception instance.
tb (traceback): The traceback object.
Returns:
bool: False to indicate that exceptions should not be suppressed.
"""
if self.log_file:
if exc_type: # An unhandled exception occurred
if exc_type == KeyboardInterrupt:
self.log(ANSI_FG_YELLOW + "User Stop (KeyboardInterrupt)" + ANSI_RESET)
else:
self.log(ANSI_FG_RED + ">>> Unhandled Exception Occurred <<<" + ANSI_RESET)
self.log_exception(exc_value)
self.log(ANSI_FG_RED + "<<< End of Unhandled Exception >>>" + ANSI_RESET)
if self.abort:
self.abort(self)
self.log(ANSI_FG_CYAN + "--- Flight Log Ended ---" + ANSI_RESET)
self.log_file.close()
self.log_file = None
return False # Do not suppress exceptions
[docs] def log(self, message):
"""Logs a message to the file with a timestamp.
Args:
message (str): The message to log.
"""
timestamp = time.monotonic_ns() / 1000000000
log_message = f"[{timestamp:.3f}] {message}\n"
if self.log_file:
self.log_file.write(log_message)
if self.do_flush:
self.log_file.flush()
[docs] def log_exception(self, e):
"""Logs an exception with a traceback.
Args:
e (Exception): The exception to log.
"""
self.log(f"Exception: {e}")
formatted_traceback = "".join(traceback.format_exception(type(e), e, e.__traceback__))
self.log(formatted_traceback)
[docs] def write(self, data):
"""Writes raw data to the log file.
Args:
data (bytes): The data to write.
"""
if self.log_file:
self.log_file.write(data)
if self.do_flush:
self.log_file.flush()
# Example Usage
if __name__ == "__main__":
# Create an instance of FlightDataRecorder
recorder = FlightDataRecorder()
# Print the previous log report
print("Previous Log:")
recorder.print_report()
# Use the recorder in a context manager
with recorder:
recorder.log("Starting flight...")
recorder.log("Altitude reached 1 meter")
result = 10 / 0 # This will cause an exception
recorder.log("Ending flight.") # Never reached
# Indicate that the program has finished
print("Program finished.")