Cooperative Multitasking With asyncio
The structure of this project's code follows principals discussed in the
Cooperative Multitasking in CircuitPython with asyncio Learn Guide. If you haven't already I would recommend that you read through at least the first 3 pages: Overview, Concurrent Tasks, and Communicating Between Tasks in order to gain a foundational understanding of this project's structure.
GameState Class
The GameState
class keeps track of all of the variables needed by the game, and facilitates "communication" between the other tasks by storing variables that each other task can access and update.
It holds following variables:
-
difficulty
- How many blinks there will be in the sequence. -
led_off_time
- How long the LEDs stay off during a blink. -
led_on_time
- How long the LEDs stay on during a blink. -
score
- The player's current score. -
current_state
- State Machine variable that controls the games behavior. The possible values areSTATE_WAITING_TO_START
,STATE_PLAYER_TURN
, andSTATE_BLINKA_TURN
. -
sequence
- List that holds the current sequence of colors. Colors are represented by the single letters: R, G, B, Y. -
index
- The current index within the blinking sequence for Blinka's turn. -
btn_cooldown_time
- A timestamp that will get set to a short time into the future and then used to ignore button presses that occur within the cool down period. It prevents unwanted effects from accidental double presses. -
highscore
- The high score value, initially read out of NVM storage and then updated if the player manages to beat it.
Asyncio Tasks
The core logic for the project is comprised of 3 asyncio
tasks:
- Main Task
- Player Action Task
- Blinka Action Task
The Main Task simply initializes the GameState
object and launches the other two tasks. The majority of the logic is implemented in the Player Action Task and Blinka Action Task.
Internally both the Player Action Task, and Blinka Action Task use a infinite loop to run forever which include call(s) to asyncio.sleep()
to let other tasks have a chance to run too.
Player Action Task
The async function player_action(game_state)
is the player action task. It's responsible for polling the buttons to determine if the player has pressed any of them, and if so to react as appropriate depending on the current game state and the button that was pressed.
This task checks the current game_state
in order to decide how to react to button presses. The following states are possible:
- Waiting to Start - If the game was in this state then a button press will cause the "ready set go" blinks to occur and then the game will begin with Blinka's first turn.
- Blinka's Turn - Button presses are ignored during Blinka's turn.
- Players Turn - The game checks if the button that was pressed matches the next color in the sequence, if so the player is awarded a point, and if not the game is over. If the sequence is empty, meaning the player has pressed all buttons correctly, then it goes back to Blinka's turn to blink a new sequence. When the game ends the state becomes Waiting to Start to allow the player to play again with the press of button.
Blinka Action Task
The async function blinka_action(game_state)
is the Blinka Action Task. It's responsible for the actions during Blinka's turn.
This task doesn't do anything if the current_state
is not Blinka's Turn. When it is Blinka's turn the task will randomly generate a sequence if one hasn't been generated already. Then it will loop through the sequence blinking each color LED as it comes up in the sequence.
Once the end of the sequence is reached the current_state
is changed back to Player's Turn.
Source Code
The source code is embedded below and has been commented thoroughly with notes describing the purpose for each piece of code.
# SPDX-FileCopyrightText: 2024 Tim Cocks # # SPDX-License-Identifier: MIT """ Blinka Says - A game inspired by Simon. Test your memory by following along to the pattern that Blinka puts forth. This project uses asyncio for cooperative multitasking through tasks. There is one task for the players actions and another for Blinka's actions. The player action reads input from the buttons being pressed by the player and reacts to them as appropriate. The Blinka action blinks the randomized sequence that the player must then try to follow and replicate. """ import random import time import asyncio import board from digitalio import DigitalInOut, Direction from displayio import Group import keypad import terminalio from adafruit_display_text.bitmap_label import Label import foamyguy_nvm_helper as nvm_helper # State Machine variables STATE_WAITING_TO_START = 0 STATE_PLAYER_TURN = 1 STATE_BLINKA_TURN = 2 # list of color shortcut letters COLORS = ("Y", "G", "R", "B") # keypad initialization to read the button pins buttons = keypad.Keys( (board.D5, board.D6, board.D9, board.D10), value_when_pressed=False, pull=True) # Init LED output pins leds = { "Y": DigitalInOut(board.A0), "G": DigitalInOut(board.A1), "R": DigitalInOut(board.A3), "B": DigitalInOut(board.A2) } for color in COLORS: leds[color].direction = Direction.OUTPUT # display setup display = board.DISPLAY main_group = Group() # Label to show the "High" score label highscore_lbl = Label(terminalio.FONT, text="High ", scale=2) highscore_lbl.anchor_point = (1.0, 0.0) highscore_lbl.anchored_position = (display.width - 4, 4) main_group.append(highscore_lbl) # Label to show the "Current" score label curscore_lbl = Label(terminalio.FONT, text="Current", scale=2) curscore_lbl.anchor_point = (0.0, 0.0) curscore_lbl.anchored_position = (4, 4) main_group.append(curscore_lbl) # Label to show the current score numerical value curscore_val = Label(terminalio.FONT, text="0", scale=4) curscore_val.anchor_point = (0.0, 0.0) curscore_val.anchored_position = (4, curscore_lbl.bounding_box[1] + (curscore_lbl.bounding_box[3] * curscore_lbl.scale) + 10) main_group.append(curscore_val) # Label to show the high score numerical value highscore_val = Label(terminalio.FONT, text="0", scale=4) highscore_val.anchor_point = (1.0, 0.0) highscore_val.anchored_position = (display.width - 4, highscore_lbl.bounding_box[1] + highscore_lbl.bounding_box[3] * curscore_lbl.scale + 10) main_group.append(highscore_val) # Label to show the "Game Over" message. game_over_lbl = Label(terminalio.FONT, text="Game Over", scale=3) game_over_lbl.anchor_point = (0.5, 1.0) game_over_lbl.anchored_position = (display.width // 2, display.height - 4) game_over_lbl.hidden = True main_group.append(game_over_lbl) # set the main_group to show on the display display.root_group = main_group class GameState: """ Class that stores all the information about the game state. Used for keeping track of everything and sharing it between the asyncio tasks. """ def __init__(self, difficulty: int, led_off_time: int, led_on_time: int): # how many blinks per sequence self.difficulty = difficulty # how long the LED should spend off during a blink self.led_off_time = led_off_time # how long the LED should spend on during a blink self.led_on_time = led_on_time # the player's current score self.score = 0 # the current state for the state machine that controls how the game behaves. self.current_state = STATE_WAITING_TO_START # list to hold the sequence of colors that have been chosen self.sequence = [] # the current index within the sequence self.index = 0 # a timestamp that will be used to ignore button presses for a short period of time # to avoid accidental double presses. self.btn_cooldown_time = -1 # a variable to hold the eventual high-score self.highscore = None try: # read data from NVM storage read_data = nvm_helper.read_data() # if we found data check if it's a high-score value if isinstance(read_data, list) and read_data[0] == "bls_hs": # it is a high-score so populate the label with its value self.highscore = read_data[1] except EOFError: # no high-score data pass async def player_action(game_state: GameState): """ Read the buttons to determine if the player has pressed any of them, and react appropriately if so. :param game_state: The GameState object that holds the current state of the game. :return: None """ # pylint: disable=too-many-branches, too-many-statements # loop forever inside of this task while True: # get any events that have occurred from the keypad object key_event = buttons.events.get() # if we're Waiting To Start if game_state.current_state == STATE_WAITING_TO_START: # if the buttons aren't locked out for cool down if game_state.btn_cooldown_time < time.monotonic(): # if there is a released event on any key if key_event and key_event.released: # hide the game over label game_over_lbl.hidden = True # show the starting score curscore_val.text = str(game_state.score) print("Starting game!") # ready set go blinks for _, led_obj in leds.items(): led_obj.value = True await asyncio.sleep(250 / 1000) for _, led_obj in leds.items(): led_obj.value = False await asyncio.sleep(250 / 1000) for _, led_obj in leds.items(): led_obj.value = True await asyncio.sleep(250 / 1000) for _, led_obj in leds.items(): led_obj.value = False await asyncio.sleep(250 / 1000) for _, led_obj in leds.items(): led_obj.value = True await asyncio.sleep(250 / 1000) for _, led_obj in leds.items(): led_obj.value = False # change the state to Blinka's Turn game_state.current_state = STATE_BLINKA_TURN # if it's Blinka's Turn elif game_state.current_state == STATE_BLINKA_TURN: # ignore buttons on Blinka's turn pass # if it's the Player's Turn elif game_state.current_state == STATE_PLAYER_TURN: # if a button has been pressed if key_event and key_event.pressed: # light up the corresponding LED in the button leds[COLORS[key_event.key_number]].value = True # if a button has been released if key_event and key_event.released: # turn off the corresponding LED in the button leds[COLORS[key_event.key_number]].value = False #print(key_event) #print(game_state.sequence) # if the color of the button pressed matches the current color in the sequence if COLORS[key_event.key_number] == game_state.sequence[0]: # remove the current color from the sequence game_state.sequence.pop(0) # increment the score value game_state.score += 1 # update the score label curscore_val.text = str(game_state.score) # if there are no colors left in the sequence # i.e. the level is complete if len(game_state.sequence) == 0: # give a bonus point for finishing the level game_state.score += 1 # increase the difficulty for next level game_state.difficulty += 1 # update the score label curscore_val.text = str(game_state.score) # change the state to Blinka's Turn game_state.current_state = STATE_BLINKA_TURN print(f"difficulty after lvl: {game_state.difficulty}") # The pressed button color does not match the current color in the sequence # i.e. player pressed the wrong button else: print("player lost!") # show the game over label game_over_lbl.hidden = False # if the player's current score is higher than the highscore if game_state.highscore is None or game_state.score > game_state.highscore: # save new high score value to NVM storage nvm_helper.save_data(("bls_hs", game_state.score), test_run=False) # update highscore variable to the players score game_state.highscore = game_state.score # update the high score label highscore_val.text = str(game_state.score) # change to Waiting to Start game_state.current_state = STATE_WAITING_TO_START # reset the current score to zero game_state.score = 0 # reset the difficulty to 1 game_state.difficulty = 1 # enable the button cooldown timer to ignore any button presses # in the near future to avoid double presses game_state.btn_cooldown_time = time.monotonic() + 1.5 # reset the sequence to an empty list game_state.sequence = [] # sleep, allowing other asyncio tasks to take action await asyncio.sleep(0) async def blinka_action(game_state: GameState): """ Choose colors randomly to add to the sequence. Blink the LEDs in accordance with the sequence. :param game_state: The GameState object that holds the current state of the game. :return: None """ # loop forever inside of this task while True: # if it's Blinka's Turn if game_state.current_state == STATE_BLINKA_TURN: print(f"difficulty start of blinka turn: {game_state.difficulty}") # if the sequence is empty if len(game_state.sequence) == 0: # loop for the current difficulty for _ in range(game_state.difficulty): # append a random color to the sequence game_state.sequence.append(random.choice(COLORS)) print(game_state.sequence) # wait for LED_OFF amount of time await asyncio.sleep(game_state.led_off_time / 1000) # turn on the LED for the current color in the sequence leds[game_state.sequence[game_state.index]].value = True # wait for LED_ON amount of time await asyncio.sleep(game_state.led_on_time / 1000) # turn off the LED for the current color in the sequence leds[game_state.sequence[game_state.index]].value = False # wait for LED_OFF amount of time await asyncio.sleep(game_state.led_off_time / 1000) # increment the index game_state.index += 1 # if the last index in the sequence has been passed if game_state.index >= len(game_state.sequence): # reset the index to zero game_state.index = 0 # change to the Players Turn game_state.current_state = STATE_PLAYER_TURN print("players turn!") # sleep, allowing other asyncio tasks to take action await asyncio.sleep(0) async def main(): """ Main asyncio task that will initialize the Game State and start the other tasks running. :return: None """ # initialize the Game State game_state = GameState(1, 500, 500) # if there is a saved highscore if game_state.highscore is not None: # set the highscore into it's label to show on the display highscore_val.text = str(game_state.highscore) # initialze player task player_task = asyncio.create_task(player_action(game_state)) # initialize blinka task blinka_task = asyncio.create_task(blinka_action(game_state)) # start the tasks running await asyncio.gather(player_task, blinka_task) # run the main task asyncio.run(main())
Text editor powered by tinymce.