Let's start out with the code that goes onto the FunHouse. This code can run with or without Home Assistant and we'll go over the options. 

MQTT Secrets Settings

Since the code publishes directly to the MQTT server, there are a few more secrets.py file settings that the code expects to find. If your MQTT server has no username and password, you can change the value to None, however in general, the Home Assistant MQTT broker is set up to be password protected by default.

'mqtt_broker': "192.168.1.1",
'mqtt_port': 1883,
'mqtt_username': 'myusername',
'mqtt_password': 'mypassword',

To add code and libraries to your FunHouse, click the Download Project Bundle button to get the code and all of the libraries.

# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT

import time
import board
import digitalio
from adafruit_display_shapes.circle import Circle
from adafruit_funhouse import FunHouse

OUTLET_STATE_TOPIC = "funhouse/outlet/state"
OUTLET_COMMAND_TOPIC = "funhouse/outlet/set"
MOTION_TIMEOUT = 300  # Timeout in seconds
USE_MQTT = False

if USE_MQTT:
    try:
        from secrets import secrets
    except ImportError:
        print("WiFi secrets are kept in secrets.py, please add them there!")
        raise

def set_outlet_state(value):
    global last_pir_timestamp
    if value:
        funhouse.peripherals.dotstars.fill(0x00FF00)
        last_pir_timestamp = time.monotonic()
    else:
        funhouse.peripherals.dotstars.fill(0xFF0000)
        last_pir_timestamp = time.monotonic() - MOTION_TIMEOUT

    outlet.value = value
    publish_outlet_state()

def publish_outlet_state():
    if USE_MQTT:
        funhouse.peripherals.led = True
        output = "on" if outlet.value else "off"
        # Publish the Dotstar State
        print("Publishing to {}".format(OUTLET_STATE_TOPIC))
        funhouse.network.mqtt_publish(OUTLET_STATE_TOPIC, output)
        funhouse.peripherals.led = False

def connected(client, userdata, result, payload):
    status.fill = 0x00FF00
    status.outline = 0x008800
    print("Connected to MQTT! Subscribing...")
    client.subscribe(OUTLET_COMMAND_TOPIC)

def disconnected(client):
    status.fill = 0xFF0000
    status.outline = 0x880000

def message(client, topic, payload):
    print("Topic {0} received new value: {1}".format(topic, payload))
    if topic == OUTLET_COMMAND_TOPIC:
        set_outlet_state(payload == "on")

def timeleft():
    seconds = int(last_pir_timestamp + MOTION_TIMEOUT - time.monotonic())
    if outlet.value and seconds >= 0:
        minutes = seconds // 60
        seconds -= minutes * 60
        return "{:01}:{:02}".format(minutes, seconds)
    return "Off"    

# Set Initial States
funhouse = FunHouse(default_bg=0x0F0F00)
funhouse.peripherals.dotstars.fill(0)
outlet = digitalio.DigitalInOut(board.A0)
outlet.direction = digitalio.Direction.OUTPUT
last_pir_timestamp = None
funhouse.display.show(None)
funhouse.add_text(
    text="Timeout Left:",
    text_position=(20, 60),
    text_color=0xFF0000,
    text_font="fonts/Arial-Bold-24.pcf",
)
countdown_label = funhouse.add_text(
    text_position=(120, 100),
    text_anchor_point=(0.5, 0.5),
    text_color=0xFFFF00,
    text_font="fonts/Arial-Bold-24.pcf",
)
funhouse.display.show(funhouse.splash)

status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000)
funhouse.splash.append(status)

# Initialize a new MQTT Client object
if USE_MQTT:
    funhouse.network.init_mqtt(
        secrets["mqtt_broker"],
        secrets["mqtt_port"],
        secrets["mqtt_username"],
        secrets["mqtt_password"],
    )
    funhouse.network.on_mqtt_connect = connected
    funhouse.network.on_mqtt_disconnect = disconnected
    funhouse.network.on_mqtt_message = message

    print("Attempting to connect to {}".format(secrets["mqtt_broker"]))
    funhouse.network.mqtt_connect()
set_outlet_state(False)

while True:
    if funhouse.peripherals.pir_sensor:
        last_pir_timestamp = time.monotonic()
        if not outlet.value:
            set_outlet_state(True)
    if outlet.value and time.monotonic() >= last_pir_timestamp + MOTION_TIMEOUT:
        set_outlet_state(False)
    funhouse.set_text(timeleft(), countdown_label) 
    # Check any topics we are subscribed to
    if USE_MQTT:
        funhouse.network.mqtt_loop(0.5)

Copy these over to the CIRCUITPY drive for your FunHouse board in the root directory along with your secrets.py file. The files on your board should look like this:

Code Walkthrough

Now to cover the code in sections. First are library imports. This includes the FunHouse library, the Circle to display if the board is connected, time for checking in intervals, and finally board and digitalio for controlling the outlet itself.

import time
import board
import digitalio
from adafruit_display_shapes.circle import Circle
from adafruit_funhouse import FunHouse

Next up are the MQTT topics, OUTLET_STATE_TOPIC and OUTLET_COMMAND_TOPIC. If you're not connecting to MQTT, you can leave these alone. The script will subscribe to the command topic to listen for any commands to turn the outlet on or off and will publish to the state topic to let Home Assistant know the state of the outlet.

MOTION_TIMEOUT is the amount of time in seconds for the FunHouse to wait before turning off the outlet. Any movement that triggers the PIR sensor will reset the timeout to this value.

USE_MQTT should be set to False if you do not plan on using Home Assistant.

OUTLET_STATE_TOPIC = "funhouse/outlet/state"
OUTLET_COMMAND_TOPIC = "funhouse/outlet/set"
MOTION_TIMEOUT = 300  # Timeout in seconds
USE_MQTT = True

Next up the script attempts to import secrets.py to get the MQTT login information. This is skipped this if you're not using MQTT.

if USE_MQTT:
    try:
        from secrets import secrets
    except ImportError:
        print("WiFi secrets are kept in secrets.py, please add them there!")
        raise

In this section, the script to set the outlet state is defined. It does a few things beside just turning the outlet on or off. It changes the color of the DotStars, changes the time left on the clock to either 0 or the value MOTION_TIMEOUT depending on whether it is being set off or on. It also publishes the state of the outlet to MQTT if it's connected.

def set_outlet_state(value):
    global last_pir_timestamp
    if value:
        funhouse.peripherals.dotstars.fill(0x00FF00)
        last_pir_timestamp = time.monotonic()
    else:
        funhouse.peripherals.dotstars.fill(0xFF0000)
        last_pir_timestamp = time.monotonic() - MOTION_TIMEOUT

    outlet.value = value
    publish_outlet_state()

This function will publish the current state of the outlet to MQTT, and thus Home Assistant. It will only do so if USE_MQTT is set to True. The output in this case is a raw on or off string value that is published to the  OUTLET_STATE_TOPIC.

def publish_outlet_state():
    if USE_MQTT:
        funhouse.peripherals.led = True
        output = "on" if outlet.value else "off"
        # Publish the Dotstar State
        print("Publishing to {}".format(OUTLET_STATE_TOPIC))
        funhouse.network.mqtt_publish(OUTLET_STATE_TOPIC, output)
        funhouse.peripherals.led = False

The next few functions are used for changing the circle to red or green depending on the connection status and subscribing to the OUTLET_COMMAND_TOPIC.

def connected(client, userdata, result, payload):
    status.fill = 0x00FF00
    status.outline = 0x008800
    print("Connected to MQTT! Subscribing...")
    client.subscribe(OUTLET_COMMAND_TOPIC)

def disconnected(client):
    status.fill = 0xFF0000
    status.outline = 0x880000

def message(client, topic, payload):
    print("Topic {0} received new value: {1}".format(topic, payload))
    if topic == OUTLET_COMMAND_TOPIC:
        set_outlet_state(payload == "on")

The code in the timeleft() function is meant to return either the amount of time left formatted in minutes and seconds or return the string Off if there is no time left.

def timeleft():
    seconds = int(last_pir_timestamp + MOTION_TIMEOUT - time.monotonic())
    if outlet.value and seconds >= 0:
        minutes = seconds // 60
        seconds -= minutes * 60
        return "{:01}:{:02}".format(minutes, seconds)
    return "Off"

The next bit of code creates a few of the variables with their initial states, including the funhouse object, the outlet Digital IO, and creates and draws the text labels. The DotStar LEDs are set to off and are lit up red once initialization is done to indicate that motion sensing will now work, but it is off.

# Set Initial States
funhouse = FunHouse(default_bg=0x0F0F00)
funhouse.peripherals.dotstars.fill(0)
outlet = digitalio.DigitalInOut(board.A0)
outlet.direction = digitalio.Direction.OUTPUT
last_pir_timestamp = None
funhouse.display.show(None)
funhouse.add_text(
    text="Timeout Left:",
    text_position=(20, 60),
    text_color=0xFF0000,
    text_font="fonts/Arial-Bold-24.pcf",
)
countdown_label = funhouse.add_text(
    text_position=(120, 100),
    text_anchor_point=(0.5, 0.5),
    text_color=0xFFFF00,
    text_font="fonts/Arial-Bold-24.pcf",
)
funhouse.display.show(funhouse.splash)

This section initializes MQTT using the secrets if USE_MQTT is set to True, and sets up the handler functions that were defined earlier, and connects. Once that is through, the initial outlet state is set to False, which sets the DotStars red.

# Initialize a new MQTT Client object
if USE_MQTT:
    funhouse.network.init_mqtt(
        secrets["mqtt_broker"],
        secrets["mqtt_port"],
        secrets["mqtt_username"],
        secrets["mqtt_password"],
    )
    funhouse.network.on_mqtt_connect = connected
    funhouse.network.on_mqtt_disconnect = disconnected
    funhouse.network.on_mqtt_message = message

    print("Attempting to connect to {}".format(secrets["mqtt_broker"]))
    funhouse.network.mqtt_connect()
set_outlet_state(False)

Finally, there is the main loop. In the loop, the PIR sensor is checked. If it detects motion, the time is extended and if it was off, the new Outlet State is set to True, which also publishes to MQTT if it is connected.

It also checks to see if enough time has elapsed and turns off the outlet if it is currently on. The reason for checking the outlet state is to avoid flooding MQTT with messages and only when it changes.

The label is updated with the amount of time left and finally if MQTT is connected, the MQTT loop is run with a half second timeout. This has a default timeout of 1 second, but with the other tasks in the loop, it can take more than that. This results in the display timer appearing to skip seconds, when it just not updating often enough.

while True:
    if funhouse.peripherals.pir_sensor:
        last_pir_timestamp = time.monotonic()
        if not outlet.value:
            set_outlet_state(True)
    if outlet.value and time.monotonic() >= last_pir_timestamp + MOTION_TIMEOUT:
        set_outlet_state(False)
    funhouse.set_text(timeleft(), countdown_label) 
    # Check any topics we are subscribed to
    if USE_MQTT:
        funhouse.network.mqtt_loop(0.5)

This guide was first published on May 05, 2021. It was last updated on 2021-05-05 09:42:54 -0400.

This page (Coding the Motion Sensor) was last updated on Sep 21, 2021.

Text editor powered by tinymce.