The LED blinking tasks discussed on the previous page don't know about each other. In fact, that's almost the whole point: the tasks can run independently and still keep good time because they are using asyncio.sleep() to take turns running.

Control One Blinking LED

Now suppose you want to pass information to the LED tasks that affects what they do. For instance, suppose you want modify the blink rate, based on some button pushes.

You could monitor the buttons in the blink task, but that makes the blink task more complicated. This example makes a separate task that monitors the buttons. It will change a value in a shared object to tell the blink task what to do.

import asyncio
import board
import digitalio
import keypad


class Interval:
    """Simple class to hold an interval value. Use .value to to read or write."""

    def __init__(self, initial_interval):
        self.value = initial_interval


async def monitor_interval_buttons(pin_slower, pin_faster, interval):
    """Monitor two buttons: one lengthens the interval, the other shortens it.
    Change interval.value as appropriate.
    """
    # Assume buttons are active low.
    with keypad.Keys(
        (pin_slower, pin_faster), value_when_pressed=False, pull=True
    ) as keys:
        while True:
            key_event = keys.events.get()
            if key_event and key_event.pressed:
                if key_event.key_number == 0:
                    # Lengthen the interval.
                    interval.value += 0.1
                else:
                    # Shorten the interval.
                    interval.value = max(0.1, interval.value - 0.1)
                print("interval is now", interval.value)
            # Let another task run.
            await asyncio.sleep(0)


async def blink(pin, interval):
    """Blink the given pin forever. The blinking rate is controlled by the supplied Interval object."""
    with digitalio.DigitalInOut(pin) as led:
        led.switch_to_output()
        while True:
            led.value = not led.value
            await asyncio.sleep(interval.value)

async def main():
    # Start blinking 0.5 sec on, 0.5 sec off.
    interval = Interval(0.5)

    led_task = asyncio.create_task(blink(board.D1, interval))
    interval_task = asyncio.create_task(monitor_interval_buttons(board.D3, board.D4, interval))
    # This will run forever, because neither task ever exits.
    await asyncio.gather(led_task, interval_task)

asyncio.run(main())

In the program above, the led_task and the interval_task share the interval object, but otherwise don't know about each other. You can change the details of one without the other one having to change.

There is one new interesting thing here. In monitor_interval_buttons(), it waits for keypresses in an infinite loop. Regardless of what happens, each time around the loop, it does an await asyncio.sleep(0), which gives control back to the task scheduler. This is the standard way in asyncio of saying "I've run long enough, let other tasks run". If there's no other task ready to run, the scheduler will give back control immediately, since the sleep time is 0.

Control Two Blinking LEDs 

Now suppose you wanted to control two LEDs, with different buttons. That's easy: just create more tasks. Below is a main() to do that. The rest of the program stays the same.

async def main():
    interval1 = Interval(0.5)
    interval2 = Interval(1.0)

    led1_task = asyncio.create_task(blink(board.D1, interval1))
    led2_task = asyncio.create_task(blink(board.D2, interval2))
    interval1_task = asyncio.create_task(monitor_interval_buttons(board.D3, board.D4, interval1))
    interval2_task = asyncio.create_task(monitor_interval_buttons(board.D5, board.D6, interval2))

    await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)

No Race Conditions

You might wonder if this technique of using shared data might run into race conditions, where data written by one task is in an inconsistent state when read by another task. As long as your task makes its data consistent before giving up control to the scheduler by using await, this won't happen. Since the tasks are cooperatively taking turns, one task cannot interrupt another to run.

To use jargon, the code between two await statements in a task is like a critical section: it can't be interrupted by another task.

(This is not true if you are using multiple event loops in multiple threads, but CircuitPython asyncio does not currently provide that.)

This guide was first published on Nov 23, 2021. It was last updated on 2021-11-23 19:34:33 -0500.

This page (Communicating Between Tasks) was last updated on Dec 01, 2021.

Text editor powered by tinymce.