
Installing Project Code
To use with CircuitPython, you need to first install a few libraries, into the lib folder on your CIRCUITPY drive. Then you need to update code.py with the example script.
Thankfully, we can do this in one go. In the example below, click the Download Project Bundle button below to download the necessary libraries and the code.py file in a zip file. Extract the contents of the zip file, open the directory CPB_Quick_Draw_Duo/quick_draw_duo/ and then click on the directory that matches the version of CircuitPython you're using and copy the contents of that directory to both of the CIRCUITPY drives.
Your CIRCUITPY drives should now look similar to the following image:

# SPDX-FileCopyrightText: 2020 Kevin J. Walters for Adafruit Industries # # SPDX-License-Identifier: MIT # cpb-quick-draw v1.11 # CircuitPython (on CPBs) Quick Draw reaction game # This is a two player game using two Circuit Playground Bluefruit boards # to test the reaction time of the players in a "quick draw" with the # synchronisation and draw times exchanged via Bluetooth Low Energy # The switches must be set to DIFFERENT positions on the two CPBs # Tested with Circuit Playground Bluefruit Alpha # and CircuitPython and 5.0.0-beta.2 # Needs recent adafruit_ble and adafruit_circuitplayground.bluefruit libraries # copy this file to CPB board as code.py # MIT License # Copyright (c) 2020 Kevin J. Walters # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import time import gc import struct import random # On a CPB this seeds from a hardware RNG in the CPU # This is the new cp object which works on CPX and CPB from adafruit_circuitplayground import cp from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement from adafruit_ble.services.nordic import UARTService from adafruit_bluefruit_connect.packet import Packet debug = 3 # Bluetooth scanning timeout BLESCAN_TIMEOUT = 5 TURNS = 10 # Integer number of seconds SHORTEST_DELAY = 1 LONGEST_DELAY = 5 # This was 10 in the original game # Misdraw time (100ms) IMPOSSIBLE_DUR = 0.1 # The duration of the short blue flashes (in seconds) during time delay # measurement in ping_for_rtt() and the long one at the end SYNC_FLASH_DUR = 0.1 SYNCED_LONGFLASH_DUR = 2 # The pause between displaying each pixel in the result summary SUMMARY_DUR = 0.5 # The number of "pings" sent by ping_for_rtt() NUM_PINGS = 8 # Special values used to indicate failed exchange of reaction times # and no value ERROR_DUR = -1.0 TIME_NONE = -2.0 # A timeout value for the protocol protocol_timeout = 14.0 # These application specific packets could be placed in an another file # and then import'ed class TimePacket(Packet): """A packet for exchanging time information, duration (last rtt) and time.monotonic() and lastrtt.""" _FMT_PARSE = '<xxffx' PACKET_LENGTH = struct.calcsize(_FMT_PARSE) # _FMT_CONSTRUCT doesn't include the trailing checksum byte. _FMT_CONSTRUCT = '<2sff' # Using lower case in attempt to avoid clashing with standard packets _TYPE_HEADER = b'!z' # number of args must match _FMT_PARSE # for Packet.parse_private() to work, hence the sendtime parameter def __init__(self, duration, sendtime): """Construct a TimePacket.""" self._duration = duration self._sendtime = sendtime # over-written later in to_bytes() def to_bytes(self): """Return the bytes needed to send this packet. Unusually this also sets the sendtime to the current time indicating when the data was serialised. """ self._sendtime = time.monotonic() # refresh _sendtime partial_packet = struct.pack(self._FMT_CONSTRUCT, self._TYPE_HEADER, self._duration, self._sendtime) return self.add_checksum(partial_packet) @property def duration(self): """The last rtt value or a negative number if n/a.""" return self._duration @property def sendtime(self): """The time packet was sent (when to_bytes() was last called).""" return self._sendtime TimePacket.register_packet_type() class StartGame(Packet): # pylint: disable=too-few-public-methods """A packet to indicate the receiver must start the game immediately.""" _FMT_PARSE = '<xxx' PACKET_LENGTH = struct.calcsize(_FMT_PARSE) # _FMT_CONSTRUCT doesn't include the trailing checksum byte. _FMT_CONSTRUCT = '<2s' # Using lower case in attempt to avoid clashing with standard packets _TYPE_HEADER = b'!y' def to_bytes(self): """Return the bytes needed to send this packet. """ partial_packet = struct.pack(self._FMT_CONSTRUCT, self._TYPE_HEADER) return self.add_checksum(partial_packet) StartGame.register_packet_type() # This board's role is determine by the switch, the two CPBs must have # the switch in different positions # left is the master / client / central device # right is the slave / server / peripheral device master_device = cp.switch # True when switch is left (near ear symbol) # The default brightness is 1.0 - leaving at that as it # improves performance by removing need for a second buffer in memory # 10 is number of NeoPixels on CPX/CPB numpixels = 10 halfnumpixels = numpixels // 2 pixels = cp.pixels faint_red = (1, 0, 0) red = (40, 0, 0) green = (0, 30, 0) blue = (0, 0, 10) brightblue = (0, 0, 100) yellow = (40, 20, 0) white = (30, 30, 30) black = (0, 0, 0) win_colour = green win_pixels = [win_colour] * halfnumpixels opponent_misdraw_colour = faint_red misdraw_colour = red misdraw_pixels = [misdraw_colour] * halfnumpixels draw_colour = yellow draw_pixels = [draw_colour] * halfnumpixels lose_colour = black if master_device: # button A is on left (usb at top player_button = lambda: cp.button_a # player_button.switch_to_input(pull=digitalio.Pull.DOWN) player_px = (0, halfnumpixels) opponent_px = (halfnumpixels, numpixels) else: # button B is on right player_button = lambda: cp.button_b # player_button.switch_to_input(pull=digitalio.Pull.DOWN) player_px = (halfnumpixels, numpixels) opponent_px = (0, halfnumpixels) def d_print(level, *args, **kwargs): """A simple conditional print for debugging based on global debug level.""" if not isinstance(level, int): print(level, *args, **kwargs) elif debug >= level: print(*args, **kwargs) def read_packet(timeout=None): """Read a packet with an optional locally implemented timeout. This is a workaround due to the timeout not being configurable.""" if timeout is None: return Packet.from_stream(uart) # Current fixed timeout is 1s packet = None read_start_t = time.monotonic() while packet is None and time.monotonic() - read_start_t < timeout: packet = Packet.from_stream(uart) return packet def connect(): """Connect two boards using the first Nordic UARTService the client finds over Bluetooth Low Energy. No timeouts, will wait forever.""" new_conn = None new_uart = None if master_device: # Master code while new_uart is None: d_print("Disconnected, scanning") for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=BLESCAN_TIMEOUT): d_print(2, advertisement.address, advertisement.rssi, "dBm") if UARTService not in advertisement.services: continue d_print(1, "Connecting to", advertisement.address) ble.connect(advertisement) break for conns in ble.connections: if UARTService in conns: d_print("Found UARTService") new_conn = conns new_uart = conns[UARTService] break ble.stop_scan() else: # Slave code new_uart = UARTService() advertisement = ProvideServicesAdvertisement(new_uart) d_print("Advertising") ble.start_advertising(advertisement) # Is there a conn object somewhere here?? while not ble.connected: pass return (new_conn, new_uart) def ping_for_rtt(): # pylint: disable=too-many-branches,too-many-statements """Calculate the send time for Bluetooth Low Energy based from a series of round-trip time measurements and assuming that half of that is the send time. This code must be run at approximately the same time on each device as the timeout per packet is one second.""" # The rtt is sent to server but for first packet client # sent there's no value to send, -1.0 is specal first packet value rtt = TIME_NONE rtts = [] offsets = [] if master_device: # Master code while True: gc.collect() # an opportune moment request = TimePacket(rtt, TIME_NONE) d_print(2, "TimePacket TX") uart.write(request.to_bytes()) response = Packet.from_stream(uart) t2 = time.monotonic() if isinstance(response, TimePacket): d_print(2, "TimePacket RX", response.sendtime) rtt = t2 - request.sendtime rtts.append(rtt) time_remote_cpb = response.sendtime + rtt / 2.0 offset = time_remote_cpb - t2 offsets.append(offset) d_print(3, "RTT plus a bit={:f},".format(rtt), "remote_time={:f},".format(time_remote_cpb), "offset={:f}".format(offset)) if len(rtts) >= NUM_PINGS: break pixels.fill(blue) time.sleep(SYNC_FLASH_DUR) pixels.fill(black) # This second sleep is very important to ensure that the # server is already awaiting the next packet before client # sends it to avoid server instantly reading buffered packets time.sleep(SYNC_FLASH_DUR) else: responses = 0 while True: gc.collect() # an opportune moment packet = Packet.from_stream(uart) if isinstance(packet, TimePacket): d_print(2, "TimePacket RX", packet.sendtime) # Send response uart.write(TimePacket(TIME_NONE, TIME_NONE).to_bytes()) responses += 1 rtts.append(packet.duration) pixels.fill(blue) time.sleep(SYNC_FLASH_DUR) pixels.fill(black) elif packet is None: # This could be a timeout or an indication of a disconnect d_print(2, "None from from_stream()") else: print("Unexpected packet type", packet) if responses >= NUM_PINGS: break # indicate a good rtt calculate, skip first one # as it's not present on slave if debug >= 3: print("RTTs:", rtts) if master_device: rtt_start = 1 rtt_end = len(rtts) - 1 else: rtt_start = 2 rtt_end = len(rtts) # Use quickest ones and hope any outlier times don't reoccur! quicker_rtts = sorted(rtts[rtt_start:rtt_end])[0:(NUM_PINGS // 2) + 1] mean_rtt = sum(quicker_rtts) / len(quicker_rtts) # Assuming symmetry between send and receive times # this may not be perfectly true, parsing is one factor here send_time = mean_rtt / 2.0 d_print(2, "send_time=", send_time) # Indicate sync with a longer 2 second blue flash pixels.fill(brightblue) time.sleep(SYNCED_LONGFLASH_DUR) pixels.fill(black) return send_time def random_pause(): """This is the pause before the players draw. It only runs on the master (BLE client) as it should be followed by a synchronising barrier.""" if master_device: time.sleep(random.randint(SHORTEST_DELAY, LONGEST_DELAY)) def barrier(packet_send_time): """Master send a Start message and then waits for a reply. Slave waits for Start message, then sends reply, then pauses for packet_send_time so both master and slave return from barrier() at the same time.""" if master_device: uart.write(StartGame().to_bytes()) d_print(2, "StartGame TX") packet = read_packet(timeout=protocol_timeout) if isinstance(packet, StartGame): d_print(2, "StartGame RX") else: print("Unexpected packet type", packet) else: packet = read_packet(timeout=protocol_timeout) if isinstance(packet, StartGame): d_print(2, "StartGame RX") uart.write(StartGame().to_bytes()) d_print(2, "StartGame TX") else: print("Unexpected packet type", packet) print("Sleeping to sync up", packet_send_time) time.sleep(packet_send_time) def sync_test(): """For testing synchronisation. Warning - this is flashes a lot!""" for _ in range(40): pixels.fill(white) time.sleep(0.1) pixels.fill(black) time.sleep(0.1) def get_opponent_reactiontime(player_reaction): """Send reaction time data to the other player and receive theirs. Reusing the TimePacket() for this.""" opponent_reaction = ERROR_DUR if master_device: uart.write(TimePacket(player_reaction, TIME_NONE).to_bytes()) print("TimePacket TX") packet = read_packet(timeout=protocol_timeout) if isinstance(packet, TimePacket): d_print(2, "TimePacket RX") opponent_reaction = packet.duration else: d_print(2, "Unexpected packet type", packet) else: packet = read_packet(timeout=protocol_timeout) if isinstance(packet, TimePacket): d_print(2, "TimePacket RX") opponent_reaction = packet.duration uart.write(TimePacket(player_reaction, TIME_NONE).to_bytes()) d_print(2, "TimePacket TX") else: print("Unexpected packet type", packet) return opponent_reaction def show_winner(player_reaction, opponent_reaction): """Show the winner on the appropriate set of NeoPixels. Returns win, misdraw, draw, colour) - 3 booleans and a result colour.""" l_win = False l_misdraw = False l_draw = False l_colour = lose_colour if player_reaction < IMPOSSIBLE_DUR or opponent_reaction < IMPOSSIBLE_DUR: if opponent_reaction != ERROR_DUR and opponent_reaction < IMPOSSIBLE_DUR: pixels[opponent_px[0]:opponent_px[1]] = misdraw_pixels l_colour = opponent_misdraw_colour # This must come after previous if to get the most appropriate colour if player_reaction != ERROR_DUR and player_reaction < IMPOSSIBLE_DUR: l_misdraw = True pixels[player_px[0]:player_px[1]] = misdraw_pixels l_colour = misdraw_colour # overwrite any opponent_misdraw_colour else: if player_reaction < opponent_reaction: l_win = True pixels[player_px[0]:player_px[1]] = win_pixels l_colour = win_colour elif opponent_reaction < player_reaction: pixels[opponent_px[0]:opponent_px[1]] = win_pixels else: # Equality! Very unlikely to reach here l_draw = False pixels[player_px[0]:player_px[1]] = draw_pixels pixels[opponent_px[0]:opponent_px[1]] = draw_pixels l_colour = draw_colour return (l_win, l_misdraw, l_draw, l_colour) def show_summary(result_colours): """Show the results on the NeoPixels.""" # trim anything beyond 10 for idx, p_colour in enumerate(result_colours[0:numpixels]): pixels[idx] = p_colour time.sleep(SUMMARY_DUR) # CPB auto-seeds from hardware random number generation on the nRF52840 chip # Note: original code for CPX uses A4-A7 analog inputs, # CPB cannot use A7 for analog in wins = 0 misdraws = 0 losses = 0 draws = 0 # default timeout is 1.0 and on latest library with UARTService this # cannot be changed ble = BLERadio() # Connect the two boards over Bluetooth Low Energy # Switch on left for master / client, switch on right for slave / server d_print("connect()") (conn, uart) = connect() # Calculate round-trip time (rtt) delay between the two CPB boards # flashing blue to indicate the packets and longer 2s flash when done ble_send_time = None d_print("ping_for_rtt()") ble_send_time = ping_for_rtt() my_results = [] # play the game for a number of TURNS then show results for _ in range(TURNS): # This is an attempt to force a reconnection but may not take into # account all disconnection scenarios if uart is None: (conn, uart) = connect() # This is a good time to garbage collect gc.collect() # Random pause to stop players preempting the draw random_pause() try: # Synchronise the two boards by exchanging a Start message d_print("barrier()") barrier(ble_send_time) if debug >= 4: sync_test() # Show white on all NeoPixels to indicate draw now # This will execute at the same time on both boards pixels.fill(white) # Wait for and time how long it takes for player to press button start_t = time.monotonic() while not player_button(): pass finish_t = time.monotonic() # Turn-off NeoPixels pixels.fill(black) # Play the shooting sound # 16k mono 8bit normalised version of # https://freesound.org/people/Diboz/sounds/213925/ cp.play_file("PistolRicochet.wav") # The CPBs are no longer synchronised due to reaction time varying # per player # Exchange draw times player_reaction_dur = finish_t - start_t opponent_reaction_dur = get_opponent_reactiontime(player_reaction_dur) # Show green for winner and red for any misdraws (win, misdraw, draw, colour) = show_winner(player_reaction_dur, opponent_reaction_dur) my_results.append(colour) if misdraw: misdraw += 1 elif draw: draws += 1 elif win: wins += 1 else: losses += 1 # Output reaction times to serial console in Mu friendly format print("({:d}, {:d}, {:f}, {:f})".format(wins, misdraws, player_reaction_dur, opponent_reaction_dur)) # Keep NeoPixel result colour for 5 seconds then turn-off and repeat time.sleep(5) except Exception as err: # pylint: disable=broad-except print("Caught exception", err) if conn is not None: conn.disconnect() conn = None uart = None pixels.fill(black) # show results summary on NeoPixels show_summary(my_results) # infinite pause to stop the code completing which would turn off NeoPixels while True: pass
Playing the Game
When the code starts on both boards the NeoPixels should indicate:
- Eight brief blue flashes - measurement of round-trip time (rtt).
- One longer blue flash - successful conclusion of rtt measurements.
- White - start of round one, time to press the button!
The player's button is the left button, if the switch is set to the left, and the right button if the switch is set to the right. The winner of each round is indicated by a green flash, which is on the finger side for the winner. At the end of the ten rounds, the player's board will indicate their score.
- Green - win.
- No colour - opponent won.
- Red - misdraw.
- Faint red - opponent misdrew.
- Amber - a draw, same reaction time for both players, very rare!
If one board is reset during the game, then the other must also be reset to start the game again.
The maximum Bluetooth range between two CPB boards is approximately 5m (17ft). The range will decrease if obstacles are in the path.
The video below shows both boards being reset and the code starting with the blue flashing for synchronisation. A tense ten round game follows. As per the original game, green on the finger side indicates the winner and at the end the local summary gradually appears on the NeoPixels.
Code Discussion
For this application the core game code is the same for both boards therefore it's reasonable for the implementation to be a single program which contains the communication code for both the Bluetooth LE central device and peripheral device. There are conditionals in the code based on a master_device
variable for when the code needs to behave differently.
The first significant part of the communication code is creating the connection and then measuring the round-trip time, as per the design, to calculate the send time.
ble = BLERadio() (conn, uart) = connect() ble_send_time = ping_for_rtt()
For each round of the game the following code runs.
# Code from the main for loop with most comments removed gc.collect() random_pause() try: barrier(ble_send_time) pixels.fill(white) start_t = time.monotonic() while not player_button(): pass finish_t = time.monotonic() pixels.fill(black) cp.play_file("PistolRicochet.wav") player_reaction_dur = finish_t - start_t opponent_reaction_dur = get_opponent_reactiontime(player_reaction_dur) # Show green for winner and red for any misdraws (win, misdraw, draw, colour) = show_winner(player_reaction_dur, opponent_reaction_dur) my_results.append(colour) if misdraw: misdraw += 1 elif draw: draws += 1 elif win: wins += 1 else: losses += 1 # Output reaction times to serial console in Mu friendly format print("({:d}, {:d}, {:f}, {:f})".format(wins, misdraws, player_reaction_dur, opponent_reaction_dur)) time.sleep(5) except Exception as err: # pylint: disable=broad-except print("Caught exception", err) if conn is not None: conn.disconnect() conn = None uart = None pixels.fill(black)
The code is a unusual in requesting that the memory is tidied up before the player reacts to the NeoPixels - this is an attempt to prevent it from occurring during the reaction timing. Generally, scheduling a garbage collection (GC) is best left to the interpreter or runtime system. In this specific case, the program is designed to be idle for a period of time and then needs to accurately time a small critical section of the code. These three factors are a reasonable justification for the programmer using a carefully placed gc.collect()
. In some languages, like Java, the programmer can only request a GC, it's not guaranteed to immediately occur.
The code between the time.monotonic()
calls is kept to a minimum to try to ensure only the reaction time is measured. The player is reacting to the prior execution of pixels.fill(white)
. All statements take a certain amount of time to execute and in the case of the fill
method it's not documented precisely when the RGB LEDs change. In this case any small advantage the player gets in reaction time is the same for each player so it makes no competitive difference.
The cp.play_file("PistolRicochet.wav")
is playing a shooting sound immediately after the player has pressed the button. This statement returns when the sound has completed playing which isn't ideal here but it's the only option from the convenient cp
object. This is a new combined object from the adafruit_circuitplayground library that can be used interchangeably on the CPB and CPX boards.
The get_opponent_reactiontime
()
function exchanges reaction times between the boards. The boards trust the reaction time is a genuine, accurate measurement. There's clearly scope here for the remote code being altered to cheat!
The boolean variable draw
is used to represent the unlikely outcome of both players pressing their button at the same time. This will become more likely if the boards have been powered on for several hours. Python and CircuitPython's time.monotonic()
returns time as a float variable. This return value increases over time reducing the resolution available to represent the fractional part of the value. The effect is far more significant for the 30bit storage representation used by CircuitPython (based on single precision floating point) in combination with the epoch time of 0.0 at power-up. This lowers the precision and granularity of the millisecond portion as time passes making draws more likely.
Page last edited January 22, 2025
Text editor powered by tinymce.