code.py
The code.py file for this project is responsible for the high level behavior of the app. First, it initializes the displayio elements that make up the interface, and the Fruit Jam library which sets up the DAC and other hardware. It uses the sync_time() function to get the current time from NTP servers and updates the system time from accordingly, which is required for the AWS authentication.
During the main loop, it polls sys.stdin for pressed keys as described in the USB Keyboard learn guide. When the pressed key is a letter, the display is updated. Arrow up/down keys raise and lower the volume. Pressing Enter will fetch the MP3 file for the current word and then start playing it.
After the speech MP3 finishes, the word is spelled aloud one letter at a time by playing the pre-loaded MP3 files located in spell_jam_assets/letter_mp3s/. There is one MP3 for each letter of the alphabet, the code loops over the word one letter at a time playing each relevant letter MP3 file. After the spelling, the full word MP3 file is played once more.
If Enter is pressed when no new word has been typed, then the MP3 files for the previous word are played again.
# SPDX-FileCopyrightText: 2025 Tim Cocks for Adafruit Industries
# SPDX-License-Identifier: MIT
import sys
import os
import time
import supervisor
from adafruit_fruitjam import FruitJam
from adafruit_fruitjam.peripherals import request_display_config
from displayio import OnDiskBitmap, TileGrid, Group
from adafruit_bitmap_font import bitmap_font
from adafruit_display_text.bitmap_label import Label
try:
from launcher_config import LauncherConfig
except ImportError:
class LauncherConfig:
def __init__(self):
self.data = {}
self.audio_output = "headphone"
self.audio_volume_override_danger = 0.75
# comment out one of these imports depending on which TTS engine you want to use
from tts_aws import WordFetcherTTS
# from tts_local import WordFetcherTTS
# read the user settings
launcher_config = LauncherConfig()
# constants
LETTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
# local variables
curword = ""
lastword = ""
sayword = False
# setup display
request_display_config(320, 240)
display = supervisor.runtime.display
# setup background image
main_group = Group()
background = OnDiskBitmap("spell_jam_assets/background.bmp")
background_tg = TileGrid(background, pixel_shader=background.pixel_shader)
main_group.append(background_tg)
# setup 14-segment label used to display words
font = bitmap_font.load_font("spell_jam_assets/14segment_16.bdf")
screen_lbl = Label(text="Type a word", font=font, color=0x00FF00)
screen_lbl.anchor_point = (0.5, 0)
screen_lbl.anchored_position = (display.width // 2, 100)
main_group.append(screen_lbl)
# initialize Fruit Jam built-in hardware
fj = FruitJam()
if "audio" in launcher_config.data and "volume" in launcher_config.data["audio"]:
volume_level = launcher_config.audio_volume
else:
volume_level = 0.5
fj.peripherals.audio_output = launcher_config.audio_output
fj.peripherals.safe_volume_limit = launcher_config.audio_volume_override_danger
fj.peripherals.volume = volume_level
vol_int = round(volume_level * 100)
fj.neopixels.brightness = 0.1
word_fetcher = WordFetcherTTS(fj, launcher_config)
def play_sound():
soundPath = None
if "words" in os.listdir("spell_jam_assets"):
soundPath = "spell_jam_assets/words/"
else:
try:
sdAssets = os.listdir("/sd/spell_jam_assets/")
soundPath = f"/sd/spell_jam_assets/{sdAssets[sdAssets.index('words')]}/"
except ValueError:
soundPath = None
except OSError:
soundPath = None
if soundPath is not None:
for sound in os.listdir(soundPath):
if sound.upper()[:-4] == lastword.upper():
if sound[-4:] == ".mp3":
fj.play_mp3_file(f"{soundPath}{sound}")
elif sound[-4:] == ".wav":
fj.play_file(f"{soundPath}{sound}")
break
def say_and_spell_lastword():
"""
Say the last word, then spell it out one letter at a time, finally say it once more.
"""
if sayword:
if word_fetcher.output_path[-4:] == ".mp3":
fj.play_mp3_file(word_fetcher.output_path)
elif word_fetcher.output_path[-4:] == ".wav":
fj.play_file(word_fetcher.output_path)
time.sleep(0.2)
for letter in lastword:
fj.play_mp3_file(f"spell_jam_assets/letter_mp3s/{letter.upper()}.mp3")
time.sleep(0.2)
if sayword:
if word_fetcher.output_path[-4:] == ".mp3":
fj.play_mp3_file(word_fetcher.output_path)
elif word_fetcher.output_path[-4:] == ".wav":
fj.play_file(word_fetcher.output_path)
play_sound()
fj.neopixels.fill(0x000000)
display.root_group = main_group
while True:
# check how many bytes are available
available = supervisor.runtime.serial_bytes_available
# if there are some bytes available
if available:
# read data from the keyboard input
c = sys.stdin.read(available)
# print the data that was read
print(c, end="")
if c in LETTERS:
curword += c
screen_lbl.text = curword
elif c in {"\x7f", "\x08"}: # backspace
curword = curword[:-1]
screen_lbl.text = curword
elif c == "\n":
if curword:
lastword = curword
sayword = word_fetcher.fetch_word(lastword)
say_and_spell_lastword()
curword = ""
else:
# repeat last word
say_and_spell_lastword()
elif c.encode("utf-8") == b"\x1b[B":
# down arrow
vol_int = max(0, vol_int - 5)
fj.peripherals.volume = vol_int / 100
print(f"Volume: {fj.peripherals.volume}")
elif c.encode("utf-8") == b"\x1b[A":
# up arrow
vol_int = min(fj.peripherals.safe_volume_limit * 100, vol_int + 5)
fj.peripherals.volume = vol_int / 100
print(f"Volume: {fj.peripherals.volume}")
else:
print(f"unused key: {c.encode('utf-8')}")
aws_polly.py
This file provides two high level helper functions to download MP3 files from AWS Polly: text_to_speech_polly_http() and text_to_speech_with_ssml(). Only text_to_speech_polly_http() is used by the Spell Jam project. The functions accept a requests instance, the desired text to get a speech MP3 for, AWS keys, and some configuration options, which all have defaults used by the Spell Jam project. The ssml version of the function is the same, except that it accepts the text to speak in a format called SSML which stands for Speech Synthesis Markup Language. It's an XML based format that allows finer grained control over some aspects of the speech.
The majority of the AWS functionality is encapsulated within the PollyHTTPClient class. It handles creating authorized requests, and signing them according the the requirements of AWS Signature Version 4. The details of the authentication and request are somewhat gnarly, but if you're curious the code is embedded below.
# SPDX-FileCopyrightText: 2025 Tim Cocks for Adafruit Industries
# SPDX-License-Identifier: MIT
import hmac
import json
from adafruit_datetime import datetime
import adafruit_hashlib as hashlib
def url_encode(string, safe=""):
"""
Minimal URL encoding function to replace urllib.parse.quote
Args:
string (str): String to encode
safe (str): Characters that should not be encoded
Returns:
str: URL encoded string
"""
# Characters that need to be encoded (RFC 3986)
unreserved = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~"
# Add safe characters to unreserved set
allowed = unreserved + safe
encoded = ""
for char in string:
if char in allowed:
encoded += char
else:
# Convert to percent-encoded format
encoded += f"%{ord(char):02X}"
return encoded
def _zero_pad(num, count=2):
padded = str(num)
while len(padded) < count:
padded = "0" + padded
return padded
class PollyHTTPClient:
# pylint: disable=no-self-use
def __init__(self, requests_instance, access_key, secret_key, region="us-east-1"):
self._requests = requests_instance
self.access_key = access_key
self.secret_key = secret_key
self.region = region
self.service = "polly"
self.host = f"polly.{region}.amazonaws.com"
self.endpoint = f"https://{self.host}"
def _sign(self, key, msg):
"""Helper function for AWS signature"""
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(self, date_stamp):
"""Generate AWS signature key"""
k_date = self._sign(("AWS4" + self.secret_key).encode("utf-8"), date_stamp)
k_region = self._sign(k_date, self.region)
k_service = self._sign(k_region, self.service)
k_signing = self._sign(k_service, "aws4_request")
return k_signing
def _create_canonical_request(self, method, uri, query_string, headers, payload):
"""Create canonical request for AWS Signature V4"""
canonical_uri = url_encode(uri, safe="/")
canonical_querystring = query_string
# Create canonical headers
canonical_headers = ""
signed_headers = ""
header_names = sorted(headers.keys())
for name in header_names:
canonical_headers += f"{name.lower()}:{headers[name].strip()}\n"
signed_headers += f"{name.lower()};"
signed_headers = signed_headers[:-1] # Remove trailing semicolon
# Create payload hash
payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()
# Create canonical request
canonical_request = (f"{method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{payload_hash}")
return canonical_request, signed_headers
def _create_string_to_sign(self, timestamp, credential_scope, canonical_request):
"""Create string to sign for AWS Signature V4"""
algorithm = "AWS4-HMAC-SHA256"
canonical_request_hash = hashlib.sha256(
canonical_request.encode("utf-8")
).hexdigest()
string_to_sign = (
f"{algorithm}\n{timestamp}\n{credential_scope}\n{canonical_request_hash}"
)
return string_to_sign
def synthesize_speech( # pylint: disable=too-many-locals
self,
text,
voice_id="Joanna",
output_format="mp3",
engine="standard",
text_type="text",
):
"""
Synthesize speech using Amazon Polly via direct HTTP request
Args:
text (str): Text to convert to speech
voice_id (str): Voice to use (e.g., 'Joanna', 'Matthew', 'Amy')
output_format (str): Audio format ('mp3', 'ogg_vorbis', 'pcm')
engine (str): Engine type ('standard' or 'neural')
text_type (str): 'text' or 'ssml'
Returns:
bytes: Audio data if successful, None if failed
"""
# Prepare request
method = "POST"
uri = "/v1/speech"
# Create request body
request_body = {
"Text": text,
"OutputFormat": output_format,
"VoiceId": voice_id,
"Engine": engine,
"TextType": text_type,
}
payload = json.dumps(request_body)
# Get current time
now = datetime.now()
# amzdate = now.strftime('%Y%m%dT%H%M%SZ')
amzdate = (f"{now.year}{_zero_pad(now.month)}{_zero_pad(now.day)}"
f"T{_zero_pad(now.hour)}{_zero_pad(now.minute)}{_zero_pad(now.second)}Z")
# datestamp = now.strftime('%Y%m%d')
datestamp = f"{now.year}{_zero_pad(now.month)}{_zero_pad(now.day)}"
# Create headers
headers = {
"Content-Type": "application/x-amz-json-1.0",
"Host": self.host,
"X-Amz-Date": amzdate,
"X-Amz-Target": "AWSPollyService.SynthesizeSpeech",
}
# Create canonical request
canonical_request, signed_headers = self._create_canonical_request(
method, uri, "", headers, payload
)
# Create string to sign
credential_scope = f"{datestamp}/{self.region}/{self.service}/aws4_request"
string_to_sign = self._create_string_to_sign(
amzdate, credential_scope, canonical_request
)
# Create signature
signing_key = self._get_signature_key(datestamp)
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
# Add authorization header
authorization_header = (
f"AWS4-HMAC-SHA256 "
f"Credential={self.access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
headers["Authorization"] = authorization_header
# Make request
try:
url = f"{self.endpoint}{uri}"
response = self._requests.post(url, headers=headers, data=payload)
if response.status_code == 200:
return response.content
else:
print(f"Error: HTTP {response.status_code}")
print(f"Response: {response.text}")
return None
except Exception as e: # pylint: disable=broad-except
print(f"Request failed: {e}")
return None
def text_to_speech_polly_http(
requests_instance,
text,
access_key,
secret_key,
output_file="/saves/tts_output.mp3",
voice_id="Joanna",
region="us-east-1",
output_format="mp3",
):
"""
Simple function to convert text to speech using Polly HTTP API
Args:
text (str): Text to convert
access_key (str): AWS Access Key ID
secret_key (str): AWS Secret Access Key
output_file (str): Output file path
voice_id (str): Polly voice ID
region (str): AWS region
output_format (str): Audio format
Returns:
bool: True if successful, False otherwise
"""
# Create Polly client
client = PollyHTTPClient(requests_instance, access_key, secret_key, region)
# Synthesize speech
audio_data = client.synthesize_speech(
text=text, voice_id=voice_id, output_format=output_format
)
if audio_data:
# Save to file
try:
with open(output_file, "wb") as f:
f.write(audio_data)
print(f"Audio saved to: {output_file}")
return True
except Exception as e: # pylint: disable=broad-except
print(f"Failed to save file: {e}")
return False
else:
print("Failed to synthesize speech")
return False
def text_to_speech_with_ssml(
requests_instance,
text,
access_key,
secret_key,
speech_rate="medium",
output_file="output.mp3",
):
"""
Example with SSML for speech rate control
"""
# Wrap text in SSML
ssml_text = f'<speak><prosody rate="{speech_rate}">{text}</prosody></speak>'
client = PollyHTTPClient(requests_instance, access_key, secret_key)
audio_data = client.synthesize_speech(
text=ssml_text, voice_id="Joanna", text_type="ssml"
)
if audio_data:
with open(output_file, "wb") as f:
f.write(audio_data)
print(f"SSML audio saved to: {output_file}")
return True
return False
# Example usage
# if __name__ == "__main__":
# # Replace with your actual AWS credentials
# AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
# AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
#
# # Basic usage
# success = text_to_speech_polly_http(
# text="Hello from CircuitPython! This is Amazon Polly speaking.",
# access_key=AWS_ACCESS_KEY,
# secret_key=AWS_SECRET_KEY,
# voice_id="Joanna"
# )
# SSML example
# if success:
# text_to_speech_with_ssml(
# text="This speech has a custom rate using SSML markup.",
# access_key=AWS_ACCESS_KEY,
# secret_key=AWS_SECRET_KEY,
# speech_rate="slow",
# output_file="ssml_example.mp3"
# )
tts_aws.py
This file contains a WordFetcherTTS class implementation that uses the aws_poly.py functions above. The object exposes a minimal interface for the code.py to make use of. It's possible to change the TTS service from AWS to other cloud TTS providers, or even a local server on your own network running a free TTS model on a Raspberry Pi or PC. See this playground page for details on setting up a local TTS server and using it with Spell Jam.
# SPDX-FileCopyrightText: 2025 Tim Cocks for Adafruit Industries
# SPDX-License-Identifier: MIT
# tts_aws.py
import os
import adafruit_connection_manager
import adafruit_requests
from aws_polly import text_to_speech_polly_http
class WordFetcherTTS():
def __init__(self, fj=None, launcher_config=None, output_path="/saves/tts_output.mp3"):
self.output_path = output_path
self.fj = fj
self.launcher_config = launcher_config
# AWS auth requires us to have accurate date/time
fj.sync_time()
# setup adafruit_requests session
pool = adafruit_connection_manager.get_radio_socketpool(fj.network._wifi.esp)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(fj.network._wifi.esp)
self.requests = adafruit_requests.Session(pool, ssl_context)
self.AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
self.AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
def fetch_word(self, word: str, voice: str = "Joanna") -> bool:
if not self.AWS_ACCESS_KEY or not self.AWS_SECRET_KEY:
print("Missing AWS credentials.")
return False
if self.fj:
self.fj.neopixels.fill(0xFFFF00)
success = text_to_speech_polly_http(
self.requests,
text=word,
access_key=self.AWS_ACCESS_KEY,
secret_key=self.AWS_SECRET_KEY,
output_file=self.output_path,
voice_id=voice,
region="us-east-1",
output_format="mp3",
)
if self.fj:
self.fj.neopixels.fill(0x00FF00)
return success
hmac.py
The AWS signature process requires using a cryptographic function called HMAC. This file contains an implementation of that function for aws_polly.py use. The file was originally open sourced as part of the micropython-lib project.
# SPDX-FileCopyrightText: 2015 Paul Sokolovsky
# SPDX-License-Identifier: MIT
# https://github.com/micropython/micropython-lib
# https://github.com/micropython/micropython-lib/blob/master/LICENSE
# Implements the hmac module from the Python standard library.
class HMAC:
# pylint: disable=protected-access, import-outside-toplevel
def __init__(self, key, msg=None, digestmod=None):
if not isinstance(key, (bytes, bytearray)):
raise TypeError("key: expected bytes/bytearray")
import adafruit_hashlib as hashlib
if digestmod is None:
# TODO: Default hash algorithm is now deprecated.
digestmod = hashlib.md5
if callable(digestmod):
# A hashlib constructor returning a new hash object.
make_hash = digestmod # A
elif isinstance(digestmod, str):
# A hash name suitable for hashlib.new().
make_hash = lambda d=b"": getattr(hashlib, digestmod)(d)
else:
# A module supporting PEP 247.
make_hash = digestmod.new # C
self._outer = make_hash()
self._inner = make_hash()
self.digest_size = getattr(self._inner, "digest_size", None)
# If the provided hash doesn't support block_size (e.g. built-in
# hashlib), 64 is the correct default for all built-in hash
# functions (md5, sha1, sha256).
self.block_size = getattr(self._inner, "block_size", 64)
# Truncate to digest_size if greater than block_size.
if len(key) > self.block_size:
key = make_hash(key).digest()
# Pad to block size.
key = key + bytes(self.block_size - len(key))
self._outer.update(bytes(x ^ 0x5C for x in key))
self._inner.update(bytes(x ^ 0x36 for x in key))
if msg is not None:
self.update(msg)
@property
def name(self):
return "hmac-" + getattr(self._inner, "name", type(self._inner).__name__)
def update(self, msg):
self._inner.update(msg)
def copy(self):
if not hasattr(self._inner, "copy"):
# Not supported for built-in hash functions.
raise NotImplementedError()
# Call __new__ directly to avoid the expensive __init__.
other = self.__class__.__new__(self.__class__)
other.block_size = self.block_size
other.digest_size = self.digest_size
other._inner = self._inner.copy()
other._outer = self._outer.copy()
return other
def _current(self):
h = self._outer
if hasattr(h, "copy"):
# built-in hash functions don't support this, and as a result,
# digest() will finalise the hmac and further calls to
# update/digest will fail.
h = h.copy()
h.update(self._inner.digest())
return h
def digest(self):
h = self._current()
return h.digest()
def hexdigest(self):
import binascii
return str(binascii.hexlify(self.digest()), "utf-8")
def new(key, msg=None, digestmod=None):
return HMAC(key, msg, digestmod)
Page last edited November 24, 2025
Text editor powered by tinymce.