Finally there is code that runs on the host computer and acts as a server. First are the imported libraries:

import time
import json
import ssl
import socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from rpc import RpcServer, MqttError

Next are a couple variables to keep track of the state of things. The dict is used to avoid using the global keyword, since it remains in the same place in memory.

mqtt_status = {
    "connected": False,
    "client": None,
}
last_mqtt_messages = {}

Next is a list of protected functions. The purpose of this list is to prevent calling these function to avoid memory loops or other situations that would likely crash Python or may result in some difficult to debug situations.

# For program flow purposes, we do not want these functions to be called remotely
PROTECTED_FUNCTIONS = ["main", "handle_rpc"]

These functions are to keep track of our connection and the statuses of the topics that are being watched. These are used as callbacks when MQTT is initialized.

def connect(mqtt_client, userdata, flags, rc):
    mqtt_status["connected"] = True

def disconnect(mqtt_client, userdata, rc):
    mqtt_status["connected"] = False

def message(_client, topic, payload):
    last_mqtt_messages[topic] = payload

Next there are all of the functions that are called by RPC and are just standard MQTT connection functions as used in the library examples with a few exceptions.

First in mqtt_publish(), if the connection has been dropped, it will attempt to reconnect automatically. This seemed to make the code overall more reliable.

mqtt_get_last_value() just returns a corresponding value from one of the topics it was watching if available, otherwise it just returns None.

Finally is the is_running() function which is simply used to check that there is an RPC connection when the MacroPad is waiting for the server.

# Default to 1883 since SSL on CPython is not currently supported
def mqtt_init(broker, port=1883, username=None, password=None):
    mqtt_status["client"] = MQTT.MQTT(
        broker=broker,
        port=port,
        username=username,
        password=password,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
    )

    mqtt_status["client"].on_connect = connect
    mqtt_status["client"].on_disconnect = disconnect
    mqtt_status["client"].on_message = message

def mqtt_connect():
    mqtt_status["client"].connect()

def mqtt_publish(topic, payload):
    if mqtt_status["client"] is None:
        raise MqttError("MQTT is not initialized")
    try:
        return_val = mqtt_status["client"].publish(topic, json.dumps(payload))
    except BrokenPipeError:
        time.sleep(0.5)
        mqtt_status["client"].connect()
        return_val = mqtt_status["client"].publish(topic, json.dumps(payload))
    return return_val

def mqtt_subscribe(topic):
    if mqtt_status["client"] is None:
        raise MqttError("MQTT is not initialized")
    return mqtt_status["client"].subscribe(topic)

def mqtt_get_last_value(topic):
    """Return the last value we have received regarding a topic"""
    if topic in last_mqtt_messages.keys():
        return last_mqtt_messages[topic]
    return None

def is_running():
    return True

This is the handler function and where all the magic happens. It starts by making sure the called function isn't in the protected functions list. Then it checks to make sure the function is in the globals() list just to make sure something like some_function_that_does_not_exist() was called.

Assuming it gets this far, it will just call the function with all of the arguments and let Python handle any mismatched arguments. If everything happened like it was supposed to, there may be a return value. A response packet is created and returned. If not, an error response packet is created and returned.

def handle_rpc(packet):
    """This function will verify good data in packet,
    call the method with parameters, and generate a response
    packet as the return value"""
    print("Received packet")
    func_name = packet["function"]
    if func_name in PROTECTED_FUNCTIONS:
        return rpc.create_response_packet(
            error=True,
            message=f"{func_name}'() is a protected function and can not be called."
        )
    if func_name not in globals():
        return rpc.create_response_packet(
            error=True,
            message=f"Function {func_name}() not found"
        )
    try:
        return_val = globals()[func_name](*packet['args'], **packet['kwargs'])
    except MqttError as err:
        return rpc.create_response_packet(error=True, error_type="MQTT", message=str(err))

    packet = rpc.create_response_packet(return_val=return_val)
    return packet

Here is the main function that really just keeps calling the RpcServer loop() function and if MQTT is connected, it calls the MQTT loop() function.

def main():
    """Command line, entry point"""
    while True:
        rpc.loop(0.25)
        if mqtt_status["connected"] and mqtt_status["client"] is not None:
            try:
                mqtt_status["client"].loop(0.5)
            except AttributeError:
                mqtt_status["connected"] = False

Finally is the code that serves as the entry and exit points to the script.

if __name__ == '__main__':
    rpc = RpcServer(handle_rpc)
    try:
        print('Listening for RPC Calls, to stop press "CTRL+C"')
        main()
    except KeyboardInterrupt:
        print("")
        print("Caught interrupt, exiting...")
    rpc.close_serial()

Full Code Listing

# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT

import time
import json
import ssl
import socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from rpc import RpcServer, MqttError

mqtt_status = {
    "connected": False,
    "client": None,
}
last_mqtt_messages = {}

# For program flow purposes, we do not want these functions to be called remotely
PROTECTED_FUNCTIONS = ["main", "handle_rpc"]

def connect(_mqtt_client, _userdata, _flags, _rc):
    mqtt_status["connected"] = True

def disconnect(_mqtt_client, _userdata, _rc):
    mqtt_status["connected"] = False

def message(_client, topic, payload):
    last_mqtt_messages[topic] = payload

# Default to 1883 since SSL on CPython is not currently supported
def mqtt_init(broker, port=1883, username=None, password=None):
    mqtt_status["client"] = MQTT.MQTT(
        broker=broker,
        port=port,
        username=username,
        password=password,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
    )

    mqtt_status["client"].on_connect = connect
    mqtt_status["client"].on_disconnect = disconnect
    mqtt_status["client"].on_message = message

def mqtt_connect():
    mqtt_status["client"].connect()

def mqtt_publish(topic, payload):
    if mqtt_status["client"] is None:
        raise MqttError("MQTT is not initialized")
    try:
        return_val = mqtt_status["client"].publish(topic, json.dumps(payload))
    except BrokenPipeError:
        time.sleep(0.5)
        mqtt_status["client"].connect()
        return_val = mqtt_status["client"].publish(topic, json.dumps(payload))
    return return_val

def mqtt_subscribe(topic):
    if mqtt_status["client"] is None:
        raise MqttError("MQTT is not initialized")
    return mqtt_status["client"].subscribe(topic)

def mqtt_get_last_value(topic):
    """Return the last value we have received regarding a topic"""
    if topic in last_mqtt_messages.keys():
        return last_mqtt_messages[topic]
    return None

def is_running():
    return True

def handle_rpc(packet):
    """This function will verify good data in packet,
    call the method with parameters, and generate a response
    packet as the return value"""
    print("Received packet")
    func_name = packet["function"]
    if func_name in PROTECTED_FUNCTIONS:
        return rpc.create_response_packet(
            error=True,
            message=f"{func_name}'() is a protected function and can not be called.",
        )
    if func_name not in globals():
        return rpc.create_response_packet(
            error=True, message=f"Function {func_name}() not found"
        )
    try:
        return_val = globals()[func_name](*packet["args"], **packet["kwargs"])
    except MqttError as err:
        return rpc.create_response_packet(
            error=True, error_type="MQTT", message=str(err)
        )

    packet = rpc.create_response_packet(return_val=return_val)
    return packet

def main():
    """Command line, entry point"""
    while True:
        rpc.loop(0.25)
        if mqtt_status["connected"] and mqtt_status["client"] is not None:
            try:
                mqtt_status["client"].loop(0.5)
            except AttributeError:
                mqtt_status["connected"] = False

if __name__ == "__main__":
    rpc = RpcServer(handle_rpc)
    try:
        print('Listening for RPC Calls, to stop press "CTRL+C"')
        main()
    except KeyboardInterrupt:
        print("")
        print("Caught interrupt, exiting...")
    rpc.close_serial()

This guide was first published on Aug 11, 2021. It was last updated on Jun 21, 2024.

This page (Host Computer Code) was last updated on Jun 21, 2024.

Text editor powered by tinymce.