The first chunk of code imports the secrets.py file containing WiFi details, Adafruit IO credentials, and location metadata.
# Get wifi details and more from a secrets.py file try: from secrets import secrets except ImportError: print("WiFi secrets are kept in secrets.py, please add them there!") raise
# AirLift FeatherWing esp32_cs = DigitalInOut(board.D13) esp32_reset = DigitalInOut(board.D12) esp32_ready = DigitalInOut(board.D11) spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
The PM2.5 sensor is initialized with UART. The BME280 sensor is also initialized with I2C.
# Connect to a PM2.5 sensor over UART reset_pin = DigitalInOut(board.G0) reset_pin.direction = Direction.OUTPUT reset_pin.value = False uart = busio.UART(board.TX, board.RX, baudrate=9600) pm25 = PM25_UART(uart, reset_pin) # Connect to a BME280 sensor over I2C i2c = busio.I2C(board.SCL, board.SDA) bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)
The calculate_aqi
function calculates a real-time qualitative Air Quality Index (AQI). This function does not use the quantitative EPA NOWCast Real-Time AQI function since this code does not store values over time. Values are stored and processed by Adafruit IO.
The function takes a sensor reading of PM2.5 size particles and returns both the AQI value and the AQI category.
The Air Quality Index developed by the EPA is divided into six categories. Each of the six category names has an index. These categories range from the least amount of health concern ("Good") to immediate health concern where you'll need to don a respirator ("Hazardous").
The breakpoints (CLow, CHigh, ILow, IHigh) in this function are provided by the United State's Environmental Protection Agency (EPA), for more information on the AQI calculation check out this Wikipedia article.
- If you are in a country other than the United States, this Wikipedia article contains air quality indices for locations all over the world.
### Sensor Functions ### def calculate_aqi(pm_sensor_reading): """Returns a calculated air quality index (AQI) and category as a tuple. NOTE: The AQI returned by this function should ideally be measured using the 24-hour concentration average. Calculating a AQI without averaging will result in higher AQI values than expected. :param float pm_sensor_reading: Particulate matter sensor value. """ # Check sensor reading using EPA breakpoint (Clow-Chigh) if 0.0 <= pm_sensor_reading <= 12.0: # AQI calculation using EPA breakpoints (Ilow-IHigh) aqi_val = map_range(int(pm_sensor_reading), 0, 12, 0, 50) aqi_cat = "Good" elif 12.1 <= pm_sensor_reading <= 35.4: aqi_val = map_range(int(pm_sensor_reading), 12, 35, 51, 100) aqi_cat = "Moderate" elif 35.5 <= pm_sensor_reading <= 55.4: aqi_val = map_range(int(pm_sensor_reading), 36, 55, 101, 150) aqi_cat = "Unhealthy for Sensitive Groups" elif 55.5 <= pm_sensor_reading <= 150.4: aqi_val = map_range(int(pm_sensor_reading), 56, 150, 151, 200) aqi_cat = "Unhealthy" elif 150.5 <= pm_sensor_reading <= 250.4: aqi_val = map_range(int(pm_sensor_reading), 151, 250, 201, 300) aqi_cat = "Very Unhealthy" elif 250.5 <= pm_sensor_reading <= 350.4: aqi_val = map_range(int(pm_sensor_reading), 251, 350, 301, 400) aqi_cat = "Hazardous" elif 350.5 <= pm_sensor_reading <= 500.4: aqi_val = map_range(int(pm_sensor_reading), 351, 500, 401, 500) aqi_cat = "Hazardous" else: print("Invalid PM2.5 concentration") aqi_val = -1 aqi_cat = None return aqi_val, aqi_cat
The sample_aq_sensor
function samples a PM2.5 sensor. Since the PlanTower updates the counts every 2.3 seconds, yet outputs data every 1 second. The sensor could possibly take two successive, identical, samples. This function samples the sensor for 2.3 seconds and averages the amount of samples over the number of samples obtained in that time interval.
- For more information about how the PM2.5 sensor calculates and outputs data, check out this page of the PM2.5 sensor's learn guide featuring StanJ's analysis report.
def sample_aq_sensor(): """Samples PM2.5 sensor over a 2.3 second sample rate. """ aq_reading = 0 aq_samples = [] # initial timestamp time_start = time.monotonic() # sample pm2.5 sensor over 2.3 sec sample rate while time.monotonic() - time_start <= 2.3: try: aqdata = pm25.read() aq_samples.append(aqdata["pm25 env"]) except RuntimeError: print("Unable to read from sensor, retrying...") continue # pm sensor output rate of 1s time.sleep(1) # average sample reading / # samples for sample in range(len(aq_samples)): aq_reading += aq_samples[sample] aq_reading = aq_reading / len(aq_samples) aq_samples.clear() return aq_reading
The read_bme_280
function reads the BME280 sensor temperature and humidity and returns it as a tuple. If you set USE_CELSIUS
at the top of the code to True
, the temperature value will be returned in Celsius instead of Fahrenheit.
def read_bme280(is_celsius=False): """Returns temperature and humidity from BME280 environmental sensor, as a tuple. :param bool is_celsius: Returns temperature in degrees celsius if True, otherwise fahrenheit. """ humid = bme280.humidity temp = bme280.temperature if not is_celsius: temp = temp * 1.8 + 32 return temperature, humid
Next is the Adafruit IO initialization and configuration. An instance of the Adafruit IO HTTP client is created.
# Create an instance of the Adafruit IO HTTP client io = IO_HTTP(secrets['aio_user'], secrets['aio_key'], wifi)
The feeds you created earlier are initialized using calls to get_feed
.
# Describes feeds used to hold Adafruit IO data feed_aqi = io.get_feed("air-quality-sensor.aqi") feed_aqi_category = io.get_feed("air-quality-sensor.category") feed_humidity = io.get_feed("air-quality-sensor.humidity") feed_temperature = io.get_feed("air-quality-sensor.temperature")
The location values (latitude, longitude, elevation) are pulled from your secrets file and initialized as a tuple, location_metadata
.
# Set up location metadata from secrets.py file location_metadata = (secrets['latitude'], secrets['longitude'], secrets['elevation'])
Main Loop
The while True
loop fetches the current time from Adafruit IO's time API (we don't need a real-time-clock or timezone calculations) and reads/publishes sensor data to Adafruit IO when a time interval elapses.
Obtaining time from Adafruit IO
The Adafruit IO time service does not replace a time-synchronization service like NTP, but it can help you figure out your local time on an Internet of Things device that doesn't have a built in clock.
Instead of using a software-based timer, this code will fetch the current time from Adafruit IO using a call to receive_time()
every 30 seconds. Then, it will keep track of the minutes elapsed.
try: print("Fetching time...") cur_time = io.receive_time() print("Time fetched OK!") # Hourly reset if cur_time.tm_min == 0: prv_mins = 0 except (ValueError, RuntimeError) as e: print("Failed to fetch time, retrying\n", e) wifi.reset() wifi.connect() continue if cur_time.tm_min >= prv_mins: print("%d min elapsed.."%elapsed_minutes) prv_mins = cur_time.tm_min elapsed_minutes += 1
Sample and Publish Data to Adafruit IO
When PUBLISH_INTERVAL
elapses, the loop will sample the air quality sensor and environmental sensor. Values are printed to the REPL.
Once values are obtained, each value is published to its respective Adafruit IO Feed using calls to io.send_data
.
Finally, the sensor sleeps 30 seconds before running again.
if elapsed_minutes >= PUBLISH_INTERVAL: print("Sampling AQI...") aqi_reading = sample_aq_sensor() aqi, aqi_category = calculate_aqi(aqi_reading) print("AQI: %d"%aqi) print("Category: %s"%aqi_category) # temp and humidity print("Sampling environmental sensor...") temperature, humidity = read_bme280(USE_CELSIUS) print("Temperature: %0.1f F" % temperature) print("Humidity: %0.1f %%" % humidity) # Publish all values to Adafruit IO print("Publishing to Adafruit IO...") try: io.send_data(feed_aqi["key"], str(aqi), location_metadata) io.send_data(feed_aqi_category["key"], aqi_category) io.send_data(feed_temperature["key"], str(temperature)) io.send_data(feed_humidity["key"], str(humidity)) print("Published!") except (ValueError, RuntimeError) as e: print("Failed to send data to IO, retrying\n", e) wifi.reset() wifi.connect() continue # Reset timer elapsed_minutes = 0 time.sleep(30)
Text editor powered by tinymce.