To demonstrate cooperative multitasking, the place to start is with simple examples of implementing one or two independently blinking LEDs. First there will be examples without asyncio, and then the guide will show how to use asyncio tasks to code the same examples.
You'll find it helpful to try these examples yourself, and experiment with modifying them a little. You just need to wire up a couple of LEDs with resistors to two pins. Later, you'll need some pushbuttons. Here's a typical wiring diagram for the Adafruit Metro M4 Express board, but you can use almost any board. See the Hardware section on the previous page.
Blinking LEDs without asyncio
One LED
Suppose you want to make one LED blink, without using asyncio. The program below does that. It is fancier than the simplest blink example you may have seen, because it will be built upon as more examples are presented.
The example uses pin board.D1
, because later examples use another pin for another LED. But use whichever pin you want, such as board.LED
(if the board has such).
The program uses with
for automatic deinit()
of DigitalInOut
. It blinks the LED 10 times, and then it prints "done".
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import time import board import digitalio def blink(pin, interval, count): with digitalio.DigitalInOut(pin) as led: led.switch_to_output(value=False) for _ in range(count): led.value = True time.sleep(interval) led.value = False time.sleep(interval) def main(): blink(board.D1, 0.25, 10) print("done") main()
Two LEDs
Now suppose you want to add another LED, blinking at a different rate, and blinking 20 times instead of 10. Both LEDs should start blinking at the same time, though they'll blink at different rates.
But if the program above is expanded in a thoughtless way, it won't work. Suppose just calling blink()
a second time, after the first. The first blink()
takes control and keeps control, even when it's sleeping with time.sleep()
. The second blink()
, for the second LED, only gets to run after the first blink()
is all done. So this doesn't work.
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT # DOESN'T WORK import time import board import digitalio def blink(pin, interval, count): with digitalio.DigitalInOut(pin) as led: led.switch_to_output(value=False) for _ in range(count): led.value = True time.sleep(interval) led.value = False time.sleep(interval) def main(): blink(board.D1, 0.25, 10) # DOESN'T WORK # Second LED blinks only after the first one is finished. blink(board.D2, 0.1, 20) main()
It turns out, it's quite a bit more complicated to get both LEDs to blink at the same time without using asyncio
. The example below is just one way to solve the problem: there are many other ways. The program needs to check time constantly to see whether it's time for the LED to change state. A class was added to keep the necessary state. In essence, the program is using its own special-purpose task and event loop mechanism, just for this particular example.
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import time import board import digitalio class Blinker: def __init__(self, led, interval, count): self.led = led self.interval = interval # Count both on and off. self.count2 = count * 2 self.last_transition = 0 def blink(self): """Return False when blinking is finished.""" if self.count2 <= 0: return False now = time.monotonic() if now > self.last_transition + self.interval: self.led.value = not self.led.value self.last_transition = now self.count2 -= 1 return True def main(): with digitalio.DigitalInOut(board.D1) as led1, digitalio.DigitalInOut( board.D2 ) as led2: led1.switch_to_output(value=False) led2.switch_to_output(value=False) blinker1 = Blinker(led1, 0.25, 10) blinker2 = Blinker(led2, 0.1, 20) running1 = True running2 = True while running1 or running2: running1 = blinker1.blink() running2 = blinker2.blink() print("done") main()
As you can see doing two tasks together, in harmony, is not nearly as straightforward as one might like. Now to see how asyncio can make this easier.
Blinking LEDs with asyncio
Now try the same examples like the ones above, but this time with asyncio.
One LED
Asyncio isn't needed to blink just one LED, but this example is written in asyncio style anyway. Notice that this example looks similar to the non-asyncio one-LED example above, but there are significant differences:
- The
time.sleep()
calls are replaced withawait asyncio.sleep()
. - The
blink()
andmain()
functions are defined asasync def
instead of justdef
. - A
Task
object is created (which starts it running) Thenawait asyncio.gather()
will wait for the task to complete. - Instead of just calling
main()
, callasyncio.run(main())
.
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import asyncio import board import digitalio async def blink(pin, interval, count): # Don't forget the async! with digitalio.DigitalInOut(pin) as led: led.switch_to_output(value=False) for _ in range(count): led.value = True await asyncio.sleep(interval) # Don't forget the await! led.value = False await asyncio.sleep(interval) # Don't forget the await! async def main(): # Don't forget the async! led_task = asyncio.create_task(blink(board.D1, 0.25, 10)) await asyncio.gather(led_task) # Don't forget the await! print("done") asyncio.run(main())
So what's going on here? First, every function or method that contains an await
must be defined as async def
, to indicate that it's a coroutine. Second, you can't call an async
function directly from non-async code. Instead you must use asyncio.run()
or a similar special function to bridge the gap between the non-async code (the mainline code in code.py) and async code.
What does await
mean, anyway? It indicates a point in the code where the coroutine or task that is running gives up control to the scheduler, and waits for another async routine to complete. await
means "I need to wait for something; let other tasks run until I'm ready to resume." In blink()
above, it uses await asyncio.sleep()
. When when the code goes to sleep, another task can be run. When the sleep()
is over, this coroutine will resume.
In main()
, we first create a Task
. We instantiate the blink()
coroutine by calling it with the arguments we want, and then we pass that coroutine to asyncio.create_task()
. create_task()
wraps the coroutine in a Task
, and then schedules the task to run "soon". "Soon" means it will get a turn to run as soon other existing tasks have given up control.
Then the program uses await asyncio.gather()
, which waits for all the tasks it's passed to finish. In this case, there's only one task to wait for.
Tasks and coroutines are both Awaitable
objects, which means they can be await
-ed. In fact, to get them to run, you have to await them.
Note that you have to use await
to get a coroutine or a Task
to do something. If you forget an await
, nothing will happen, and you don't necessarily get an error immediately. Hence the cautionary comments above.
Two LEDs
The next example blinks two LEDs, again using asyncio. Note that this code is almost the same as the one-LED example above. The blink()
function is exactly the same. This time two tasks in main()
are created, one for each LED. await asyncio.gather()
is used, but it is passed two tasks instead of one.
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import asyncio import board import digitalio async def blink(pin, interval, count): with digitalio.DigitalInOut(pin) as led: led.switch_to_output(value=False) for _ in range(count): led.value = True await asyncio.sleep(interval) # Don't forget the "await"! led.value = False await asyncio.sleep(interval) # Don't forget the "await"! async def main(): led1_task = asyncio.create_task(blink(board.D1, 0.25, 10)) led2_task = asyncio.create_task(blink(board.D2, 0.1, 20)) await asyncio.gather(led1_task, led2_task) # Don't forget "await"! print("done") asyncio.run(main())
Try this example, and see how the tasks appear to start at the same time. Both run to completion, and then you'll see "done" printed out.
Summary
Here are the key things to remember from these examples:
- Define a coroutine with
async def
. - Give up control in a coroutine with
await
. - Sleep in a coroutine with
await asyncio.sleep(interval)
. - Create a task that will run soon with with
asyncio.create_task(some_coroutine(arg1, arg2, ...))
. - Wait for tasks to finish with await
asyncio.gather(task1, task2, ...)
. - Don't forget
await
.
Text editor powered by tinymce.