circuitpython_sf-cpb-quick-draw-duo.jpg
Quick Draw Duo played on two Circuit Playground Bluefruits with external audio at Sticky Fingers restaurant.

Installing Project Code

To use with CircuitPython, you need to first install a few libraries, into the lib folder on your CIRCUITPY drive. Then you need to update code.py with the example script.

Thankfully, we can do this in one go. In the example below, click the Download Project Bundle button below to download the necessary libraries and the code.py file in a zip file. Extract the contents of the zip file, open the directory CPB_Quick_Draw_Duo/quick_draw_duo/ and then click on the directory that matches the version of CircuitPython you're using and copy the contents of that directory to both of the CIRCUITPY drives.

Your CIRCUITPY drives should now look similar to the following image:

CIRCUITPY
# SPDX-FileCopyrightText: 2020 Kevin J. Walters for Adafruit Industries
#
# SPDX-License-Identifier: MIT

# cpb-quick-draw v1.11
# CircuitPython (on CPBs) Quick Draw reaction game
# This is a two player game using two Circuit Playground Bluefruit boards
# to test the reaction time of the players in a "quick draw" with the
# synchronisation and draw times exchanged via Bluetooth Low Energy
# The switches must be set to DIFFERENT positions on the two CPBs

# Tested with Circuit Playground Bluefruit Alpha
# and CircuitPython and 5.0.0-beta.2

# Needs recent adafruit_ble and adafruit_circuitplayground.bluefruit libraries

# copy this file to CPB board as code.py

# MIT License

# Copyright (c) 2020 Kevin J. Walters

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import time
import gc
import struct
import random  # On a CPB this seeds from a hardware RNG in the CPU

# This is the new cp object which works on CPX and CPB
from adafruit_circuitplayground import cp

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.services.nordic import UARTService

from adafruit_bluefruit_connect.packet import Packet

debug = 3

# Bluetooth scanning timeout
BLESCAN_TIMEOUT = 5

TURNS = 10
# Integer number of seconds
SHORTEST_DELAY = 1
LONGEST_DELAY = 5  # This was 10 in the original game

# Misdraw time (100ms)
IMPOSSIBLE_DUR = 0.1

# The duration of the short blue flashes (in seconds) during time delay
# measurement in ping_for_rtt() and the long one at the end
SYNC_FLASH_DUR = 0.1
SYNCED_LONGFLASH_DUR = 2

# The pause between displaying each pixel in the result summary
SUMMARY_DUR = 0.5

# The number of "pings" sent by ping_for_rtt()
NUM_PINGS = 8

# Special values used to indicate failed exchange of reaction times
# and no value
ERROR_DUR = -1.0
TIME_NONE = -2.0

# A timeout value for the protocol
protocol_timeout = 14.0

# These application specific packets could be placed in an another file
# and then import'ed
class TimePacket(Packet):
    """A packet for exchanging time information,
       duration (last rtt) and time.monotonic() and lastrtt."""

    _FMT_PARSE = '<xxffx'
    PACKET_LENGTH = struct.calcsize(_FMT_PARSE)
    # _FMT_CONSTRUCT doesn't include the trailing checksum byte.
    _FMT_CONSTRUCT = '<2sff'

    # Using lower case in attempt to avoid clashing with standard packets
    _TYPE_HEADER = b'!z'

    # number of args must match _FMT_PARSE
    # for Packet.parse_private() to work, hence the sendtime parameter
    def __init__(self, duration, sendtime):
        """Construct a TimePacket."""
        self._duration = duration
        self._sendtime = sendtime  # over-written later in to_bytes()

    def to_bytes(self):
        """Return the bytes needed to send this packet.
           Unusually this also sets the sendtime to the current time indicating
           when the data was serialised.
        """
        self._sendtime = time.monotonic()  # refresh _sendtime
        partial_packet = struct.pack(self._FMT_CONSTRUCT, self._TYPE_HEADER,
                                     self._duration, self._sendtime)
        return self.add_checksum(partial_packet)

    @property
    def duration(self):
        """The last rtt value or a negative number if n/a."""
        return self._duration

    @property
    def sendtime(self):
        """The time packet was sent (when to_bytes() was last called)."""
        return self._sendtime

TimePacket.register_packet_type()


class StartGame(Packet):  # pylint: disable=too-few-public-methods
    """A packet to indicate the receiver must start the game immediately."""

    _FMT_PARSE = '<xxx'
    PACKET_LENGTH = struct.calcsize(_FMT_PARSE)
    # _FMT_CONSTRUCT doesn't include the trailing checksum byte.
    _FMT_CONSTRUCT = '<2s'

    # Using lower case in attempt to avoid clashing with standard packets
    _TYPE_HEADER = b'!y'

    def to_bytes(self):
        """Return the bytes needed to send this packet.
        """
        partial_packet = struct.pack(self._FMT_CONSTRUCT, self._TYPE_HEADER)
        return self.add_checksum(partial_packet)


StartGame.register_packet_type()

# This board's role is determine by the switch, the two CPBs must have
# the switch in different positions
# left is the master / client / central device
# right is the slave / server / peripheral device
master_device = cp.switch  # True when switch is left (near ear symbol)

# The default brightness is 1.0 - leaving at that as it
# improves performance by removing need for a second buffer in memory
# 10 is number of NeoPixels on CPX/CPB
numpixels = 10
halfnumpixels = numpixels // 2
pixels = cp.pixels

faint_red = (1, 0, 0)
red = (40, 0, 0)
green = (0, 30, 0)
blue = (0, 0, 10)
brightblue = (0, 0, 100)
yellow = (40, 20, 0)
white = (30, 30, 30)
black = (0, 0, 0)

win_colour = green
win_pixels = [win_colour] * halfnumpixels
opponent_misdraw_colour = faint_red
misdraw_colour = red
misdraw_pixels = [misdraw_colour] * halfnumpixels
draw_colour = yellow
draw_pixels = [draw_colour] * halfnumpixels
lose_colour = black

if master_device:
    # button A is on left (usb at top
    player_button = lambda: cp.button_a
    # player_button.switch_to_input(pull=digitalio.Pull.DOWN)

    player_px = (0, halfnumpixels)
    opponent_px = (halfnumpixels, numpixels)
else:
    # button B is on right
    player_button = lambda: cp.button_b
    # player_button.switch_to_input(pull=digitalio.Pull.DOWN)

    player_px = (halfnumpixels, numpixels)
    opponent_px = (0, halfnumpixels)


def d_print(level, *args, **kwargs):
    """A simple conditional print for debugging based on global debug level."""
    if not isinstance(level, int):
        print(level, *args, **kwargs)
    elif debug >= level:
        print(*args, **kwargs)


def read_packet(timeout=None):
    """Read a packet with an optional locally implemented timeout.
       This is a workaround due to the timeout not being configurable."""
    if timeout is None:
        return Packet.from_stream(uart)  # Current fixed timeout is 1s

    packet = None
    read_start_t = time.monotonic()
    while packet is None and time.monotonic() - read_start_t < timeout:
        packet = Packet.from_stream(uart)
    return packet


def connect():
    """Connect two boards using the first Nordic UARTService the client
       finds over Bluetooth Low Energy.
       No timeouts, will wait forever."""
    new_conn = None
    new_uart = None
    if master_device:
        # Master code
        while new_uart is None:
            d_print("Disconnected, scanning")
            for advertisement in ble.start_scan(ProvideServicesAdvertisement,
                                                timeout=BLESCAN_TIMEOUT):
                d_print(2, advertisement.address, advertisement.rssi, "dBm")
                if UARTService not in advertisement.services:
                    continue
                d_print(1, "Connecting to", advertisement.address)
                ble.connect(advertisement)
                break
            for conns in ble.connections:
                if UARTService in conns:
                    d_print("Found UARTService")
                    new_conn = conns
                    new_uart = conns[UARTService]
                    break
            ble.stop_scan()

    else:
        # Slave code
        new_uart = UARTService()
        advertisement = ProvideServicesAdvertisement(new_uart)
        d_print("Advertising")
        ble.start_advertising(advertisement)
        # Is there a conn object somewhere here??
        while not ble.connected:
            pass

    return (new_conn, new_uart)


def ping_for_rtt():  # pylint: disable=too-many-branches,too-many-statements
    """Calculate the send time for Bluetooth Low Energy based from
       a series of round-trip time measurements and assuming that
       half of that is the send time.
       This code must be run at approximately the same time
       on each device as the timeout per packet is one second."""
    # The rtt is sent to server but for first packet client
    # sent there's no value to send, -1.0 is specal first packet value
    rtt = TIME_NONE
    rtts = []
    offsets = []

    if master_device:
        # Master code
        while True:
            gc.collect()  # an opportune moment
            request = TimePacket(rtt, TIME_NONE)
            d_print(2, "TimePacket TX")
            uart.write(request.to_bytes())
            response = Packet.from_stream(uart)
            t2 = time.monotonic()
            if isinstance(response, TimePacket):
                d_print(2, "TimePacket RX", response.sendtime)
                rtt = t2 - request.sendtime
                rtts.append(rtt)
                time_remote_cpb = response.sendtime + rtt / 2.0
                offset = time_remote_cpb - t2
                offsets.append(offset)
                d_print(3,
                        "RTT plus a bit={:f},".format(rtt),
                        "remote_time={:f},".format(time_remote_cpb),
                        "offset={:f}".format(offset))
            if len(rtts) >= NUM_PINGS:
                break

            pixels.fill(blue)
            time.sleep(SYNC_FLASH_DUR)
            pixels.fill(black)
            # This second sleep is very important to ensure that the
            # server is already awaiting the next packet before client
            # sends it to avoid server instantly reading buffered packets
            time.sleep(SYNC_FLASH_DUR)

    else:
        responses = 0
        while True:
            gc.collect()  # an opportune moment
            packet = Packet.from_stream(uart)
            if isinstance(packet, TimePacket):
                d_print(2, "TimePacket RX", packet.sendtime)
                # Send response
                uart.write(TimePacket(TIME_NONE, TIME_NONE).to_bytes())
                responses += 1
                rtts.append(packet.duration)
                pixels.fill(blue)
                time.sleep(SYNC_FLASH_DUR)
                pixels.fill(black)
            elif packet is None:
                # This could be a timeout or an indication of a disconnect
                d_print(2, "None from from_stream()")
            else:
                print("Unexpected packet type", packet)
            if responses >= NUM_PINGS:
                break

    # indicate a good rtt calculate, skip first one
    # as it's not present on slave
    if debug >= 3:
        print("RTTs:", rtts)
    if master_device:
        rtt_start = 1
        rtt_end = len(rtts) - 1
    else:
        rtt_start = 2
        rtt_end = len(rtts)

    # Use quickest ones and hope any outlier times don't reoccur!
    quicker_rtts = sorted(rtts[rtt_start:rtt_end])[0:(NUM_PINGS // 2) + 1]
    mean_rtt = sum(quicker_rtts) / len(quicker_rtts)
    # Assuming symmetry between send and receive times
    # this may not be perfectly true, parsing is one factor here
    send_time = mean_rtt / 2.0

    d_print(2, "send_time=", send_time)

    # Indicate sync with a longer 2 second blue flash
    pixels.fill(brightblue)
    time.sleep(SYNCED_LONGFLASH_DUR)
    pixels.fill(black)
    return send_time


def random_pause():
    """This is the pause before the players draw.
       It only runs on the master (BLE client) as it should be followed
       by a synchronising barrier."""
    if master_device:
        time.sleep(random.randint(SHORTEST_DELAY, LONGEST_DELAY))


def barrier(packet_send_time):
    """Master send a Start message and then waits for a reply.
       Slave waits for Start message, then sends reply, then pauses
       for packet_send_time so both master and slave return from
       barrier() at the same time."""

    if master_device:
        uart.write(StartGame().to_bytes())
        d_print(2, "StartGame TX")
        packet = read_packet(timeout=protocol_timeout)
        if isinstance(packet, StartGame):
            d_print(2, "StartGame RX")
        else:
            print("Unexpected packet type", packet)

    else:
        packet = read_packet(timeout=protocol_timeout)
        if isinstance(packet, StartGame):
            d_print(2, "StartGame RX")
            uart.write(StartGame().to_bytes())
            d_print(2, "StartGame TX")
        else:
            print("Unexpected packet type", packet)

        print("Sleeping to sync up", packet_send_time)
        time.sleep(packet_send_time)


def sync_test():
    """For testing synchronisation. Warning - this is flashes a lot!"""
    for _ in range(40):
        pixels.fill(white)
        time.sleep(0.1)
        pixels.fill(black)
        time.sleep(0.1)


def get_opponent_reactiontime(player_reaction):
    """Send reaction time data to the other player and receive theirs.
       Reusing the TimePacket() for this."""
    opponent_reaction = ERROR_DUR
    if master_device:
        uart.write(TimePacket(player_reaction,
                              TIME_NONE).to_bytes())
        print("TimePacket TX")
        packet = read_packet(timeout=protocol_timeout)
        if isinstance(packet, TimePacket):
            d_print(2, "TimePacket RX")
            opponent_reaction = packet.duration
        else:
            d_print(2, "Unexpected packet type", packet)

    else:
        packet = read_packet(timeout=protocol_timeout)
        if isinstance(packet, TimePacket):
            d_print(2, "TimePacket RX")
            opponent_reaction = packet.duration
            uart.write(TimePacket(player_reaction,
                                  TIME_NONE).to_bytes())
            d_print(2, "TimePacket TX")
        else:
            print("Unexpected packet type", packet)
    return opponent_reaction


def show_winner(player_reaction, opponent_reaction):
    """Show the winner on the appropriate set of NeoPixels.
       Returns win, misdraw, draw, colour) - 3 booleans and a result colour."""
    l_win = False
    l_misdraw = False
    l_draw = False
    l_colour = lose_colour

    if player_reaction < IMPOSSIBLE_DUR or opponent_reaction < IMPOSSIBLE_DUR:
        if opponent_reaction != ERROR_DUR and opponent_reaction < IMPOSSIBLE_DUR:
            pixels[opponent_px[0]:opponent_px[1]] = misdraw_pixels
            l_colour = opponent_misdraw_colour

        # This must come after previous if to get the most appropriate colour
        if player_reaction != ERROR_DUR and player_reaction < IMPOSSIBLE_DUR:
            l_misdraw = True
            pixels[player_px[0]:player_px[1]] = misdraw_pixels
            l_colour = misdraw_colour  # overwrite any opponent_misdraw_colour

    else:
        if player_reaction < opponent_reaction:
            l_win = True
            pixels[player_px[0]:player_px[1]] = win_pixels
            l_colour = win_colour
        elif opponent_reaction < player_reaction:
            pixels[opponent_px[0]:opponent_px[1]] = win_pixels
        else:
            # Equality! Very unlikely to reach here
            l_draw = False
            pixels[player_px[0]:player_px[1]] = draw_pixels
            pixels[opponent_px[0]:opponent_px[1]] = draw_pixels
            l_colour = draw_colour

    return (l_win, l_misdraw, l_draw, l_colour)


def show_summary(result_colours):
    """Show the results on the NeoPixels."""
    # trim anything beyond 10
    for idx, p_colour in enumerate(result_colours[0:numpixels]):
        pixels[idx] = p_colour
        time.sleep(SUMMARY_DUR)

# CPB auto-seeds from hardware random number generation on the nRF52840 chip
# Note: original code for CPX uses A4-A7 analog inputs,
#       CPB cannot use A7 for analog in

wins = 0
misdraws = 0
losses = 0
draws = 0

# default timeout is 1.0 and on latest library with UARTService this
# cannot be changed
ble = BLERadio()

# Connect the two boards over Bluetooth Low Energy
# Switch on left for master / client, switch on right for slave / server
d_print("connect()")
(conn, uart) = connect()

# Calculate round-trip time (rtt) delay between the two CPB boards
# flashing blue to indicate the packets and longer 2s flash when done
ble_send_time = None
d_print("ping_for_rtt()")
ble_send_time = ping_for_rtt()

my_results = []

# play the game for a number of TURNS then show results
for _ in range(TURNS):
    # This is an attempt to force a reconnection but may not take into
    # account all disconnection scenarios
    if uart is None:
        (conn, uart) = connect()

    # This is a good time to garbage collect
    gc.collect()

    # Random pause to stop players preempting the draw
    random_pause()

    try:
        # Synchronise the two boards by exchanging a Start message
        d_print("barrier()")
        barrier(ble_send_time)

        if debug >= 4:
            sync_test()

        # Show white on all NeoPixels to indicate draw now
        # This will execute at the same time on both boards
        pixels.fill(white)

        # Wait for and time how long it takes for player to press button
        start_t = time.monotonic()
        while not player_button():
            pass
        finish_t = time.monotonic()

        # Turn-off NeoPixels
        pixels.fill(black)

        # Play the shooting sound
        # 16k mono 8bit normalised version of
        # https://freesound.org/people/Diboz/sounds/213925/
        cp.play_file("PistolRicochet.wav")

        # The CPBs are no longer synchronised due to reaction time varying
        # per player
        # Exchange draw times
        player_reaction_dur = finish_t - start_t
        opponent_reaction_dur = get_opponent_reactiontime(player_reaction_dur)

        # Show green for winner and red for any misdraws
        (win, misdraw, draw, colour) = show_winner(player_reaction_dur,
                                                   opponent_reaction_dur)
        my_results.append(colour)
        if misdraw:
            misdraw += 1
        elif draw:
            draws += 1
        elif win:
            wins += 1
        else:
            losses += 1

        # Output reaction times to serial console in Mu friendly format
        print("({:d}, {:d}, {:f}, {:f})".format(wins, misdraws,
                                                player_reaction_dur,
                                                opponent_reaction_dur))

        # Keep NeoPixel result colour for 5 seconds then turn-off and repeat
        time.sleep(5)
    except Exception as err:  # pylint: disable=broad-except
        print("Caught exception", err)
        if conn is not None:
            conn.disconnect()
            conn = None
        uart = None

    pixels.fill(black)

# show results summary on NeoPixels
show_summary(my_results)

# infinite pause to stop the code completing which would turn off NeoPixels
while True:
    pass
The switch on the two CPB boards must be in a different position to assign the correct role in the communication protocol.

Playing the Game

When the code starts on both boards the NeoPixels should indicate:

  1. Eight brief blue flashes - measurement of round-trip time (rtt).
  2. One longer blue flash - successful conclusion of rtt measurements.
  3. White - start of round one, time to press the button!

The player's button is the left button, if the switch is set to the left, and the right button if the switch is set to the right. The winner of each round is indicated by a green flash, which is on the finger side for the winner. At the end of the ten rounds, the player's board will indicate their score.

  • Green - win.
  • No colour - opponent won.
  • Red - misdraw.
  • Faint red - opponent misdrew.
  • Amber - a draw, same reaction time for both players, very rare!

If one board is reset during the game, then the other must also be reset to start the game again.

The maximum Bluetooth range between two CPB boards is approximately 5m (17ft). The range will decrease if obstacles are in the path.

Example Video

The video below shows both boards being reset and the code starting with the blue flashing for synchronisation. A tense ten round game follows. As per the original game, green on the finger side indicates the winner and at the end the local summary gradually appears on the NeoPixels.

Code Discussion

For this application the core game code is the same for both boards therefore it's reasonable for the implementation to be a single program which contains the communication code for both the Bluetooth LE central device and peripheral device. There are conditionals in the code based on a master_device variable for when the code needs to behave differently.

The first significant part of the communication code is creating the connection and then measuring the round-trip time, as per the design, to calculate the send time.

ble = BLERadio()

(conn, uart) = connect()

ble_send_time = ping_for_rtt()

For each round of the game the following code runs.

# Code from the main for loop with most comments removed
    gc.collect()
    random_pause()
    try:
        barrier(ble_send_time)
        pixels.fill(white)
        start_t = time.monotonic()
        while not player_button():
            pass
        finish_t = time.monotonic()

        pixels.fill(black)

        cp.play_file("PistolRicochet.wav")

        player_reaction_dur = finish_t - start_t
        opponent_reaction_dur = get_opponent_reactiontime(player_reaction_dur)

        # Show green for winner and red for any misdraws
        (win, misdraw, draw, colour) = show_winner(player_reaction_dur,
                                                   opponent_reaction_dur)
        my_results.append(colour)
        if misdraw:
            misdraw += 1
        elif draw:
            draws += 1
        elif win:
            wins += 1
        else:
            losses += 1

        # Output reaction times to serial console in Mu friendly format
        print("({:d}, {:d}, {:f}, {:f})".format(wins, misdraws,
                                                player_reaction_dur,
                                                opponent_reaction_dur))

        time.sleep(5)
    except Exception as err:  # pylint: disable=broad-except
        print("Caught exception", err)
        if conn is not None:
            conn.disconnect()
            conn = None
        uart = None

    pixels.fill(black)

The code is a unusual in requesting that the memory is tidied up before the player reacts to the NeoPixels - this is an attempt to prevent it from occurring during the reaction timing. Generally, scheduling a garbage collection (GC) is best left to the interpreter or runtime system. In this specific case, the program is designed to be idle for a period of time and then needs to accurately time a small critical section of the code. These three factors are a reasonable justification for the programmer using a carefully placed gc.collect(). In some languages, like Java, the programmer can only request a GC, it's not guaranteed to immediately occur.

The code between the time.monotonic() calls is kept to a minimum to try to ensure only the reaction time is measured. The player is reacting to the prior execution of pixels.fill(white). All statements take a certain amount of time to execute and in the case of the fill method it's not documented precisely when the RGB LEDs change. In this case any small advantage the player gets in reaction time is the same for each player so it makes no competitive difference.

The cp.play_file("PistolRicochet.wav") is playing a shooting sound immediately after the player has pressed the button. This statement returns when the sound has completed playing which isn't ideal here but it's the only option from the convenient cp object. This is a new combined object from the adafruit_circuitplayground library that can be used interchangeably on the CPB and CPX boards.

The get_opponent_reactiontime() function exchanges reaction times between the boards. The boards trust the reaction time is a genuine, accurate measurement. There's clearly scope here for the remote code being altered to cheat!

The boolean variable draw is used to represent the unlikely outcome of both players pressing their button at the same time. This will become more likely if the boards have been powered on for several hours. Python and CircuitPython's time.monotonic() returns time as a float variable. This return value increases over time reducing the resolution available to represent the fractional part of the value. The effect is far more significant for the 30bit storage representation used by CircuitPython (based on single precision floating point) in combination with the epoch time of 0.0 at power-up. This lowers the precision and granularity of the millisecond portion as time passes making draws more likely.

This guide was first published on Jan 20, 2020. It was last updated on Nov 10, 2019.

This page (Quick Draw Duo) was last updated on Jun 06, 2023.

Text editor powered by tinymce.