The background_write feature added in CircuitPython 7.3 is especially handy to take full advantage of the Servo 2040 board from Pimoroni: (Not familiar with servos yet? Check out this guide to learn about servos and how to use them in CircuitPython)

Top view video of 18 servo motors hooked up to a driver board. The servo motor fans oscillate in order.
Build the hexapod/robot arm/other articulated contraption of your dreams with this all-in-one RP2040 powered servo controller with current measurement, sensor headers, and RGB...
$29.95
In Stock

This type of servo motor needs a control pulse of around 1ms to 2ms in length, repeated every 20ms. A standard PWM peripheral can control one motor, but RP2040 only has 8 PWM peripherals. This presents a challenge: How to control all 18 at once?

Pimoroni has their own library for Arduino and MicroPython. Since it's open source, the author of this guide peeked inside and saw that it could be possible in CircuitPython too, with the addition of StateMachine.background_write. Here's example code which will control 18 servo motors in oscillating fashion (though it is also perfectly OK to do without plugging motors into every position)

To reduce the potential for electrical damage, only plug or un-plug servo motors when the power is off!
# SPDX-FileCopyrightText: 2022 Jeff Epler, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
#
# Heavy inspiration from Pimoroni's "PWM Cluster":
# https://github.com/pimoroni/pimoroni-pico/blob/main/drivers/pwm/pwm_cluster.cpp
# https://github.com/pimoroni/pimoroni-pico/blob/main/drivers/pwm/pwm_cluster.pio

import array

import board
import rp2pio
import adafruit_ticks
import ulab.numpy as np
from adafruit_motor import servo


import adafruit_pioasm

_cycle_count = 3
_program = adafruit_pioasm.Program(
    """
.wrap_target
    out pins, 32            ; Immediately set the pins to their new state
    out y, 32               ; Set the counter
count_check:
    jmp y-- delay           ; Check if the counter is 0, and if so wrap around.
                            ; If not decrement the counter and jump to the delay
.wrap

delay:
    jmp count_check [1]     ; Wait a few cycles then jump back to the loop
"""
)


class PulseItem:
    def __init__(self, group, index, phase, maxval):
        self._group = group
        self._index = index
        self._phase = phase
        self._value = 0
        self._maxval = maxval
        self._turn_on = self._turn_off = None
        self._mask = 1 << index

    @property
    def frequency(self):
        return self._group.frequency

    @property
    def duty_cycle(self):
        return self._value

    @duty_cycle.setter
    def duty_cycle(self, value):
        if value < 0 or value > self._maxval:
            raise ValueError(f"value must be in the range(0, {self._maxval+1})")
        self._value = value
        self._recalculate()

    @property
    def phase(self):
        return self._phase

    @phase.setter
    def phase(self, phase):
        if phase < 0 or phase >= self._maxval:
            raise ValueError(f"phase must be in the range(0, {self._maxval})")
        self._phase = phase
        self._recalculate()

    def _recalculate(self):
        self._turn_on = self._get_turn_on()
        self._turn_off = self._get_turn_off()
        self._group._maybe_update()  # pylint: disable=protected-access

    def _get_turn_on(self):
        maxval = self._maxval
        if self._value == 0:
            return None
        if self._value == self._maxval:
            return 0
        return self.phase % maxval

    def _get_turn_off(self):
        maxval = self._maxval
        if self._value == 0:
            return None
        if self._value == self._maxval:
            return None
        return (self._value + self.phase) % maxval

    def __str__(self):
        return f"<PulseItem: {self.duty_cycle=} {self.phase=} {self._turn_on=} {self._turn_off=}>"


class PulseGroup:
    def __init__(
        self,
        first_pin,
        pin_count,
        period=0.02,
        maxval=65535,
        stagger=False,
        auto_update=True,
    ):  # pylint: disable=too-many-arguments
        """Create a pulse group with the given characteristics"""
        self._frequency = round(1 / period)
        pio_frequency = round((1 + maxval) * _cycle_count / period)
        self._sm = rp2pio.StateMachine(
            _program.assembled,
            frequency=pio_frequency,
            first_out_pin=first_pin,
            out_pin_count=pin_count,
            auto_pull=True,
            pull_threshold=32,
            **_program.pio_kwargs,
        )
        self._auto_update = auto_update
        self._items = [
            PulseItem(self, i, round(maxval * i / pin_count) if stagger else 0, maxval)
            for i in range(pin_count)
        ]
        self._maxval = maxval

    @property
    def frequency(self):
        return self._frequency

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.deinit()

    def deinit(self):
        self._sm.deinit()
        del self._items[:]

    def __getitem__(self, i):
        """Get an individual pulse generator"""
        return self._items[i]

    def __len__(self):
        return len(self._items)

    def update(self):
        changes = {0: [0, 0]}

        for i in self._items:
            turn_on = i._turn_on  # pylint: disable=protected-access
            turn_off = i._turn_off  # pylint: disable=protected-access
            mask = i._mask  # pylint: disable=protected-access

            if turn_on is not None:
                this_change = changes.get(turn_on)
                if this_change:
                    this_change[0] |= mask
                else:
                    changes[turn_on] = [mask, 0]

                # start the cycle 'on'
                if turn_off is not None and turn_off < turn_on:
                    changes[0][0] |= mask

            if turn_off is not None:
                this_change = changes.get(turn_off)
                if this_change:
                    this_change[1] |= mask
                else:
                    changes[turn_off] = [0, mask]

        def make_sequence():
            sorted_changes = sorted(changes.items())
            # Note that the first change time is always 0! Loop over range(len) is
            # to reduce allocations
            old_time = 0
            value = 0
            for time, (turn_on, turn_off) in sorted_changes:
                if time != 0:  # never occurs on the first iteration
                    yield time - old_time - 1
                old_time = time

                value = (value | turn_on) & ~turn_off
                yield value

            # the final delay value
            yield self._maxval - old_time

        buf = array.array("L", make_sequence())

        self._sm.background_write(loop=buf)

    def _maybe_update(self):
        if self._auto_update:
            self.update()

    @property
    def auto_update(self):
        return self.auto_update

    @auto_update.setter
    def auto_update(self, value):
        self.auto_update = bool(value)

    def __str__(self):
        return f"<PulseGroup({len(self)})>"


class CyclicSignal:
    def __init__(self, data, phase=0):
        self._data = data
        self._phase = 0
        self.phase = phase
        self._scale = len(self._data) - 1

    @property
    def phase(self):
        return self._phase

    @phase.setter
    def phase(self, value):
        self._phase = value % 1

    @property
    def value(self):
        idxf = self._phase * len(self._data)
        idx = int(idxf)
        frac = idxf % 1
        idx1 = (idx + 1) % len(self._data)
        val = self._data[idx]
        val1 = self._data[idx1]
        return val + (val1 - val) * frac

    def advance(self, delta):
        self._phase = (self._phase + delta) % 1


if __name__ == "__main__":
    pulsers = PulseGroup(board.SERVO_1, 18, auto_update=False)
    # Set the phase of each servo so that servo 0 starts at offset 0ms, servo 1
    # at offset 2.5ms, ...
    # For up to 8 servos, this means their duty cycles do not overlap.  Otherwise,
    # servo 9 is also at offset 0ms, etc.
    for j, p in enumerate(pulsers):
        p.phase = 8192 * (j % 8)

    servos = [servo.Servo(p) for p in pulsers]

    sine = np.sin(np.linspace(0, 2 * np.pi, 50, endpoint=False)) * 0.5 + 0.5
    print(sine)

    signals = [CyclicSignal(sine, j / len(servos)) for j in range(len(servos))]

    t0 = adafruit_ticks.ticks_ms()
    while True:
        t1 = adafruit_ticks.ticks_ms()
        for servo, signal in zip(servos, signals):
            signal.advance((t1 - t0) / 8000)
            servo.fraction = signal.value
        pulsers.update()
        print(adafruit_ticks.ticks_diff(t1, t0), "ms")
        t0 = t1

The key technique, which the author became aware of through Pimoroni's open source code, is to organize the PIO's data as pairs of numbers: First, 32 bits to give the new value of up to 32 output pins; Second, an additional 32 bits to give the length of time the pins should be held with this value.

The Python code simply needs to consider all the individual PWM signals in turn, and assemble a list of steps that mean something like "Turn off everything. Wait 1.5ms, then turn on output 1.  Wait 1 ms, then turn off output 1 and turn on output 2. Wait 1.75ms", and so on. In order to meet the requirements of servo motors, the whole list of steps is carefully controlled to take exactly 20 milliseconds to follow. Then, Python sends the list of steps to the PIO module to be looped "forever", or until the next list of steps is calculated and sent.

Note that the "list of steps" are sent as data to the PIO. This is distinct from the pio program, which in effect interprets the list of steps.

_program = adafruit_pioasm.Program(                                             
    """                                                                         
.wrap_target                                                                    
    out pins, 32            ; Immediately set the pins to their new state       
    out y, 32               ; Set the counter                                   
count_check:                                                                    
    jmp y-- delay           ; Check if the counter is 0, and if so wrap around. 
                            ; If not decrement the counter and jump to the delay
.wrap                                                                           
                                                                                
delay:                                                                          
    jmp count_check [1]     ; Wait a few cycles then jump back to the loop      
"""                                                                             
)

To try out the code, simply copy it to a Pimoroni Servo 2040 board loaded with CircuitPython 7.3.0 or newer (or other RP2040 board, just change the code to use a different starting pin instead of board.SERVO_1) and hook up some RC servo motors to the appropriate headers.

The code is quite lengthy, but provides a class that can be useful in other code. A PulseGroup acts like a collection of PWM objects, each of which behaves similarly enough to pulseio.PWMOut to work with the Adafruit-CircuitPython-Motor library. Even better, by setting the phase of each PWM output separately, the overlap of the pulses for different motors can be minimized or eliminated, which may decrease peak current usage.

PulseGroup could be used for other tasks as well, like controlling the brightness of multiple LEDs.

There are bound to be other uses for the background write capability. Why not code one up and submit a new example on our GitHub?

This guide was first published on Mar 03, 2021. It was last updated on May 10, 2022.

This page (Advanced: Using PIO to control Servos with Background Writes) was last updated on May 23, 2023.

Text editor powered by tinymce.