CircuitPython uses the asyncio library to support cooperative multitasking in CircuitPython, which includes the async and await language keywords. Cooperative multitasking is a style of programming in which multiple tasks take turns running. Each task runs until it needs to wait for something, or until it decides it has run for long enough and should let another task run.

In cooperative multitasking, a scheduler manages the tasks. Only one task runs at a time. When a task gives up control and starts waiting, the scheduler starts another task that is ready to run. The scheduler runs an event loop which repeats this process over and over for all the tasks assigned to the event loop.

A task is a kind of coroutine. A coroutine can stop in the middle of some code. When the coroutine is called again, it starts where it left off. A coroutine is declared with the keyword async, and the keyword await indicates that the coroutine is giving up control at that point.

This diagram shows the scheduler, running an event loop, with three tasks: Task 1 is running, Task 2 is ready to run, and is waiting for Task 1 to give up control, and Task 3 is waiting for something else, and isn't ready to run yet.

asyncio Demonstration

The example on this page demonstrates a basic use of asyncio. This uses a microcontroller and a button to control two animations displayed on two different NeoPixel rings. One ring displays a rainbow swirl, and the other displays a blink animation at a 0.5 second interval. Pressing the button reverses the direction of the rainbow swirl, and speeds up the blink animation to a 0.1 second interval. Releasing the button returns both to their initial states.

Wiring

The first step is wiring up the NeoPixel rings to your microcontroller.

  • NeoPixel Rings

    • NeoPixel ring one: data in (DIN) to microcontroller A1
    • NeoPixel ring one: ground to microcontroller GND
    • NeoPixel ring one: V+ to microcontroller 3V
    • NeoPixel ring two: data in (DIN) to microcontroller A2
    • NeoPixel ring two: ground to microcontroller GND
    • NeoPixel ring two: V+ to microcontroller 3V

    Button

    • The built-in Boot button (highlighted in magenta in the wiring diagram) is located along the bottom of the board, towards the center.

asyncio Example Code

Once everything is wired up, the next step is to load the example code onto your microcontroller.

To run this example, you'll need to include a few libraries onto your CIRCUITPY drive. Then you need to update code.py with the example code.

Thankfully, this can be done in one go. In the example below, click the Download Project Bundle button below to download the necessary libraries and the code.py file in a zip file. Extract the contents of the zip file, and copy the entire lib folder and the code.py file to your CIRCUITPY drive.

# SPDX-FileCopyrightText: Copyright (c) 2022 Dan Halbert for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2022 Kattni Rembor for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
CircuitPython asyncio example for two NeoPixel rings and one button.
"""
import asyncio
import board
import neopixel
import keypad
from rainbowio import colorwheel

button_pin = board.BUTTON  # The pin the button is connected to.
num_pixels = 16  # The number of NeoPixels on a single ring.
brightness = 0.2  # The LED brightness.

# Set up NeoPixel rings.
ring_one = neopixel.NeoPixel(board.A1, num_pixels, brightness=brightness, auto_write=False)
ring_two = neopixel.NeoPixel(board.A2, num_pixels, brightness=brightness, auto_write=False)


class AnimationControls:
    """The controls to allow you to vary the rainbow and blink animations."""
    def __init__(self):
        self.reverse = False
        self.wait = 0.0
        self.delay = 0.5


async def rainbow_cycle(controls):
    """Rainbow cycle animation on ring one."""
    while True:
        for j in range(255, -1, -1) if controls.reverse else range(0, 256, 1):
            for i in range(num_pixels):
                rc_index = (i * 256 // num_pixels) + j
                ring_one[i] = colorwheel(rc_index & 255)
            ring_one.show()
            await asyncio.sleep(controls.wait)


async def blink(controls):
    """Blink animation on ring two."""
    while True:
        ring_two.fill((0, 0, 255))
        ring_two.show()
        await asyncio.sleep(controls.delay)
        ring_two.fill((0, 0, 0))
        ring_two.show()
        await asyncio.sleep(controls.delay)
        await asyncio.sleep(controls.wait)


async def monitor_button(button, controls):
    """Monitor button that reverses rainbow direction and changes blink speed.
    Assume button is active low.
    """
    with keypad.Keys((button,), value_when_pressed=False, pull=True) as key:
        while True:
            key_event = key.events.get()
            if key_event:
                if key_event.pressed:
                    controls.reverse = True
                    controls.delay = 0.1
                elif key_event.released:
                    controls.reverse = False
                    controls.delay = 0.5
            await asyncio.sleep(0)


async def main():
    animation_controls = AnimationControls()
    button_task = asyncio.create_task(monitor_button(button_pin, animation_controls))
    animation_task = asyncio.create_task(rainbow_cycle(animation_controls))
    blink_task = asyncio.create_task(blink(animation_controls))

    # This will run forever, because no tasks ever finish.
    await asyncio.gather(button_task, animation_task, blink_task)

asyncio.run(main())

Your CIRCUITPY drive contents should resemble the image below.

You should have at least the following file in the top level of the CIRCUITPY drive:

  • code.py

Your CIRCUITPY/lib folder should contain at least the following folder and files:

  • asyncio/
  • adafruit_ticks.mpy
  • neopixel.mpy
CIRCUITPY

Ring one will light up in a rainbow swirl. Ring two will begin blinking blue at a 0.5 second interval.

Now, press the button. The rainbow swirl on ring one will reverse direction, and the blinking on ring two will speed up!

Now release the button. The rainbow swirl on ring one returns to its original direction, and the blinking on ring two returns to its original speed!

The microcontroller in this image isn't the same as yours, but the concept is identical!

Code Walkthrough

First you import the necessary modules and libraries.

import asyncio
import board
import neopixel
import keypad
from rainbowio import colorwheel

Then, you specify the button pin, the number of LEDs in each NeoPixel ring, and the LED brightness.

Next you set up the two NeoPixel rings on pins A1 and A2, using the number of pixels and brightness specified above, and setting auto_write=False.

ring_one = neopixel.NeoPixel(board.A1, num_pixels, brightness=brightness, auto_write=False)
ring_two = neopixel.NeoPixel(board.A2, num_pixels, brightness=brightness, auto_write=False)

Following set up, you create a class called AnimationControls. This class provides ways to control the animations with asyncio.

class AnimationControls:
    def __init__(self):
        self.reverse = False
        self.wait = 0.0
        self.delay = 0.5

Then, you have the rainbow and blink animation code. This is where the asyncio-specific code begins.

In terms of the animation parts of the code, the first function is the rainbow cycle animation code. This is pretty standard except for the second line of code. In this example, the line beginning with for j in includes non-standard code for the rainbow cycle in reverse: range(255, -1, -1) if controls.reverse, followed by the standard forward rainbow cycle code - range(0, 256, 1).

async def rainbow_cycle(controls):
    """Rainbow cycle animation on ring one."""
    while True:
        for j in range(255, -1, -1) if controls.reverse else range(0, 256, 1):
            for i in range(num_pixels):
                rc_index = (i * 256 // num_pixels) + j
                ring_one[i] = colorwheel(rc_index & 255)
            ring_one.show()
            await asyncio.sleep(controls.wait)

The second function is the blink animation code. This is typical. You fill all the NeoPixel LEDs blue, delay for a specified amount of time, then turn all of the LEDs off, and delay for the same specified amount of time.

async def blink(controls):
    """Blink animation on ring two."""
    while True:
        ring_two.fill((0, 0, 255))
        ring_two.show()
        await asyncio.sleep(controls.delay)
        ring_two.fill((0, 0, 0))
        ring_two.show()
        await asyncio.sleep(controls.delay)
        await asyncio.sleep(controls.wait)

In both functions, you must call show() on the NeoPixel ring object to get the animations to run because you set auto_write=False in the NeoPixel ring setup.

Notice that the controls object provides the animation direction (controls.reverse), the delay between steps of the animation (controls.delay), and the delay between complete animations (controls.wait).

In terms of the asyncio-specific parts of this code, you'll notice that both of these functions begin with async def. Every function that contains an await must be defined as async def, to indicate that it's a coroutine. 

Both functions contain one or more await lines. What does await mean? await means "I need to wait for something; let other tasks run until I'm ready to resume." Both include await asyncio.sleep(). Basically, when when the code goes to "sleep", another task can be run. When the sleep() is over, this coroutine will resume.

The blink() includes the following line of code twice, which utilizes the .delay attribute of the AnimationsControl object.

await asyncio.sleep(controls.delay)

Both functions end with the following line of code which utilizes the .wait attribute of the AnimationsControl object.

await asyncio.sleep(controls.wait)

The next function is called main()

In main(), first create a task. For the button_task, instantiate the monitor_button() coroutine by calling it with the arguments desired, and then pass that coroutine to asyncio.create_task(). create_task() wraps the coroutine in a task, and then schedules the task to run "soon". "Soon" means it will get a turn to run as soon other existing tasks have given up control.

Then the program uses await asyncio.gather(), which waits for all the tasks it's passed to finish.

async def main():
    animation_controls = AnimationControls()
    button_task = asyncio.create_task(monitor_button(button_pin, animation_controls))
    animation_task = asyncio.create_task(rainbow_cycle(animation_controls))
    blink_task = asyncio.create_task(blink(animation_controls))

    await asyncio.gather(button_task, animation_task, blink_task)

Finally, run the main() function to execute the code within.

asyncio.run(main())

My program ended? What happened?

await.gather(...) runs until all the listed tasks have finished. If gather completes, that means all the tasks listed have finished.

The most common causes of a task ending are:

  • an exception occurred causing the task to end
  • the task function finished

If you want to ensure the task executes forever, have a loop in your task function (e.g. a while True loop).

The following example is greatly oversimplified, but demonstrates what including a loop in your task function might look like.

async def never_ending_task():
    while True:
        print("I'm looping!")
        await asyncio.sleep(0)

This guide was first published on Apr 20, 2022. It was last updated on 2022-04-20 14:41:15 -0400.

This page (Multitasking with asyncio) was last updated on May 24, 2022.

Text editor powered by tinymce.