Download the Project Bundle
Your project will use a specific set of CircuitPython libraries and the code.py file. To get everything you need, click on the Download Project Bundle link below, and uncompress the .zip file.
Drag the contents of the uncompressed bundle directory onto your board's CIRCUITPY drive, replacing any existing files or directories with the same names, and adding any new ones that are necessary.
# SPDX-FileCopyrightText: 2025 John Park for Adafruit Industries
#
# SPDX-License-Identifier: MIT
'''
Espresso Tank Meter
Feather ESP32-S2 with RCWL-1601 Ultrasonic distance sensor
'''
import time
import os
import ssl
import microcontroller
import supervisor
import socketpool
import wifi
import board
import digitalio
import alarm
import neopixel
import adafruit_hcsr04
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
import adafruit_requests
import adafruit_max1704x
# Initialize the power pin for the sensor
sensor_power = digitalio.DigitalInOut(board.A2)
sensor_power.direction = digitalio.Direction.OUTPUT
sensor_power.value = False # Start with sensor powered off
def power_sensor_on():
"""Turn on power to the ultrasonic sensor and wait for it to stabilize."""
sensor_power.value = True
time.sleep(0.55) # Give sensor time to power up and stabilize
def power_sensor_off():
"""Turn off power to the ultrasonic sensor."""
sensor_power.value = False
# Initialize the sonar sensor
sonar = adafruit_hcsr04.HCSR04(trigger_pin=board.A0, echo_pin=board.A1)
# Initialize the battery monitor
i2c = board.I2C() # uses board.SCL and board.SDA
battery_monitor = adafruit_max1704x.MAX17048(i2c)
# Define colors (hex values)
WHITE = 0xFFFFFF
BLUE = 0x0000FF
GREEN = 0x00FF00
YELLOW = 0xFFFF00
RED = 0xFF0000
PINK = 0xbb00bb
CYAN = 0x00bbbb
OFF = 0x000000
# Initialize the NeoPixel
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.25)
# Show yellow on startup
pixel.fill(YELLOW)
# Operating hours (24-hour format with minutes, e.g., "6:35" and "16:00")
OPENING_TIME = "6:00"
CLOSING_TIME = "15:30"
# Normal operation check interval
NORMAL_CHECK_MINUTES = 10
# Sleep duration in seconds during operating hours
SLEEP_DURATION = 60 * NORMAL_CHECK_MINUTES
# Display duration in seconds
DISPLAY_DURATION = 1
# Number of samples to average
NUM_SAMPLES = 5
def parse_time(time_str):
"""Convert time string (HH:MM format) to hours and minutes."""
# pylint: disable=redefined-outer-name
parts = time_str.split(':')
return int(parts[0]), int(parts[1])
def get_average_distance():
"""Take multiple distance readings and return the average."""
power_sensor_on() # Power on the sensor before taking measurements
distances = []
for _ in range(NUM_SAMPLES):
try:
distance = sonar.distance
distances.append(distance)
time.sleep(0.1) # Short delay between readings
except RuntimeError:
print("Error reading distance")
continue
power_sensor_off() # Power off the sensor after measurements
# Only average valid readings
if distances:
return sum(distances) / len(distances)
return None
def set_pixel_color(distance):
"""Set NeoPixel color based on distance."""
if distance is None:
pixel.fill(OFF)
return
if distance < 2:
pixel.fill(WHITE)
elif 2 <= distance < 10:
pixel.fill(BLUE)
elif 10 <= distance < 16:
pixel.fill(GREEN)
elif 18 <= distance < 20:
pixel.fill(YELLOW)
else: # distance >= 22
pixel.fill(RED)
# Wait for things to settle before reading sonar
time.sleep(0.1)
# Get average distance
avg_distance = get_average_distance()
if avg_distance is not None:
if avg_distance >= 22:
# pylint: disable=invalid-name
avg_distance = 22
print(f"Average distance: {avg_distance:.1f} cm")
# Set color based on average distance
set_pixel_color(avg_distance)
# Check battery status
battery_voltage = battery_monitor.cell_voltage
battery_percent = battery_monitor.cell_percent
print(f"Battery: {battery_percent:.1f}% ({battery_voltage:.2f}V)")
# Try connecting to WiFi
try:
print("Connecting to %s" % os.getenv("CIRCUITPY_WIFI_SSID"))
# Show pink while attempting to connect
pixel.fill(PINK)
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
print("Connected to %s" % os.getenv("CIRCUITPY_WIFI_SSID"))
# Show cyan on successful connection
pixel.fill(CYAN)
time.sleep(1) # Brief pause to show the connection success
# pylint: disable=broad-except
except Exception as e:
print("Failed to connect to WiFi. Error:", e, "\nBoard will hard reset in 30 seconds.")
pixel.fill(OFF)
time.sleep(10)
microcontroller.reset()
# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=os.getenv("ADAFRUIT_AIO_USERNAME"),
password=os.getenv("ADAFRUIT_AIO_KEY"),
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
# Initialize Adafruit IO MQTT "helper"
io = IO_MQTT(mqtt_client)
try:
# If Adafruit IO is not connected...
if not io.is_connected:
print("Connecting to Adafruit IO...")
io.connect()
# Get current time from AIO time service
aio_username = os.getenv("ADAFRUIT_AIO_USERNAME")
aio_key = os.getenv("ADAFRUIT_AIO_KEY")
timezone = os.getenv("TIMEZONE")
# pylint: disable=line-too-long
TIME_URL = f"https://io.adafruit.com/api/v2/{aio_username}/integrations/time/strftime?x-aio-key={aio_key}&tz={timezone}"
TIME_URL += "&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z"
print("Getting time from Adafruit IO...")
response = requests.get(TIME_URL)
time_str = response.text.strip() # Remove any leading/trailing whitespace
print("Current time:", time_str)
# Parse the current time from the time string
time_parts = time_str.split()
current_time = time_parts[1].split(':')
current_hour = int(current_time[0])
current_minute = int(current_time[1])
# Get opening and closing hours and minutes
opening_hour, opening_minute = parse_time(OPENING_TIME)
closing_hour, closing_minute = parse_time(CLOSING_TIME)
# Convert all times to minutes for easier comparison
current_minutes = current_hour * 60 + current_minute
opening_minutes = opening_hour * 60 + opening_minute
closing_minutes = closing_hour * 60 + closing_minute
# Check if we're within operating hours
if opening_minutes <= current_minutes < closing_minutes:
print(f"Within operating hours ({OPENING_TIME} to {CLOSING_TIME}), proceeding with measurement")
# Explicitly pump the message loop
io.loop()
# Send the distance data
print(f"Publishing {avg_distance:.1f} to espresso water level feed")
io.publish("espresso-water-tank-level", f"{avg_distance:.1f}")
# Send the battery data
print(f"Publishing {battery_percent:.1f} to battery level feed")
io.publish("espresso-water-sensor-battery", f"{battery_percent:.1f}")
# Make sure the message gets sent
io.loop()
print("Water level sent successfully")
# Keep NeoPixel lit for DISPLAY_DURATION seconds
time.sleep(DISPLAY_DURATION)
# Use normal check interval during operating hours
# # pylint: disable=invalid-name
sleep_seconds = SLEEP_DURATION
print(f"Next check in {NORMAL_CHECK_MINUTES} minutes")
else:
print(f"Outside operating hours ({OPENING_TIME} to {CLOSING_TIME}), going back to sleep")
# Calculate time until next opening
if current_minutes >= closing_minutes:
# After closing, calculate time until opening tomorrow
minutes_until_open = (24 * 60 - current_minutes) + opening_minutes
else:
# Before opening, calculate time until opening today
minutes_until_open = opening_minutes - current_minutes
# Convert minutes to seconds for sleep duration
sleep_seconds = minutes_until_open * 60
hours_until_open = minutes_until_open // 60
minutes_remaining = minutes_until_open % 60
if minutes_remaining:
print(f"Sleeping until {OPENING_TIME} ({hours_until_open} hours, {minutes_remaining} minutes)")
else:
print(f"Sleeping until {OPENING_TIME} ({hours_until_open} hours)")
response.close()
# pylint: disable=broad-except
except Exception as e:
print("Failed to get or send data, or connect. Error:", e,
"\nBoard will hard reset in 30 seconds.")
pixel.fill(OFF)
time.sleep(30)
microcontroller.reset()
else:
print("Failed to get valid distance readings")
pixel.fill(OFF)
# pylint: disable=invalid-name
sleep_seconds = SLEEP_DURATION # Use normal interval if we couldn't get readings
# Prepare for deep sleep
power_sensor_off() # Make sure sensor is powered off before sleep
pixel.brightness = 0 # Turn off NeoPixel
# Flush the serial output before sleep
# pylint: disable=pointless-statement
supervisor.runtime.serial_bytes_available
time.sleep(0.05)
# Create time alarm
time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + sleep_seconds)
# Enter deep sleep
alarm.exit_and_deep_sleep_until_alarms(time_alarm)
How it Works
Setup and Imports
import time, os, ssl, wifi, board, alarm, neopixel
import adafruit_hcsr04, adafruit_minimqtt, adafruit_max1704x
The code uses several libraries to handle hardware interfaces, WiFi connectivity, and power management. Key components are initialized:
- Ultrasonic sensor (HCSR04) on pins A0 and A1
- Battery monitor (MAX17048) via I2C
- NeoPixel LED for status indication
Configuration Constants
OPENING_TIME = "6:00"
CLOSING_TIME = "15:30"
NORMAL_CHECK_MINUTES = 5
SLEEP_DURATION = 60 * NORMAL_CHECK_MINUTES
NUM_SAMPLES = 5
These define the operating schedule and behavior:
- Operating hours from 6 AM to 3:30 PM
- Takes measurements every 5 minutes during operation
- Averages 5 samples per measurement
Distance Measurement
def get_average_distance():
distances = []
for _ in range(NUM_SAMPLES):
try:
distance = sonar.distance
distances.append(distance)
time.sleep(0.1)
except RuntimeError:
print("Error reading distance")
continue
This function:
- Takes multiple readings from the sensor
- Handles potential read errors
- Returns the average of successful readings
Visual Feedback
def set_pixel_color(distance):
if distance < 2:
pixel.fill(WHITE)
elif 2 <= distance < 10:
pixel.fill(BLUE)
# ... etc
The NeoPixel changes color based on the water level:
- Uses different colors to indicate various water levels
- Provides immediate visual feedback about tank status
WiFi and Data Transmission
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=os.getenv("ADAFRUIT_AIO_USERNAME"),
password=os.getenv("ADAFRUIT_AIO_KEY"),
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
The code:
- Connects to WiFi using credentials from environment variables
- Sets up MQTT connection to Adafruit IO
- Uses SSL for secure data transmission
Time Management
TIME_URL = f"https://io.adafruit.com/api/v2/{aio_username}/integrations/time/strftime?..."
response = requests.get(TIME_URL)
time_str = response.text.strip()
The device:
- Gets current time from Adafruit IO
- Parses it to determine if it's within operating hours
- Calculates appropriate sleep duration
Data Reporting
io.publish("espresso-water-tank-level", f"{avg_distance:.1f}")
io.publish("espresso-water-sensor-battery", f"{battery_percent:.1f}")
Sends two pieces of data to Adafruit IO:
- Water level (distance measurement)
- Battery percentage
Power Management
time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + sleep_seconds)
alarm.exit_and_deep_sleep_until_alarms(time_alarm)
The code implements these power-saving features:
- Uses deep sleep between measurements
- Calculates sleep duration based on operating hours
- Turns off NeoPixel during sleep
Page last edited February 11, 2025
Text editor powered by tinymce.