Hardware Setup and Configuration

The first chunk of code imports the secrets.py file containing WiFi details, Adafruit IO credentials, and location metadata.

# Get wifi details and more from a secrets.py file
try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise
# AirLift FeatherWing
esp32_cs = DigitalInOut(board.D13)
esp32_reset = DigitalInOut(board.D12)
esp32_ready = DigitalInOut(board.D11)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)

The PM2.5 sensor is initialized with UART. The BME280 sensor is also initialized with I2C.

# Connect to a PM2.5 sensor over UART
reset_pin = DigitalInOut(board.G0)
reset_pin.direction = Direction.OUTPUT
reset_pin.value = False
uart = busio.UART(board.TX, board.RX, baudrate=9600)
pm25 = PM25_UART(uart, reset_pin)

# Connect to a BME280 sensor over I2C
i2c = busio.I2C(board.SCL, board.SDA)
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)

Reading and Calculating AQI

The calculate_aqi function calculates a real-time qualitative Air Quality Index (AQI). This function does not use the quantitative EPA NOWCast Real-Time AQI function since this code does not store values over time. Values are stored and processed by Adafruit IO.

The function takes a sensor reading of PM2.5 size particles and returns both the AQI value and the AQI category. 

The Air Quality Index developed by the EPA is divided into six categories. Each of the six category names has an index. These categories range from the least amount of health concern ("Good") to immediate health concern where you'll need to don a respirator ("Hazardous"). 

The breakpoints (CLow, CHigh, ILow, IHigh) in this function are provided by the United State's Environmental Protection Agency (EPA), for more information on the AQI calculation check out this Wikipedia article.

### Sensor Functions ###
def calculate_aqi(pm_sensor_reading):
    """Returns a calculated air quality index (AQI)
    and category as a tuple.
    NOTE: The AQI returned by this function should ideally be measured
    using the 24-hour concentration average. Calculating a AQI without
    averaging will result in higher AQI values than expected.
    :param float pm_sensor_reading: Particulate matter sensor value.

    """
    # Check sensor reading using EPA breakpoint (Clow-Chigh)
    if 0.0 <= pm_sensor_reading <= 12.0:
        # AQI calculation using EPA breakpoints (Ilow-IHigh)
        aqi_val = map_range(int(pm_sensor_reading), 0, 12, 0, 50)
        aqi_cat = "Good"
    elif 12.1 <= pm_sensor_reading <= 35.4:
        aqi_val = map_range(int(pm_sensor_reading), 12, 35, 51, 100)
        aqi_cat = "Moderate"
    elif 35.5 <= pm_sensor_reading <= 55.4:
        aqi_val = map_range(int(pm_sensor_reading), 36, 55, 101, 150)
        aqi_cat = "Unhealthy for Sensitive Groups"
    elif 55.5 <= pm_sensor_reading <= 150.4:
        aqi_val = map_range(int(pm_sensor_reading), 56, 150, 151, 200)
        aqi_cat = "Unhealthy"
    elif 150.5 <= pm_sensor_reading <= 250.4:
        aqi_val = map_range(int(pm_sensor_reading), 151, 250, 201, 300)
        aqi_cat = "Very Unhealthy"
    elif 250.5 <= pm_sensor_reading <= 350.4:
        aqi_val = map_range(int(pm_sensor_reading), 251, 350, 301, 400)
        aqi_cat = "Hazardous"
    elif 350.5 <= pm_sensor_reading <= 500.4:
        aqi_val = map_range(int(pm_sensor_reading), 351, 500, 401, 500)
        aqi_cat = "Hazardous"
    else:
        print("Invalid PM2.5 concentration")
        aqi_val = -1
        aqi_cat = None
    return aqi_val, aqi_cat

The sample_aq_sensor function samples a PM2.5 sensor. Since the PlanTower updates the counts every 2.3 seconds, yet outputs data every 1 second. The sensor could possibly take two successive, identical, samples. This function samples the sensor for 2.3 seconds and averages the amount of samples over the number of samples obtained in that time interval.

def sample_aq_sensor():
    """Samples PM2.5 sensor
    over a 2.3 second sample rate.

    """
    aq_reading = 0
    aq_samples = []

    # initial timestamp
    time_start = time.monotonic()
    # sample pm2.5 sensor over 2.3 sec sample rate
    while time.monotonic() - time_start <= 2.3:
        try:
            aqdata = pm25.read()
            aq_samples.append(aqdata["pm25 env"])
        except RuntimeError:
            print("Unable to read from sensor, retrying...")
            continue
        # pm sensor output rate of 1s
        time.sleep(1)
    # average sample reading / # samples
    for sample in range(len(aq_samples)):
        aq_reading += aq_samples[sample]
    aq_reading = aq_reading / len(aq_samples)
    aq_samples.clear()
    return aq_reading

The read_bme_280 function reads the BME280 sensor temperature and humidity and returns it as a tuple. If you set USE_CELSIUS at the top of the code to True, the temperature value will be returned in Celsius instead of Fahrenheit.

def read_bme280(is_celsius=False):
    """Returns temperature and humidity
    from BME280 environmental sensor, as a tuple.

    :param bool is_celsius: Returns temperature in degrees celsius
                            if True, otherwise fahrenheit.
    """
    humid = bme280.humidity
    temp = bme280.temperature
    if not is_celsius:
        temp = temp * 1.8 + 32
    return temperature, humid

Adafruit IO Setup and Configuration

Next is the Adafruit IO initialization and configuration. An instance of the Adafruit IO HTTP client is created.

# Create an instance of the Adafruit IO HTTP client
io = IO_HTTP(secrets['aio_user'], secrets['aio_key'], wifi)

The feeds you created earlier are initialized using calls to get_feed

# Describes feeds used to hold Adafruit IO data
feed_aqi = io.get_feed("air-quality-sensor.aqi")
feed_aqi_category = io.get_feed("air-quality-sensor.category")
feed_humidity = io.get_feed("air-quality-sensor.humidity")
feed_temperature = io.get_feed("air-quality-sensor.temperature")

The location values (latitude, longitude, elevation) are pulled from your secrets file and initialized as a tuple, location_metadata.

# Set up location metadata from secrets.py file
location_metadata = (secrets['latitude'], secrets['longitude'], secrets['elevation'])

Main Loop

The while True loop fetches the current time from Adafruit IO's time API (we don't need a real-time-clock or timezone calculations) and reads/publishes sensor data to Adafruit IO when a time interval elapses. 

Obtaining time from Adafruit IO

The Adafruit IO time service does not replace a time-synchronization service like NTP, but it can help you figure out your local time on an Internet of Things device that doesn't have a built in clock.

Instead of using a software-based timer, this code will fetch the current time from Adafruit IO using a call to receive_time() every 30 seconds. Then, it will keep track of the minutes elapsed.

try:
        print("Fetching time...")
        cur_time = io.receive_time()
        print("Time fetched OK!")
        # Hourly reset
        if cur_time.tm_min == 0:
            prv_mins = 0
    except (ValueError, RuntimeError) as e:
        print("Failed to fetch time, retrying\n", e)
        wifi.reset()
        wifi.connect()
        continue

    if cur_time.tm_min >= prv_mins:
        print("%d min elapsed.."%elapsed_minutes)
        prv_mins = cur_time.tm_min
        elapsed_minutes += 1

Sample and Publish Data to Adafruit IO

When PUBLISH_INTERVAL elapses, the loop will sample the air quality sensor and environmental sensor. Values are printed to the REPL.

Once values are obtained, each value is published to its respective Adafruit IO Feed using calls to io.send_data

Finally, the sensor sleeps 30 seconds before running again.

if elapsed_minutes >= PUBLISH_INTERVAL:
        print("Sampling AQI...")
        aqi_reading = sample_aq_sensor()
        aqi, aqi_category = calculate_aqi(aqi_reading)
        print("AQI: %d"%aqi)
        print("Category: %s"%aqi_category)

        # temp and humidity
        print("Sampling environmental sensor...")
        temperature, humidity = read_bme280(USE_CELSIUS)
        print("Temperature: %0.1f F" % temperature)
        print("Humidity: %0.1f %%" % humidity)

        # Publish all values to Adafruit IO
        print("Publishing to Adafruit IO...")
        try:
            io.send_data(feed_aqi["key"], str(aqi), location_metadata)
            io.send_data(feed_aqi_category["key"], aqi_category)
            io.send_data(feed_temperature["key"], str(temperature))
            io.send_data(feed_humidity["key"], str(humidity))
            print("Published!")
        except (ValueError, RuntimeError) as e:
            print("Failed to send data to IO, retrying\n", e)
            wifi.reset()
            wifi.connect()
            continue
        # Reset timer
        elapsed_minutes = 0
      time.sleep(30)

This guide was first published on Oct 16, 2020. It was last updated on Oct 16, 2020.

This page (Code Walkthrough) was last updated on Oct 09, 2020.

Text editor powered by tinymce.