circuitpython_clue-advanced-rpsgame-introscreenwithselection.jpg
The introduction screen with button guide for the Advanced rock, paper, scissors game.

This is a multi-player version of rock, paper, scissors with:

  • Simple displayio graphics.
  • An announcer and sound effects implemented with sound samples in the wav format.
  • A dynamic group for the players, formed after the button guide appears when the program starts at power-up (or reset).
  • A more sophisticated data exchange protocol providing anti-cheat features and a more responsive feel.
  • Configurable name for each player.
  • A sorted score table per game for the players.

This has been tested with up to six players.

Example Video

The video below shows the game being played on four devices. Each player would normally hide their choice. Bluetooth facilitates this as the game works well up to 4m (13ft) apart. The boards have been placed near each other in this demonstratation to ease the video recording.

The sequence of events in the video:

  • 00:00 The program has started running on three of the boards as those boards were reset about 2 seconds before the video starts.
  • 00:04 The introduction screen on the three boards with screens.
  • 00:08 One click on reset button of the Circuit Playground Bluefruit with no display. The introduction is shorter without a display, hence the staggered start.
  • 00:10 The button guide scrolls into view.
  • 00:21 The join game phase, the boards send and receive packets to establish the group for the game.
  • 00:23 The names of the players and the rssi of the first advertising packet received are shown on the devices with screens. All devices briefly flash blue (this is disabled by default in the code) on the NeoPixels when an advertising packet is received in the join game phase.
  • 00:44 The boards conclude their search for other players and then commence a four player game with three rounds per game.
  • 01:00 All four players have made their choice and are exchanging them, the game announces "Ready" and hums during transmission.
  • 01:14 Exchange of data concludes and results from the first round are shown and announced on each device.
  • 01:56 Second round begins.
  • 02:38 Third and final round begins.
  • 02:53 The Circuit Playground Bluefruit shows the scores on the NeoPixels.
  • 03:07 The other three boards with displays show the scores for the game and sort them into descending order.

Installing Project Code

To use with CircuitPython, you need to first install a few libraries, into the lib folder on your CIRCUITPY drive. Then you need to update code.py with the example script.

Thankfully, we can do this in one go. In the example below, click the Download Project Bundle button below to download the necessary libraries and the code.py file in a zip file. Extract the contents of the zip file, open the directory CLUE_Rock_Paper_Scissors/advanced/ and then click on the directory that matches the version of CircuitPython you're using and copy the contents of that directory to your CIRCUITPY drive.

Your CIRCUITPY drive should now look similar to the following image:

CIRCUITPY
# SPDX-FileCopyrightText: 2020 Kevin J Walters for Adafruit Industries
#
# SPDX-License-Identifier: MIT

# clue-multi-rpsgame v1.20
# CircuitPython massively multiplayer rock paper scissors game over Bluetooth LE

# Tested with CLUE and Circuit Playground Bluefruit Alpha with TFT Gizmo
# using CircuitPython and 5.3.0

# copy this file to CLUE/CPB board as code.py

# MIT License

# Copyright (c) 2020 Kevin J. Walters

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import gc
import os
import random

from micropython import const
import board
import digitalio

import neopixel
from adafruit_ble import BLERadio

# These imports works on CLUE, CPB (and CPX on 5.x)
try:
    from audioio import AudioOut
except ImportError:
    from audiopwmio import PWMAudioOut as AudioOut

# RPS module files
from rps_advertisements import JoinGameAdvertisement, \
                               RpsEncDataAdvertisement, \
                               RpsKeyDataAdvertisement, \
                               RpsRoundEndAdvertisement
from rps_audio import SampleJukebox
from rps_comms import broadcastAndReceive, addrToText, MIN_AD_INTERVAL
from rps_crypto import bytesPad, strUnpad, generateOTPadKey, \
                       enlargeKey, encrypt, decrypt
from rps_display import RPSDisplay, blankScreen


# Look for our name in secrets.py file if present
ble_name = None
try:
    from secrets import secrets
    ble_name = secrets.get("rps_name")
    if ble_name is None:
        ble_name = secrets.get("ble_name")
        if ble_name is None:
            print("INFO: No rps_name or ble_name entry found in secrets dict")
except ImportError:
    pass   # File is optional, reaching here is not a program error


debug = 1

def d_print(level, *args, **kwargs):
    """A simple conditional print for debugging based on global debug level."""
    if not isinstance(level, int):
        print(level, *args, **kwargs)
    elif debug >= level:
        print(*args, **kwargs)


def tftGizmoPresent():
    """Determine if the TFT Gizmo is attached.
       The TFT's Gizmo circuitry for backlight features a 10k pull-down resistor.
       This attempts to verify the presence of the pull-down to determine
       if TFT Gizmo is present.
       This is likely to get confused if anything else is connected to pad A3.
       Only use this on Circuit Playground Express (CPX)
       or Circuit Playground Bluefruit (CPB) boards."""
    present = True
    try:
        with digitalio.DigitalInOut(board.A3) as backlight_pin:
            backlight_pin.pull = digitalio.Pull.UP
            present = not backlight_pin.value
    except ValueError:
        # The Gizmo is already initialised, i.e. showing console output
        pass

    return present


# Assuming CLUE if it's not a Circuit Playround (Bluefruit)
clue_less = "Circuit Playground" in os.uname().machine

# Note: difference in pull-up and pull-down
#       and logical not use for buttons
if clue_less:
    # CPB with TFT Gizmo (240x240)
    # from adafruit_circuitplayground import cp  # Avoiding to save memory

    # Outputs
    if tftGizmoPresent():
        from adafruit_gizmo import tft_gizmo
        display = tft_gizmo.TFT_Gizmo()
        JG_RX_COL = 0x0000ff
        BUTTON_Y_POS = 120
    else:
        display = None
        JG_RX_COL = 0x000030  # dimmer blue for upward facing CPB NeoPixels
        BUTTON_Y_POS = None

    audio_out = AudioOut(board.SPEAKER)
    #pixels = cp.pixels
    pixels = neopixel.NeoPixel(board.NEOPIXEL, 10)

    # Enable the onboard amplifier for speaker
    #cp._speaker_enable.value = True  # pylint: disable=protected-access
    speaker_enable = digitalio.DigitalInOut(board.SPEAKER_ENABLE)
    speaker_enable.switch_to_output(value=False)
    speaker_enable.value = True

    # Inputs
    # buttons reversed if it is used upside-down with Gizmo
    _button_a = digitalio.DigitalInOut(board.BUTTON_A)
    _button_a.switch_to_input(pull=digitalio.Pull.DOWN)
    _button_b = digitalio.DigitalInOut(board.BUTTON_B)
    _button_b.switch_to_input(pull=digitalio.Pull.DOWN)
    if display is None:
        def button_left():
            return _button_a.value
        def button_right():
            return _button_b.value
    else:
        def button_left():
            return _button_b.value
        def button_right():
            return _button_a.value

else:
    # CLUE with builtin screen (240x240)
    # from adafruit_clue import clue  # Avoiding to save memory

    # Outputs
    display = board.DISPLAY
    audio_out = AudioOut(board.SPEAKER)
    #pixels = clue.pixel
    pixels = neopixel.NeoPixel(board.NEOPIXEL, 1)
    JG_RX_COL = 0x0000ff
    BUTTON_Y_POS = 152

    # Inputs
    _button_a = digitalio.DigitalInOut(board.BUTTON_A)
    _button_a.switch_to_input(pull=digitalio.Pull.UP)
    _button_b = digitalio.DigitalInOut(board.BUTTON_B)
    _button_b.switch_to_input(pull=digitalio.Pull.UP)
    def button_left():
        return not _button_a.value
    def  button_right():
        return not _button_b.value


blankScreen(display, pixels)

# Set to True for blue flashing when devices are joining the playing group
JG_FLASH = False

IMAGE_DIR = "rps/images"
AUDIO_DIR = "rps/audio"

audio_files = (("searching", "welcome-to", "arena", "ready")
               + ("rock", "paper", "scissors")
               + ("start-tx", "end-tx", "txing")
               + ("rock-scissors", "paper-rock", "scissors-paper")
               + ("you-win", "draw", "you-lose", "error")
               + ("humiliation", "excellent"))

gc.collect()
d_print(2, "GC before SJ", gc.mem_free())
sample = SampleJukebox(audio_out, audio_files,
                       directory=AUDIO_DIR)
del audio_files  # not needed anymore
gc.collect()
d_print(2, "GC after SJ", gc.mem_free())

# A lookup table in Dict form for win/lose, each value is a sample name
# Does not need to cater for draw (tie) condition
WAV_VICTORY_NAME = { "rp": "paper-rock",
                     "pr": "paper-rock",
                     "ps": "scissors-paper",
                     "sp": "scissors-paper",
                     "sr": "rock-scissors",
                     "rs": "rock-scissors"}

# This limit is based on displaying names on screen with scale=2 font
MAX_PLAYERS = 8
# Some code is dependent on these being lower-case
CHOICES = ("rock", "paper", "scissors")

rps_display = RPSDisplay(display, pixels,
                         CHOICES, sample, WAV_VICTORY_NAME,
                         MAX_PLAYERS, BUTTON_Y_POS,
                         IMAGE_DIR + "/rps-sprites-ind4.bmp",
                         ble_color=JG_RX_COL)

# Transmit maximum times in seconds
JG_MSG_TIME_S = 20
FIRST_MSG_TIME_S = 12
STD_MSG_TIME_S = 4
LAST_ACK_TIME_S = 1.5


# Intro screen with audio
rps_display.introductionScreen()

# Enable the Bluetooth LE radio and set player's name (from secrets.py)
ble = BLERadio()
if ble_name is not None:
    ble.name = ble_name


game_no = 1
round_no = 1
wins = losses = draws = voids = 0

# TOTAL_ROUNDS = 5
TOTAL_ROUNDS = 3

CRYPTO_ALGO = "chacha20"
KEY_SIZE = 8  # in bytes
KEY_ENLARGE = 256 // KEY_SIZE // 8

# Scoring values
POINTS_WIN = 2
POINTS_DRAW = 1

WIN = const(1)
DRAW = const(2)  # AKA tie
LOSE = const(3)
INVALID = const(4)

def evaluateRound(mine, yours):
    """Determine who won the round in this game based on the two strings mine and yours.
       Returns WIN, DRAW, LOSE or INVALID for bad input."""
    # Return INVALID if any input is None
    try:
        mine_lc = mine.lower()
        yours_lc = yours.lower()
    except AttributeError:
        return INVALID

    if mine_lc not in CHOICES or yours_lc not in CHOICES:
        return INVALID

    # Both inputs are valid choices if we got this far
    # pylint: disable=too-many-boolean-expressions
    if mine_lc == yours_lc:
        return DRAW
    elif (mine_lc == "rock" and yours_lc == "scissors"
          or mine_lc == "paper" and yours_lc == "rock"
          or mine_lc == "scissors" and yours_lc == "paper"):
        return WIN

    return LOSE


rps_display.playerListScreen()

def addPlayer(name, addr_text, address, ad):
    # pylint: disable=unused-argument
    # address is part of call back
    """Add the player name and mac address to players global variable
       and the name and rssi (if present) to on-screen list."""

    rssi = ad.rssi if ad else None

    players.append((name, addr_text))
    rps_display.addPlayer(name, rssi=rssi)


# Make a list of all the player's (name, mac address as text)
# where both are strings with this player as first entry
players = []
my_name = ble.name
rps_display.fadeUpDown("down")
addPlayer(my_name, addrToText(ble.address_bytes), None, None)


# These two functions mainly serve to adapt the call back arguments
# to the called functions which do not use them
def jgAdCallbackFlashBLE(_a, _b, _c):
    """Used in broadcastAndReceive to flash the NeoPixels
       when advertising messages are received."""
    return rps_display.flashBLE()

def jgEndscanCallback(_a, _b, _c):
    """Used in broadcastAndReceive to allow early termination of the scanning
       when the left button is pressed.
       Button may need to be held down for a second."""
    return button_left()

# Join Game
gc.collect()
d_print(2, "GC before JG", gc.mem_free())

sample.play("searching", loop=True)
rps_display.fadeUpDown("up")
jg_msg = JoinGameAdvertisement(game="RPS")
(_, _, _) = broadcastAndReceive(ble,
                                jg_msg,
                                scan_time=JG_MSG_TIME_S,
                                scan_response_request=True,
                                ad_cb=(jgAdCallbackFlashBLE
                                       if JG_FLASH
                                       else None),
                                endscan_cb=jgEndscanCallback,
                                name_cb=addPlayer)
del _  # To clean-up with GC below
sample.stop()
gc.collect()
d_print(2, "GC after JG", gc.mem_free())

# Wait for button release - this stops a long press
# being acted upon in the main loop further down
while button_left():
    pass

scores = [0] * len(players)
num_other_players = len(players) - 1

# Set the advertising interval to the minimum for four or fewer players
# and above that extend value by players multiplied by 7ms
ad_interval = MIN_AD_INTERVAL if len(players) <= 4 else len(players) * 0.007

d_print(1, "PLAYERS", players)

# Sequence numbers - real packets start range between 1-255 inclusive
seq_tx = [1]  # The next number to send

new_round_init = True

# A nonce by definition must not be reused but here a random key is
# generated per round and this is used once per round so this is ok
static_nonce = bytes(range(12, 0, -1))

while True:
    if round_no > TOTAL_ROUNDS:
        print("Summary: ",
              "wins {:d}, losses {:d},"
              " draws {:d}, void {:d}\n\n".format(wins, losses, draws, voids))

        rps_display.showGameResult(players, scores, rounds_tot=TOTAL_ROUNDS)

        # Reset variables for another game
        round_no = 1
        wins = losses = draws = voids = 0
        scores = [0] * len(players)
        game_no += 1

    if new_round_init:
        rps_display.showGameRound(game_no=game_no, round_no=round_no, rounds_tot=TOTAL_ROUNDS)
        # Make a new initial random choice for the player and show it
        my_choice_idx = random.randrange(len(CHOICES))
        rps_display.fadeUpDown("down")
        rps_display.showChoice(my_choice_idx,
                               game_no=game_no, round_no=round_no, rounds_tot=TOTAL_ROUNDS,
                               won_sf=wins, drew_sf=draws, lost_sf=losses)
        rps_display.fadeUpDown("up")
        new_round_init = False

    if button_left():
        while button_left():  # Wait for button release
            pass
        my_choice_idx = (my_choice_idx + 1) % len(CHOICES)
        rps_display.showChoice(my_choice_idx,
                               game_no=game_no, round_no=round_no, rounds_tot=TOTAL_ROUNDS,
                               won_sf=wins, drew_sf=draws, lost_sf=losses)

    if button_right():
        gc.collect()
        d_print(2, "GC before comms", gc.mem_free())

        # This sound cue is really for other players
        sample.play("ready")

        my_choice = CHOICES[my_choice_idx]
        player_choices = [my_choice]

        # Repeating key four times to make key for ChaCha20
        short_key = generateOTPadKey(KEY_SIZE)
        key = enlargeKey(short_key, KEY_ENLARGE)
        d_print(3, "KEY", key)

        plain_bytes = bytesPad(my_choice, size=8, pad=0)
        cipher_bytes = encrypt(plain_bytes, key, CRYPTO_ALGO,
                               nonce=static_nonce)
        enc_data_msg = RpsEncDataAdvertisement(enc_data=cipher_bytes,
                                               round_no=round_no)

        # Wait for ready sound sample to stop playing
        sample.wait()
        sample.play("start-tx")
        sample.wait()
        sample.play("txing", loop=True)
        # Players will not be synchronised at this point as they do not
        # have to make their choices simultaneously - much longer 12 second
        # time to accomodate this
        _, enc_data_by_addr, _ = broadcastAndReceive(ble,
                                                     enc_data_msg,
                                                     RpsEncDataAdvertisement,
                                                     RpsKeyDataAdvertisement,
                                                     scan_time=FIRST_MSG_TIME_S,
                                                     ad_interval=ad_interval,
                                                     receive_n=num_other_players,
                                                     seq_tx=seq_tx)

        key_data_msg = RpsKeyDataAdvertisement(key_data=short_key, round_no=round_no)
        # All of the programs will be loosely synchronised now
        _, key_data_by_addr, _ = broadcastAndReceive(ble,
                                                     key_data_msg,
                                                     RpsEncDataAdvertisement,
                                                     RpsKeyDataAdvertisement,
                                                     RpsRoundEndAdvertisement,
                                                     scan_time=STD_MSG_TIME_S,
                                                     ad_interval=ad_interval,
                                                     receive_n=num_other_players,
                                                     seq_tx=seq_tx,
                                                     ads_by_addr=enc_data_by_addr)
        del enc_data_by_addr

        # Play end transmit sound while doing next decrypt bit
        sample.play("end-tx")

        re_msg = RpsRoundEndAdvertisement(round_no=round_no)
        # The round end message is really about acknowledging receipt of
        # the key_data_msg by sending a non-critical message with the ack
        _, re_by_addr, _ = broadcastAndReceive(ble,
                                               re_msg,
                                               RpsEncDataAdvertisement,
                                               RpsKeyDataAdvertisement,
                                               RpsRoundEndAdvertisement,
                                               scan_time=LAST_ACK_TIME_S,
                                               ad_interval=ad_interval,
                                               receive_n=num_other_players,
                                               seq_tx=seq_tx,
                                               ads_by_addr=key_data_by_addr)
        del key_data_by_addr, _  # To allow GC

        # This will have accumulated all the messages for this round
        allmsg_by_addr = re_by_addr
        del re_by_addr

        # Decrypt results
        # If any data is incorrect the opponent_choice is left as None
        for p_idx1 in range(1, len(players)):
            print("DECRYPT GC", p_idx1, gc.mem_free())
            opponent_name = players[p_idx1][0]
            opponent_macaddr = players[p_idx1][1]
            opponent_choice = None
            opponent_msgs = allmsg_by_addr.get(opponent_macaddr)
            if opponent_msgs is None:
                opponent_msgs = []
            cipher_ad = cipher_bytes = cipher_round = None
            key_ad = key_bytes = key_round = None
            # There should be either one or two messges per type
            # two occurs when there
            for msg_idx in range(len(opponent_msgs)):
                if (cipher_ad is None
                        and isinstance(opponent_msgs[msg_idx][0],
                                       RpsEncDataAdvertisement)):
                    cipher_ad = opponent_msgs[msg_idx][0]
                    cipher_bytes = cipher_ad.enc_data
                    cipher_round = cipher_ad.round_no
                elif (key_ad is None
                      and isinstance(opponent_msgs[msg_idx][0],
                                     RpsKeyDataAdvertisement)):
                    key_ad = opponent_msgs[msg_idx][0]
                    key_bytes = key_ad.key_data
                    key_round = key_ad.round_no

            if cipher_ad and key_ad:
                if round_no == cipher_round == key_round:
                    key = enlargeKey(key_bytes, KEY_ENLARGE)
                    plain_bytes = decrypt(cipher_bytes, key, CRYPTO_ALGO,
                                          nonce=static_nonce)
                    opponent_choice = strUnpad(plain_bytes)
                else:
                    print("Received wrong round for {:d} {:d}: {:d} {:d}",
                          opponent_name, round_no, cipher_round, key_round)
            else:
                print("Missing packets: RpsEncDataAdvertisement "
                      "and RpsKeyDataAdvertisement:", cipher_ad, key_ad)
            player_choices.append(opponent_choice)

        # Free up some memory by deleting any data that's no longer needed
        del allmsg_by_addr
        gc.collect()
        d_print(2, "GC after comms", gc.mem_free())

        sample.wait()  # Ensure end-tx has completed

        # Chalk up wins and losses - checks this player but also has to
        # check other players against each other to calculate all the
        # scores for the high score table at the end of game
        for p_idx0, (p0_name, _) in enumerate(players[:len(players) - 1]):
            for p_idx1, (p1_name, _) in enumerate(players[p_idx0 + 1:], p_idx0 + 1):
                # evaluateRound takes text strings for RPS
                result = evaluateRound(player_choices[p_idx0],
                                       player_choices[p_idx1])

                # this_player is used to control incrementing the summary
                # for the tally for this local player
                this_player = 0
                void = False
                if p_idx0 == 0:
                    this_player = 1
                    p0_ch_idx = None
                    p1_ch_idx = None
                    try:
                        p0_ch_idx = CHOICES.index(player_choices[p_idx0])
                        p1_ch_idx = CHOICES.index(player_choices[p_idx1])
                    except ValueError:
                        void = True  # Ensure this is marked void
                        print("ERROR", "failed to decode",
                              player_choices[p_idx0], player_choices[p_idx1])

                    # showPlayerVPlayer takes int index values for RPS
                    rps_display.showPlayerVPlayer(p0_name, p1_name, p_idx1,
                                                  p0_ch_idx, p1_ch_idx,
                                                  result == WIN,
                                                  result == DRAW,
                                                  result == INVALID or void)

                if result == INVALID or void:
                    voids += this_player
                elif result == DRAW:
                    draws += this_player
                    scores[p_idx0] += POINTS_DRAW
                    scores[p_idx1] += POINTS_DRAW
                elif result == WIN:
                    wins += this_player
                    scores[p_idx0] += POINTS_WIN
                else:
                    losses += this_player
                    scores[p_idx1] += POINTS_WIN

                d_print(2,
                        p0_name, player_choices[p_idx0], "vs",
                        p1_name, player_choices[p_idx1],
                        "result", result)

        print("Game {:d}, round {:d}, wins {:d}, losses {:d}, draws {:d}, "
              "void {:d}".format(game_no, round_no, wins, losses, draws, voids))

        round_no += 1
        new_round_init = True

Configuration

If you wish, the player's name can be set by adding an entry to the secrets dict in an optional secrets.py file. This can be either "rps_name" or "ble_name", for example:

secrets = {
           "rps_name": "Huey"
          }

The file is typically used by projects for confidential information like credentials. Here, it's just a useful file to reuse as it keeps the configuration separate from the code.

Code Discussion

This is a much larger program. It's split into a number of files with related functions and there's some limited use of classes.

The clue and cpb Objects

This program does not use the clue and cpb objects from the libraries adafruit_clue and adafruit_circuitplayground.bluefruit, respectively. These are very useful for interactive experimentation and small programs but they do import a lot of libraries. For larger programs like the Advanced game that do not use most of the functionality, it is more economical on memory to import the specific libraries used in the program.

Join Game

After the introduction screen and button guide, the game establishes the group playing with a JoinGameAdvertisement message sent amongst all the boards that want to play the game. The code is show below.

def addPlayer(name, addr_text, address, ad):
    # pylint: disable=unused-argument
    # address is part of call back
    """Add the player name and mac address to players global variable
       and the name and rssi (if present) to on-screen list."""

    rssi = ad.rssi if ad else None

    players.append((name, addr_text))
    rps_display.addPlayer(name, rssi=rssi)


# Make a list of all the player's (name, mac address as text)
# where both are strings with this player as first entry
players = []
my_name = ble.name
rps_display.fadeUpDown("down")
addPlayer(my_name, addrToText(ble.address_bytes), None, None)


# These two functions mainly serve to adapt the call back arguments
# to the called functions which do not use them
def jgAdCallbackFlashBLE(_a, _b, _c):
    """Used in broadcastAndReceive to flash the NeoPixels
       when advertising messages are received."""
    return rps_display.flashBLE()

def jgEndscanCallback(_a, _b, _c):
    """Used in broadcastAndReceive to allow early termination of the scanning
       when the left button is pressed.
       Button may need to be held down for a second."""
    return button_left()

# Join Game
gc.collect()
d_print(2, "GC before JG", gc.mem_free())

sample.play("searching", loop=True)
rps_display.fadeUpDown("up")
jg_msg = JoinGameAdvertisement(game="RPS")
(_, _, _) = broadcastAndReceive(ble,
                                jg_msg,
                                scan_time=JG_MSG_TIME_S,
                                scan_response_request=True,
                                ad_cb=(jgAdCallbackFlashBLE
                                       if JG_FLASH
                                       else None),
                                endscan_cb=jgEndscanCallback,
                                name_cb=addPlayer)
del _  # To clean-up with GC below
sample.stop()
gc.collect()
d_print(2, "GC after JG", gc.mem_free())

The addPlayer() function adds a player to the the players list, a global variable, and calls the addPlayer() method on the (global) rps_display object to add the player's details to the list on the display. This is used once to add the local player and then passed as a callback to broadcastAndReceive() to add the remote players as the device receives JoinGameAdvertisement from other players. The use of the callback means the return values are not needed from broadcastAndReceive() - a Python convention is to assign these to the _ variable and in this case only the third element in the tuple will survive, the rest are over-written. This isn't needed and in order to minimise memory in use it is deleted. This makes the variable's previous value available for garbage collection by the subsequent gc.collect().

Some other callbacks are used here:

  • ad_cb is called whenever an Advertisement is received and is used for flashing the NeoPixel's blue if JG_FLASH is True (code currently has it set to False);
  • endscan_db is called periodically and will terminate the scanning if it returns a True value, a left button press returns True here.

A sound effect is playing continuously during this part of the program, the sample object is a SampleJukebox which is described further down.

Main Loop

The main loop has the same three conditional statements as the previous game plus one extra one to deal with rounds per game.

The first if checks for the end of each game, displaying the score on the display or NeoPixels using rps_display.showGameResult() and then resetting all the variables for the next game.

# Main loop code excerpt 1/8 - end of a game
while True:
    if round_no > TOTAL_ROUNDS:
        print("Summary: ",
              "wins {:d}, losses {:d},"
              " draws {:d}, void {:d}\n\n".format(wins, losses,
                                                  draws, voids))

        rps_display.showGameResult(players, scores,
                                   rounds_tot=TOTAL_ROUNDS)

        # Reset variables for another game
        round_no = 1
        wins = 0
        losses = 0
        draws = 0
        voids = 0
        scores = [0] * len(players)
        game_no += 1

The second if checks a boolean to see if it is the start of a new round. The game and round number are updated with rps_display.showGameRound() and then a random starting choice is made for the player and displayed on screen using the rps_display.showChoice() with surrounding fades for a visually pleasant transition.

# Main loop code excerpt 2/8 - end of a round
    if new_round_init:
        rps_display.showGameRound(game_no=game_no, round_no=round_no,
                                  rounds_tot=TOTAL_ROUNDS)
        # Make a new initial random choice for the player and show it
        my_choice_idx = random.randrange(len(CHOICES))
        rps_display.fadeUpDown("down")
        rps_display.showChoice(my_choice_idx,
                               game_no=game_no, round_no=round_no,
                               rounds_tot=TOTAL_ROUNDS,
                               won_sf=wins, drew_sf=draws,
                               lost_sf=losses)
        rps_display.fadeUpDown("up")
        new_round_init = False

The if for the left button is very similar to the Simple game with the rps_display.Here, showChoice() takes the place of setCursor() from the Simple game.

# Main loop code excerpt 3/8 - left button press
    if button_left():
        while button_left():  ### Wait for button release
            pass
        my_choice_idx = (my_choice_idx + 1) % len(CHOICES)
        rps_display.showChoice(my_choice_idx,
                               game_no=game_no, round_no=round_no,
                               rounds_tot=TOTAL_ROUNDS,
                               won_sf=wins, drew_sf=draws,
                               lost_sf=losses)

The final if for the right button contains a lot of code for the exchange of choices between all the players. This would be better if it was split into some functions to make the code easier to read and understand.

The first part creates a short, per-message 8 byte encryption key with generateOTPPadKey() stretched with enlargeKey(). The player's choice is padded and and encrypted and incorporated into the RpsEncDataAdvertisement object. The padding here just adds NUL characters to ensure the message is 8 bytes long.

Padding schemes like PKCS#5 are used for real applications using encryption.

# Main loop code excerpt 5/8 - right button press i
    if button_right():
        gc.collect()
        d_print(2, "GC before comms", gc.mem_free())

        # This sound cue is really for other players
        sample.play("ready")

        my_choice = CHOICES[my_choice_idx]
        player_choices = [my_choice]

        # Repeating key four times to make key for ChaCha20
        short_key = generateOTPadKey(KEY_SIZE)
        key = enlargeKey(short_key, KEY_ENLARGE)
        d_print(3, "KEY", key)

        plain_bytes = bytesPad(my_choice, size=8, pad=0)
        cipher_bytes = encrypt(plain_bytes, key, CRYPTO_ALGO,
                               nonce=static_nonce)
        enc_data_msg = RpsEncDataAdvertisement(enc_data=cipher_bytes,
                                               round_no=round_no)

        # Wait for ready sound sample to stop playing
        sample.wait()

The next part, shown below, starts the humming sound sample which runs while the program is sending messages. The three messages, RpsEncDataAdvertisement, RpsKeyDataAdvertisement and RpsRoundEndAdvertisement, are sent in that order and only when the prior message has been received from all other players. The data from the other players' messages is accumulated and left in allmsg_by_addr.

# Main loop code excerpt 6/8 - right button press ii
        sample.play("start-tx")
        sample.wait()
        sample.play("txing", loop=True)
        # Players will not be synchronised at this point as they do not
        # have to make their choices simultaneously - much longer 12 second
        # time to accomodate this
        _, enc_data_by_addr, _ = broadcastAndReceive(ble,
                                                     enc_data_msg,
                                                     RpsEncDataAdvertisement,
                                                     RpsKeyDataAdvertisement,
                                                     scan_time=FIRST_MSG_TIME_S,
                                                     ad_interval=ad_interval,
                                                     receive_n=num_other_players,
                                                     seq_tx=seq_tx)

        key_data_msg = RpsKeyDataAdvertisement(key_data=short_key, round_no=round_no)
        # All of the programs will be loosely synchronised now
        _, key_data_by_addr, _ = broadcastAndReceive(ble,
                                                     key_data_msg,
                                                     RpsEncDataAdvertisement,
                                                     RpsKeyDataAdvertisement,
                                                     RpsRoundEndAdvertisement,
                                                     scan_time=STD_MSG_TIME_S,
                                                     ad_interval=ad_interval,
                                                     receive_n=num_other_players,
                                                     seq_tx=seq_tx,
                                                     ads_by_addr=enc_data_by_addr)
        del enc_data_by_addr

        # Play end transmit sound while doing next decrypt bit
        sample.play("end-tx")

        re_msg = RpsRoundEndAdvertisement(round_no=round_no)
        # The round end message is really about acknowledging receipt of
        # the key_data_msg by sending a non-critical message with the ack
        _, re_by_addr, _ = broadcastAndReceive(ble,
                                               re_msg,
                                               RpsEncDataAdvertisement,
                                               RpsKeyDataAdvertisement,
                                               RpsRoundEndAdvertisement,
                                               scan_time=LAST_ACK_TIME_S,
                                               ad_interval=ad_interval,
                                               receive_n=num_other_players,
                                               seq_tx=seq_tx,
                                               ads_by_addr=key_data_by_addr)
        del key_data_by_addr, _  # To allow GC

        # This will have accumulated all the messages for this round
        allmsg_by_addr = re_by_addr
        del re_by_addr

To free up as much memory as possible any data structures not needed at this point are del'ed. The other players' message(s) are then decrypted.

# Main loop code excerpt 6/8 - right button press iii
        # Decrypt results
        # If any data is incorrect the opponent_choice is left as None
        for p_idx1 in range(1, len(players)):
            print("DECRYPT GC", p_idx1, gc.mem_free())
            opponent_name = players[p_idx1][0]
            opponent_macaddr = players[p_idx1][1]
            opponent_choice = None
            opponent_msgs = allmsg_by_addr.get(opponent_macaddr)
            if opponent_msgs is None:
                opponent_msgs = []
            cipher_ad = cipher_bytes = cipher_round = None
            key_ad = key_bytes = key_round = None
            # There should be either one or two messges per type
            # two occurs when there
            for msg_idx in range(len(opponent_msgs)):
                if (cipher_ad is None
                        and isinstance(opponent_msgs[msg_idx][0],
                                       RpsEncDataAdvertisement)):
                    cipher_ad = opponent_msgs[msg_idx][0]
                    cipher_bytes = cipher_ad.enc_data
                    cipher_round = cipher_ad.round_no
                elif (key_ad is None
                      and isinstance(opponent_msgs[msg_idx][0],
                                     RpsKeyDataAdvertisement)):
                    key_ad = opponent_msgs[msg_idx][0]
                    key_bytes = key_ad.key_data
                    key_round = key_ad.round_no

            if cipher_ad and key_ad:
                if round_no == cipher_round == key_round:
                    key = enlargeKey(key_bytes, KEY_ENLARGE)
                    plain_bytes = decrypt(cipher_bytes, key, CRYPTO_ALGO,
                                          nonce=static_nonce)
                    opponent_choice = strUnpad(plain_bytes)
                else:
                    print("Received wrong round for {:d} {:d}: {:d} {:d}",
                          opponent_name, round_no, cipher_round, key_round)
            else:
                print("Missing packets: RpsEncDataAdvertisement "
                      "and RpsKeyDataAdvertisement:", cipher_ad, key_ad)
            player_choices.append(opponent_choice)

        # Free up some memory by deleting any data that's no longer needed
        del allmsg_by_addr
        gc.collect()
        d_print(2, "GC after comms", gc.mem_free())

The decrypted choices of the opponents are now checked against the local player's choice to show who has won. Perhaps surprisingly, if there's more than one opponent then they are checked against each other. This is required to calculate the complete score table for all the players shown at the end of each game.

# Main loop code excerpt 7/8 - right button press iv
        sample.wait()  # Ensure end-tx has completed

        # Chalk up wins and losses - checks this player but also has to
        # check other players against each other to calculate all the
        # scores for the high score table at the end of game
        for p_idx0, (p0_name, _) in enumerate(players[:len(players) - 1]):
            for p_idx1, (p1_name, _) in enumerate(players[p_idx0 + 1:], p_idx0 + 1):
                # evaluateRound takes text strings for RPS
                result = evaluateRound(player_choices[p_idx0],
                                       player_choices[p_idx1])

                # this_player is used to control incrementing the summary
                # for the tally for this local player
                this_player = 0
                void = False
                if p_idx0 == 0:
                    this_player = 1
                    p0_ch_idx = None
                    p1_ch_idx = None
                    try:
                        p0_ch_idx = CHOICES.index(player_choices[p_idx0])
                        p1_ch_idx = CHOICES.index(player_choices[p_idx1])
                    except ValueError:
                        void = True  # Ensure this is marked void
                        print("ERROR", "failed to decode",
                              player_choices[p_idx0], player_choices[p_idx1])

                    # showPlayerVPlayer takes int index values for RPS
                    rps_display.showPlayerVPlayer(p0_name, p1_name, p_idx1,
                                                  p0_ch_idx, p1_ch_idx,
                                                  result == WIN,
                                                  result == DRAW,
                                                  result == INVALID or void)

Finally the results from the round are added to the sub-totals.

# Main loop code excerpt 8/8 - right button press v
               if result == INVALID or void:
                    voids += this_player
                elif result == DRAW:
                    draws += this_player
                    scores[p_idx0] += POINTS_DRAW
                    scores[p_idx1] += POINTS_DRAW
                elif result == WIN:
                    wins += this_player
                    scores[p_idx0] += POINTS_WIN
                else:
                    losses += this_player
                    scores[p_idx1] += POINTS_WIN

                d_print(2,
                        p0_name, player_choices[p_idx0], "vs",
                        p1_name, player_choices[p_idx1],
                        "result", result)

        print("Game {:d}, round {:d}, wins {:d}, losses {:d}, draws {:d}, "
              "void {:d}".format(game_no, round_no, wins, losses, draws, voids))

        round_no += 1
        new_round_init = True

The evaluateRound() function has a different style of implementation compared to the Simple version.

def evaluateRound(mine, yours):
    """Determine who won the round in this game based on the two strings mine and yours.
       Returns WIN, DRAW, LOSE or INVALID for bad input."""
    # Return INVALID if any input is None
    try:
        mine_lc = mine.lower()
        yours_lc = yours.lower()
    except AttributeError:
        return INVALID

    if mine_lc not in CHOICES or yours_lc not in CHOICES:
        return INVALID

    # Both inputs are valid choices if we got this far
    if mine_lc == yours_lc:
        return DRAW
    elif (mine_lc == "rock" and yours_lc == "scissors"
          or mine_lc == "paper" and yours_lc == "rock"
          or mine_lc == "scissors" and yours_lc == "paper"):
        return WIN

    return LOSE

This more compact version makes use of Python's in operator to check if the inputs are present in the tuple of valid CHOICES for data validation. The CHOICES variable could be a list or a tuple. A tuple is used as form of defensive programming since tuples are immutable (read-only) in Python - this prevents accidental modification to this constant sequence.

The return type has changed and is now a form of enumerated type using const() global variables. CircuitPython's const() can only be used for integers but that's sufficient here. CPython offers the useful Enum class for enumerations but this is not currently present in CircuitPython.

The long expression in elif uses a mixture of and and or boolean operators. The and expressions are evaluated first as the programmer intended due to Python's precedence rules, placing and higher than or.

SampleJukebox class

Using a separate class for playing the sound samples tidies up the code a little but the main motivation for creating this class was to add a workaround to reduce the chance of fatal MemoryError exceptions, possibly related to memory fragmentation. The PWMAudioOut library currently allocates memory dynamically under the covers. The SampleJukebox class attempts to use the library in a way where the 2048 byte buffer is immediately re-allocated just after it is de-allocated. This means the code is almost guaranteed to be able to reuse the previous 2048 contiguous section of memory.

RPSDisplay class

This is a large class containing all the code to send output to the display and/or to the NeoPixels for each of the screens in the game. Its constructor takes the sample object to let it play samples during some of the simple animations.

Dynamic Advertising Interval

The advertising interval seen in the code above is set just before the main loop using.

ad_interval = MIN_AD_INTERVAL if len(players) <= 4 else len(players) * 0.007

This increases the interval for five or more players, 5 players will be 35 milliseconds, 6 players will be 42ms. This is an attempt to keep the collisions at a low level to maintain good efficiency.

MIN_AD_INTERVAL would be expected to be 0.02 seconds (20ms) for Bluetooth Low Energy. The actual value used in the code is 0.02001. This is needed to work around a minor bug in adadfruit_ble library's start_advertising() which relates to the fundamental limited precision of floating-point. Some general background on this can be found in the Number Representation section of Clue Sensor Plotter in CircuitPython.

Advertisement Matching

The adafruit_ble library provides a feature for efficiently filtering advertising packets including any scan responses in start_scan(). If any classes are passed as arguments then only packets matching those classes will be returned. This is implemented with a simple prefix mechanism where a list of 1 or more prefix byte sequences are checked against the advertising data.

For RpsRoundEndAdvertisement the class sets match_prefixes attribute which the library code then uses to construct the prefix.

# match_prefixes tuple replaces deprecated prefix
    match_prefixes = (
        struct.pack(
            _PREFIX_FMT,
            MANUFACTURING_DATA_ADT,
            ADAFRUIT_COMPANY_ID,
            struct.calcsize("<H" + _DATA_FMT_ROUND),
            RPS_ROUND_ID
        ),
    )

The MANUFACTURING_DATA_ADT has a value of 0xff, the ADAFRUIT_COMPANY_ID is 0x0822 and the RPS_ROUND_ID is 0xfe43. The prefix ends up as the bytes (in hex) 0bff22080343fe. The 0b is a length field and automatically prepended by the library. Some values appear "reversed", this is due to BLE values being encoded in little-endian order. Python's struct library uses "<" to represent this. The example below shows how this prefix will match the data in the RpsRoundEndAdvertisement packet regardless of the per-instance field values and correctly doesn't match a different packet.

circuitpython_ble-prefix-matching-example1-1800x1350.png
An example of prefix matching for two RpsRoundEndAdvertisement messages with different values and a JoinGameAdvertisement message which differs and does not match. The length fields for the Manufacturer's Data field and within it are shown with a darker background.

The order of the round_no and sequence_number is critical for this prefix matching to work as the identifier number associated with round_no (0xfe43) is being used as part of the prefix to identify the class.

Current Issues

The game works well but in common with almost all large codebases there are a number of issues.

Flicker when Changing Player's Choice

There is a noticeable flicker when a player presses the left button to advance the choice between the rock, paper and scissors icons. This is a common problem with simple or naive graphics code and was left in the code on purpose to demonstrate the phenomena.

The implementation of the showChoice() method replaces all the screen objects and then recreates them including ones that have not been updated like the text at the top and bottom of the display. The extra display updates from these unnecessary changes are slow enough for the player to see the transition as flicker. This flicker can be reduced or eliminated by only changing the displayio objects which need to be updated.

In general, if lots of changes are made to displayio objects and if these are best displayed at once then briefly turning auto_refresh off is an option. This will coalesce the changes into one display update.

Another occurrence of this type of flashing/flicker has been discussed in the Adafruit CircuitPython and MicroPython forum.

Dependency on dict Key Ordering  - Now Fixed

The four custom Advertisement sub-classes were dependent on the behaviour of CircuitPython's dict type for the identifier numbers to match the prefixes for each message.

A simple example on CircuitPython 5.3.0's REPL below shows the nature of this issue.

>>> letters = {"a": 1, "b": 2, "c" :3}
>>> letters
{'c': 3, 'a': 1, 'b': 2}

The order of the keys is not maintained in CircuitPython, "a" is the first key in letters as it is constructed but "c" is returned as the first one. This is a common feature for data types built upon a rudimentary hash table. Depending on order which is not specified is a common source of pernicious, latent bugs.

CPython started returning dict keys in insert-order from version 3.6 and this formed part of the specification in version 3.7.

A few languages also randomly vary their hash tables to counter denial attacks, see Daniel Lemire: Use random hashing if you care about security?

Improvement

This was fixed with a new feature in the adafruit_ble library to maintain the order of fields within a ManufacturerData field based on the order of assignment. CircuitPython offers an OrderedDict data type which was used to implement this. 

CPB Only Needs adafruit_display_text Library

The RPSDisplay class uses Group and Label. This creates the unfortunate, unnecessary dependency on having the adafruit_display_text library even when the Circuit Playground Bluefruit is used without a display. This could be fixed with some sub-classes that implement the relevant code for each type of output in separate files and conditional import statements for those sub-classes.

Sequence Number Wraparound

The sequence number used in advertising messages starts at 1 and is transmitted as an unsigned 8bit number giving it a maximum of 255. The code currently does not deal with exceeding 255 and will probably break in the 28th game.

It's common for packet identifier and sequence numbers to use a fairly small data size as the number only needs to span the maximum number of packets which are "in flight" and can wraparound back to 0. For comparison, IPv4 has a 16bit identifier for each packet and TCP has a 32bit sequence number per connection.

Protocol Versioning

The messages do not have a version number. For a simple game this isn't a serious problem but for any software where there's an expectation of new features or different versions of the software in use concurrently then having a version number in the protocol/messages is useful to be able to detect or support old/new message formats, possibly concurrently.

BLE advertising packets are self-describing to some extent in the sense they have types in the fields so this is not as problematic as with some other formats.

Very Infrequent MemoryError Allocating 65536 or 42441 Bytes

A MemoryError exception is raised when a program runs out of memory from the heap. In CircuitPython the accompanying stack trace is printed to the serial console. There is an elusive bug somewhere that causes an allocation attempt for a relatively large amount of memory.

Traceback (most recent call last):
  File "code.py", line 448, in <module>
  File "rps_comms.py", line 340, in broadcastAndReceive
  File "rps_comms.py", line 115, in startScan
  File "adafruit_ble/__init__.py", line 263, in start_scan
MemoryError: memory allocation failed, allocating 65536 bytes

The program here does not do anything obvious which needs such a large amount of memory particularly around the call to start_scan(). The sizes are suspicious:

  • both values appear far larger than any piece of data in the program;
  • 42441 only has two factors, 53 * 797, suggesting it is not a simple repeated data type;
  • 65536 is 216.

There's no obvious pattern so far for when this occurs although it does appear to always happen on the third call to broadcastAndReceive(). The frequency of occurrence is approximately 1 round in 500. It may occur more frequently when many (5+) players are playing. This is logged as issue in GitHub.

This guide was first published on Sep 02, 2020. It was last updated on 2023-12-05 11:34:53 -0500.

This page (Advanced Game) was last updated on Nov 28, 2023.

Text editor powered by tinymce.