Source code for botservices

"""Event-driven Services for CodeBot.

The module provides the `BotServices` object, which lets you connect callback functions
to react to events.

Example of hooking a callback function::

    from botservices import BotServices
    bot = BotServices()

    # Define callback function
    def handle_button(buttons):
        if buttons[0]:
            # Do something amazing when BTN-0 is pressed!

    # Hook the callback.
    bot.on_button(handle_button)
"""
import pyb
import botcore

# Bring some funcs local for speed
check_lines = botcore.ls.check
check_buttons = botcore.buttons.was_pressed
check_prox = botcore.prox.range

[docs]class BotServices: """CodeBot *event loop* built on :mod:`botcore` module. Todo: * Traverse _copy_ of callback lists, to enable safely hooking from within cb(). * Add a param to on_xx() to allow remove cb from list. * Add config of prox settings. """ def __init__(self): self.tmr_ev_list = [] self.last_buttons = check_buttons() self.last_lines = check_lines() self.last_prox = check_prox(1) self.accel_int = None # is set to axes[] on interrupt self.cb_button = [] self.cb_line = [] self.cb_prox = [] self.cb_accel = [] self.do_run = True self.prox_pwr = 2 self.prox_sens = 100 self.ls_thresh = 1000 self.ls_reflective_line = False self.stat_poll = 0 # Useful for performance benchmarking self.start = pyb.millis()
[docs] def loop(self): """Enter event loop. Call this after registering 'on_EVENT' event callback functions.""" while self.do_run: self.poll()
[docs] def stop(self): """Exit the event loop.""" self.do_run = False
# --- Register event callbacks ---
[docs] def on_button(self, cb): """Attach callback to Button Press events. Args: cb : Callback *function* to be called when event happens. The function will be called with a single parameter, a `list` of 2 bools. """ self.cb_button.append(cb)
[docs] def on_prox(self, cb): """Attach callback to Proximity Sensor events. Args: cb : Callback *function* to be called when event happens. The function will be called with a single parameter, a `list` of 2 bools. """ self.cb_prox.append(cb)
[docs] def on_line(self, cb): """Attach callback to Line Sensor events. Args: cb : Callback *function* to be called when event happens. The function will be called with a single parameter, a `list` of 5 bools. """ self.cb_line.append(cb)
[docs] def on_accel(self, cb): """Attach callback to Accelerometer events. Args: cb : Callback *function* to be called when event happens. The function will be called with a single parameter, a `list` of 3 bools corresponding to X, Y, Z axes of acceleration. """ if not self.cb_accel: botcore.accel.enable_int1(cb=self.handle_accel_irq) self.cb_accel.append(cb)
[docs] def on_timeout(self, cb, timeout_ms, arg=None): """Schedule callback to occur after specified timeout. Args: cb: Callback *function* to be called after time period expires. **Note:** For a repeating timer, the callback function can use this function to *re-register* itself. timeout_ms (int): Timeout in milliseconds, after which *callback* will be called. arg: Optional argument that will be passed to callback function ``cb(arg)``. """ self.tmr_ev_list.append((cb, timeout_ms, pyb.millis(), arg))
# ---
[docs] def clear_last(self): """Clear last-value for all sensors, forcing event callbacks on next poll with new current values""" self.last_buttons = None self.last_lines = None self.last_prox = None
[docs] def config_prox(self, power, sensitivity): """Configure proximity sensors. Args: power (int): Emitter power (1-8). sensitivity (int): Sensor sensitivity (0-100). Percent range from 0=least to 100=most sensitive. """ self.prox_pwr = power self.prox_sens = sensitivity
[docs] def config_ls(self, thresh, reflective_line): """Configure line sensors. Args: thresh (int): Sensor level for boundary between line and background. Sensor range is 0 (bright) to 4095 (dark). reflective_line (bool): Set to True for a bright line on dark background, False for dark line against light background. """ self.ls_thresh = thresh self.ls_reflective_line = reflective_line
def handle_accel_irq(self, axes): self.accel_int = axes #@micropython.native def poll(self): self.stat_poll += 1 # if pyb.elapsed_millis(self.start) >= 1000: # print("bench:{}".format(self.stat_poll)) # self.stat_poll = 0 # self.start = pyb.millis() # Check buttons new_buttons = check_buttons() if new_buttons != self.last_buttons: for cb in self.cb_button: cb(new_buttons) self.last_buttons = new_buttons # Check line sensors if self.cb_line: new_lines = check_lines(self.ls_thresh, self.ls_reflective_line) if new_lines != self.last_lines: for cb in self.cb_line: cb(new_lines) self.last_lines = new_lines # Check prox sensors if self.cb_prox: new_prox = check_prox(1, self.prox_pwr, self.prox_sens) if new_prox != self.last_prox: for cb in self.cb_prox: cb(new_prox) self.last_prox = new_prox # Check accelerometer if self.accel_int: for cb in self.cb_accel: cb(self.accel_int) self.accel_int = None # Check scheduled events # Traverse event list, filter/fire timeout events. cb_list = [] rem_list = [] for ev in self.tmr_ev_list: (cb, timeout, start, arg) = ev if pyb.elapsed_millis(start) >= timeout: cb_list.append((cb, arg)) else: rem_list.append(ev) self.tmr_ev_list = rem_list for cb,arg in cb_list: if arg is not None: cb(arg) else: cb()
[docs]class CancelableCallback: """Wrapper for callbacks you'd like to be able to cancel. Example:: cb = CancelableCallback(my_event) bot.on_timeout(cb, 1000) ... cb.cancel() """ def __init__(self, cb): self.cb = cb def __call__(self, *args): self.cb(*args) def _nop(self, *args): pass
[docs] def cancel(self): """Cancel this callback. *Note: callback remains on event list, just does nothing when called.* """ self.cb = self._nop
class TestBotSvc: # Simple test class to check out BotServices operation. def __init__(self, botservices): self.user_led_pos = 0 self.svc = botservices self.svc.on_line(self.handle_line) self.svc.on_prox(self.handle_prox) self.svc.on_button(self.handle_button) self.handle_tick100ms() self.svc.clear_last() # Pulse the USB LED and say hello at startup botcore.leds.usb(True) self.svc.on_timeout(botcore.leds.usb, 1000, arg=False) self.say_hello() def say_hello(self): for i in range(300,1500): botcore.spkr.pitch(i) pyb.delay(10) botcore.spkr.off() pyb.delay(50) for i in range(1500,1000, -1): botcore.spkr.pitch(i) pyb.delay(20) botcore.spkr.off() def handle_line(self, sensors): """A line sensor has changed!""" botcore.leds.ls(sensors) def handle_prox(self, sensors): """A prox sensor has changed!""" botcore.leds.prox(sensors) def handle_button(self, buttons): if buttons[0] or buttons[1]: self.svc.stop() def handle_tick100ms(self): botcore.leds.user(1 << self.user_led_pos) self.user_led_pos = (self.user_led_pos + 1) % 8 self.svc.on_timeout(self.handle_tick100ms, 100) if __name__ == '__main__': # Run tests when run standalone: >>> execfile('botservices.py') svc = BotServices() test = TestBotSvc(svc) print("Entering event loop") svc.loop() print("Program ended.")