The code walkthrough starts with the shared RPC library, because this library is the foundation that the rest of the code relies on. This library was written in such a way that it can be used with both CPython and CircuitPython, but the Server component relies on a CPython specific library and the Client library is expecting the CDC data device to be enabled, so to make them truly work on either would require additional code.
First, it tries to import some CPython specific libraries and uses that to determine the environment that the library is running in:
import time import json try: import serial import adafruit_board_toolkit.circuitpython_serial json_decode_exception = json.decoder.JSONDecodeError except ImportError: import usb_cdc as serial json_decode_exception = ValueError
Next are a couple of adjustable parameters for timeout values. These values seemed to work well, but feel free to adjust them if it improves performance for you.
RESPONSE_TIMEOUT = 5 DATA_TIMEOUT = 0.5
Next the custom errors RpcError
and MqttError
are defined to differentiate them from other Python errors that are specific to this library or MQTT.
class RpcError(Exception): """For RPC Specific Errors""" class MqttError(Exception): """For MQTT Specific Errors"""
Next up is the code that is shared between the libraries which is called _Rpc
and has the underscore because the base class is not meant to be directly instantiated.
class _Rpc: def __init__(self): self._serial = None
This code will create a response packet which makes it so the receiving component will know the structure of what to expect. By having it in a function, the code can pass just the minimum of what it needs to and get a full packet out.
@staticmethod def create_response_packet( error=False, error_type="RPC", message=None, return_val=None ): return { "error": error, "error_type": error_type if error else None, "message": message, "return_val": return_val, }
The other kind of packet is the request packet to request an RPC operation.
@staticmethod def create_request_packet(function, args=[], kwargs={}): return { "function": function, "args": args, "kwargs": kwargs }
The _wait_for_packet()
function behaves slightly differently depending on whether a timeout was given or not. If timeout is None
, it will continue to wait indefinitely until a packet is received. Otherwise it will exit the function with an error response packet if it times out.
If it doesn't time out and a packet is received, the received packet will be returned to the calling function. One other thing that this function is responsible for is understanding the type of packet it is listening for and whether it has received the entire thing.
def _wait_for_packet(self, timeout=None): incoming_packet = b"" if timeout is not None: response_start_time = time.monotonic() while True: if incoming_packet: data_start_time = time.monotonic() while not self._serial.in_waiting: if ( incoming_packet and (time.monotonic() - data_start_time) >= DATA_TIMEOUT ): incoming_packet = b"" if not incoming_packet and timeout is not None: if (time.monotonic() - response_start_time) >= timeout: return self.create_response_packet( error=True, message="Timed out waiting for response" ) time.sleep(0.001) data = self._serial.read(self._serial.in_waiting) if data: try: incoming_packet += data packet = json.loads(incoming_packet) # json can try to be clever with missing braces, so make sure we have everything if sorted(tuple(packet.keys())) == sorted(self._packet_format()): return packet except json_decode_exception: pass # Incomplete packet
The first kind of class that can be created from this library is the RpcClient
. The RpcClient
is the component that will make the calls and listen for responses from the RpcServer
and is fairly straightforward because it makes use of much of the shared code covered above and call()
is really the only unique public function.
_packet_format()
just helps the _wait_for_packet()
function know what type of packet it is listening for.
class RpcClient(_Rpc): def __init__(self): super().__init__() self._serial = serial.data def _packet_format(self): return self.create_response_packet().keys() def call(self, function, *args, **kwargs): packet = self.create_request_packet(function, args, kwargs) self._serial.write(bytes(json.dumps(packet), "utf-8")) # Wait for response packet to indicate success return self._wait_for_packet(RESPONSE_TIMEOUT)
RpcServer
is a bit more involved because it needs PySerial to handle initializing the serial connection whereas the RpcClient
, which is intended to be run on a CircuitPython device, has already taken care of that. The RpcServer
starts off with needing a handler
function passed in, which is called whenever a packet is received. The reason for using this strategy is because of function scope. If the handler were built into the library, only the library functions would be accessible.
One of the nice things about the expected setup is that the RpcServer
is expecting a CircuitPython device, so it makes use of the Adafruit_Board_Toolkit to automatically detect which port the MacroPad is connected to. It is also able to return only the CDC Data devices, further simplifying things.
The loop()
function is intended to be called regularly to listen for and process request packets by sending then to the handler
function specified when the library was instantiated.
class RpcServer(_Rpc): def __init__(self, handler, baudrate=9600): super().__init__() self._serial = self.init_serial(baudrate) self._handler = handler def _packet_format(self): return self.create_request_packet(None).keys() def init_serial(self, baudrate): port = self.detect_port() return serial.Serial( port, baudrate, parity="N", rtscts=False, xonxoff=False, exclusive=True, ) @staticmethod def detect_port(self): """ Detect the port automatically """ comports = adafruit_board_toolkit.circuitpython_serial.data_comports() ports = [comport.device for comport in comports] if len(ports) >= 1: if len(ports) > 1: print("Multiple devices detected, using the first detected port.") return ports[0] raise RuntimeError( "Unable to find any CircuitPython Devices with the CDC Data port enabled." ) def loop(self, timeout=None): packet = self._wait_for_packet(timeout) if "error" not in packet: response_packet = self._handler(packet) self._serial.write(bytes(json.dumps(response_packet), "utf-8")) def close_serial(self): if self._serial is not None: self._serial.close()
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries # # SPDX-License-Identifier: Unlicense """ USB CDC Remote Procedure Call class """ import time import json try: import serial import adafruit_board_toolkit.circuitpython_serial json_decode_exception = json.decoder.JSONDecodeError except ImportError: import usb_cdc as serial json_decode_exception = ValueError RESPONSE_TIMEOUT = 5 DATA_TIMEOUT = 0.5 class RpcError(Exception): """For RPC Specific Errors""" class MqttError(Exception): """For MQTT Specific Errors""" class _Rpc: def __init__(self): self._serial = None @staticmethod def create_response_packet( error=False, error_type="RPC", message=None, return_val=None ): return { "error": error, "error_type": error_type if error else None, "message": message, "return_val": return_val, } @staticmethod def create_request_packet(function, args=[], kwargs={}): # pylint: disable=dangerous-default-value return {"function": function, "args": args, "kwargs": kwargs} def _wait_for_packet(self, timeout=None): incoming_packet = b"" if timeout is not None: response_start_time = time.monotonic() while True: if incoming_packet: data_start_time = time.monotonic() while not self._serial.in_waiting: if ( incoming_packet and (time.monotonic() - data_start_time) >= DATA_TIMEOUT ): incoming_packet = b"" if not incoming_packet and timeout is not None: if (time.monotonic() - response_start_time) >= timeout: return self.create_response_packet( error=True, message="Timed out waiting for response" ) time.sleep(0.001) data = self._serial.read(self._serial.in_waiting) if data: try: incoming_packet += data packet = json.loads(incoming_packet) # json can try to be clever with missing braces, so make sure we have everything if sorted(tuple(packet.keys())) == sorted(self._packet_format()): return packet except json_decode_exception: pass # Incomplete packet class RpcClient(_Rpc): def __init__(self): super().__init__() self._serial = serial.data def _packet_format(self): return self.create_response_packet().keys() def call(self, function, *args, **kwargs): packet = self.create_request_packet(function, args, kwargs) self._serial.write(bytes(json.dumps(packet), "utf-8")) # Wait for response packet to indicate success return self._wait_for_packet(RESPONSE_TIMEOUT) class RpcServer(_Rpc): def __init__(self, handler, baudrate=9600): super().__init__() self._serial = self.init_serial(baudrate) self._handler = handler def _packet_format(self): return self.create_request_packet(None).keys() def init_serial(self, baudrate): port = self.detect_port() return serial.Serial( port, baudrate, parity="N", rtscts=False, xonxoff=False, exclusive=True, ) @staticmethod def detect_port(): """ Detect the port automatically """ comports = adafruit_board_toolkit.circuitpython_serial.data_comports() ports = [comport.device for comport in comports] if len(ports) >= 1: if len(ports) > 1: print("Multiple devices detected, using the first detected port.") return ports[0] raise RuntimeError( "Unable to find any CircuitPython Devices with the CDC Data port enabled." ) def loop(self, timeout=None): packet = self._wait_for_packet(timeout) if "error" not in packet: response_packet = self._handler(packet) self._serial.write(bytes(json.dumps(response_packet), "utf-8")) def close_serial(self): if self._serial is not None: self._serial.close()
Page last edited January 21, 2025
Text editor powered by tinymce.