The LED blinking tasks discussed on the previous page don't know about each other. In fact, that's almost the whole point: the tasks can run independently and still keep good time because they are using asyncio.sleep()
to take turns running.
Control One Blinking LED
Now suppose you want to pass information to the LED tasks that affects what they do. For instance, suppose you want modify the blink rate, based on some button pushes.
You could monitor the buttons in the blink task, but that makes the blink task more complicated. This example makes a separate task that monitors the buttons. It will change a value in a shared object to tell the blink task what to do.
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import asyncio import board import digitalio import keypad class Interval: """Simple class to hold an interval value. Use .value to to read or write.""" def __init__(self, initial_interval): self.value = initial_interval async def monitor_interval_buttons(pin_slower, pin_faster, interval): """Monitor two buttons: one lengthens the interval, the other shortens it. Change interval.value as appropriate. """ # Assume buttons are active low. with keypad.Keys( (pin_slower, pin_faster), value_when_pressed=False, pull=True ) as keys: while True: key_event = keys.events.get() if key_event and key_event.pressed: if key_event.key_number == 0: # Lengthen the interval. interval.value += 0.1 else: # Shorten the interval. interval.value = max(0.1, interval.value - 0.1) print("interval is now", interval.value) # Let another task run. await asyncio.sleep(0) async def blink(pin, interval): """Blink the given pin forever. The blinking rate is controlled by the supplied Interval object. """ with digitalio.DigitalInOut(pin) as led: led.switch_to_output() while True: led.value = not led.value await asyncio.sleep(interval.value) async def main(): # Start blinking 0.5 sec on, 0.5 sec off. interval = Interval(0.5) led_task = asyncio.create_task(blink(board.D1, interval)) interval_task = asyncio.create_task( monitor_interval_buttons(board.D3, board.D4, interval) ) # This will run forever, because neither task ever exits. await asyncio.gather(led_task, interval_task) asyncio.run(main())
In the program above, the led_task
and the interval_task
share the interval
object, but otherwise don't know about each other. You can change the details of one without the other one having to change.
There is one new interesting thing here. In monitor_interval_buttons()
, it waits for keypresses in an infinite loop. Regardless of what happens, each time around the loop, it does an await asyncio.sleep(0)
, which gives control back to the task scheduler. This is the standard way in asyncio of saying "I've run long enough, let other tasks run". If there's no other task ready to run, the scheduler will give back control immediately, since the sleep time is 0
.
Control Two Blinking LEDs
Now suppose you wanted to control two LEDs, with different buttons. That's easy: just create more tasks. Below is an example that does that. Only the main()
has changed. The rest of the program stays the same.
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import asyncio import board import digitalio import keypad class Interval: """Simple class to hold an interval value. Use .value to to read or write.""" def __init__(self, initial_interval): self.value = initial_interval async def monitor_interval_buttons(pin_slower, pin_faster, interval): """Monitor two buttons: one lengthens the interval, the other shortens it. Change interval.value as appropriate. """ # Assume buttons are active low. with keypad.Keys( (pin_slower, pin_faster), value_when_pressed=False, pull=True ) as keys: while True: key_event = keys.events.get() if key_event and key_event.pressed: if key_event.key_number == 0: # Lengthen the interval. interval.value += 0.1 else: # Shorten the interval. interval.value = max(0.1, interval.value - 0.1) print("interval is now", interval.value) # Let another task run. await asyncio.sleep(0) async def blink(pin, interval): """Blink the given pin forever. The blinking rate is controlled by the supplied Interval object. """ with digitalio.DigitalInOut(pin) as led: led.switch_to_output() while True: led.value = not led.value await asyncio.sleep(interval.value) async def main(): interval1 = Interval(0.5) interval2 = Interval(1.0) led1_task = asyncio.create_task(blink(board.D1, interval1)) led2_task = asyncio.create_task(blink(board.D2, interval2)) interval1_task = asyncio.create_task( monitor_interval_buttons(board.D3, board.D4, interval1) ) interval2_task = asyncio.create_task( monitor_interval_buttons(board.D5, board.D6, interval2) ) await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task) asyncio.run(main())
No Race Conditions
You might wonder if this technique of using shared data might run into race conditions, where data written by one task is in an inconsistent state when read by another task. As long as your task makes its data consistent before giving up control to the scheduler by using await
, this won't happen. Since the tasks are cooperatively taking turns, one task cannot interrupt another to run.
To use jargon, the code between two await
statements in a task is like a critical section: it can't be interrupted by another task.
(This is not true if you are using multiple event loops in multiple threads, but CircuitPython asyncio does not currently provide that.)
Text editor powered by tinymce.