Here we have a class that manages all interaction with AdafruitIO via the ESP32 AirLift.

The file starts with imports, logger fetching, some string constants for the time service and the import of the secrets file.

import busio
import gc
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
import adafruit_esp32spi.adafruit_esp32spi_requests as requests
import adafruit_logging as logging
import rtc

logger = logging.getLogger('main')

TIME_SERVICE = "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s"
# our strftime is %Y-%m-%d %H:%M:%S.%L %j %u %z %Z see http://strftime.net/ for decoding details
# See https://apidock.com/ruby/DateTime/strftime for full options
TIME_SERVICE_STRFTIME = '&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z'

# Get wifi details and more from a settings.py file
try:
    from secrets import secrets
except ImportError:
    logger.critical('WiFi settings are kept in settings.py, please add them there!')
    raise

Constructing the class sets up the SPI interface to the AirLift and initializes the web interface.

    def __init__(self):
        esp32_cs = DigitalInOut(board.D10)
        esp32_ready = DigitalInOut(board.D9)
        esp32_reset = DigitalInOut(board.D6)

        spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
        self._esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)

        if self._esp.status == adafruit_esp32spi.WL_IDLE_STATUS:
            logger.debug('ESP32 found and in idle mode')
        logger.info('Firmware vers. %s', self._esp.firmware_version)
        logger.info('MAC addr: %s', ':'.join([hex(i)[2:4] for i in self._esp.MAC_address]))

        requests.set_interface(self._esp)
  

There is a convenience method for connecting to WiFi.

    def connect(self):
        logger.debug("Connecting...")
        while not self._esp.is_connected:
            try:
                self._esp.connect_AP(secrets['ssid'], secrets['password'])
            except RuntimeError as e:
                logger.error("could not connect to AP, retrying: %s", e)
                continue

Now we have the main methods. First one to post a payload dictionary to AdafruitIO.

It ensures that there is a live connection to WiFi. Then it tries to post to AdafruitIO. It will make 5 attempts before giving up. If the post gets processed ok, it closes the response and returns whether the post resulted in a 200 status (i.e. OK). If a RuntimeError was raised by the post, the ESP32 is reset, it reconnects to WiFi, and tries again. In practice an exception is occasionally raised, but a reset and retry succeeds.

    def post(self, feed, payload):
        api_url = 'https://io.adafruit.com/api/v2/{0}/feeds/{1}/data'.format(secrets['aio_username'], feed)
        logger.info('POSTing to %s', api_url)
        logger.info('payload: %s', str(payload))
        auth_header = {'X-AIO-KEY':secrets['aio_key']}
        self.connect()
        r = None
        tries = 0
        while True:
            if tries == 5:
                return False
            tries += 1
            try:
                r = requests.post(api_url, headers=auth_header, json=payload)
                logger.info('Status: %d', r.status_code)
                if r.status_code == 200:
                    logger.debug('Headers: %s', str(r.headers))
                    logger.debug('Text: %s', str(r.json()))
                else:
                    logger.debug('Text: %s', str(r.json()))
                break
            except RuntimeError as err:
                logger.error('Error posting: %s', str(err))
                logger.info('Resetting and reconnecting')
                self._esp.reset()
                self.connect()
        r.close()
        return r.status_code == 200

Finally there is the method to update the time from AdafruitIO. This is copied from the PyPortal library.

First it makes sure an AdafruitIO username and password as well a timezone specification have been provided in the secrets file. If so, the time fetch URL is constructed and a GET is used to fetch the time information. If that returns successfully, the result text is torn apart to extract the date and time. That's used to set the system's software RTC as well as being returned.

def refresh_local_time(self):
        # pylint: disable=line-too-long
        """Fetch and "set" the local time of this microcontroller to the local time at the location, using an internet time API.
        Copied from adafruit_pyportal
        :param str location: Your city and country, e.g. ``"New York, US"``.
        """
        # pylint: enable=line-too-long
        api_url = None
        try:
            aio_username = secrets['aio_username']
            aio_key = secrets['aio_key']
        except KeyError:
            raise KeyError("\n\nOur time service requires a login/password to rate-limit. Please register for a free adafruit.io account and place the user/key in your secrets file under 'aio_username' and 'aio_key'")# pylint: disable=line-too-long

        location = secrets['timezone']
        if location:
            logger.debug('Getting time for timezone %s', location)
            api_url = (TIME_SERVICE + "&tz=%s") % (aio_username, aio_key, location)
        else: # we'll try to figure it out from the IP address
            logger.debug("Getting time from IP address")
            api_url = TIME_SERVICE % (aio_username, aio_key)
        api_url += TIME_SERVICE_STRFTIME
        logger.debug('Requesting time from %s', api_url)
        try:
            self.connect()
            response = requests.get(api_url)
            logger.debug('Time reply: %s', response.text)
            times = response.text.split(' ')
            the_date = times[0]
            the_time = times[1]
            year_day = int(times[2])
            week_day = int(times[3])
            is_dst = None  # no way to know yet
        except KeyError:
            raise KeyError("Was unable to lookup the time, try setting secrets['timezone'] according to http://worldtimeapi.org/timezones")  # pylint: disable=line-too-long
        year, month, mday = [int(x) for x in the_date.split('-')]
        the_time = the_time.split('.')[0]
        hours, minutes, seconds = [int(x) for x in the_time.split(':')]
        now = time.struct_time((year, month, mday, hours, minutes, seconds, week_day, year_day,
                                is_dst))
        rtc.RTC().datetime = now
        logger.debug('Fetched time: %s', str(now))

        # now clean up
        response.close()
        response = None
        gc.collect()
        return now

This guide was first published on May 01, 2019. It was last updated on Nov 28, 2023.

This page (aio.py) was last updated on Apr 20, 2019.

Text editor powered by tinymce.