This starts off by setting up a logger that can be used throughout the system.

logger = logging.getLogger('main')

It then sets up the various pieces of hardware and software it will be using to make readings and publish them to AdafruitIO.

gps_uart = busio.UART(board.TX, board.RX, baudrate=9600, timeout=3.000)
gps_interface = gps.Gps(gps_uart)

air_uart = busio.UART(board.A2, board.A3, baudrate=9600)
air = air_quality.AirQualitySensor(air_uart)

i2c = busio.I2C(board.SCL, board.SDA)
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)

aio_interface = aio.AIO()

Readings will be taken every 5 minutes (300 seconds) and the time will be refreshed hourly (3600 seconds) from AdafruitIO. To accomplish this, we'll set up an interval and scheduling time for each.

reading_interval = 300.0
reading_time = time.monotonic()

time_update_interval = 3600.0
time_update_time = time.monotonic()

Once everything is set up, the GPS is read to discover where the device is. Note that, since the purpose of the project is to be stationary and collect data, the GPS location is only read once when the system starts up.

while True:
    if gps_interface.get_fix():
    logger.error('Failed getting fix... retrying')

Since only the data value and time will change for different posts to AdafruitIO, a template is made that can then be updated for each post.

payload = {'value' : 0, 'lat' : gps_interface.latitude, 'lon' : gps_interface.longitude, 'created_at' : ''}

Now that everything is initialized and configured, it's time for the main loop.

The first thing that happens is to grab the current time. It's not the time so much as a time related value. time.monotonic() returns the number of seconds since the system started up. But it's an always increasing number of seconds, and that's exactly what we need for scheduling.

now = time.monotonic()

That time value can be used to check against the scheduled times to see whether something needs to be done. First, is it time to refresh the system time? Since we can fetch this from a time server (AdafruitIO conveniently provides one), we don't need to worry about having a hardware RTC: we can just refresh the software RTC occasionally. For this use that's accurate enough.

This is a common approach to scheduling in a single threaded environment: set the time something should happen (the scheduled time), when the current time equals or is later than the scheduled time, set the next scheduled time and do whatever needs doing.

if now >= time_update_time:
        time_update_time = now + time_update_interval'refreshing time')
        except RuntimeError as e:
            logger.debug('Time refresh failed with: %s', str(e))

The next step is to make and report a set of readings if it's time to do so. It starts by formatting a timestamp and putting it in the payload.

    if now >= reading_time:
        reading_time = now + reading_interval'Taking a reading')

        st = time.localtime()
        timestamp = '{0}/{1:02}/{2:02} {3:2}:{4:02}:{5:02}'.format(st.tm_year,
        payload['created_at'] = timestamp

Next, to read and report air quality measurements. To handle times when the sensor might not read successfully, this is conditional. If a fresh reading can't be taken for some reason, the air quality readings are skipped. If a reading was taken, each value in turn is placed in the payload and it is sent to AdafruitIO. If sending fails, this reading cycle is stopped and we hope it works the next time.

Note that the continue statement will cause the program to jump back to the start of the while True: loop.

  'Air Quality pm10 standard: %d', air.pm10_standard)
            payload['value'] = air.pm10_standard
            if not'environmental-sensor.pm10-std', payload):
                logger.critical('post of pm10 standard failed')

  'Air Quality pm25 standard: %d', air.pm25_standard)
            payload['value'] = air.pm25_standard
            if not'environmental-sensor.pm25-std', payload):
                logger.critical('post of pm25 standard failed')


Finally, the BMP280 sensors are read and their readings sent to AdafruitIO in the same way. The loop then repeats.'Temperature: %f', bme280.temperature)
        payload['value'] = bme280.temperature
        if not'environmental-sensor.temperature', payload):
            logger.critical('post of temperature failed')
            continue'Humidity: %f', bme280.humidity)
        payload['value'] = bme280.humidity
        if not'environmental-sensor.humidity', payload):
            logger.critical('post of humidity failed')
            continue'Pressure: %f', bme280.pressure)
        payload['value'] = bme280.pressure
        if not'environmental-sensor.pressure', payload):
            logger.critical('post of pressure failed')
            continue'Waiting for next reading')

This guide was first published on May 01, 2019. It was last updated on May 01, 2019.

This page ( was last updated on Mar 26, 2021.

Text editor powered by tinymce.