Source code for codeair.embedded_data

"""Embedding data in console output for visualization"""

import struct
import time
from codeair_drivers import console_bytes
from img_utils import get_bitmap_bytes
import sys
import supervisor

def _check_ping_resp():
    buf = ''
    for _ in range(3):
        while supervisor.runtime.serial_bytes_available:
            b = sys.stdin.read(1)
            buf += b
            if "OK" in buf or len(buf) > 32:
                return True
        time.sleep(0.01)
    return False

[docs]class ConsoleWriter: """A class to adapt console_bytes to the write method like a file.""" write = console_bytes
console_writer = ConsoleWriter()
[docs]class EmbeddedDataPipe: """Encode supported data types for output to selected destination. Embedded Data Protocol: C0DECAFE + LEN_4bytes_u32 + descriptor_16bytes + data -- LEN is big endian -- descriptor is object-dependent string (see EdataViewer.js) Attributes: destination (file): The destination for the encoded data. MAGIC (bytes): The magic bytes to identify the protocol. """ def __init__(self): """Initializes the EmbeddedDataPipe and sets the default destination. """ self.destination = console_writer self.MAGIC = b'\xc0\xde\xca\xfe'
[docs] def connect(self, destination=None): """Connects to a new destination. Args: destination (file object): The new destination. Defaults to console_writer. """ if destination is None: destination = console_writer self.destination = destination
[docs] def ping(self): """Send a ping message to CodeSpace. Return True if connected and responding. """ # Will reply with "OK\r" if successful. self.destination.write(self.MAGIC + b'\x01\x00\x00\x00') return _check_ping_resp()
[docs] def chart(self, chart_name, value, series=None, chart_type="line"): """Send chart data. A unique chart_name will create a new chart. New series also work this way. Args: chart_name (str): Identifies which chart we're talking about ("temp", "sensors", etc.). value (float or iterable): The data to plot. Must be a single number or an iterable of (val, millisecond) points. series (str, optional): Optional string for labeling multiple data series in the same chart. chart_type (str, optional): Optional. Can support line vs. bar vs. scatter, etc. Defaults to "line". """ # descriptor="chartBNAME" | B=type(0=line), NAME=chart_name(max 10 chars) chart_type_code = 0 if chart_type == "line" else 1 # Assuming 0=line, 1=other types descriptor = struct.pack('5sB10s', 'chart', chart_type_code, chart_name[:10]) # After descriptor comes series name (null-terminated) if series: series_name = series.encode('utf-8') + b'\x00' else: series_name = b'\x00' # default series # Followed by data (8-bytes per point) if isinstance(value, (int, float)): data = struct.pack('>fI', value, time.ticks_ms()) # single point, current timestamp elif isinstance(value, (list, tuple)): data = b"" for val, timestamp in value: data += struct.pack('>fI', val, int(timestamp)) else: raise ValueError("Invalid value type. Must be int, float, list, or tuple.") # Construct data packet pkt = self.MAGIC + struct.pack('>I16s', 16 + len(series_name) + len(data), descriptor) + series_name + data self.destination.write(pkt)
[docs] def image(self, bitmap): """Send image to data panel. Only 16-bit RGB565 images are supported. Args: bitmap (Bitmap): The bitmap image to send. """ # descriptor="imageBWWHH" | B=bits per pixel, WW=width16, HH=height16 total_pixels = bitmap.width * bitmap.height hdr = self.MAGIC hdr += struct.pack('>I5sBHH6s', 16 + total_pixels * 2, "image", 16, bitmap.width, bitmap.height, "") bmp = get_bitmap_bytes(bitmap) self.destination.write(hdr) self.destination.write(bmp) # Write separately to maintain zero copy
[docs] def text(self, value): """Send text message to data panel. This is a simple type, used primarily for testing the embedded data protocol. Args: value (str): The text message to send. """ hdr = self.MAGIC + struct.pack('>I16s', 16 + len(value), "text") + value self.destination.write(hdr)