Let's start out with the code that goes onto the FunHouse. This code can run with or without Home Assistant and we'll go over the options.
MQTT Secrets Settings
Since the code publishes directly to the MQTT server, there are a few more secrets.py file settings that the code expects to find. If your MQTT server has no username and password, you can change the value to None
, however in general, the Home Assistant MQTT broker is set up to be password protected by default.
MQTT_BROKER = "192.168.1.1" MQTT_PORT = 1883 MQTT_USERNAME = "myusername" MQTT_PASSWORD = "mypassword"
To add code and libraries to your FunHouse, click the Download Project Bundle button to get the code and all of the libraries.
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries # SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries # # SPDX-License-Identifier: MIT import os import time import board import digitalio from displayio import CIRCUITPYTHON_TERMINAL from adafruit_display_shapes.circle import Circle from adafruit_funhouse import FunHouse OUTLET_STATE_TOPIC = "funhouse/outlet/state" OUTLET_COMMAND_TOPIC = "funhouse/outlet/set" MOTION_TIMEOUT = 300 # Timeout in seconds USE_MQTT = True # Use dict to avoid reassigning the variable timestamps = { "last_pir": None } def set_outlet_state(value): if value: funhouse.peripherals.dotstars.fill(0x00FF00) timestamps["last_pir"] = time.monotonic() else: funhouse.peripherals.dotstars.fill(0xFF0000) timestamps["last_pir"] = time.monotonic() - MOTION_TIMEOUT outlet.value = value publish_outlet_state() def publish_outlet_state(): if USE_MQTT: funhouse.peripherals.led = True output = "on" if outlet.value else "off" # Publish the Dotstar State print("Publishing to {}".format(OUTLET_STATE_TOPIC)) funhouse.network.mqtt_publish(OUTLET_STATE_TOPIC, output) funhouse.peripherals.led = False def connected(client, _userdata, _result, _payload): status.fill = 0x00FF00 status.outline = 0x008800 print("Connected to MQTT! Subscribing...") client.subscribe(OUTLET_COMMAND_TOPIC) def disconnected(_client): status.fill = 0xFF0000 status.outline = 0x880000 def message(_client, topic, payload): print("Topic {0} received new value: {1}".format(topic, payload)) if topic == OUTLET_COMMAND_TOPIC: set_outlet_state(payload == "on") def timeleft(): seconds = int(timestamps["last_pir"] + MOTION_TIMEOUT - time.monotonic()) if outlet.value and seconds >= 0: minutes = seconds // 60 seconds -= minutes * 60 return "{:01}:{:02}".format(minutes, seconds) return "Off" # Set Initial States funhouse = FunHouse(default_bg=0x0F0F00) funhouse.peripherals.dotstars.fill(0) outlet = digitalio.DigitalInOut(board.A0) outlet.direction = digitalio.Direction.OUTPUT funhouse.display.root_group = CIRCUITPYTHON_TERMINAL funhouse.add_text( text="Timeout Left:", text_position=(20, 60), text_color=0xFF0000, text_font="fonts/Arial-Bold-24.pcf", ) countdown_label = funhouse.add_text( text_position=(120, 100), text_anchor_point=(0.5, 0.5), text_color=0xFFFF00, text_font="fonts/Arial-Bold-24.pcf", ) funhouse.display.root_group = funhouse.splash status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000) funhouse.splash.append(status) # Initialize a new MQTT Client object if USE_MQTT: funhouse.network.init_mqtt( os.getenv("MQTT_BROKER"), os.getenv("MQTT_PORT"), os.getenv("MQTT_USERNAME"), os.getenv("MQTT_PASSWORD"), ) funhouse.network.on_mqtt_connect = connected funhouse.network.on_mqtt_disconnect = disconnected funhouse.network.on_mqtt_message = message print("Attempting to connect to {}".format(os.getenv("MQTT_BROKER"))) funhouse.network.mqtt_connect() set_outlet_state(False) while True: if funhouse.peripherals.pir_sensor: timestamps["last_pir"] = time.monotonic() if not outlet.value: set_outlet_state(True) if outlet.value and time.monotonic() >= timestamps["last_pir"] + MOTION_TIMEOUT: set_outlet_state(False) funhouse.set_text(timeleft(), countdown_label) # Check any topics we are subscribed to if USE_MQTT: funhouse.network.mqtt_loop(0.5)
Copy these over to the CIRCUITPY drive for your FunHouse board in the root directory along with your secrets.py file. The files on your board should look like this:
Code Walkthrough
Now to cover the code in sections. First are library imports. This includes the FunHouse
library, the Circle
to display if the board is connected, time
for checking in intervals, and finally board
and digitalio
for controlling the outlet itself.
import os import time import board import digitalio from adafruit_display_shapes.circle import Circle from adafruit_funhouse import FunHouse
Next up are the MQTT topics, OUTLET_STATE_TOPIC
and OUTLET_COMMAND_TOPIC
. If you're not connecting to MQTT, you can leave these alone. The script will subscribe to the command topic to listen for any commands to turn the outlet on or off and will publish to the state topic to let Home Assistant know the state of the outlet.
MOTION_TIMEOUT
is the amount of time in seconds for the FunHouse to wait before turning off the outlet. Any movement that triggers the PIR sensor will reset the timeout to this value.
USE_MQTT
should be set to False
if you do not plan on using Home Assistant.
OUTLET_STATE_TOPIC = "funhouse/outlet/state" OUTLET_COMMAND_TOPIC = "funhouse/outlet/set" MOTION_TIMEOUT = 300 # Timeout in seconds USE_MQTT = True
In order to allow changing a value inside of a function and avoid using the global
keyword, a dict key is used. This trick allows changing the value of a variable without changing the memory location of that variable, so global isn't needed.
# Use dict to avoid reassigning the variable timestamps = { "last_pir": None }
In this section, the script to set the outlet state is defined. It does a few things beside just turning the outlet on or off. It changes the color of the DotStars, changes the time left on the clock to either 0 or the value MOTION_TIMEOUT
depending on whether it is being set off or on. It also publishes the state of the outlet to MQTT if it's connected.
def set_outlet_state(value): if value: funhouse.peripherals.dotstars.fill(0x00FF00) timestamps["last_pir"] = time.monotonic() else: funhouse.peripherals.dotstars.fill(0xFF0000) timestamps["last_pir"] = time.monotonic() - MOTION_TIMEOUT outlet.value = value publish_outlet_state()
This function will publish the current state of the outlet to MQTT, and thus Home Assistant. It will only do so if USE_MQTT
is set to True
. The output in this case is a raw on or off
string value that is published to the OUTLET_STATE_TOPIC
.
def publish_outlet_state(): if USE_MQTT: funhouse.peripherals.led = True output = "on" if outlet.value else "off" # Publish the Dotstar State print("Publishing to {}".format(OUTLET_STATE_TOPIC)) funhouse.network.mqtt_publish(OUTLET_STATE_TOPIC, output) funhouse.peripherals.led = False
The next few functions are used for changing the circle to red or green depending on the connection status and subscribing to the OUTLET_COMMAND_TOPIC
.
def connected(client, _userdata, _result, _payload): status.fill = 0x00FF00 status.outline = 0x008800 print("Connected to MQTT! Subscribing...") client.subscribe(OUTLET_COMMAND_TOPIC) def disconnected(_client): status.fill = 0xFF0000 status.outline = 0x880000 def message(_client, topic, payload): print("Topic {0} received new value: {1}".format(topic, payload)) if topic == OUTLET_COMMAND_TOPIC: set_outlet_state(payload == "on")
The code in the timeleft()
function is meant to return either the amount of time left formatted in minutes and seconds or return the string Off
if there is no time left.
def timeleft(): seconds = int(timestamps["last_pir"] + MOTION_TIMEOUT - time.monotonic()) if outlet.value and seconds >= 0: minutes = seconds // 60 seconds -= minutes * 60 return "{:01}:{:02}".format(minutes, seconds) return "Off"
The next bit of code creates a few of the variables with their initial states, including the funhouse
object, the outlet Digital IO, and creates and draws the text labels. The DotStar LEDs are set to off and are lit up red once initialization is done to indicate that motion sensing will now work, but it is off.
# Set Initial States funhouse = FunHouse(default_bg=0x0F0F00) funhouse.peripherals.dotstars.fill(0) outlet = digitalio.DigitalInOut(board.A0) outlet.direction = digitalio.Direction.OUTPUT funhouse.display.root_group = CIRCUITPYTHON_TERMINAL funhouse.add_text( text="Timeout Left:", text_position=(20, 60), text_color=0xFF0000, text_font="fonts/Arial-Bold-24.pcf", ) countdown_label = funhouse.add_text( text_position=(120, 100), text_anchor_point=(0.5, 0.5), text_color=0xFFFF00, text_font="fonts/Arial-Bold-24.pcf", ) funhouse.display.root_group = funhouse.splash
This section initializes MQTT using the secrets if USE_MQTT
is set to True
, and sets up the handler functions that were defined earlier, and connects. Once that is through, the initial outlet state is set to False
, which sets the DotStars red. The os.getenv()
function is used to get settings from settings.toml.
# Initialize a new MQTT Client object if USE_MQTT: funhouse.network.init_mqtt( os.getenv("MQTT_BROKER"), os.getenv("MQTT_PORT"), os.getenv("MQTT_USERNAME"), os.getenv("MQTT_PASSWORD"), ) funhouse.network.on_mqtt_connect = connected funhouse.network.on_mqtt_disconnect = disconnected funhouse.network.on_mqtt_message = message print("Attempting to connect to {}".format(os.getenv("MQTT_BROKER"))) funhouse.network.mqtt_connect() set_outlet_state(False)
Finally, there is the main loop. In the loop, the PIR sensor is checked. If it detects motion, the time is extended and if it was off, the new Outlet State is set to True, which also publishes to MQTT if it is connected.
It also checks to see if enough time has elapsed and turns off the outlet if it is currently on. The reason for checking the outlet state is to avoid flooding MQTT with messages and only when it changes.
The label is updated with the amount of time left and finally if MQTT is connected, the MQTT loop is run with a half second timeout. This has a default timeout of 1 second, but with the other tasks in the loop, it can take more than that. This results in the display timer appearing to skip seconds, when it just not updating often enough.
while True: if funhouse.peripherals.pir_sensor: timestamps["last_pir"] = time.monotonic() if not outlet.value: set_outlet_state(True) if outlet.value and time.monotonic() >= timestamps["last_pir"] + MOTION_TIMEOUT: set_outlet_state(False) funhouse.set_text(timeleft(), countdown_label) # Check any topics we are subscribed to if USE_MQTT: funhouse.network.mqtt_loop(0.5)
Text editor powered by tinymce.