First the code starts off by importing all of the libraries that will be used. Ones to take note of are the rpc library which is project specific and the secrets which should have been set up in an earlier step.

import time
import displayio
import terminalio
from adafruit_display_shapes.rect import Rect
from rpc import RpcClient, RpcError
from adafruit_display_text import label
from adafruit_macropad import MacroPad
from secrets import secrets

Now to initialize the MacroPad and RpcClient libraries.

macropad = MacroPad()
rpc = RpcClient()

Next are the configurable settings:

  • COMMAND_TOPIC is what Home Assistant should listen to.
  • SUBSCRIBE_TOPICS are the MQTT topics that the code should subscribe to in order to get the current status of the lights. It is highly likely that you will need to change this in order to match your specific setup.
  • ENCODER_ITEM refers to the key_number that should be sent when pressing the encoder knob. If you don't want it to respond, change the value to None.
  • KEY_LABELS are just the labels that are displayed that correspond to the buttons.
  • UPDATE_DELAY is the amount of time in seconds that the code should wait after sending a command before checking the status of the light. If it often seems to be the wrong status, you may want to increase the value, but it will seem less snappy.
  • NEOPIXEL_COLORS refer to the value that the NeoPixels should light up corresponding to the value of the possible answers in SUBSCRIBE_TOPICS.
COMMAND_TOPIC = "macropad/peripheral"
SUBSCRIBE_TOPICS = ("stat/demoswitch/POWER", "stat/office-light/POWER")
ENCODER_ITEM = 0
KEY_LABELS = ("Demo", "Office")
UPDATE_DELAY = 0.25
NEOPIXEL_COLORS = {
    "OFF": 0xff0000,
    "ON": 0x00ff00,
}

Next to define a custom MqttError to differentiate it from other Python errors.

class MqttError(Exception):
    """For MQTT Specific Errors"""
    pass

The next bit of code will draw the labels that display what the buttons do and was borrowed from the MACROPAD Hotkeys guide because of the nice aesthetic.

group = displayio.Group()
for key_index in range(12):
    x = key_index % 3
    y = key_index // 3
    group.append(label.Label(terminalio.FONT, text=(str(KEY_LABELS[key_index]) if key_index < len(KEY_LABELS) else ''), color=0xFFFFFF,
                             anchored_position=((macropad.display.width - 1) * x / 2,
                                                macropad.display.height - 1 -
                                                (3 - y) * 12),
                             anchor_point=(x / 2, 1.0)))
group.append(Rect(0, 0, macropad.display.width, 12, fill=0xFFFFFF))
group.append(label.Label(terminalio.FONT, text='Home Assistant', color=0x000000,
                         anchored_position=(macropad.display.width//2, -2),
                         anchor_point=(0.5, 0.0)))
macropad.display.show(group)

This next function is simple, but makes things much easier. It allows you to specify the function you would like to call remotely and pass in the parameters in the same way as you would pass them into the remote function. It also handles raising the appropriate kind of error or returning the Return Value if it was successful.

def rpc_call(function, *args, **kwargs):
    response = rpc.call(function, *args, **kwargs)
    if response["error"]:
        if response["error_type"] == "mqtt":
            raise MqttError(response["message"])
        raise RpcError(response["message"])
    return response["return_val"]

The next couple of functions use the rpc_call() function to connect to MQTT and update the key colors.

def mqtt_init():
    rpc_call("mqtt_init", secrets["mqtt_broker"], username=secrets["mqtt_username"], password=secrets["mqtt_password"], port=secrets["mqtt_port"])
    rpc_call("mqtt_connect")

def update_key(key_number):
    switch_state = rpc_call("mqtt_get_last_value", SUBSCRIBE_TOPICS[key_number])
    if switch_state is not None:
        macropad.pixels[key_number] = NEOPIXEL_COLORS[switch_state]
    else:
        macropad.pixels[key_number] = 0

This bit of code waits for the server to start running by attempting to call a simple function and checking if an RpcError is being returned.

server_is_running = False
print("Waiting for server...")
while not server_is_running:
    try:
        server_is_running = rpc_call("is_running")
        print("Connected")
    except RpcError:
        pass

Once it is all connected, one last bit of code is run before entering the main loop. It just connects to MQTT and then subscribes to all of the SUBSCRIBE_TOPICS.

mqtt_init()
last_macropad_encoder_value = macropad.encoder

for key_number, topic in enumerate(SUBSCRIBE_TOPICS):
    rpc_call("mqtt_subscribe", topic)
    update_key(key_number)

The main loop just listens to the MacroPad library for button presses and encoder changes and if it detects them it will publish that change to MQTT.

while True:
    output = {}

    key_event = macropad.keys.events.get()
    if key_event and key_event.pressed:
        output["key_number"] = key_event.key_number

    if macropad.encoder != last_macropad_encoder_value:
        output["encoder"] = macropad.encoder - last_macropad_encoder_value
        last_macropad_encoder_value = macropad.encoder

    macropad.encoder_switch_debounced.update()
    if macropad.encoder_switch_debounced.pressed and "key_number" not in output and ENCODER_ITEM is not None:
        output["key_number"] = ENCODER_ITEM

    if output:
        try:
            rpc_call("mqtt_publish", COMMAND_TOPIC, output)
            if "key_number" in output:
                time.sleep(UPDATE_DELAY)
                update_key(output["key_number"])
            elif ENCODER_ITEM is not None:
                update_key(ENCODER_ITEM)
        except MqttError:
            mqtt_init()
        except RpcError as err_msg:
            print(err_msg)

Full Code Listing

# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: Unlicense
"""
Home Assistant Remote Procedure Call for MacroPad.
"""
import time
import displayio
import terminalio
from adafruit_display_shapes.rect import Rect
from rpc import RpcClient, RpcError
from adafruit_display_text import label
from adafruit_macropad import MacroPad
from secrets import secrets

macropad = MacroPad()
rpc = RpcClient()

COMMAND_TOPIC = "macropad/peripheral"
SUBSCRIBE_TOPICS = ("stat/demoswitch/POWER", "stat/office-light/POWER")
ENCODER_ITEM = 0
KEY_LABELS = ("Demo", "Office")
UPDATE_DELAY = 0.25
NEOPIXEL_COLORS = {
    "OFF": 0xff0000,
    "ON": 0x00ff00,
}

class MqttError(Exception):
    """For MQTT Specific Errors"""
    pass
# Set up displayio group with all the labels
group = displayio.Group()
for key_index in range(12):
    x = key_index % 3
    y = key_index // 3
    group.append(label.Label(terminalio.FONT, text=(str(KEY_LABELS[key_index]) if key_index < len(KEY_LABELS) else ''), color=0xFFFFFF,
                             anchored_position=((macropad.display.width - 1) * x / 2,
                                                macropad.display.height - 1 -
                                                (3 - y) * 12),
                             anchor_point=(x / 2, 1.0)))
group.append(Rect(0, 0, macropad.display.width, 12, fill=0xFFFFFF))
group.append(label.Label(terminalio.FONT, text='Home Assistant', color=0x000000,
                         anchored_position=(macropad.display.width//2, -2),
                         anchor_point=(0.5, 0.0)))
macropad.display.show(group)

def rpc_call(function, *args, **kwargs):
    response = rpc.call(function, *args, **kwargs)
    if response["error"]:
        if response["error_type"] == "mqtt":
            raise MqttError(response["message"])
        raise RpcError(response["message"])
    return response["return_val"]

def mqtt_init():
    rpc_call("mqtt_init", secrets["mqtt_broker"], username=secrets["mqtt_username"], password=secrets["mqtt_password"], port=secrets["mqtt_port"])
    rpc_call("mqtt_connect")

def update_key(key_number):
    if key_number < len(SUBSCRIBE_TOPICS):
        switch_state = rpc_call("mqtt_get_last_value", SUBSCRIBE_TOPICS[key_number])
        if switch_state is not None:
            macropad.pixels[key_number] = NEOPIXEL_COLORS[switch_state]
        else:
            macropad.pixels[key_number] = 0

server_is_running = False
print("Waiting for server...")
while not server_is_running:
    try:
        server_is_running = rpc_call("is_running")
        print("Connected")
    except RpcError:
        pass

mqtt_init()
last_macropad_encoder_value = macropad.encoder

for key_number, topic in enumerate(SUBSCRIBE_TOPICS):
    rpc_call("mqtt_subscribe", topic)
    update_key(key_number)

while True:
    output = {}

    key_event = macropad.keys.events.get()
    if key_event and key_event.pressed:
        output["key_number"] = key_event.key_number

    if macropad.encoder != last_macropad_encoder_value:
        output["encoder"] = macropad.encoder - last_macropad_encoder_value
        last_macropad_encoder_value = macropad.encoder

    macropad.encoder_switch_debounced.update()
    if macropad.encoder_switch_debounced.pressed and "key_number" not in output and ENCODER_ITEM is not None:
        output["key_number"] = ENCODER_ITEM

    if output:
        try:
            rpc_call("mqtt_publish", COMMAND_TOPIC, output)
            if "key_number" in output:
                time.sleep(UPDATE_DELAY)
                update_key(output["key_number"])
            elif ENCODER_ITEM is not None:
                update_key(ENCODER_ITEM)
        except MqttError:
            mqtt_init()
        except RpcError as err_msg:
            print(err_msg)

Host Computer Code

Finally there is code that runs on the host computer and acts as a server. First are the imported libraries:

import time
import json
import ssl
import socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from rpc import RpcServer

Next are a few variables to keep track of the state of things:

mqtt_client = None
mqtt_connected = False
last_mqtt_messages = {}

Next is a list of protected functions. The purpose of this list is to prevent calling these function to avoid memory loops or other situations that would likely crash Python or may result in some difficult to debug situations.

# For program flow purposes, we do not want these functions to be called remotely
PROTECTED_FUNCTIONS = ["main", "handle_rpc"]

These functions are to keep track of our connection and the status of the topics that are being watched. These are used as callbacks when MQTT is initialized.

def connect(mqtt_client, userdata, flags, rc):
    global mqtt_connected
    mqtt_connected = True

def disconnect(mqtt_client, userdata, rc):
    global mqtt_connected
    mqtt_connected = False

def message(client, topic, message):
    last_mqtt_messages[topic] = message

Next tp define a custom MqttError like was done in the MacroPad code.

class MqttError(Exception):
    """For MQTT Specific Errors"""
    pass

Next there are all of the functions that are called by RPC and are just standard MQTT connection functions as used in the library examples with a few exceptions.

First in mqtt_publish(), if the connection has been dropped, it will attempt to reconnect automatically. This seemed to make the code overall more reliable.

mqtt_get_last_value() just returns a corresponding value from one of the topics it was watching if available, otherwise it just returns None.

Finally is the is_running() function which is simply used to check that there is an RPC connection when the MacroPad is waiting for the server.

# Default to 1883 as SSL on CPython is not currently supported
def mqtt_init(broker, port=1883, username=None, password=None):
    global mqtt_client, mqtt_connect_info
    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        username=username,
        password=password,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
    )

    mqtt_client.on_connect = connect
    mqtt_client.on_disconnect = disconnect
    mqtt_client.on_message = message

def mqtt_connect():
    mqtt_client.connect()

def mqtt_publish(topic, payload):
    if mqtt_client is None:
        raise MqttError("MQTT is not initialized")
    try:
        return_val = mqtt_client.publish(topic, json.dumps(payload))
    except BrokenPipeError:
        time.sleep(0.5)
        mqtt_client.connect()
        return_val = mqtt_client.publish(topic, json.dumps(payload))
    return return_val

def mqtt_subscribe(topic):
    if mqtt_client is None:
        raise MqttError("MQTT is not initialized")
    return mqtt_client.subscribe(topic)

def mqtt_get_last_value(topic):
    """Return the last value we have received regarding a topic"""
    if topic in last_mqtt_messages.keys():
        return last_mqtt_messages[topic]
    return None

def is_running():
    return True

This is the handler function and where all the magic happens. It starts by making sure the called function isn't in the protected functions list. Then it checks to make sure the function is in the globals() list just to make sure something like the_function_that_doesn't_really_exist() was called.

Assuming it gets this far, it will just call the function with all of the arguments and let Python handle any mismatched arguments. If everything happened like it was supposed to, there may be a return value. A response packet is created and returned. If not, an error response packet is created and returned.

def handle_rpc(packet):
    """This function will verify good data in packet,
    call the method with parameters, and generate a response
    packet as the return value"""
    print("Received packet")
    func_name = packet['function']
    if func_name in PROTECTED_FUNCTIONS:
        return rpc.create_response_packet(error=True, message=f"{func_name}'() is a protected function and can not be called.")
    if func_name not in globals():
        return rpc.create_response_packet(error=True, message=f"Function {func_name}() not found")
    try:
        return_val = globals()[func_name](*packet['args'], **packet['kwargs'])
    except MqttError as err:
        return rpc.create_response_packet(error=True, error_type="MQTT", message=str(err))

    packet = rpc.create_response_packet(return_val=return_val)
    return packet

Here is the main function that really just keeps calling the RpcServer loop() function and if MQTT is connected, it calls the MQTT loop() function.

def main():
    """Command line, entry point"""
    global mqtt_connected
    while True:
        rpc.loop(0.25)
        if mqtt_connected and mqtt_client is not None:
            try:
                mqtt_client.loop(0.5)
            except AttributeError:
                mqtt_connected = False

Finally is the code that serves as the entry and exit points to the script.

if __name__ == '__main__':
    rpc = RpcServer(handle_rpc)
    try:
        print(f"Listening for RPC Calls, to stop press \"CTRL+C\"")
        main()
    except KeyboardInterrupt:
        print("")
        print(f"Caught interrupt, exiting...")
    rpc.close_serial()

Full Code Listing

import time
import json
import ssl
import socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from rpc import RpcServer

mqtt_client = None
mqtt_connected = False
last_mqtt_messages = {}

# For program flow purposes, we do not want these functions to be called remotely
PROTECTED_FUNCTIONS = ["main", "handle_rpc"]

def connect(mqtt_client, userdata, flags, rc):
    global mqtt_connected
    mqtt_connected = True

def disconnect(mqtt_client, userdata, rc):
    global mqtt_connected
    mqtt_connected = False

def message(client, topic, message):
    last_mqtt_messages[topic] = message

class MqttError(Exception):
    """For MQTT Specific Errors"""
    pass

# Default to 1883 as SSL on CPython is not currently supported
def mqtt_init(broker, port=1883, username=None, password=None):
    global mqtt_client, mqtt_connect_info
    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        username=username,
        password=password,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
    )

    mqtt_client.on_connect = connect
    mqtt_client.on_disconnect = disconnect
    mqtt_client.on_message = message

def mqtt_connect():
    mqtt_client.connect()

def mqtt_publish(topic, payload):
    if mqtt_client is None:
        raise MqttError("MQTT is not initialized")
    try:
        return_val = mqtt_client.publish(topic, json.dumps(payload))
    except BrokenPipeError:
        time.sleep(0.5)
        mqtt_client.connect()
        return_val = mqtt_client.publish(topic, json.dumps(payload))
    return return_val

def mqtt_subscribe(topic):
    if mqtt_client is None:
        raise MqttError("MQTT is not initialized")
    return mqtt_client.subscribe(topic)

def mqtt_get_last_value(topic):
    """Return the last value we have received regarding a topic"""
    if topic in last_mqtt_messages.keys():
        return last_mqtt_messages[topic]
    return None

def is_running():
    return True

def handle_rpc(packet):
    """This function will verify good data in packet,
    call the method with parameters, and generate a response
    packet as the return value"""
    print("Received packet")
    func_name = packet['function']
    if func_name in PROTECTED_FUNCTIONS:
        return rpc.create_response_packet(error=True, message=f"{func_name}'() is a protected function and can not be called.")
    if func_name not in globals():
        return rpc.create_response_packet(error=True, message=f"Function {func_name}() not found")
    try:
        return_val = globals()[func_name](*packet['args'], **packet['kwargs'])
    except MqttError as err:
        return rpc.create_response_packet(error=True, error_type="MQTT", message=str(err))

    packet = rpc.create_response_packet(return_val=return_val)
    return packet

def main():
    """Command line, entry point"""
    global mqtt_connected
    while True:
        rpc.loop(0.25)
        if mqtt_connected and mqtt_client is not None:
            try:
                mqtt_client.loop(0.5)
            except AttributeError:
                mqtt_connected = False

if __name__ == '__main__':
    rpc = RpcServer(handle_rpc)
    try:
        print(f"Listening for RPC Calls, to stop press \"CTRL+C\"")
        main()
    except KeyboardInterrupt:
        print("")
        print(f"Caught interrupt, exiting...")
    rpc.close_serial()

This guide was first published on Aug 11, 2021. It was last updated on 2021-08-11 11:03:22 -0400.

This page (MacroPad Code) was last updated on Sep 15, 2021.

Text editor powered by tinymce.