To use with CircuitPython, you need to first install a few libraries, into the lib folder on your CIRCUITPY drive. Then you need to update code.py with the example script.

Thankfully, we can do this in one go. In the example below, click the Download Project Bundle button below to download the necessary libraries and the code.py file in a zip file. Extract the contents of the zip file, open the directory CircuitPython_Pyloton/ and then click on the directory that matches the version of CircuitPython you're using and copy the contents of that directory to your CIRCUITPY drive.

Your CIRCUITPY drive should now look similar to the following image:

CIRCUITPY

Code.py

# SPDX-FileCopyrightText: 2020 Eva Herrada for Adafruit Industries
#
# SPDX-License-Identifier: MIT

from time import time
import adafruit_ble
import board
import pyloton

ble = adafruit_ble.BLERadio()    # pylint: disable=no-member

CONNECTION_TIMEOUT = 45

HEART = True
SPEED = True
CADENCE = True
AMS = True
DEBUG = False

# 84.229 is wheel circumference (700x23 in my case)
pyloton = pyloton.Pyloton(ble, board.DISPLAY, 84.229, HEART, SPEED, CADENCE, AMS, DEBUG)

pyloton.show_splash()

ams = pyloton.ams_connect()


start = time()
hr_connection = None
speed_cadence_connections = []
while True:
    if HEART:
        if not hr_connection:
            print("Attempting to connect to a heart rate monitor")
            hr_connection = pyloton.heart_connect()
            ble.stop_scan()
    if SPEED or CADENCE:
        if not speed_cadence_connections:
            print("Attempting to connect to speed and cadence monitors")
            speed_cadence_connections = pyloton.speed_cadence_connect()

    if time()-start >= CONNECTION_TIMEOUT:
        pyloton.timeout()
        break
    # Stop scanning whether or not we are connected.
    ble.stop_scan()

    #pylint: disable=too-many-boolean-expressions
    if ((not HEART or (hr_connection and hr_connection.connected)) and
            ((not SPEED and not CADENCE) or
             (speed_cadence_connections and speed_cadence_connections[0].connected)) and
            (not AMS or (ams and ams.connected))):
        break

pyloton.setup_display()

while ((not HEART or hr_connection.connected) and
       ((not SPEED or not CADENCE) or
        (speed_cadence_connections and speed_cadence_connections[0].connected)) and
       (not AMS or ams.connected)):
    pyloton.update_display()
    pyloton.ams_remote()

print("\n\nNot all sensors are connected. Please reset to try again\n\n")

Then, you can click on the Download: Project Zip link in the window below to download the code file and .STL file.

Code Run Through

First, the code loads all the required libraries.

from time import time
import adafruit_ble
import board
import pyloton

Pyloton Setup

Then, the variables required to run Pyloton are defined. To disable a specific sensor, set that sensor's variable to False. For example, if I wanted to use AppleMediaServices and a speed and cadence sensor,  but didn't want to use a heart rate monitor, I would set HEART to False and this section of code.py would look like this:

HEART = False

SPEED = True

CADENCE = True

AMS = True

DEBUG = False

After that, Pyloton is instantiated and all the required variables are passed into it. My bike has 700x23 tires on it, so my wheel diameter is 84.229 inches. To calculate your wheel circumference, use the chart here and then convert the diameter to inches. There are 2.54 centimeters in one inch if you need to convert centimeters to inches.

ble = adafruit_ble.BLERadio()    # pylint: disable=no-member
 
CONNECTION_TIMEOUT = 45
 
HEART = True
SPEED = True
CADENCE = True
AMS = True
DEBUG = False

# 84.229 is wheel circumference (700x23 in my case)
pyloton = pyloton.Pyloton(ble, board.DISPLAY, 84.229, HEART, SPEED, CADENCE, AMS, DEBUG)

Preparing to Connect

At this stage, we first tell Pyloton to show the splash screen so we can see the status of Pyloton easily.

Then, we tell Pyloton to connect to an Apple device. Simply comment out that line and disable AMS if you don't intend on using one. 

After that, we set the variable start to the current time so that the code can tell when it started and time out if it’s taking too long.

Then, hr_connection and speed_cadence_connections are are set to False so that they don't cause a NameError when the setup loop checks the status of the various sensors.

pyloton.show_splash()
 
ams = pyloton.ams_connect()

start = time()
hr_connection = None
speed_cadence_connections = []

Connecting loop

This While loop first checks if specific sensors are already connected. If they aren't, and that sensor is enabled, it will run the functions in pyloton.py to try to connect to those sensors.

Then, if more time than the desired timeout length has passed, 45 seconds by default, it will break out of the loop and code.py will finish running.

After that, a check if a sensor is connected only if it is enabled. If all enabled sensors are connected, break out of the loop and go to the next section.

while True:
    if HEART:
        if not hr_connection:
            print("Attempting to connect to a heart rate monitor")
            hr_connection = pyloton.heart_connect()
            ble.stop_scan()
    if SPEED or CADENCE:
        if not speed_cadence_connections:
            print("Attempting to connect to speed and cadence monitors")
            speed_cadence_connections = pyloton.speed_cadence_connect()
 
    if time()-start >= CONNECTION_TIMEOUT:
        pyloton.timeout()
        break
    # Stop scanning whether or not we are connected.
    ble.stop_scan()
 
    #pylint: disable=too-many-boolean-expressions
    if ((not HEART or (hr_connection and hr_connection.connected)) and
            ((not SPEED and not CADENCE) or
             (speed_cadence_connections and speed_cadence_connections[0].connected)) and
            (not AMS or (ams and ams.connected))):
        break

pyloton.setup_display()

Main loop

This loop uses the same logic as the lines above to determine if all enabled sensors are connected. If they are, it then updates the display and runs the ams_remote function. Comment out that line if you aren't using a device with AppleMediaServices.

If one of the sensors disconnects, it breaks the loop and ends the file, notifying the user that not all sensors are connected.

while ((not HEART or hr_connection.connected) and
       ((not SPEED or not CADENCE) or
        (speed_cadence_connections and speed_cadence_connections[0].connected)) and
       (not AMS or ams.connected)):
    pyloton.update_display()
    pyloton.ams_remote()
 
print("\n\nNot all sensors are connected. Please reset to try again\n\n")

Pyloton.py Run Through

To start off, we import all of Pyloton's dependencies.

import time
import adafruit_ble
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.advertising.standard import SolicitServicesAdvertisement
import board
import digitalio
import displayio
import adafruit_imageload
from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService
from adafruit_ble_heart_rate import HeartRateService
from adafruit_bitmap_font import bitmap_font
from adafruit_display_shapes.rect import Rect
from adafruit_display_text import label
from adafruit_ble_apple_media import AppleMediaService
from adafruit_ble_apple_media import UnsupportedCommand
import gamepad
import touchio

Clue library

Pyloton contains a small section of the Adafruit_CircuitPython_CLUE library. We do this because the CLUE library has a lot of great tools for getting data from sensors, but for Pyloton we only need functions to get data from the buttons and capacitive touch pads. The CLUE library is documented here.

class Clue:
    """
    A very minimal version of the CLUE library.
    The library requires the use of many sensor-specific
    libraries this project doesn't use, and they were
    taking up a lot of RAM.
    """
    def __init__(self):
        self._i2c = board.I2C()
        self._touches = [board.D0, board.D1, board.D2]
        self._touch_threshold_adjustment = 0
        self._a = digitalio.DigitalInOut(board.BUTTON_A)
        self._a.switch_to_input(pull=digitalio.Pull.UP)
        self._b = digitalio.DigitalInOut(board.BUTTON_B)
        self._b.switch_to_input(pull=digitalio.Pull.UP)
        self._gamepad = gamepad.GamePad(self._a, self._b)
 
    @property
    def were_pressed(self):
        """
        Returns a set of buttons that have been pressed since the last time were_pressed was run.
        """
        ret = set()
        pressed = self._gamepad.get_pressed()
        for button, mask in (('A', 0x01), ('B', 0x02)):
            if mask & pressed:
                ret.add(button)
        return ret
 
    def _touch(self, i):
        if not isinstance(self._touches[i], touchio.TouchIn):
            self._touches[i] = touchio.TouchIn(self._touches[i])
            self._touches[i].threshold += self._touch_threshold_adjustment
        return self._touches[i].value
 
    @property
    def touch_0(self):
        """
        Returns True when capacitive touchpad 0 is currently being pressed.
        """
        return self._touch(0)
 
    @property
    def touch_1(self):
        """
        Returns True when capacitive touchpad 1 is currently being pressed.
        """
        return self._touch(1)
 
    @property
    def touch_2(self):
        """
        Returns True when capacitive touchpad 2 is currently being pressed.
        """
        return self._touch(2)

Pyloton Class

Here, Pyloton is defined. We then create some class variables used for setting the colors in the UI and create a Clue object.

class Pyloton:
    """
    Contains the various functions necessary for doing the Pyloton learn guide.
    """
    #pylint: disable=too-many-instance-attributes
 
    YELLOW = 0xFCFF00
    PURPLE = 0x64337E
    WHITE = 0xFFFFFF
 
    clue = Clue()

__init__

Init first defines all of the required instance variables. It then loads fonts, loads the sprite sheet, and sets up the group used to display status update text.

def __init__(self, ble, display, circ, heart=True, speed=True, cad=True, ams=True, debug=False): #pylint: disable=too-many-arguments
        self.debug = debug
        self.ble = ble
        self.display = display
        self.circumference = circ
 
        self.heart_enabled = heart
        self.speed_enabled = speed
        self.cadence_enabled = cad
        self.ams_enabled = ams
        self.hr_connection = None
 
        self.num_enabled = heart + speed + cad + ams
 
        self._previous_wheel = 0
        self._previous_crank = 0
        self._previous_revolutions = 0
        self._previous_rev = 0
        self._previous_speed = 0
        self._previous_cadence = 0
        self._previous_heart = 0
        self._speed_failed = 0
        self._cadence_failed = 0
        self._setup = 0
        self._hr_label = None
        self._sp_label = None
        self._cadence_label = None
        self._ams_label = None
        self._hr_service = None
        self._heart_y = None
        self._speed_y = None
        self._cadence_y = None
        self._ams_y = None
        self.ams = None
        self.cyc_connections = None
        self.cyc_services = None
        self.track_artist = True
 
        self.start = time.time()
 
        self.splash = displayio.Group()
        self.loading_group = displayio.Group()
 
        self._load_fonts()
 
        self.sprite_sheet, self.palette = adafruit_imageload.load("/sprite_sheet.bmp",
                                                                  bitmap=displayio.Bitmap,
                                                                  palette=displayio.Palette)
 
        self.text_group = displayio.Group()
        self.status = label.Label(font=self.arial12, x=10, y=200,
                                  text='', color=self.YELLOW)
        self.status1 = label.Label(font=self.arial12, x=10, y=220,
                                   text='', color=self.YELLOW)
 
        self.text_group.append(self.status)
        self.text_group.append(self.status1)

show_splash

Show splash is used to load and display the splash screen (essentially a loading screen) using displayio. If debug is enabled, the splash screen will not be displayed. 

def show_splash(self):
        """
        Shows the loading screen
        """
        if self.debug:
            return

        blinka_bitmap = "blinka-pyloton.bmp"

        # Compatible with CircuitPython 6 & 7
        with open(blinka_bitmap, 'rb') as bitmap_file:
            bitmap1 = displayio.OnDiskBitmap(bitmap_file)
            tile_grid = displayio.TileGrid(bitmap1, pixel_shader=getattr(bitmap1, 'pixel_shader', displayio.ColorConverter()))
            self.loading_group.append(tile_grid)
            self.display.show(self.loading_group)
            status_heading = label.Label(font=self.arial16, x=80, y=175,
                                         text="Status", color=self.YELLOW)
            rect = Rect(0, 165, 240, 75, fill=self.PURPLE)
            self.loading_group.append(rect)
            self.loading_group.append(status_heading)

        # # Compatible with CircuitPython 7+
        # bitmap1 = displayio.OnDiskBitmap(blinka_bitmap)
        # tile_grid = displayio.TileGrid(bitmap1, pixel_shader=bitmap1.pixel_shader)
        # self.loading_group.append(tile_grid)
        # self.display.show(self.loading_group)
        # status_heading = label.Label(font=self.arial16, x=80, y=175,
        #                              text="Status", color=self.YELLOW)
        # rect = Rect(0, 165, 240, 75, fill=self.PURPLE)
        # self.loading_group.append(rect)
        # self.loading_group.append(status_heading)

_load_fonts

_load_fonts does two things. First, it loads the 3 fonts we will be using, 12 pt. Arial, 16 pt. Arial, and 24 pt. Arial bold. Then, it loads a bytestring of commonly used characters. Loading these characters ahead of time is not required, however it does speed up the display process quite a bit.

def _load_fonts(self):
        """
        Loads fonts
        """
        self.arial12 = bitmap_font.load_font("/fonts/Arial-12.bdf")
        self.arial16 = bitmap_font.load_font("/fonts/Arial-16.bdf")
        self.arial24 = bitmap_font.load_font("/fonts/Arial-Bold-24.bdf")
 
        glyphs = b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-!,. "\'?!'
        self.arial12.load_glyphs(glyphs)
        self.arial16.load_glyphs(glyphs)
        self.arial24.load_glyphs(glyphs)

_status_update

_status_update allows editing the text of self.status and self.status1 to display the current status update on the CLUE's screen. When message is longer than 25 characters, it gets sliced, with the first 25 characters appearing on the first line and the 25th through 50th characters appearing on the second line. If debug is enabled, it will simply print the full message to the REPL. 

def _status_update(self, message):
        """
        Displays status updates
        """
        if self.debug:
            print(message)
            return
        if self.text_group not in self.loading_group:
            self.loading_group.append(self.text_group)
        self.status.text = message[:25]
        self.status1.text = message[25:50]

timeout

This function is called by code.py when the CONNECTION_TIMEOUT has been reached. It displays a message that the program has timed out for three seconds, and then the program presumably ends.

def timeout(self):
        """
        Displays Timeout on screen when pyloton has been searching for a sensor for too long
        """
        self._status_update("Pyloton: Timeout")
        time.sleep(3)

heart_connect

Heart connect uses the circuitpython_ble_heart_rate library to connect to a heart rate monitor. This is a modified version of the ble_heart_rate_simpletest.py file.

def heart_connect(self):
        """
        Connects to heart rate sensor
        """
        self._status_update("Heart Rate: Scanning...")
        for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5):
            if HeartRateService in adv.services:
                self._status_update("Heart Rate: Found an advertisement")
                self.hr_connection = self.ble.connect(adv)
                self._status_update("Heart Rate: Connected")
                break
        self.ble.stop_scan()
        if self.hr_connection:
            self._hr_service = self.hr_connection[HeartRateService]
        return self.hr_connection

ams_connect

This function is used to connect to an Apple device using the ble_apple_media library. It is based on the example code from ble_apple_media_simpletest.py.

def ams_connect(self, start=time.time(), timeout=30):
        """
        Connect to an Apple device using the ble_apple_media library
        """
        self._status_update("AppleMediaService: Connect your phone now")
        radio = adafruit_ble.BLERadio()
        a = SolicitServicesAdvertisement()
        a.solicited_services.append(AppleMediaService)
        radio.start_advertising(a)
 
        while not radio.connected and not self._has_timed_out(start, timeout):
            pass
 
        self._status_update("AppleMediaService: Connected")
        for connection in radio.connections:
            if not connection.paired:
                connection.pair()
                self._status_update("AppleMediaService: Paired")
            self.ams = connection[AppleMediaService]
 
        return radio

speed_cadence_connect

This function uses the ble_cycling_speed_and_cadence library to connect to speed and cadence sensors. It works with both discrete speed and cadence sensors and combined speed and cadence sensors. It will not work if you were to connect two of the same sensor type, as in two speed sensors or a speed and cadence sensor and a speed sensor. This is based on the example file, ble_cycling_speed_and_cadence_simpletest.py.

def speed_cadence_connect(self):
        """
        Connects to speed and cadence sensor
        """
        self._status_update("Speed and Cadence: Scanning...")
        # Save advertisements, indexed by address
        advs = {}
        for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5):
            if CyclingSpeedAndCadenceService in adv.services:
                self._status_update("Speed and Cadence: Found an advertisement")
                # Save advertisement. Overwrite duplicates from same address (device).
                advs[adv.address] = adv
 
        self.ble.stop_scan()
        self._status_update("Speed and Cadence: Stopped scanning")
        if not advs:
            # Nothing found. Go back and keep looking.
            return []
 
        # Connect to all available CSC sensors.
        self.cyc_connections = []
        for adv in advs.values():
            self.cyc_connections.append(self.ble.connect(adv))
            self._status_update("Speed and Cadence: Connected {}".format(len(self.cyc_connections)))
 
        self.cyc_services = []
        for conn in self.cyc_connections:
            self.cyc_services.append(conn[CyclingSpeedAndCadenceService])
        self._status_update("Pyloton: Finishing up...")
 
        return self.cyc_connections

_compute_speed

A function used to turn the total number of revolutions and the time since the last revolution into speed in miles per hour. The speed is computed by converting revolutions since the last report, rev_diff, and time since the last report, wheel_diff into rotations per minute and then into miles per hour using some fancy unit conversion.

def _compute_speed(self, values, speed):
        wheel_diff = values.last_wheel_event_time - self._previous_wheel
        rev_diff = values.cumulative_wheel_revolutions - self._previous_revolutions
        if wheel_diff:
            # Rotations per minute is 60 times the amount of revolutions since
            # the last update over the time since the last update
            rpm = 60*(rev_diff/(wheel_diff/1024))
            # We then mutiply it by the wheel's circumference and convert it to mph
            speed = round((rpm * self.circumference) * (60/63360), 1)
            if speed < 0:
                speed = self._previous_speed
            self._previous_speed = speed
            self._previous_revolutions = values.cumulative_wheel_revolutions
            self._speed_failed = 0
        else:
            self._speed_failed += 1
            if self._speed_failed >= 3:
                speed = 0
        self._previous_wheel = values.last_wheel_event_time
 
        return speed

_compute_cadence

This works in a very similar way to _compute_speed, except since cadence is measured in rotations per minute, it doesn't need as much unit conversion. Also similar to the previous function, most of the function is related to preventing undesired results being displayed.

def _compute_cadence(self, values, cadence):
        crank_diff = values.last_crank_event_time - self._previous_crank
        crank_rev_diff = values.cumulative_crank_revolutions-self._previous_rev
 
        if crank_rev_diff:
            # Rotations per minute is 60 times the amount of revolutions since the
            # last update over the time since the last update
            cadence = round(60*(crank_rev_diff/(crank_diff/1024)), 1)
            if cadence < 0:
                cadence = self._previous_cadence
            self._previous_cadence = cadence
            self._previous_rev = values.cumulative_crank_revolutions
            self._cadence_failed = 0
        else:
            self._cadence_failed += 1
            if self._cadence_failed >= 3:
                cadence = 0
        self._previous_crank = values.last_crank_event_time
 
        return cadence

Read speed and cadence

Reads speed and cadence. Will set speed and/or cadence to 0 if they haven't changed in around the last 2 seconds. Also sets them to their previous values if it receives None in svc.measurement_valuesAdditionally, it will truncate speed and cadence to ensure they don't fill up their labels.

def read_s_and_c(self):
        """
        Reads data from the speed and cadence sensor
        """
        speed = self._previous_speed
        cadence = self._previous_cadence
        for conn, svc in zip(self.cyc_connections, self.cyc_services):
            if not conn.connected:
                speed = cadence = 0
                continue
            values = svc.measurement_values
            if not values:
                if self._cadence_failed >= 3 or self._speed_failed >= 3:
                    if self._cadence_failed > 3:
                        cadence = 0
                    if self._speed_failed > 3:
                        speed = 0
                continue
            if not values.last_wheel_event_time:
                continue
            speed = self._compute_speed(values, speed)
            if not values.last_crank_event_time:
                continue
            cadence = self._compute_cadence(values, cadence)
 
        if speed:
            speed = str(speed)[:8]
        if cadence:
            cadence = str(cadence)[:8]
 
        return speed, cadence

read_heart

Reads data from the heart rate monitor. Occasionally, it sends None as the current heart rate, and in that case, we set the current heart rate to the previous heart rate and return it. 

def read_heart(self):
        """
        Reads date from the heart rate sensor
        """
        measurement = self._hr_service.measurement_values
        if measurement is None:
            heart = self._previous_heart
        else:
            heart = measurement.heart_rate
            self._previous_heart = measurement.heart_rate
 
        if heart:
            heart = str(heart)[:4]
 
        return heart

read_ams

Gets data about the current playing track from AppleMediaServices. Every 3 seconds, this switches from being the artist to being the track title. If either of these is longer than 16 characters, it gets truncated to prevent the label from filling up.

def read_ams(self):
        """
        Reads data from AppleMediaServices
        """
        current = time.time()
        try:
            if current - self.start > 3:
                self.track_artist = not self.track_artist
                self.start = time.time()
            if self.track_artist:
                data = self.ams.artist
            if not self.track_artist:
                data = self.ams.title
        except (RuntimeError, UnicodeError):
            data = None
 
        if data:
            data = data[:16] + (data[16:] and '..')
 
        return data

_icon_maker

icon_maker is used to be able to create sprites in one line instead of three.

def icon_maker(self, n, icon_x, icon_y):
        """
        Generates icons as sprites
        """
        sprite = displayio.TileGrid(self.sprite_sheet, pixel_shader=self.palette, width=1,
                                    height=1, tile_width=40, tile_height=40, default_tile=n,
                                    x=icon_x, y=icon_y)
        return sprite

_label_maker

_label_maker makes creating labels slightly more readable.

def _label_maker(self, text, x, y, font=None):
        """
        Generates labels
        """
        if not font:
            font = self.arial24
        return label.Label(font=font, x=x, y=y, text=text, color=self.WHITE)

_get_y

This uses the number of enabled devices, num_enabled to determine the placement of labels and sprites in the UI.

def _get_y(self):
        """
        Helper function for setup_display. Gets the y values used for sprites and labels.
        """
        enabled = self.num_enabled
 
        if self.heart_enabled:
            self._heart_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1
        if self.speed_enabled:
            self._speed_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1
        if self.cadence_enabled:
            self._cadence_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1
        if self.ams_enabled:
            self._ams_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1

setup_display

This creates all the UI elements that do not change. First it creates a rectangle at the top of the screen to hold the heading, then it creates the heading. Then it places the sprites from the sprite sheet at locations that correspond with how many devices are enabled.

def setup_display(self):
        """
        Prepares the display to show sensor values: Adds a header, a heading, and various sprites.
        """
        self._get_y()
        sprites = displayio.Group()
 
        rect = Rect(0, 0, 240, 50, fill=self.PURPLE)
        self.splash.append(rect)
 
        heading = label.Label(font=self.arial24, x=55, y=25, text="Pyloton", color=self.YELLOW)
        self.splash.append(heading)
 
        if self.heart_enabled:
            heart_sprite = self.icon_maker(0, 2, self._heart_y - 20)
            sprites.append(heart_sprite)
 
        if self.speed_enabled:
            speed_sprite = self.icon_maker(1, 2, self._speed_y - 20)
            sprites.append(speed_sprite)
 
        if self.cadence_enabled:
            cadence_sprite = self.icon_maker(2, 2, self._cadence_y - 20)
            sprites.append(cadence_sprite)
 
        if self.ams_enabled:
            ams_sprite = self.icon_maker(3, 2, self._ams_y - 20)
            sprites.append(ams_sprite)
 
        self.splash.append(sprites)
 
        self.display.show(self.splash)
        while self.loading_group:
            self.loading_group.pop()

update_display

update_display is used to display the most recent values from the sensors on the screen. If it is running for the first time, it will create the labels, and set _setup to True upon finishing. From then on, the text of the labels is edited, which increases the refresh rate.

def update_display(self): #pylint: disable=too-many-branches
        """
        Updates the display to display the most recent values
        """
        if self.speed_enabled or self.cadence_enabled:
            speed, cadence = self.read_s_and_c()
 
        if self.heart_enabled:
            heart = self.read_heart()
            if not self._setup:
                self._hr_label = self._label_maker('{} bpm'.format(heart), 50, self._heart_y)
                self.splash.append(self._hr_label)
            else:
                self._hr_label.text = '{} bpm'.format(heart)
 
        if self.speed_enabled:
            if not self._setup:
                self._sp_label = self._label_maker('{} mph'.format(speed), 50, self._speed_y)
                self.splash.append(self._sp_label)
            else:
                self._sp_label.text = '{} mph'.format(speed)
 
        if self.cadence_enabled:
            if not self._setup:
                self._cadence_label = self._label_maker('{} rpm'.format(cadence), 50,
                                                        self._cadence_y)
                self.splash.append(self._cadence_label)
            else:
                self._cadence_label.text = '{} rpm'.format(cadence)
 
        if self.ams_enabled:
            ams = self.read_ams()
            if not self._setup:
                self._ams_label = self._label_maker('{}'.format(ams), 50, self._ams_y,
                                                    font=self.arial16)
                self.splash.append(self._ams_label)
            else:
                self._ams_label.text = '{}'.format(ams)
 
        self._setup = True

ams_remote

ams_remote uses the built-in buttons and capacitive touch pads to control media playing on an Apple device using AppleMediaServices. Button 'A' is on the left and button 'B' is on the right. The three capacitive touch pads are at the bottom and are labeled 0, 1, and 2.

def ams_remote(self):
        """
        Allows the 2 buttons and 3 capacitive touch pads in the CLUE to function as a media remote.
        """
        try:
            # Capacitive touch pad marked 0 goes to the previous track
            if self.clue.touch_0:
                self.ams.previous_track()
                time.sleep(0.25)
 
            # Capacitive touch pad marked 1 toggles pause/play
            if self.clue.touch_1:
                self.ams.toggle_play_pause()
                time.sleep(0.25)
 
            # Capacitive touch pad marked 2 advances to the next track
            if self.clue.touch_2:
                self.ams.next_track()
                time.sleep(0.25)
 
            # If button B (on the right) is pressed, it increases the volume
            if 'B' in self.clue.were_pressed:
                self.ams.volume_up()
                time.sleep(0.1)
 
            # If button A (on the left) is pressed, the volume decreases
            if 'A' in self.clue.were_pressed:
                self.ams.volume_down()
                time.sleep(0.1)
        except (RuntimeError, UnsupportedCommand, AttributeError):
            return
# SPDX-FileCopyrightText: 2020 Eva Herrada for Adafruit Industries
#
# SPDX-License-Identifier: MIT

"""
A library for completing the Pyloton bike computer learn guide utilizing the Adafruit CLUE.
"""

import time
import adafruit_ble
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.advertising.standard import SolicitServicesAdvertisement
import board
import digitalio
import displayio
import adafruit_imageload
from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService
from adafruit_ble_heart_rate import HeartRateService
from adafruit_bitmap_font import bitmap_font
from adafruit_display_shapes.rect import Rect
from adafruit_display_text import label
from adafruit_ble_apple_media import AppleMediaService
from adafruit_ble_apple_media import UnsupportedCommand
import gamepad
import touchio

class Clue:
    """
    A very minimal version of the CLUE library.
    The library requires the use of many sensor-specific
    libraries this project doesn't use, and they were
    taking up a lot of RAM.
    """
    def __init__(self):
        self._i2c = board.I2C()
        self._touches = [board.D0, board.D1, board.D2]
        self._touch_threshold_adjustment = 0
        self._a = digitalio.DigitalInOut(board.BUTTON_A)
        self._a.switch_to_input(pull=digitalio.Pull.UP)
        self._b = digitalio.DigitalInOut(board.BUTTON_B)
        self._b.switch_to_input(pull=digitalio.Pull.UP)
        self._gamepad = gamepad.GamePad(self._a, self._b)

    @property
    def were_pressed(self):
        """
        Returns a set of buttons that have been pressed since the last time were_pressed was run.
        """
        ret = set()
        pressed = self._gamepad.get_pressed()
        for button, mask in (('A', 0x01), ('B', 0x02)):
            if mask & pressed:
                ret.add(button)
        return ret

    @property
    def button_a(self):
        """``True`` when Button A is pressed. ``False`` if not."""
        return not self._a.value

    @property
    def button_b(self):
        """``True`` when Button B is pressed. ``False`` if not."""
        return not self._b.value

    def _touch(self, i):
        if not isinstance(self._touches[i], touchio.TouchIn):
            self._touches[i] = touchio.TouchIn(self._touches[i])
            self._touches[i].threshold += self._touch_threshold_adjustment
        return self._touches[i].value

    @property
    def touch_0(self):
        """
        Returns True when capacitive touchpad 0 is currently being pressed.
        """
        return self._touch(0)

    @property
    def touch_1(self):
        """
        Returns True when capacitive touchpad 1 is currently being pressed.
        """
        return self._touch(1)

    @property
    def touch_2(self):
        """
        Returns True when capacitive touchpad 2 is currently being pressed.
        """
        return self._touch(2)


class Pyloton:
    """
    Contains the various functions necessary for doing the Pyloton learn guide.
    """
    #pylint: disable=too-many-instance-attributes

    YELLOW = 0xFCFF00
    PURPLE = 0x64337E
    WHITE = 0xFFFFFF

    clue = Clue()

    def __init__(self, ble, display, circ, heart=True, speed=True, cad=True, ams=True, debug=False): #pylint: disable=too-many-arguments
        self.debug = debug
        self.ble = ble
        self.display = display
        self.circumference = circ

        self.heart_enabled = heart
        self.speed_enabled = speed
        self.cadence_enabled = cad
        self.ams_enabled = ams
        self.hr_connection = None

        self.num_enabled = heart + speed + cad + ams

        self._previous_wheel = 0
        self._previous_crank = 0
        self._previous_revolutions = 0
        self._previous_rev = 0
        self._previous_speed = 0
        self._previous_cadence = 0
        self._previous_heart = 0
        self._speed_failed = 0
        self._cadence_failed = 0
        self._setup = 0
        self._hr_label = None
        self._sp_label = None
        self._cadence_label = None
        self._ams_label = None
        self._hr_service = None
        self._heart_y = None
        self._speed_y = None
        self._cadence_y = None
        self._ams_y = None
        self.ams = None
        self.cyc_connections = None
        self.cyc_services = None
        self.track_artist = True

        self.start = time.time()

        self.splash = displayio.Group()
        self.loading_group = displayio.Group()

        self._load_fonts()

        self.sprite_sheet, self.palette = adafruit_imageload.load("/sprite_sheet.bmp",
                                                                  bitmap=displayio.Bitmap,
                                                                  palette=displayio.Palette)

        self.text_group = displayio.Group()
        self.status = label.Label(font=self.arial12, x=10, y=200,
                                  text='', color=self.YELLOW)
        self.status1 = label.Label(font=self.arial12, x=10, y=220,
                                   text='', color=self.YELLOW)

        self.text_group.append(self.status)
        self.text_group.append(self.status1)


    def show_splash(self):
        """
        Shows the loading screen
        """
        if self.debug:
            return

        blinka_bitmap = "blinka-pyloton.bmp"

        # Compatible with CircuitPython 6 & 7
        with open(blinka_bitmap, 'rb') as bitmap_file:
            bitmap1 = displayio.OnDiskBitmap(bitmap_file)
            tile_grid = displayio.TileGrid(bitmap1, pixel_shader=getattr(bitmap1, 'pixel_shader', displayio.ColorConverter()))
            self.loading_group.append(tile_grid)
            self.display.show(self.loading_group)
            status_heading = label.Label(font=self.arial16, x=80, y=175,
                                         text="Status", color=self.YELLOW)
            rect = Rect(0, 165, 240, 75, fill=self.PURPLE)
            self.loading_group.append(rect)
            self.loading_group.append(status_heading)

        # # Compatible with CircuitPython 7+
        # bitmap1 = displayio.OnDiskBitmap(blinka_bitmap)
        # tile_grid = displayio.TileGrid(bitmap1, pixel_shader=bitmap1.pixel_shader)
        # self.loading_group.append(tile_grid)
        # self.display.show(self.loading_group)
        # status_heading = label.Label(font=self.arial16, x=80, y=175,
        #                              text="Status", color=self.YELLOW)
        # rect = Rect(0, 165, 240, 75, fill=self.PURPLE)
        # self.loading_group.append(rect)
        # self.loading_group.append(status_heading)

    def _load_fonts(self):
        """
        Loads fonts
        """
        self.arial12 = bitmap_font.load_font("/fonts/Arial-12.bdf")
        self.arial16 = bitmap_font.load_font("/fonts/Arial-16.bdf")
        self.arial24 = bitmap_font.load_font("/fonts/Arial-Bold-24.bdf")

        glyphs = b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-!,. "\'?!'
        self.arial12.load_glyphs(glyphs)
        self.arial16.load_glyphs(glyphs)
        self.arial24.load_glyphs(glyphs)


    def _status_update(self, message):
        """
        Displays status updates
        """
        if self.debug:
            print(message)
            return
        if self.text_group not in self.loading_group:
            self.loading_group.append(self.text_group)
        self.status.text = message[:25]
        self.status1.text = message[25:50]


    def timeout(self):
        """
        Displays Timeout on screen when pyloton has been searching for a sensor for too long
        """
        self._status_update("Pyloton: Timeout")
        time.sleep(3)


    def heart_connect(self):
        """
        Connects to heart rate sensor
        """
        self._status_update("Heart Rate: Scanning...")
        for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5):
            if HeartRateService in adv.services:
                self._status_update("Heart Rate: Found an advertisement")
                self.hr_connection = self.ble.connect(adv)
                self._status_update("Heart Rate: Connected")
                break
        self.ble.stop_scan()
        if self.hr_connection:
            self._hr_service = self.hr_connection[HeartRateService]
        return self.hr_connection


    @staticmethod
    def _has_timed_out(start, timeout):
        if time.time() - start >= timeout:
            return True
        return False

    def ams_connect(self, start=time.time(), timeout=30):
        """
        Connect to an Apple device using the ble_apple_media library
        """
        self._status_update("AppleMediaService: Connect your phone now")
        radio = adafruit_ble.BLERadio()
        a = SolicitServicesAdvertisement()
        a.solicited_services.append(AppleMediaService)
        radio.start_advertising(a)

        while not radio.connected and not self._has_timed_out(start, timeout):
            pass

        self._status_update("AppleMediaService: Connected")
        for connection in radio.connections:
            if not connection.paired:
                connection.pair()
                self._status_update("AppleMediaService: Paired")
            self.ams = connection[AppleMediaService]

        return radio


    def speed_cadence_connect(self):
        """
        Connects to speed and cadence sensor
        """
        self._status_update("Speed and Cadence: Scanning...")
        # Save advertisements, indexed by address
        advs = {}
        for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5):
            if CyclingSpeedAndCadenceService in adv.services:
                self._status_update("Speed and Cadence: Found an advertisement")
                # Save advertisement. Overwrite duplicates from same address (device).
                advs[adv.address] = adv

        self.ble.stop_scan()
        self._status_update("Speed and Cadence: Stopped scanning")
        if not advs:
            # Nothing found. Go back and keep looking.
            return []

        # Connect to all available CSC sensors.
        self.cyc_connections = []
        for adv in advs.values():
            self.cyc_connections.append(self.ble.connect(adv))
            self._status_update("Speed and Cadence: Connected {}".format(len(self.cyc_connections)))

        self.cyc_services = []
        for conn in self.cyc_connections:
            self.cyc_services.append(conn[CyclingSpeedAndCadenceService])
        self._status_update("Pyloton: Finishing up...")

        return self.cyc_connections


    def _compute_speed(self, values, speed):
        wheel_diff = values.last_wheel_event_time - self._previous_wheel
        rev_diff = values.cumulative_wheel_revolutions - self._previous_revolutions
        if wheel_diff:
            # Rotations per minute is 60 times the amount of revolutions since
            # the last update over the time since the last update
            rpm = 60*(rev_diff/(wheel_diff/1024))
            # We then mutiply it by the wheel's circumference and convert it to mph
            speed = round((rpm * self.circumference) * (60/63360), 1)
            if speed < 0:
                speed = self._previous_speed
            self._previous_speed = speed
            self._previous_revolutions = values.cumulative_wheel_revolutions
            self._speed_failed = 0
        else:
            self._speed_failed += 1
            if self._speed_failed >= 3:
                speed = 0
        self._previous_wheel = values.last_wheel_event_time

        return speed


    def _compute_cadence(self, values, cadence):
        crank_diff = values.last_crank_event_time - self._previous_crank
        crank_rev_diff = values.cumulative_crank_revolutions-self._previous_rev

        if crank_rev_diff:
            # Rotations per minute is 60 times the amount of revolutions since the
            # last update over the time since the last update
            cadence = round(60*(crank_rev_diff/(crank_diff/1024)), 1)
            if cadence < 0:
                cadence = self._previous_cadence
            self._previous_cadence = cadence
            self._previous_rev = values.cumulative_crank_revolutions
            self._cadence_failed = 0
        else:
            self._cadence_failed += 1
            if self._cadence_failed >= 3:
                cadence = 0
        self._previous_crank = values.last_crank_event_time

        return cadence


    def read_s_and_c(self):
        """
        Reads data from the speed and cadence sensor
        """
        speed = self._previous_speed
        cadence = self._previous_cadence
        for conn, svc in zip(self.cyc_connections, self.cyc_services):
            if not conn.connected:
                speed = cadence = 0
                continue
            values = svc.measurement_values
            if not values:
                if self._cadence_failed >= 3 or self._speed_failed >= 3:
                    if self._cadence_failed > 3:
                        cadence = 0
                    if self._speed_failed > 3:
                        speed = 0
                continue
            if not values.last_wheel_event_time:
                continue
            speed = self._compute_speed(values, speed)
            if not values.last_crank_event_time:
                continue
            cadence = self._compute_cadence(values, cadence)

        if speed:
            speed = str(speed)[:8]
        if cadence:
            cadence = str(cadence)[:8]

        return speed, cadence


    def read_heart(self):
        """
        Reads date from the heart rate sensor
        """
        measurement = self._hr_service.measurement_values
        if measurement is None:
            heart = self._previous_heart
        else:
            heart = measurement.heart_rate
            self._previous_heart = measurement.heart_rate

        if heart:
            heart = str(heart)[:4]

        return heart


    def read_ams(self):
        """
        Reads data from AppleMediaServices
        """
        current = time.time()
        try:
            if current - self.start > 3:
                self.track_artist = not self.track_artist
                self.start = time.time()
            if self.track_artist:
                data = self.ams.artist
            if not self.track_artist:
                data = self.ams.title
        except (RuntimeError, UnicodeError):
            data = None

        if data:
            data = data[:16] + (data[16:] and '..')

        return data


    def icon_maker(self, n, icon_x, icon_y):
        """
        Generates icons as sprites
        """
        sprite = displayio.TileGrid(self.sprite_sheet, pixel_shader=self.palette, width=1,
                                    height=1, tile_width=40, tile_height=40, default_tile=n,
                                    x=icon_x, y=icon_y)
        return sprite


    def _label_maker(self, text, x, y, font=None):
        """
        Generates labels
        """
        if not font:
            font = self.arial24
        return label.Label(font=font, x=x, y=y, text=text, color=self.WHITE)


    def _get_y(self):
        """
        Helper function for setup_display. Gets the y values used for sprites and labels.
        """
        enabled = self.num_enabled

        if self.heart_enabled:
            self._heart_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1
        if self.speed_enabled:
            self._speed_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1
        if self.cadence_enabled:
            self._cadence_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1
        if self.ams_enabled:
            self._ams_y = 45*(self.num_enabled - enabled) + 75
            enabled -= 1


    def setup_display(self):
        """
        Prepares the display to show sensor values: Adds a header, a heading, and various sprites.
        """
        self._get_y()
        sprites = displayio.Group()

        rect = Rect(0, 0, 240, 50, fill=self.PURPLE)
        self.splash.append(rect)

        heading = label.Label(font=self.arial24, x=55, y=25, text="Pyloton", color=self.YELLOW)
        self.splash.append(heading)

        if self.heart_enabled:
            heart_sprite = self.icon_maker(0, 2, self._heart_y - 20)
            sprites.append(heart_sprite)

        if self.speed_enabled:
            speed_sprite = self.icon_maker(1, 2, self._speed_y - 20)
            sprites.append(speed_sprite)

        if self.cadence_enabled:
            cadence_sprite = self.icon_maker(2, 2, self._cadence_y - 20)
            sprites.append(cadence_sprite)

        if self.ams_enabled:
            ams_sprite = self.icon_maker(3, 2, self._ams_y - 20)
            sprites.append(ams_sprite)

        self.splash.append(sprites)

        self.display.show(self.splash)
        while self.loading_group:
            self.loading_group.pop()


    def update_display(self): #pylint: disable=too-many-branches
        """
        Updates the display to display the most recent values
        """
        if self.speed_enabled or self.cadence_enabled:
            speed, cadence = self.read_s_and_c()

        if self.heart_enabled:
            heart = self.read_heart()
            if not self._setup:
                self._hr_label = self._label_maker('{} bpm'.format(heart), 50, self._heart_y) # 75
                self.splash.append(self._hr_label)
            else:
                self._hr_label.text = '{} bpm'.format(heart)

        if self.speed_enabled:
            if not self._setup:
                self._sp_label = self._label_maker('{} mph'.format(speed), 50, self._speed_y) # 120
                self.splash.append(self._sp_label)
            else:
                self._sp_label.text = '{} mph'.format(speed)

        if self.cadence_enabled:
            if not self._setup:
                self._cadence_label = self._label_maker('{} rpm'.format(cadence), 50,
                                                        self._cadence_y)
                self.splash.append(self._cadence_label)
            else:
                self._cadence_label.text = '{} rpm'.format(cadence)

        if self.ams_enabled:
            ams = self.read_ams()
            if not self._setup:
                self._ams_label = self._label_maker('{}'.format(ams), 50, self._ams_y,
                                                    font=self.arial16)
                self.splash.append(self._ams_label)
            else:
                self._ams_label.text = '{}'.format(ams)

        self._setup = True


    def ams_remote(self):
        """
        Allows the 2 buttons and 3 capacitive touch pads in the CLUE to function as a media remote.
        """
        try:
            # Capacitive touch pad marked 0 goes to the previous track
            if self.clue.touch_0:
                self.ams.previous_track()
                time.sleep(0.25)

            # Capacitive touch pad marked 1 toggles pause/play
            if self.clue.touch_1:
                self.ams.toggle_play_pause()
                time.sleep(0.25)

            # Capacitive touch pad marked 2 advances to the next track
            if self.clue.touch_2:
                self.ams.next_track()
                time.sleep(0.25)

            # If button B (on the right) is pressed, it increases the volume
            if self.clue.button_b:
                self.ams.volume_up()
                time.sleep(0.1)

            # If button A (on the left) is pressed, the volume decreases
            if self.clue.button_a:
                self.ams.volume_down()
                time.sleep(0.1)
        except (RuntimeError, UnsupportedCommand, AttributeError):
            return

This guide was first published on Feb 26, 2020. It was last updated on Feb 26, 2020.

This page (Code the Pyloton in CircuitPython for CLUE) was last updated on Jun 01, 2023.

Text editor powered by tinymce.