Let's start out with the code that goes onto the FunHouse.
MQTT Secrets Settings
Since the code publishes directly to the MQTT server, there are a few more secrets.py file settings that the code expects to find. If your MQTT server has no username and password, you can change the value to None
, however in general, the Home Assistant MQTT broker is set up to be password protected by default.
MQTT_BROKER = "192.168.1.1" MQTT_PORT = 1883 MQTT_USERNAME = "myusername" MQTT_PASSWORD = "mypassword"
To add code and libraries to your FunHouse, click the Download Project Bundle button to get the code and all of the libraries.
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries # SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries # # SPDX-License-Identifier: MIT import os import time import board import digitalio import analogio from displayio import CIRCUITPYTHON_TERMINAL from adafruit_display_shapes.circle import Circle from adafruit_funhouse import FunHouse BOWL_STATE_TOPIC = "funhouse/catbowl/state" LOW_VALUE = 4000 EMPTY_VALUE = 2000 UPDATE_INTERVAL = 1800 # Every 30 minutes # Text labels for the Display states = { "empty": "Add Water", "low": "Low", "full": "Full", } def publish_bowl_state(state): funhouse.peripherals.led = True # Publish the Bowl Level State print("Publishing to {}".format(BOWL_STATE_TOPIC)) funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, state) funhouse.peripherals.led = False def connected(_client, _userdata, _result, _payload): status.fill = 0x00FF00 status.outline = 0x008800 def disconnected(_client): status.fill = 0xFF0000 status.outline = 0x880000 def get_bowl_reading(): water_enable.value = True level = water_level_sensor.value water_enable.value = False return level def get_bowl_state(level): if level <= EMPTY_VALUE: return "empty" elif level <= LOW_VALUE: return "low" return "full" def bowl_level_display(level): if funhouse.peripherals.button_sel: return level return states[get_bowl_state(level)] # Set Initial States funhouse = FunHouse(default_bg=0x0F0F00) funhouse.peripherals.dotstars.fill(0) water_enable = digitalio.DigitalInOut(board.A0) water_enable.switch_to_output() water_level_sensor = analogio.AnalogIn(board.A1) funhouse.display.root_group = CIRCUITPYTHON_TERMINAL funhouse.add_text( text="Bowl Level:", text_position=(120, 60), text_anchor_point=(0.5, 0.5), text_color=0xFF0000, text_font="fonts/Arial-Bold-24.pcf", ) level_label = funhouse.add_text( text_position=(120, 100), text_anchor_point=(0.5, 0.5), text_color=0xFFFF00, text_font="fonts/Arial-Bold-24.pcf", ) funhouse.display.root_group = funhouse.splash status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000) funhouse.splash.append(status) # Initialize a new MQTT Client object funhouse.network.init_mqtt( os.getenv("MQTT_BROKER"), os.getenv("MQTT_PORT"), os.getenv("MQTT_USERNAME"), os.getenv("MQTT_PASSWORD"), ) funhouse.network.on_mqtt_connect = connected funhouse.network.on_mqtt_disconnect = disconnected print("Attempting to connect to {}".format(os.getenv("MQTT_BROKER"))) funhouse.network.mqtt_connect() last_reading_timestamp = None last_bowl_state = None while True: if ( last_reading_timestamp is None or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL ): # Take Reading water_level = get_bowl_reading() # Update Display funhouse.set_text(bowl_level_display(water_level), level_label) # If changed, publish new result bowl_state = get_bowl_state(water_level) if bowl_state != last_bowl_state: publish_bowl_state(bowl_state) last_bowl_state = bowl_state last_reading_timestamp = time.monotonic()
Copy these over to the CIRCUITPY drive for your FunHouse board in the root directory along with your secrets.py file. The files on your board should look like this:
Code Walkthrough
Now to cover the code in sections. First are library imports. This includes the FunHouse
library, the Circle
to display if the board is connected, time
for checking in intervals, and finally board
, analogio
, and digitalio
for enabling and reading the sensor itself.
import os import time import board import digitalio import analogio from adafruit_display_shapes.circle import Circle from adafruit_funhouse import FunHouse
Next up is the MQTT topic, BOWL_STATE_TOPIC
. The script will publish to the state topic if the reading has changed enough to let Home Assistant know the new state of the bowl.
LOW_VALUE
is the raw analog reading for the sensor when the water is low. Anything above this value is considered full. EMPTY_VALUE
is the raw analog reading for the sensor when the water is empty. Anything at or below this value is considered empty.
UPDATE_INTERVAL
is the amount of time to wait in seconds for the FunHouse to check readings. By default this is once every half hour, which should be sufficient for something that changes very little over the course of a day.
BOWL_STATE_TOPIC = "funhouse/catbowl/state" LOW_VALUE = 4000 EMPTY_VALUE = 2000 UPDATE_INTERVAL = 1800 # Every 30 minutes
This section contains a dict of different states. This is a convenient way to change the text that is shown on the display for each of these states.
# Text labels for the Display states = { "empty": "Add Water", "low": "Low", "full": "Full", }
This function will publish the current state of the bowl to MQTT, and thus Home Assistant. The output in this case is a raw text string value corresponding to one of the keys in the above dict that is published to the BOWL_STATE_TOPIC
.
def publish_bowl_state(state): funhouse.peripherals.led = True # Publish the Bowl Level State print("Publishing to {}".format(BOWL_STATE_TOPIC)) funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, state) funhouse.peripherals.led = False
The next couple of functions are used for changing the circle to red or green depending on the connection status.
def connected(_client, _userdata, _result, _payload): status.fill = 0x00FF00 status.outline = 0x008800 def disconnected(_client): status.fill = 0xFF0000 status.outline = 0x880000
The code in the get_bowl_reading()
function is meant to briefly enable the sensor, take a reading, and then shut it off. This is done to reduce electrolysis that may occur with the metal being in the water.
The code in the get_bowl_state()
function just looks at the level
value given to it, compares it to the thresholds that were set above and returns the state of the bowl.
The code in the bowl_level_display()
function checks if the select button is held down and either returns a raw value or the text label for the current state.
def get_bowl_reading(): water_enable.value = True level = water_level_sensor.value water_enable.value = False return level def get_bowl_state(level): if level <= EMPTY_VALUE: return "empty" elif level <= LOW_VALUE: return "low" return "full" def bowl_level_display(level): if funhouse.peripherals.button_sel: return level return states[get_bowl_state(level)]
The next bit of code creates a few of the variables with their initial states, including the funhouse
object, the water_enable
Digital IO, the water_level_sensor
Analog IO and creates and draws the text labels.
# Set Initial States funhouse = FunHouse(default_bg=0x0F0F00) funhouse.peripherals.dotstars.fill(0) water_enable = digitalio.DigitalInOut(board.A0) water_enable.switch_to_output() water_level_sensor = analogio.AnalogIn(board.A1) funhouse.display.root_group = CIRCUITPYTHON_TERMINAL funhouse.add_text( text="Bowl Level:", text_position=(120, 60), text_anchor_point=(0.5, 0.5), text_color=0xFF0000, text_font="fonts/Arial-Bold-24.pcf", ) level_label = funhouse.add_text( text_position=(120, 100), text_anchor_point=(0.5, 0.5), text_color=0xFFFF00, text_font="fonts/Arial-Bold-24.pcf", ) funhouse.display.root_group = funhouse.splash
This section initializes MQTT using the secrets, sets up the handler functions that were defined earlier, and connects. The last_reading_timestamp
and last_bowl_state
are then initialized and set to None
. The os.getenv()
function is used to get settings from settings.toml.
# Initialize a new MQTT Client object funhouse.network.init_mqtt( os.getenv("MQTT_BROKER"), os.getenv("MQTT_PORT"), os.getenv("MQTT_USERNAME"), os.getenv("MQTT_PASSWORD"), ) funhouse.network.on_mqtt_connect = connected funhouse.network.on_mqtt_disconnect = disconnected print("Attempting to connect to {}".format(os.getenv("MQTT_BROKER"))) funhouse.network.mqtt_connect() last_reading_timestamp = None last_bowl_state = None
Finally, there is the main loop. In the loop, it keeps looping and waits until the UPDATE_INTERVAL
has passed. Once it has, it gets a reading and stores it in a variable so the sensor only has to be read once. It sets the display to show the appropriate message depending on the reading.
It next checks the state of the bowl and if the value differs from the last reading, it publishes the result to MQTT.
while True: if ( last_reading_timestamp is None or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL ): # Take Reading water_level = get_bowl_reading() # Update Display funhouse.set_text(bowl_level_display(water_level), level_label) # If changed, publish new result bowl_state = get_bowl_state(water_level) if bowl_state != last_bowl_state: publish_bowl_state(bowl_state) last_bowl_state = bowl_state last_reading_timestamp = time.monotonic()
Text editor powered by tinymce.