The code walkthrough starts with the shared RPC library, because this library is the foundation that the rest of the code relies on. This library was written in such a way that it can be used with both CPython and CircuitPython, but the Server component relies on a CPython specific library and the Client library is expecting the CDC data device to be enabled, so to make them truly work on either would require additional code.

First, it tries to import some CPython specific libraries and uses that to determine the environment that the library is running in:

import time
import json
try:
    import serial
    import adafruit_board_toolkit.circuitpython_serial
    json_decode_exception = json.decoder.JSONDecodeError
except ImportError:
    import usb_cdc as serial
    json_decode_exception = ValueError

Next are a couple of adjustable parameters for timeout values. These values seemed to work well, but feel free to adjust them if it improves performance for you.

RESPONSE_TIMEOUT=5
DATA_TIMEOUT=0.5

Next a custom RpcError is defined  to differentiate it from other Python errors that are specific to this library.

class RpcError(Exception):
    """For RPC Specific Errors"""
    pass

Next up is the code that is shared between the libraries which is called _Rpc and has the underscore because the base class is not meant to be directly instantiated.

class _Rpc:
    def __init__(self):
        self._serial = None

This code will create a response packet which makes it so the receiving component will know the structure of what to expect. By having it in a function, the code can pass just the minimum of what it needs to and get a full packet out.

@staticmethod
def create_response_packet(error=False, error_type="RPC", message=None, return_val=None):
    return {
        "error": error,
        "error_type": error_type if error else None,
        "message": message,
        "return_val": return_val
    }

The other kind of packet is the request packet to request an RPC operation.

@staticmethod
def create_request_packet(function, args=[], kwargs={}):
    return {
        "function": function,
        "args": args,
        "kwargs": kwargs
    }

The _wait_for_packet() function behaves slightly differently depending on whether a timeout was given or not. If timeout is None, it will continue to wait indefinitely until a packet is received. Otherwise it will exit the function with an error response packet if it times out.

If it doesn't time out and a packet is received, the received packet will be returned to the calling function. One other thing that this function is responsible for is understanding the type of packet it is listening for and whether it has received the entire thing.

def _wait_for_packet(self, timeout=None):
    incoming_packet = b""
    if timeout is not None:
    	response_start_time = time.monotonic()
    while True:
        if incoming_packet:
        	data_start_time = time.monotonic()
        while not self._serial.in_waiting:
            if incoming_packet and (time.monotonic() - data_start_time) >= DATA_TIMEOUT:
            	incoming_packet = b""
            if not incoming_packet and timeout is not None:
                if (time.monotonic() - response_start_time) >= timeout:
                	return self.create_response_packet(error=True, message="Timed out waiting for response")
            time.sleep(0.001)
        data = self._serial.read(self._serial.in_waiting)
        if data:
            try:
                incoming_packet += data
                packet = json.loads(incoming_packet)
                # json can try to be clever with missing braces, so make sure we have everything
                if sorted(tuple(packet.keys())) == sorted(self._packet_format()):
                	return packet
            except json_decode_exception:
            	pass # Incomplete packet

The first kind of class that can be created from this library is the RpcClient. The RpcClient is the component that will make the calls and listen for responses from the RpcServer and is fairly straightforward because it makes use of much of the shared code covered above and call() is really the only unique public function.

_packet_format() just helps the _wait_for_packet() function know what type of packet it is listening for.

class RpcClient(_Rpc):
    def __init__(self):
        super().__init__()
        self._serial = serial.data
    
    def _packet_format(self):
        return self.create_response_packet().keys()

    def call(self, function, *args, **kwargs):
        packet = self.create_request_packet(function, args, kwargs)
        self._serial.write(bytes(json.dumps(packet), "utf-8"))
        # Wait for response packet to indicate success
        return self._wait_for_packet(RESPONSE_TIMEOUT)

RpcServer is a bit more involved because it needs PySerial to handle initializing the serial connection whereas the RpcClient, which is intended to be run on a CircuitPython device, has already taken care of that. The RpcServer starts off with needing a handler function passed in, which is called whenever a packet is received. The reason for using this strategy is because of function scope. If the handler were built into the library, only the library functions would be accessible.

One of the nice things about the expected setup is that the RpcServer is expecting a CircuitPython device, so it makes use of the Adafruit_Board_Toolkit to automatically detect which port the MacroPad is connected to. It is also able to return only the CDC Data devices, further simplifying things.

The loop() function is intended to be called regularly to listen for and process request packets by sending then to the handler function specified when the library was instantiated.

class RpcServer(_Rpc):
    def __init__(self, handler, baudrate=9600):
        super().__init__()
        self._serial = self.init_serial(baudrate)
        self._handler = handler

    def _packet_format(self):
        return self.create_request_packet(None).keys()

    def init_serial(self, baudrate):
        port = self.detect_port()

        return serial.Serial(
            port,
            baudrate,
            parity='N',
            rtscts=False,
            xonxoff=False,
            exclusive=True,
        )

    def detect_port(self):
        """
        Detect the port automatically
        """
        comports = adafruit_board_toolkit.circuitpython_serial.data_comports()
        ports = [comport.device for comport in comports]
        if len(ports) >= 1:
            if len(ports) > 1:
                print("Multiple devices detected, using the first detected port.")
            return ports[0]
        raise RuntimeError("Unable to find any CircuitPython Devices with the CDC Data port enabled.")

    def loop(self, timeout=None):
        packet = self._wait_for_packet(timeout)
        if "error" not in packet:
            response_packet = self._handler(packet)
            self._serial.write(bytes(json.dumps(response_packet), "utf-8"))
    
    def close_serial(self):
        if self._serial is not None:
            self._serial.close()

Full Code Listing

# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: Unlicense
"""
USB CDC Remote Procedure Call class
"""

import time
import json
try:
    import serial
    import adafruit_board_toolkit.circuitpython_serial
    json_decode_exception = json.decoder.JSONDecodeError
except ImportError:
    import usb_cdc as serial
    json_decode_exception = ValueError

RESPONSE_TIMEOUT=5
DATA_TIMEOUT=0.5

class RpcError(Exception):
    """For RPC Specific Errors"""
    pass

class _Rpc:
    def __init__(self):
        self._serial = None

    @staticmethod
    def create_response_packet(error=False, error_type="RPC", message=None, return_val=None):
        return {
            "error": error,
            "error_type": error_type if error else None,
            "message": message,
            "return_val": return_val
        }

    @staticmethod
    def create_request_packet(function, args=[], kwargs={}):
        return {
            "function": function,
            "args": args,
            "kwargs": kwargs
        }

    def _wait_for_packet(self, timeout=None):
        incoming_packet = b""
        if timeout is not None:
            response_start_time = time.monotonic()
        while True:
            if incoming_packet:
                data_start_time = time.monotonic()
            while not self._serial.in_waiting:
                if incoming_packet and (time.monotonic() - data_start_time) >= DATA_TIMEOUT:
                    incoming_packet = b""
                if not incoming_packet and timeout is not None:
                    if (time.monotonic() - response_start_time) >= timeout:
                        return self.create_response_packet(error=True, message="Timed out waiting for response")
                time.sleep(0.001)
            data = self._serial.read(self._serial.in_waiting)
            if data:
                try:
                    incoming_packet += data
                    packet = json.loads(incoming_packet)
                    # json can try to be clever with missing braces, so make sure we have everything
                    if sorted(tuple(packet.keys())) == sorted(self._packet_format()):
                        return packet
                except json_decode_exception:
                    pass # Incomplete packet

class RpcClient(_Rpc):
    def __init__(self):
        super().__init__()
        self._serial = serial.data
    
    def _packet_format(self):
        return self.create_response_packet().keys()

    def call(self, function, *args, **kwargs):
        packet = self.create_request_packet(function, args, kwargs)
        self._serial.write(bytes(json.dumps(packet), "utf-8"))
        # Wait for response packet to indicate success
        return self._wait_for_packet(RESPONSE_TIMEOUT)

class RpcServer(_Rpc):
    def __init__(self, handler, baudrate=9600):
        super().__init__()
        self._serial = self.init_serial(baudrate)
        self._handler = handler

    def _packet_format(self):
        return self.create_request_packet(None).keys()

    def init_serial(self, baudrate):
        port = self.detect_port()

        return serial.Serial(
            port,
            baudrate,
            parity='N',
            rtscts=False,
            xonxoff=False,
            exclusive=True,
        )

    def detect_port(self):
        """
        Detect the port automatically
        """
        comports = adafruit_board_toolkit.circuitpython_serial.data_comports()
        ports = [comport.device for comport in comports]
        if len(ports) >= 1:
            if len(ports) > 1:
                print("Multiple devices detected, using the first detected port.")
            return ports[0]
        raise RuntimeError("Unable to find any CircuitPython Devices with the CDC Data port enabled.")

    def loop(self, timeout=None):
        packet = self._wait_for_packet(timeout)
        if "error" not in packet:
            response_packet = self._handler(packet)
            self._serial.write(bytes(json.dumps(response_packet), "utf-8"))
    
    def close_serial(self):
        if self._serial is not None:
            self._serial.close()

This guide was first published on Aug 11, 2021. It was last updated on 2021-08-11 11:03:22 -0400.

This page (Shared RPC Library) was last updated on Sep 15, 2021.

Text editor powered by tinymce.