First the code starts off by importing all of the libraries that will be used. One to take note of is the rpc
library which is project specific.
import os import time import displayio import terminalio from adafruit_display_shapes.rect import Rect from adafruit_display_text import label from adafruit_macropad import MacroPad from rpc import RpcClient, RpcError, MqttError
Now to initialize the MacroPad and RpcClient libraries.
macropad = MacroPad() rpc = RpcClient()
Next are the configurable settings:
-
COMMAND_TOPIC
is what Home Assistant should listen to. -
SUBSCRIBE_TOPICS
are the MQTT topics that the code should subscribe to in order to get the current status of the lights. It is highly likely that you will need to change this in order to match your specific setup. -
ENCODER_ITEM
refers to thekey_number
that should be sent when pressing the encoder knob. If you don't want it to respond, change the value toNone
. KEY_LABELS
are just the labels that are displayed that correspond to the buttons.UPDATE_DELAY
is the amount of time in seconds that the code should wait after sending a command before checking the status of the light. If it often seems to be the wrong status, you may want to increase the value, but it will seem less snappy.NEOPIXEL_COLORS
refer to the value that the NeoPixels should light up corresponding to the value of the possible answers inSUBSCRIBE_TOPICS
.
COMMAND_TOPIC = "macropad/peripheral" SUBSCRIBE_TOPICS = ("stat/demoswitch/POWER", "stat/office-light/POWER") ENCODER_ITEM = 0 KEY_LABELS = ("Demo", "Office") UPDATE_DELAY = 0.25 NEOPIXEL_COLORS = { "OFF": 0xFF0000, "ON": 0x00FF00, }
The next bit of code will draw the labels that display what the buttons do and was borrowed from the MACROPAD Hotkeys guide because of the nice aesthetic.
group = displayio.Group() for key_index in range(12): x = key_index % 3 y = key_index // 3 group.append( label.Label( terminalio.FONT, text=(str(KEY_LABELS[key_index]) if key_index < len(KEY_LABELS) else ""), color=0xFFFFFF, anchored_position=( (macropad.display.width - 1) * x / 2, macropad.display.height - 1 - (3 - y) * 12, ), anchor_point=(x / 2, 1.0), ) ) group.append(Rect(0, 0, macropad.display.width, 12, fill=0xFFFFFF)) group.append( label.Label( terminalio.FONT, text="Home Assistant", color=0x000000, anchored_position=(macropad.display.width // 2, -2), anchor_point=(0.5, 0.0), ) ) macropad.display.root_group = group
This next function is simple, but makes things much easier. It allows you to specify the function you would like to call remotely and pass in the parameters in the same way as you would pass them into the remote function. It also handles raising the appropriate kind of error or returning the Return Value if it was successful.
def rpc_call(function, *args, **kwargs): response = rpc.call(function, *args, **kwargs) if response["error"]: if response["error_type"] == "mqtt": raise MqttError(response["message"]) raise RpcError(response["message"]) return response["return_val"]
The next couple of functions use the rpc_call()
function to connect to MQTT and update the key colors. The os.getenv()
function is used to get settings from settings.toml.
def mqtt_init(): rpc_call( "mqtt_init", os.getenv("MQTT_BROKER"), username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"), port=os.getenv("MQTT_PORT"), ) rpc_call("mqtt_connect") def update_key(key_id): if key_id < len(SUBSCRIBE_TOPICS): switch_state = rpc_call("mqtt_get_last_value", SUBSCRIBE_TOPICS[key_id]) if switch_state is not None: macropad.pixels[key_id] = NEOPIXEL_COLORS[switch_state] else: macropad.pixels[key_id] = 0
This bit of code waits for the server to start running by attempting to call a simple function and checking if an RpcError
is being returned.
server_is_running = False print("Waiting for server...") while not server_is_running: try: server_is_running = rpc_call("is_running") print("Connected") except RpcError: pass
Once it is all connected, one last bit of code is run before entering the main loop. It just connects to MQTT and then subscribes to all of the SUBSCRIBE_TOPICS
.
mqtt_init() last_macropad_encoder_value = macropad.encoder for key_number, topic in enumerate(SUBSCRIBE_TOPICS): rpc_call("mqtt_subscribe", topic) update_key(key_number)
The main loop just listens to the MacroPad library for button presses and encoder changes and if it detects them it will publish that change to MQTT.
while True: output = {} key_event = macropad.keys.events.get() if key_event and key_event.pressed: output["key_number"] = key_event.key_number if macropad.encoder != last_macropad_encoder_value: output["encoder"] = macropad.encoder - last_macropad_encoder_value last_macropad_encoder_value = macropad.encoder macropad.encoder_switch_debounced.update() if ( macropad.encoder_switch_debounced.pressed and "key_number" not in output and ENCODER_ITEM is not None ): output["key_number"] = ENCODER_ITEM if output: try: rpc_call("mqtt_publish", COMMAND_TOPIC, output) if "key_number" in output: time.sleep(UPDATE_DELAY) update_key(output["key_number"]) elif ENCODER_ITEM is not None: update_key(ENCODER_ITEM) except MqttError: mqtt_init() except RpcError as err_msg: print(err_msg)
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries # # SPDX-License-Identifier: Unlicense """ Home Assistant Remote Procedure Call for MacroPad. """ import os import time import displayio import terminalio from adafruit_display_shapes.rect import Rect from adafruit_display_text import label from adafruit_macropad import MacroPad from rpc import RpcClient, RpcError, MqttError macropad = MacroPad() rpc = RpcClient() COMMAND_TOPIC = "macropad/peripheral" SUBSCRIBE_TOPICS = ("stat/demoswitch/POWER", "stat/office-light/POWER") ENCODER_ITEM = 0 KEY_LABELS = ("Demo", "Office") UPDATE_DELAY = 0.25 NEOPIXEL_COLORS = { "OFF": 0xFF0000, "ON": 0x00FF00, } # Set up displayio group with all the labels group = displayio.Group() for key_index in range(12): x = key_index % 3 y = key_index // 3 group.append( label.Label( terminalio.FONT, text=(str(KEY_LABELS[key_index]) if key_index < len(KEY_LABELS) else ""), color=0xFFFFFF, anchored_position=( (macropad.display.width - 1) * x / 2, macropad.display.height - 1 - (3 - y) * 12, ), anchor_point=(x / 2, 1.0), ) ) group.append(Rect(0, 0, macropad.display.width, 12, fill=0xFFFFFF)) group.append( label.Label( terminalio.FONT, text="Home Assistant", color=0x000000, anchored_position=(macropad.display.width // 2, -2), anchor_point=(0.5, 0.0), ) ) macropad.display.root_group = group def rpc_call(function, *args, **kwargs): response = rpc.call(function, *args, **kwargs) if response["error"]: if response["error_type"] == "mqtt": raise MqttError(response["message"]) raise RpcError(response["message"]) return response["return_val"] def mqtt_init(): rpc_call( "mqtt_init", os.getenv("MQTT_BROKER"), username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"), port=os.getenv("MQTT_PORT"), ) rpc_call("mqtt_connect") def update_key(key_id): if key_id < len(SUBSCRIBE_TOPICS): switch_state = rpc_call("mqtt_get_last_value", SUBSCRIBE_TOPICS[key_id]) if switch_state is not None: macropad.pixels[key_id] = NEOPIXEL_COLORS[switch_state] else: macropad.pixels[key_id] = 0 server_is_running = False print("Waiting for server...") while not server_is_running: try: server_is_running = rpc_call("is_running") print("Connected") except RpcError: pass mqtt_init() last_macropad_encoder_value = macropad.encoder for key_number, topic in enumerate(SUBSCRIBE_TOPICS): rpc_call("mqtt_subscribe", topic) update_key(key_number) while True: output = {} key_event = macropad.keys.events.get() if key_event and key_event.pressed: output["key_number"] = key_event.key_number if macropad.encoder != last_macropad_encoder_value: output["encoder"] = macropad.encoder - last_macropad_encoder_value last_macropad_encoder_value = macropad.encoder macropad.encoder_switch_debounced.update() if ( macropad.encoder_switch_debounced.pressed and "key_number" not in output and ENCODER_ITEM is not None ): output["key_number"] = ENCODER_ITEM if output: try: rpc_call("mqtt_publish", COMMAND_TOPIC, output) if "key_number" in output: time.sleep(UPDATE_DELAY) update_key(output["key_number"]) elif ENCODER_ITEM is not None: update_key(ENCODER_ITEM) except MqttError: mqtt_init() except RpcError as err_msg: print(err_msg)
Page last edited January 21, 2025
Text editor powered by tinymce.