Let's start out with the code that goes onto the FunHouse.

MQTT Secrets Settings

Since the code publishes directly to the MQTT server, there are a few more secrets.py file settings that the code expects to find. If your MQTT server has no username and password, you can change the value to None, however in general, the Home Assistant MQTT broker is set up to be password protected by default.

'mqtt_broker': "192.168.1.1",
'mqtt_port': 1883,
'mqtt_username': 'myusername',
'mqtt_password': 'mypassword',

To add code and libraries to your FunHouse, click the Download Project Bundle button to get the code and all of the libraries.

# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT

import time
import board
import digitalio
import analogio
from adafruit_display_shapes.circle import Circle
from adafruit_funhouse import FunHouse

BOWL_STATE_TOPIC = "funhouse/catbowl/state"
LOW_VALUE = 4000
EMPTY_VALUE = 2000
UPDATE_INTERVAL = 1800	# Every 30 minutes

try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

# Text labels for the Display
states = {
    "empty": "Add Water",
    "low": "Low",
    "full": "Full",
}

def publish_bowl_state(bowl_state):
    funhouse.peripherals.led = True
    # Publish the Bowl Level State
    print("Publishing to {}".format(BOWL_STATE_TOPIC))
    funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, bowl_state)
    funhouse.peripherals.led = False

def connected(client, userdata, result, payload):
    status.fill = 0x00FF00
    status.outline = 0x008800

def disconnected(client):
    status.fill = 0xFF0000
    status.outline = 0x880000

def get_bowl_reading():
    water_enable.value = True
    level = water_level_sensor.value
    water_enable.value = False
    return level

def get_bowl_state(level):
    if level <= EMPTY_VALUE:
        return "empty"
    elif level <= LOW_VALUE:
        return "low"
    return "full"

def bowl_level_display(water_level):
    if funhouse.peripherals.button_sel:
        return water_level
    return states[get_bowl_state(water_level)]

# Set Initial States
funhouse = FunHouse(default_bg=0x0F0F00)
funhouse.peripherals.dotstars.fill(0)
water_enable = digitalio.DigitalInOut(board.A0)
water_enable.switch_to_output()
water_level_sensor = analogio.AnalogIn(board.A1)
funhouse.display.show(None)
funhouse.add_text(
    text="Bowl Level:",
    text_position=(120, 60),
    text_anchor_point=(0.5, 0.5),
    text_color=0xFF0000,
    text_font="fonts/Arial-Bold-24.pcf",
)
level_label = funhouse.add_text(
    text_position=(120, 100),
    text_anchor_point=(0.5, 0.5),
    text_color=0xFFFF00,
    text_font="fonts/Arial-Bold-24.pcf",
)
funhouse.display.show(funhouse.splash)

status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000)
funhouse.splash.append(status)

# Initialize a new MQTT Client object
funhouse.network.init_mqtt(
    secrets["mqtt_broker"],
    secrets["mqtt_port"],
    secrets["mqtt_username"],
    secrets["mqtt_password"],
)
funhouse.network.on_mqtt_connect = connected
funhouse.network.on_mqtt_disconnect = disconnected

print("Attempting to connect to {}".format(secrets["mqtt_broker"]))
funhouse.network.mqtt_connect()

last_reading_timestamp = None
last_bowl_state = None

while True:
    if last_reading_timestamp is None or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL:
        # Take Reading
        water_level = get_bowl_reading()
        # Update Display
        funhouse.set_text(bowl_level_display(water_level), level_label)
        # If changed, publish new result
        bowl_state = get_bowl_state(water_level)
        if bowl_state != last_bowl_state:
            publish_bowl_state(bowl_state)
            last_bowl_state = bowl_state
        last_reading_timestamp = time.monotonic()

Copy these over to the CIRCUITPY drive for your FunHouse board in the root directory along with your secrets.py file. The files on your board should look like this:

Code Walkthrough

Now to cover the code in sections. First are library imports. This includes the FunHouse library, the Circle to display if the board is connected, time for checking in intervals, and finally board, analogio, and digitalio for enabling and reading the sensor itself.

import time
import board
import digitalio
import analogio
from adafruit_display_shapes.circle import Circle
from adafruit_funhouse import FunHouse

Next up is the MQTT topic, BOWL_STATE_TOPIC. The script will publish to the state topic if the reading has changed enough to let Home Assistant know the new state of the bowl.

LOW_VALUE is the raw analog reading for the sensor when the water is low. Anything above this value is considered full. EMPTY_VALUE is the raw analog reading for the sensor when the water is empty. Anything at or below this value is considered empty.

UPDATE_INTERVAL is the amount of time to wait in seconds for the FunHouse to check readings. By default this is once every half hour, which should be sufficient for something that changes very little over the course of a day.

BOWL_STATE_TOPIC = "funhouse/catbowl/state"
LOW_VALUE = 4000
EMPTY_VALUE = 2000
UPDATE_INTERVAL = 1800	# Every 30 minutes

Next up the script attempts to import secrets.py to get the MQTT login information.

try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

This section contains a dict of different states. This is a convenient way to change the text that is shown on the display for each of these states.

# Text labels for the Display
states = {
    "empty": "Add Water",
    "low": "Low",
    "full": "Full",
}

This function will publish the current state of the bowl to MQTT, and thus Home Assistant. The output in this case is a raw text string value corresponding to one of the keys in the above dict that is published to the  BOWL_STATE_TOPIC.

def publish_bowl_state(bowl_state):
    funhouse.peripherals.led = True
    # Publish the Bowl Level State
    print("Publishing to {}".format(BOWL_STATE_TOPIC))
    funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, bowl_state)
    funhouse.peripherals.led = False

The next couple of functions are used for changing the circle to red or green depending on the connection status.

def connected(client, userdata, result, payload):
    status.fill = 0x00FF00
    status.outline = 0x008800

def disconnected(client):
    status.fill = 0xFF0000
    status.outline = 0x880000

The code in the get_bowl_reading() function is meant to briefly enable the sensor, take a reading, and then shut it off. This is done to reduce electrolysis that may occur with the metal being in the water.

The code in the get_bowl_state() function just looks at the level value given to it, compares it to the thresholds that were set above and returns the state of the bowl.

The code in the bowl_level_display() function checks if the select button is held down and either returns a raw value or the text label for the current state.

def get_bowl_reading():
    water_enable.value = True
    level = water_level_sensor.value
    water_enable.value = False
    return level
  
def get_bowl_state(level):
    if level <= EMPTY_VALUE:
        return "empty"
    elif level <= LOW_VALUE:
        return "low"
    return "full"
  
def bowl_level_display(water_level):
    if funhouse.peripherals.button_sel:
        return water_level
    return states[get_bowl_state(water_level)]

The next bit of code creates a few of the variables with their initial states, including the funhouse object, the water_enable Digital IO, the water_level_sensor Analog IO and creates and draws the text labels.

# Set Initial States
funhouse = FunHouse(default_bg=0x0F0F00)
funhouse.peripherals.dotstars.fill(0)
water_enable = digitalio.DigitalInOut(board.A0)
water_enable.switch_to_output()
water_level_sensor = analogio.AnalogIn(board.A1)
funhouse.display.show(None)
funhouse.add_text(
    text="Bowl Level:",
    text_position=(120, 60),
    text_anchor_point=(0.5, 0.5),
    text_color=0xFF0000,
    text_font="fonts/Arial-Bold-24.pcf",
)
level_label = funhouse.add_text(
    text_position=(120, 100),
    text_anchor_point=(0.5, 0.5),
    text_color=0xFFFF00,
    text_font="fonts/Arial-Bold-24.pcf",
)
funhouse.display.show(funhouse.splash)

This section initializes MQTT using the secrets, sets up the handler functions that were defined earlier, and connects. The last_reading_timestamp and last_bowl_state are then initialized and set to None.

# Initialize a new MQTT Client object
funhouse.network.init_mqtt(
    secrets["mqtt_broker"],
    secrets["mqtt_port"],
    secrets["mqtt_username"],
    secrets["mqtt_password"],
)
funhouse.network.on_mqtt_connect = connected
funhouse.network.on_mqtt_disconnect = disconnected

print("Attempting to connect to {}".format(secrets["mqtt_broker"]))
funhouse.network.mqtt_connect()

last_reading_timestamp = None
last_bowl_state = None

Finally, there is the main loop. In the loop, it keeps looping and waits until the UPDATE_INTERVAL has passed. Once it has, it gets a reading and stores it in a variable so the sensor only has to be read once. It sets the display to show the appropriate message depending on the reading.

It next checks the state of the bowl and if the value differs from the last reading, it publishes the result to MQTT.

while True:
    if last_reading_timestamp is None or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL:
        # Take Reading
        water_level = get_bowl_reading()
        # Update Display
        funhouse.set_text(bowl_level_display(water_level), level_label)
        # If changed, publish new result
        bowl_state = get_bowl_state(water_level)
        if bowl_state != last_bowl_state:
            publish_bowl_state(bowl_state)
            last_bowl_state = bowl_state
        last_reading_timestamp = time.monotonic()

This guide was first published on May 14, 2021. It was last updated on May 14, 2021.

This page (Coding the Water Sensor) was last updated on May 10, 2023.

Text editor powered by tinymce.