In order to to set everything up, plug a keyboard, mouse and monitor into one of the Raspberry Pi Zero 2 Ws. A USB hub will be required to connect more than one device.
Connect your Raspberry Pi Zero 2 W to your WiFi network and set it up with details of all the networks that it might need to access (as the device might be used at another location).
The first software step is to install Blinka, the compatibility layer between Python on the Pi Zero 2 W and CircuitPython. The simplest option is to run the following commands in a terminal, but there's a full description of the process at https://learn.adafruit.com/circuitpython-on-raspberrypi-linux/installing-circuitpython-on-raspberry-pi.
cd ~ sudo apt install python3-venv python3 -m venv env --system-site-packages source env/bin/activate pip3 install --upgrade adafruit-python-shell wget https://raw.githubusercontent.com/adafruit/Raspberry-Pi-Installer-Scripts/master/raspi-blinka.py sudo -E env PATH=$PATH python3 raspi-blinka.py
Among other things, this sets up a virtual environment in a folder called env in your home directory. You have to activate this in a terminal session that you're going to use to run this project. You can activate it by running:
source ~env/bin/activate
You can tell when it's activated because you'll see (env) at the start of the prompt.
With this virtual environment activated, you'll also need to install some dependencies with:
pip install adafruit-circuitpython-rgb-display pillow adafruit-circuitpython-neopixel
With all the prerequisites installed, you can now copy the code below and save it as coffee.py.
import base64
import io
import json
import os
import time
from typing import Optional
import argparse
import requests
from PIL import Image, ImageDraw, ImageFont
import board
import neopixel
import digitalio
import adafruit_rgb_display.st7789 as st7789
from picamera2 import Picamera2
USERNAME = "AIO_USERNAME"
AIO_KEY = "AIO_KEY"
FEED_NAME_IN = "image1"
FEED_NAME_OUT = "image2"
HOTNOT_FEED_IN = "hotnot1"
HOTNOT_FEED_OUT = "hotnot2"
# Button pins
BUTTON_CAPTURE = 26 # capture/hot button
BUTTON_NOT = 16 # "not" vote button
# NeoPixel settings
NEOPIXEL_PIN = 21
NUM_PIXELS = 8
pixels = None
# Display globals
display = None
backlight = None
button_hot = None
button_not = None
STATE_FILE = ".adafruit_last_image.json"
def setup_pixels() -> None:
"""Initialise the NeoPixel strip."""
global pixels
pixels = neopixel.NeoPixel(
board.D21,
NUM_PIXELS,
bpp=4,
pixel_order=getattr(neopixel, "GRBW", None) or neopixel.RGBW,
auto_write=False,
)
def set_color(color) -> None:
pixels.fill(color)
pixels.show()
def setup_display() -> None:
"""Initialise the attached Mini Pi TFT display."""
global display, backlight
cs_pin = digitalio.DigitalInOut(board.CE0)
dc_pin = digitalio.DigitalInOut(board.D25)
rst_pin = digitalio.DigitalInOut(board.D24) if hasattr(board, "D24") else None
spi = board.SPI()
display = st7789.ST7789(
spi,
cs=cs_pin,
dc=dc_pin,
rst=rst_pin,
baudrate=24000000,
width=240,
height=240,
rotation=270,
x_offset=0,
y_offset=80,
)
backlight = digitalio.DigitalInOut(board.D22)
backlight.switch_to_output()
backlight.value = True
def display_image_on_tft(path: str) -> None:
backlight.value = True
img = Image.open(path).convert("RGB")
img = img.resize((display.width, display.height))
display.image(img)
def display_image_with_text(path: str, text: str, text_color=(255, 255, 255)) -> None:
"""Show an image with centered overlay text on the Mini Pi TFT."""
backlight.value = True
img = Image.open(path).convert("RGB")
img = img.resize((display.width, display.height))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.load_default()
except Exception:
font = None
text_width, text_height = draw.textsize(text, font=font)
x = (display.width - text_width) // 2
y = (display.height - text_height) // 2
draw.text((x, y), text, fill=text_color, font=font)
display.image(img)
def display_text_on_tft(text: str, text_color=(255, 255, 255), bg_color=(0, 0, 0)) -> None:
backlight.value = True
img = Image.new("RGB", (display.width, display.height), color=bg_color)
draw = ImageDraw.Draw(img)
try:
font = ImageFont.load_default()
except Exception:
font = None
text_width, text_height = draw.textsize(text, font=font)
x = (display.width - text_width) // 2
y = (display.height - text_height) // 2
draw.text((x, y), text, fill=text_color, font=font)
display.image(img)
def resize_to_target_size(input_path: str, output_path: str, target_size_kb=50) -> Optional[bytes]:
target_bytes = target_size_kb * 1024
img = Image.open(input_path)
width, height = img.size
for scale in [1.0, 0.5, 0.4, 0.3, 0.2, 0.1, 0.05, 0.02, 0.01]:
resized = img.resize((int(width * scale), int(height * scale)), Image.LANCZOS)
for quality in range(95, 1, -5):
buffer = io.BytesIO()
resized.save(buffer, format="JPEG", quality=quality)
size = buffer.tell()
if size <= target_bytes:
with open(output_path, "wb") as f:
f.write(buffer.getvalue())
print(f"Saved at {size} bytes, scale={scale}, quality={quality}")
return buffer.getvalue()
print("Could not reach target size. Try reducing the original image size or increasing compression.")
return None
def upload_to_adafruit_io(image_bytes: bytes) -> None:
b64_data = base64.b64encode(image_bytes).decode("utf-8")
url = (
f"https://io.adafruit.com/api/v2/{USERNAME}/feeds/{FEED_NAME_OUT}/data"
)
headers = {"X-AIO-Key": AIO_KEY, "Content-Type": "application/json"}
payload = {"value": b64_data}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code in (200, 201):
print("Upload successful!")
set_color((0, 255, 0))
else:
print(f"Failed to upload: {resp.status_code}, {resp.text}")
def _fetch_last_feed_item(feed_name: str) -> Optional[dict]:
url = f"https://io.adafruit.com/api/v2/{USERNAME}/feeds/{feed_name}/data/last"
headers = {"X-AIO-Key": AIO_KEY}
try:
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 404:
# No data has been posted to this feed yet
return None
except Exception as exc:
print(f"Request error for feed {feed_name}: {exc}")
return None
def poll_for_new_hotnot() -> Optional[str]:
print("Polling hotnot feed for vote ...")
initial = _fetch_last_feed_item(HOTNOT_FEED_IN)
last_created = initial.get("created_at") if initial else None
while True:
time.sleep(2)
data = _fetch_last_feed_item(HOTNOT_FEED_IN)
if not data:
continue
created_at = data.get("created_at")
if created_at and (last_created is None or created_at != last_created):
last_created = created_at
value = data.get("value")
print(f"Received new item from {HOTNOT_FEED_IN}: {value}")
if isinstance(value, str):
if value.strip().lower() == "hot":
set_color((255, 0, 0))
else:
set_color((0, 255, 0))
return value
def capture_image(path: str = "input.jpg") -> str:
camera = Picamera2()
config = camera.create_still_configuration()
camera.configure(config)
camera.start()
time.sleep(2)
camera.capture_file(path)
camera.close()
return path
def take_photo_and_upload() -> None:
try:
capture_image("input.jpg")
display_image_on_tft("input.jpg")
image_bytes = resize_to_target_size("input.jpg", "output.jpg")
if image_bytes:
upload_to_adafruit_io(image_bytes)
display_image_on_tft("output.jpg")
poll_for_new_hotnot()
display_text_on_tft("Press button or wait for new image")
except Exception as exc:
print(f"Failed to capture or upload image: {exc}")
# -------- Functions from check.py ----------
def setup_vote_buttons() -> None:
"""Configure GPIO pins for voting buttons using Blinka."""
global button_hot, button_not
button_hot = digitalio.DigitalInOut(getattr(board, f"D{BUTTON_CAPTURE}"))
button_hot.direction = digitalio.Direction.INPUT
button_hot.pull = digitalio.Pull.UP
button_not = digitalio.DigitalInOut(getattr(board, f"D{BUTTON_NOT}"))
button_not.direction = digitalio.Direction.INPUT
button_not.pull = digitalio.Pull.UP
def upload_hotnot(value: str) -> None:
url = (
f"https://io.adafruit.com/api/v2/{USERNAME}/feeds/{HOTNOT_FEED_OUT}/data"
)
headers = {"X-AIO-Key": AIO_KEY, "Content-Type": "application/json"}
payload = {"value": value}
try:
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code in (200, 201):
print(f"Uploaded '{value}' to hotnot feed")
else:
print(f"Failed to upload '{value}': {resp.status_code} - {resp.text}")
except Exception as exc:
print(f"Request failed: {exc}")
def poll_for_button_press() -> None:
if not (button_hot and button_not):
return
print("Waiting for button press (hot/not)...")
while True:
if not button_hot.value:
display_text_on_tft("sending hot")
upload_hotnot("hot")
while not button_hot.value:
time.sleep(0.05)
break
if not button_not.value:
display_text_on_tft("sending not")
upload_hotnot("not")
while not button_not.value:
time.sleep(0.05)
break
time.sleep(0.05)
def load_last_seen_timestamp() -> Optional[str]:
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r") as f:
data = json.load(f)
return data.get("created_at")
except Exception:
return None
return None
def save_last_seen_timestamp(created_at: str) -> None:
try:
with open(STATE_FILE, "w") as f:
json.dump({"created_at": created_at}, f)
except Exception as exc:
print(f"Failed to save timestamp: {exc}")
def fetch_latest_data() -> Optional[dict]:
url = (
f"https://io.adafruit.com/api/v2/{USERNAME}/feeds/{FEED_NAME_IN}/data/last"
)
headers = {"X-AIO-Key": AIO_KEY}
try:
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 404:
# Feed may not have any data yet
return None
print(f"Error: {resp.status_code} - {resp.text}")
except Exception as exc:
print(f"Request failed: {exc}")
return None
def process_incoming_image(value: str, created_at: str) -> str:
filename = f"received_{created_at.replace(':', '-').replace('T', '_').replace('Z', '')}.jpg"
try:
img_data = base64.b64decode(value)
img = Image.open(io.BytesIO(img_data))
img.save(filename)
print(f"Image saved as: {filename}")
except Exception as exc:
print(f"Failed to process image: {exc}")
return filename
while True:
global FEED_NAME_IN, FEED_NAME_OUT, HOTNOT_FEED_IN, HOTNOT_FEED_OUT
parser = argparse.ArgumentParser(description="Combined image uploader")
parser.add_argument(
"--user",
choices=["1", "2"],
default="1",
help="Select user feed (1 or 2)",
)
args = parser.parse_args()
if args.user == "1":
FEED_NAME_IN = "image1"
FEED_NAME_OUT = "image2"
HOTNOT_FEED_IN = "hotnot1"
HOTNOT_FEED_OUT = "hotnot2"
else:
FEED_NAME_IN = "image2"
FEED_NAME_OUT = "image1"
HOTNOT_FEED_IN = "hotnot2"
HOTNOT_FEED_OUT = "hotnot1"
setup_pixels()
setup_display()
setup_vote_buttons()
display_text_on_tft("Press button or wait for new image")
last_seen = load_last_seen_timestamp()
prev_state = button_hot.value
try:
while True:
# Check capture/hot button for capture action
current_state = button_hot.value
if not current_state and prev_state:
display_text_on_tft("taking a photo")
take_photo_and_upload()
time.sleep(0.2)
prev_state = current_state
# Check Adafruit IO for new image
data = fetch_latest_data()
if data:
created_at = data.get("created_at")
value = data.get("value")
if created_at and value and (last_seen is None or created_at > last_seen):
save_last_seen_timestamp(created_at)
last_seen = created_at
path = process_incoming_image(value, created_at)
display_image_with_text(path, "Hot or not?")
poll_for_button_press()
display_text_on_tft("Press button or wait for new image")
time.sleep(0.5)
except KeyboardInterrupt:
print("Exiting...")
finally:
if button_hot:
button_hot.deinit()
if button_not:
button_not.deinit()
You'll just need to change AIO_USERNAME to your Adafruit IO username and AIO_KEY to your Adafruit Online key (which you can get from adafruit.io).
At this point, you'll need two Raspberry Pis set up with all this installed, so you'll need to repeat the process on the second Raspberry Pi Zero 2 W.
Because this code uses NeoPixels which require low-level hardware access, the code will need to run with sudo. You also need to ensure that the correct parts of the virtual environment are passed over. This means that the full command to run this is:
sudo -E env PATH=$PATH python3 coffee.py --user 1
One Raspberry Pi needs to be user 1 and the other needs to be user 2. There's no difference between them other than the Adafruit IO feeds they use, so it doesn't make a difference which is which.
If you run this command on both Raspberry Pi Zero 2W's then everything should work. However, we don't want to have to type this command in every time you want it to run (and you may not have two sets of monitor, keyboard and mouse to log in and run it), so I set it up to run automatically when powering on the Raspberry Pi.
Save the below as combined.service in your home directory (it'll be moved to the final location in the next step).
[Unit] Description=Coffee sharer After=network.target [Service] Type=simple User=root WorkingDirectory=%h/coffee ExecStart=/home/ben_e/env/bin/python /home/ben_e/coffee/coffee.py --user 1 Restart=on-failure [Install] WantedBy=multi-user.target
You will need to change the paths in the ExecStart line to the location of your virtual environment and the Python code.
When you've made those changes, you can run the code below in a terminal to start the service. Enter this and the code will run whenever you reboot the Raspberry Pi (and it will restart if it crashes for any reason).
sudo cp combined.service /etc/systemd/system/ sudo systemctl daemon-reload sudo systemctl enable --now combined.service
All the software is now setup. Now to build the final structure to keep everything together.
Page last edited August 06, 2025
Text editor powered by tinymce.