code.py

code.py

This starts off by setting up a logger that can be used throughout the system.

Download: file
logger = logging.getLogger('main')
logger.setLevel(logging.INFO)

It then sets up the various pieces of hardware and software it will be using to make readings and publish them to AdafruitIO.

Download: file
gps_uart = busio.UART(board.TX, board.RX, baudrate=9600, timeout=3.000)
gps_interface = gps.Gps(gps_uart)
gps_interface.begin()

air_uart = busio.UART(board.A2, board.A3, baudrate=9600)
air = air_quality.AirQualitySensor(air_uart)

i2c = busio.I2C(board.SCL, board.SDA)
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)

aio_interface = aio.AIO()

Readings will be taken every 5 minutes (300 seconds) and the time will be refreshed hourly (3600 seconds) from AdafruitIO. To accomplish this, we'll set up an interval and scheduling time for each.

Download: file
reading_interval = 300.0
reading_time = time.monotonic()

time_update_interval = 3600.0
time_update_time = time.monotonic()

Once everything is set up, the GPS is read to discover where the device is. Note that, since the purpose of the project is to be stationary and collect data, the GPS location is only read once when the system starts up.

Download: file
while True:
    if gps_interface.get_fix():
        break
    logger.error('Failed getting fix... retrying')

gps_interface.read()

Since only the data value and time will change for different posts to AdafruitIO, a template is made that can then be updated for each post.

Download: file
payload = {'value' : 0, 'lat' : gps_interface.latitude, 'lon' : gps_interface.longitude, 'created_at' : ''}

Now that everything is initialized and configured, it's time for the main loop.

The first thing that happens is to grab the current time. It's not the time so much as a time related value. time.monotonic() returns the number of seconds since the system started up. But it's an always increasing number of seconds, and that's exactly what we need for scheduling.

Download: file
now = time.monotonic()

That time value can be used to check against the scheduled times to see whether something needs to be done. First, is it time to refresh the system time? Since we can fetch this from a time server (AdafruitIO conveniently provides one), we don't need to worry about having a hardware RTC: we can just refresh the software RTC occasionally. For this use that's accurate enough.

This is a common approach to scheduling in a single threaded environment: set the time something should happen (the scheduled time), when the current time equals or is later than the scheduled time, set the next scheduled time and do whatever needs doing.

Download: file
if now >= time_update_time:
        time_update_time = now + time_update_interval
        logger.info('refreshing time')
        try:
            aio_interface.refresh_local_time()
        except RuntimeError as e:
            logger.debug('Time refresh failed with: %s', str(e))

The next step is to make and report a set of readings if it's time to do so. It starts by formatting a timestamp and putting it in the payload.

Download: file
    if now >= reading_time:
        reading_time = now + reading_interval
        logger.info('Taking a reading')

        st = time.localtime()
        timestamp = '{0}/{1:02}/{2:02} {3:2}:{4:02}:{5:02}'.format(st.tm_year,
                                                                   st.tm_mon,
                                                                   st.tm_mday,
                                                                   st.tm_hour,
                                                                   st.tm_min,
                                                                   st.tm_sec)
        payload['created_at'] = timestamp
        ...
  

Next, to read and report air quality measurements. To handle times when the sensor might not read successfully, this is conditional. If a fresh reading can't be taken for some reason, the air quality readings are skipped. If a reading was taken, each value in turn is placed in the payload and it is sent to AdafruitIO. If sending fails, this reading cycle is stopped and we hope it works the next time.

Note that the continue statement will cause the program to jump back to the start of the while True: loop.

Download: file
        if air.read():
            logger.info('Air Quality pm10 standard: %d', air.pm10_standard)
            payload['value'] = air.pm10_standard
            if not aio_interface.post('environmental-sensor.pm10-std', payload):
                logger.critical('post of pm10 standard failed')
                continue

            logger.info('Air Quality pm25 standard: %d', air.pm25_standard)
            payload['value'] = air.pm25_standard
            if not aio_interface.post('environmental-sensor.pm25-std', payload):
                logger.critical('post of pm25 standard failed')
                continue

            ...
  

Finally, the BMP280 sensors are read and their readings sent to AdafruitIO in the same way. The loop then repeats.

Download: file
        logger.info('Temperature: %f', bme280.temperature)
        payload['value'] = bme280.temperature
        if not aio_interface.post('environmental-sensor.temperature', payload):
            logger.critical('post of temperature failed')
            continue

        logger.info('Humidity: %f', bme280.humidity)
        payload['value'] = bme280.humidity
        if not aio_interface.post('environmental-sensor.humidity', payload):
            logger.critical('post of humidity failed')
            continue

        logger.info('Pressure: %f', bme280.pressure)
        payload['value'] = bme280.pressure
        if not aio_interface.post('environmental-sensor.pressure', payload):
            logger.critical('post of pressure failed')
            continue

        logger.info('Waiting for next reading')
This guide was first published on May 01, 2019. It was last updated on May 01, 2019. This page (code.py) was last updated on Jun 12, 2019.