Here we have a class that manages all interaction with AdafruitIO via the ESP32 AirLift.
The file starts with imports, logger fetching, some string constants for the time service and the fetching of WIFI credentials, and AIO username and token from the settings.toml file with os.getenv()
from os import getenv
import time
import gc
import board
import busio
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
import adafruit_esp32spi.adafruit_esp32spi_requests as requests
import adafruit_logging as logging
import rtc
logger = logging.getLogger('main')
if not logger.hasHandlers():
logger.addHandler(logging.StreamHandler())
TIME_SERVICE = "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s"
# our strftime is %Y-%m-%d %H:%M:%S.%L %j %u %z %Z see http://strftime.net/ for decoding details
# See https://apidock.com/ruby/DateTime/strftime for full options
TIME_SERVICE_STRFTIME = '&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z'
# Get WiFi details and Adafruit IO keys, ensure these are setup in settings.toml
# (visit io.adafruit.com if you need to create an account, or if you need your Adafruit IO key.)
ssid = getenv("CIRCUITPY_WIFI_SSID")
password = getenv("CIRCUITPY_WIFI_PASSWORD")
aio_username = getenv("ADAFRUIT_AIO_USERNAME")
aio_key = getenv("ADAFRUIT_AIO_KEY")
if None in [ssid, password, aio_username, aio_key]:
raise RuntimeError(
"WiFi and Adafruit IO settings are kept in settings.toml, "
"please add them there. The settings file must contain "
"'CIRCUITPY_WIFI_SSID', 'CIRCUITPY_WIFI_PASSWORD', "
"'ADAFRUIT_AIO_USERNAME' and 'ADAFRUIT_AIO_KEY' at a minimum."
)
Constructing the class sets up the SPI interface to the AirLift and initializes the web interface.
def __init__(self):
try:
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_busy = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)
self._onboard_esp = True
except AttributeError:
esp32_cs = DigitalInOut(board.D10)
esp32_busy = DigitalInOut(board.D9)
esp32_reset = DigitalInOut(board.D6)
self._onboard_esp = False
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
self._esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_busy, esp32_reset)
if self._esp.status == adafruit_esp32spi.WL_IDLE_STATUS:
logger.debug('ESP32 found and in idle mode')
logger.info('Firmware vers. %s', self._esp.firmware_version)
logger.info('MAC addr: %s', ':'.join([hex(i)[2:4] for i in self._esp.MAC_address]))
requests.set_interface(self._esp)
There is a convenience method for connecting to WiFi.
def connect(self):
logger.debug("Connecting...")
while not self._esp.is_connected:
try:
self._esp.connect_AP(ssid, password)
except RuntimeError as e:
logger.error("could not connect to AP, retrying: %s", e)
continue
Now we have the main methods. First one to post a payload dictionary to AdafruitIO.
It ensures that there is a live connection to WiFi. Then it tries to post to AdafruitIO. It will make 5 attempts before giving up. If the post gets processed ok, it closes the response and returns whether the post resulted in a 200 status (i.e. OK). If a RuntimeError was raised by the post, the ESP32 is reset, it reconnects to WiFi, and tries again. In practice an exception is occasionally raised, but a reset and retry succeeds.
def post(self, feed, payload):
api_url = 'https://io.adafruit.com/api/v2/{0}/feeds/{1}/data'.format(secrets['aio_username'], feed)
logger.info('POSTing to %s', api_url)
logger.info('payload: %s', str(payload))
auth_header = {'X-AIO-KEY':secrets['aio_key']}
self.connect()
r = None
tries = 0
while True:
if tries == 5:
return False
tries += 1
try:
r = requests.post(api_url, headers=auth_header, json=payload)
logger.info('Status: %d', r.status_code)
if r.status_code == 200:
logger.debug('Headers: %s', str(r.headers))
logger.debug('Text: %s', str(r.json()))
else:
logger.debug('Text: %s', str(r.json()))
break
except RuntimeError as err:
logger.error('Error posting: %s', str(err))
logger.info('Resetting and reconnecting')
self._esp.reset()
self.connect()
r.close()
return r.status_code == 200
Finally there is the method to update the time from AdafruitIO. This is copied from the PyPortal library.
First it makes sure an AdafruitIO username and password as well a timezone specification have been provided in the secrets file. If so, the time fetch URL is constructed and a GET is used to fetch the time information. If that returns successfully, the result text is torn apart to extract the date and time. That's used to set the system's software RTC as well as being returned.
def refresh_local_time(self):
# pylint: disable=line-too-long
"""Fetch and "set" the local time of this microcontroller to the local time at the location, using an internet time API.
Copied from adafruit_pyportal
:param str location: Your city and country, e.g. ``"New York, US"``.
"""
# pylint: enable=line-too-long
api_url = None
try:
aio_username = secrets['aio_username']
aio_key = secrets['aio_key']
except KeyError:
raise KeyError("\n\nOur time service requires a login/password to rate-limit. Please register for a free adafruit.io account and place the user/key in your secrets file under 'aio_username' and 'aio_key'")# pylint: disable=line-too-long
location = secrets['timezone']
if location:
logger.debug('Getting time for timezone %s', location)
api_url = (TIME_SERVICE + "&tz=%s") % (aio_username, aio_key, location)
else: # we'll try to figure it out from the IP address
logger.debug("Getting time from IP address")
api_url = TIME_SERVICE % (aio_username, aio_key)
api_url += TIME_SERVICE_STRFTIME
logger.debug('Requesting time from %s', api_url)
try:
self.connect()
response = requests.get(api_url)
logger.debug('Time reply: %s', response.text)
times = response.text.split(' ')
the_date = times[0]
the_time = times[1]
year_day = int(times[2])
week_day = int(times[3])
is_dst = None # no way to know yet
except KeyError:
raise KeyError("Was unable to lookup the time, try setting secrets['timezone'] according to http://worldtimeapi.org/timezones") # pylint: disable=line-too-long
year, month, mday = [int(x) for x in the_date.split('-')]
the_time = the_time.split('.')[0]
hours, minutes, seconds = [int(x) for x in the_time.split(':')]
now = time.struct_time((year, month, mday, hours, minutes, seconds, week_day, year_day,
is_dst))
rtc.RTC().datetime = now
logger.debug('Fetched time: %s', str(now))
# now clean up
response.close()
response = None
gc.collect()
return now
Page last edited March 25, 2025
Text editor powered by tinymce.