Here we have a class that manages all interaction with AdafruitIO via the ESP32 AirLift.
The file starts with imports, logger fetching, some string constants for the time service and the import of the secrets file.
import busio import gc from digitalio import DigitalInOut from adafruit_esp32spi import adafruit_esp32spi import adafruit_esp32spi.adafruit_esp32spi_requests as requests import adafruit_logging as logging import rtc logger = logging.getLogger('main') TIME_SERVICE = "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s" # our strftime is %Y-%m-%d %H:%M:%S.%L %j %u %z %Z see http://strftime.net/ for decoding details # See https://apidock.com/ruby/DateTime/strftime for full options TIME_SERVICE_STRFTIME = '&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z' # Get wifi details and more from a settings.py file try: from secrets import secrets except ImportError: logger.critical('WiFi settings are kept in settings.py, please add them there!') raise
Constructing the class sets up the SPI interface to the AirLift and initializes the web interface.
def __init__(self): esp32_cs = DigitalInOut(board.D10) esp32_ready = DigitalInOut(board.D9) esp32_reset = DigitalInOut(board.D6) spi = busio.SPI(board.SCK, board.MOSI, board.MISO) self._esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) if self._esp.status == adafruit_esp32spi.WL_IDLE_STATUS: logger.debug('ESP32 found and in idle mode') logger.info('Firmware vers. %s', self._esp.firmware_version) logger.info('MAC addr: %s', ':'.join([hex(i)[2:4] for i in self._esp.MAC_address])) requests.set_interface(self._esp)
There is a convenience method for connecting to WiFi.
def connect(self): logger.debug("Connecting...") while not self._esp.is_connected: try: self._esp.connect_AP(secrets['ssid'], secrets['password']) except RuntimeError as e: logger.error("could not connect to AP, retrying: %s", e) continue
Now we have the main methods. First one to post a payload dictionary to AdafruitIO.
It ensures that there is a live connection to WiFi. Then it tries to post to AdafruitIO. It will make 5 attempts before giving up. If the post gets processed ok, it closes the response and returns whether the post resulted in a 200 status (i.e. OK). If a RuntimeError
was raised by the post, the ESP32 is reset, it reconnects to WiFi, and tries again. In practice an exception is occasionally raised, but a reset and retry succeeds.
def post(self, feed, payload): api_url = 'https://io.adafruit.com/api/v2/{0}/feeds/{1}/data'.format(secrets['aio_username'], feed) logger.info('POSTing to %s', api_url) logger.info('payload: %s', str(payload)) auth_header = {'X-AIO-KEY':secrets['aio_key']} self.connect() r = None tries = 0 while True: if tries == 5: return False tries += 1 try: r = requests.post(api_url, headers=auth_header, json=payload) logger.info('Status: %d', r.status_code) if r.status_code == 200: logger.debug('Headers: %s', str(r.headers)) logger.debug('Text: %s', str(r.json())) else: logger.debug('Text: %s', str(r.json())) break except RuntimeError as err: logger.error('Error posting: %s', str(err)) logger.info('Resetting and reconnecting') self._esp.reset() self.connect() r.close() return r.status_code == 200
Finally there is the method to update the time from AdafruitIO. This is copied from the PyPortal library.
First it makes sure an AdafruitIO username and password as well a timezone specification have been provided in the secrets file. If so, the time fetch URL is constructed and a GET is used to fetch the time information. If that returns successfully, the result text is torn apart to extract the date and time. That's used to set the system's software RTC as well as being returned.
def refresh_local_time(self): # pylint: disable=line-too-long """Fetch and "set" the local time of this microcontroller to the local time at the location, using an internet time API. Copied from adafruit_pyportal :param str location: Your city and country, e.g. ``"New York, US"``. """ # pylint: enable=line-too-long api_url = None try: aio_username = secrets['aio_username'] aio_key = secrets['aio_key'] except KeyError: raise KeyError("\n\nOur time service requires a login/password to rate-limit. Please register for a free adafruit.io account and place the user/key in your secrets file under 'aio_username' and 'aio_key'")# pylint: disable=line-too-long location = secrets['timezone'] if location: logger.debug('Getting time for timezone %s', location) api_url = (TIME_SERVICE + "&tz=%s") % (aio_username, aio_key, location) else: # we'll try to figure it out from the IP address logger.debug("Getting time from IP address") api_url = TIME_SERVICE % (aio_username, aio_key) api_url += TIME_SERVICE_STRFTIME logger.debug('Requesting time from %s', api_url) try: self.connect() response = requests.get(api_url) logger.debug('Time reply: %s', response.text) times = response.text.split(' ') the_date = times[0] the_time = times[1] year_day = int(times[2]) week_day = int(times[3]) is_dst = None # no way to know yet except KeyError: raise KeyError("Was unable to lookup the time, try setting secrets['timezone'] according to http://worldtimeapi.org/timezones") # pylint: disable=line-too-long year, month, mday = [int(x) for x in the_date.split('-')] the_time = the_time.split('.')[0] hours, minutes, seconds = [int(x) for x in the_time.split(':')] now = time.struct_time((year, month, mday, hours, minutes, seconds, week_day, year_day, is_dst)) rtc.RTC().datetime = now logger.debug('Fetched time: %s', str(now)) # now clean up response.close() response = None gc.collect() return now
Text editor powered by tinymce.