This is the code that will run on the bear. Since the API keys are stored in your environment variables, there's no need to modify the code.
# SPDX-FileCopyrightText: 2023 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import threading
import os
import sys
import time
import random
import configparser
from tempfile import NamedTemporaryFile
import azure.cognitiveservices.speech as speechsdk
from openai import OpenAI
import board
import digitalio
from adafruit_motorkit import MotorKit
from listener import Listener
API_KEYS_FILE = "~/keys.txt"
# ChatGPT Parameters
SYSTEM_ROLE = (
"You are a helpful voice assistant in the form of a talking teddy bear"
" that answers questions and gives information"
)
CHATGPT_MODEL = "gpt-3.5-turbo"
WHISPER_MODEL = "whisper-1"
# Azure Parameters
AZURE_SPEECH_VOICE = "en-GB-OliverNeural"
DEVICE_ID = None
# Speech Recognition Parameters
ENERGY_THRESHOLD = 1000 # Energy level for mic to detect
RECORD_TIMEOUT = 30
# Motor Parameters
ARM_MOVEMENT_TIME = 0.5
BASE_MOUTH_DURATION = 0.2 # A higher number means slower mouth movement
SPEECH_VARIANCE = 0.1 # Higher allows more mouth movement variance.
# It pauses for BASE_MOUTH_DURATION ± SPEECH_VARIANCE
MOTOR_DUTY_CYCLE = 1.0 # Lower provides less power to the motors
# Do some checks and Import API keys from API_KEYS_FILE
config = configparser.ConfigParser()
username = os.environ["USER"]
user_homedir = os.path.expanduser(f"~{username}")
API_KEYS_FILE = API_KEYS_FILE.replace("~", user_homedir)
def get_config_value(section, key, min_length=None):
if not config.has_section(section):
print("Please make sure API_KEYS_FILE points to "
f"a valid file and has an [{section}] section.")
sys.exit(1)
if key not in config[section]:
print(
f"Please make sure your API keys file contains an {key} under the {section} section."
)
sys.exit(1)
value = config[section][key]
if min_length and len(value) < min_length:
print(f"Please set {key} in your API keys file with a valid key.")
sys.exit(1)
return config[section][key]
print(os.path.expanduser(API_KEYS_FILE))
config.read(os.path.expanduser(API_KEYS_FILE))
openai = OpenAI(
# This is the default and can be omitted
api_key=get_config_value("openai", "OPENAI_API_KEY", 10)
)
speech_key = get_config_value("azure", "SPEECH_KEY", 15)
service_region = get_config_value("azure", "SPEECH_REGION")
speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region)
speech_config.speech_synthesis_voice_name = AZURE_SPEECH_VOICE
def sendchat(prompt):
response = ""
stream = openai.chat.completions.create(
model=CHATGPT_MODEL,
messages=[
{"role": "system", "content": SYSTEM_ROLE},
{"role": "user", "content": prompt},
],
stream=True,
)
# Send the heard text to ChatGPT and return the result
for chunk in stream:
if chunk.choices[0].delta.content is not None:
response += chunk.choices[0].delta.content
# Send the heard text to ChatGPT and return the result
return response
def transcribe(wav_data):
# Read the transcription.
print("Transcribing...")
attempts = 0
while attempts < 3:
try:
with NamedTemporaryFile(suffix=".wav") as temp_file:
result = openai.Audio.translate_raw(
WHISPER_MODEL, wav_data, temp_file.name
)
return result["text"].strip()
except (openai.error.ServiceUnavailableError, openai.error.APIError):
time.sleep(3)
attempts += 1
return "I wasn't able to understand you. Please repeat that."
class Bear:
def __init__(self, azure_speech_config):
kit = MotorKit(i2c=board.I2C())
self._arms_motor = kit.motor1
self._mouth_motor = kit.motor2
# Setup Foot Button
self._foot_button = digitalio.DigitalInOut(board.D16)
self._foot_button.direction = digitalio.Direction.INPUT
self._foot_button.pull = digitalio.Pull.UP
self.do_mouth_movement = False
self._mouth_thread = threading.Thread(target=self.move_mouth, daemon=True)
self._mouth_thread.start()
if DEVICE_ID is None:
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True)
else:
audio_config = speechsdk.audio.AudioOutputConfig(device_name=DEVICE_ID)
self._speech_synthesizer = speechsdk.SpeechSynthesizer(
speech_config=azure_speech_config, audio_config=audio_config
)
self._speech_synthesizer.synthesizing.connect(self.start_moving_mouth)
self._speech_synthesizer.synthesis_completed.connect(self.stop_moving_mouth)
def start_moving_mouth(self, _event):
self.do_mouth_movement = True
def stop_moving_mouth(self, _event):
self.do_mouth_movement = False
def deinit(self):
self.do_mouth_movement = False
self._mouth_thread.join()
self._arms_motor.throttle = None
self._mouth_motor.throttle = None
self._speech_synthesizer.synthesis_started.disconnect_all()
self._speech_synthesizer.synthesis_completed.disconnect_all()
def _move_arms_motor(self, dir_up=True):
direction = -1 if dir_up else 1
self._arms_motor.throttle = MOTOR_DUTY_CYCLE * direction
time.sleep(ARM_MOVEMENT_TIME)
# Remove Power from the motor to avoid overheating
self._arms_motor.throttle = None
def _move_mouth_motor(self, dir_open=True):
duration = (
BASE_MOUTH_DURATION
+ random.random() * SPEECH_VARIANCE
- (SPEECH_VARIANCE / 2)
)
# Only power the motor while opening and let the spring close it
self._mouth_motor.throttle = MOTOR_DUTY_CYCLE if dir_open else None
time.sleep(duration)
# Remove Power from the motor and let close to avoid overheating
self._mouth_motor.throttle = None
def foot_pressed(self):
return not self._foot_button.value
def move_mouth(self):
print("Starting mouth movement thread")
while True:
if self.do_mouth_movement:
self._move_mouth_motor(dir_open=True)
self._move_mouth_motor(dir_open=False)
def move_arms(self, hide=True):
self._move_arms_motor(dir_up=hide)
def speak(self, text):
result = self._speech_synthesizer.speak_text_async(text).get()
# Check result
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
print("Speech synthesized for text [{}]".format(text))
elif result.reason == speechsdk.ResultReason.Canceled:
cancellation_details = result.cancellation_details
print("Speech synthesis canceled: {}".format(cancellation_details.reason))
if cancellation_details.reason == speechsdk.CancellationReason.Error:
print("Error details: {}".format(cancellation_details.error_details))
def main():
listener = Listener(openai.api_key, ENERGY_THRESHOLD, RECORD_TIMEOUT)
bear = Bear(speech_config)
transcription = [""]
bear.speak(
"Hello there! Just give my left foot a squeeze if you would like to get my attention."
)
while True:
try:
# If button is pressed, start listening
if bear.foot_pressed():
bear.speak("How may I help you?")
listener.listen()
if listener.speech_waiting():
bear.speak("Let me think about that")
bear.move_arms(hide=True)
text = listener.recognize()
if text:
transcription.append(text)
print(f"Phrase Complete. Sent '{text}' to ChatGPT.")
chat_response = sendchat(text)
transcription.append(f"> {chat_response}")
print("Got response from ChatGPT. Beginning speech synthesis.")
bear.move_arms(hide=False)
bear.speak(chat_response)
os.system("clear")
for line in transcription:
print(line)
print("", end="", flush=True)
time.sleep(0.25)
except KeyboardInterrupt:
break
bear.deinit()
if __name__ == "__main__":
main()
cd ~ wget https://raw.githubusercontent.com/adafruit/Adafruit_Learning_System_Guides/main/ChatGPT_Bear/assistant.py
How the Python Code Works
This is an overview about how the code works. It's mostly a combination of code to interface between:
- Azure Speech Services API
- OpenAI ChatGPT API
- OpenAI Whisper API
- SpeechRecognition Library
- CircuitPython Motor Library
The code uses threading and events in order to be able to move the mouth motors at the same time the audio output from the Azure Speech Services is playing.
Parameters
The code starts out with a number of parameters that can be altered. Here are the main parameters that you might want to change:
- The
SYSTEM_ROLEparameter is a description to ChatGPT about how it should act. - The
CHATGPT_MODELandWHISPER_MODELparameters can be updated as newer versions of the API become available. - The
AZURE_SPEECH_VOICEis the voice name that can be altered to change how the bear sounds. See the Create an account with Azure page for more details on selecting a different voice in Azure Speech Services. -
The
RECORD_TIMEOUTis the amount of time in seconds that the bear will listen after pushing the button on the foot. - The
DEVICE_IDis for specifying the ID of a particular Audio Output Device. See the Troubleshooting section on the Usage page of this guide to find the right value. Leaving it set toNonewill try and use the system default device.
The remaining parameters have already been tuned pretty well, but you are welcome to experiment with them.
# ChatGPT Parameters
SYSTEM_ROLE = (
"You are a helpful voice assistant in the form of a talking teddy bear"
" that answers questions and gives information"
)
CHATGPT_MODEL = "gpt-3.5-turbo"
WHISPER_MODEL = "whisper-1"
# Azure Parameters
AZURE_SPEECH_VOICE = "en-GB-OliverNeural"
DEVICE_ID = None
# Speech Recognition Parameters
ENERGY_THRESHOLD = 1000 # Energy level for mic to detect
PHRASE_TIMEOUT = 3.0 # Space between recordings for sepating phrases
RECORD_TIMEOUT = 30
# Motor Parameters
ARM_MOVEMENT_TIME = 0.5
BASE_MOUTH_DURATION = 0.2 # A higher number means slower mouth movement
SPEECH_VARIANCE = 0.1 # Higher allows more mouth movement variance.
# It pauses for BASE_MOUTH_DURATION ± SPEECH_VARIANCE
MOTOR_DUTY_CYCLE = 1.0 # Lower provides less power to the motors
# Do some checks and Import API keys from API_KEYS_FILE
config = configparser.ConfigParser()
username = os.environ["USER"]
user_homedir = os.path.expanduser(f"~{username}")
API_KEYS_FILE = API_KEYS_FILE.replace("~", user_homedir)
def get_config_value(section, key, min_length=None):
if not config.has_section(section):
print("Please make sure API_KEYS_FILE points to "
f"a valid file and has an [{section}] section.")
sys.exit(1)
if key not in config[section]:
print(
f"Please make sure your API keys file contains an {key} under the {section} section."
)
sys.exit(1)
value = config[section][key]
if min_length and len(value) < min_length:
print(f"Please set {key} in your API keys file with a valid key.")
sys.exit(1)
return config[section][key]
print(os.path.expanduser(API_KEYS_FILE))
config.read(os.path.expanduser(API_KEYS_FILE))
openai = OpenAI(
# This is the default and can be omitted
api_key=get_config_value("openai", "OPENAI_API_KEY", 10)
)
speech_key = get_config_value("azure", "SPEECH_KEY", 15)
service_region = get_config_value("azure", "SPEECH_REGION")
speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region)
speech_config.speech_synthesis_voice_name = AZURE_SPEECH_VOICE
OpenAI API Interface Helpers
The sendchat and transcribe functions are for packaging and sending data to the ChatGPT and Whisper APIs respectively.
def sendchat(prompt):
response = ""
stream = openai.chat.completions.create(
model=CHATGPT_MODEL,
messages=[
{"role": "system", "content": SYSTEM_ROLE},
{"role": "user", "content": prompt},
],
stream=True,
)
# Send the heard text to ChatGPT and return the result
for chunk in stream:
if chunk.choices[0].delta.content is not None:
response += chunk.choices[0].delta.content
# Send the heard text to ChatGPT and return the result
return response
def transcribe(wav_data):
# Read the transcription.
print("Transcribing...")
attempts = 0
while attempts < 3:
try:
with NamedTemporaryFile(suffix=".wav") as temp_file:
result = openai.Audio.translate_raw(
WHISPER_MODEL, wav_data, temp_file.name
)
return result["text"].strip()
except (openai.error.ServiceUnavailableError, openai.error.APIError):
time.sleep(3)
attempts += 1
return "I wasn't able to understand you. Please repeat that."
The Listener Class
The Listener class is now in its own file and interfaces with the SpeechRecognition library to handle listening for speech and returning the prepared audio data.
# SPDX-FileCopyrightText: 2023 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import speech_recognition as sr
class Listener:
def __init__(
self, api_key, energy_threshold=300, record_timeout=30
):
self.listener_handle = None
self.microphone = sr.Microphone()
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = energy_threshold
self.recognizer.dynamic_energy_threshold = False
self.recognizer.pause_threshold = 1
self.phrase_time = time.monotonic()
with self.microphone as source:
self.recognizer.adjust_for_ambient_noise(
source
) # we only need to calibrate once, before we start listening
self.record_timeout = record_timeout
self._audio = None
self.listener_handle = None
self.api_key = api_key
def listen(self, ready_callback=None):
print("Start listening...")
self._start_listening()
if ready_callback:
ready_callback()
while (
self.listener_handle and not self.speech_waiting()
):
time.sleep(0.1)
self.stop_listening()
def _save_audio_callback(self, _, audio):
print("Saving audio")
self._audio = audio
def _start_listening(self):
if not self.listener_handle:
self.listener_handle = self.recognizer.listen_in_background(
self.microphone,
self._save_audio_callback,
phrase_time_limit=self.record_timeout,
)
def stop_listening(self, wait_for_stop=False):
if self.listener_handle:
self.listener_handle(wait_for_stop=wait_for_stop)
self.listener_handle = None
print("Stop listening...")
def is_listening(self):
return self.listener_handle is not None
def speech_waiting(self):
return self._audio is not None
def recognize(self):
if self._audio:
# Transcribe the audio data to text using Whisper
print("Recognizing...")
attempts = 0
while attempts < 3:
try:
result = self.recognizer.recognize_whisper_api(
self._audio, api_key=self.api_key
)
self._audio = None
return result.strip()
except sr.RequestError as e:
print(f"Error: {e}")
time.sleep(3)
attempts += 1
print("Retry attempt: ", attempts)
print("Failed to recognize")
return None
return None
The Bear Class
The Bear class initializes and sets up the motors and button. It handles the timing of the motors and the speech synthesis.
One interesting function inside of this class is the move_mouth function which is threaded. It is in a constant loop and will constantly move the mouth while the do_mouth_movement variable is true. This makes turning the mouth movement on and off very easy.
In the initialization, there are also a couple of events that are triggered by the Azure Speech Services API to time the mouth movement to the speech much better. The way these events are set up is by using the connect function and passing in the function that will be called when triggered.
class Bear:
def __init__(self, azure_speech_config):
kit = MotorKit(i2c=board.I2C())
self._arms_motor = kit.motor1
self._mouth_motor = kit.motor2
# Setup Foot Button
self._foot_button = digitalio.DigitalInOut(board.D16)
self._foot_button.direction = digitalio.Direction.INPUT
self._foot_button.pull = digitalio.Pull.UP
self.do_mouth_movement = False
self._mouth_thread = threading.Thread(target=self.move_mouth, daemon=True)
self._mouth_thread.start()
if DEVICE_ID is None:
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True)
else:
audio_config = speechsdk.audio.AudioOutputConfig(device_name=DEVICE_ID)
self._speech_synthesizer = speechsdk.SpeechSynthesizer(
speech_config=azure_speech_config, audio_config=audio_config
)
self._speech_synthesizer.synthesizing.connect(self.start_moving_mouth)
self._speech_synthesizer.synthesis_completed.connect(self.stop_moving_mouth)
def start_moving_mouth(self, _event):
self.do_mouth_movement = True
def stop_moving_mouth(self, _event):
self.do_mouth_movement = False
def deinit(self):
self.do_mouth_movement = False
self._mouth_thread.join()
self._arms_motor.throttle = None
self._mouth_motor.throttle = None
self._speech_synthesizer.synthesis_started.disconnect_all()
self._speech_synthesizer.synthesis_completed.disconnect_all()
def _move_arms_motor(self, dir_up=True):
direction = -1 if dir_up else 1
self._arms_motor.throttle = MOTOR_DUTY_CYCLE * direction
time.sleep(ARM_MOVEMENT_TIME)
# Remove Power from the motor to avoid overheating
self._arms_motor.throttle = None
def _move_mouth_motor(self, dir_open=True):
duration = (
BASE_MOUTH_DURATION
+ random.random() * SPEECH_VARIANCE
- (SPEECH_VARIANCE / 2)
)
# Only power the motor while opening and let the spring close it
self._mouth_motor.throttle = MOTOR_DUTY_CYCLE if dir_open else None
time.sleep(duration)
# Remove Power from the motor and let close to avoid overheating
self._mouth_motor.throttle = None
def foot_pressed(self):
return not self._foot_button.value
def move_mouth(self):
print("Starting mouth movement thread")
while True:
if self.do_mouth_movement:
self._move_mouth_motor(dir_open=True)
self._move_mouth_motor(dir_open=False)
def move_arms(self, hide=True):
self._move_arms_motor(dir_up=hide)
def speak(self, text):
result = self._speech_synthesizer.speak_text_async(text).get()
# Check result
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
print("Speech synthesized for text [{}]".format(text))
elif result.reason == speechsdk.ResultReason.Canceled:
cancellation_details = result.cancellation_details
print("Speech synthesis canceled: {}".format(cancellation_details.reason))
if cancellation_details.reason == speechsdk.CancellationReason.Error:
print("Error details: {}".format(cancellation_details.error_details))
The Main Function Loop
The main function starts off by setting up instances of the Bear and Listener classes. It also sets up a transcript that it maintains throughout interactions. This allows for you to see what is being heard and what the bear is returning in a text format.
The Main Loop waits to the foot to be presses. Once pressed, it listens for up to 30 seconds or until it determines a phase has been spoken. Once it has a phrase, it is processed and the bear moves and responds appropriately.
Once you exit the main loop, a few things are deinitialized in order to free up resources and hardware so that the bear isn't drawing excess power.
def main():
listener = Listener(openai.api_key, ENERGY_THRESHOLD, RECORD_TIMEOUT)
bear = Bear(speech_config)
transcription = [""]
bear.speak(
"Hello there! Just give my left foot a squeeze if you would like to get my attention."
)
while True:
try:
# If button is pressed, start listening
if bear.foot_pressed():
bear.speak("How may I help you?")
listener.listen()
if listener.speech_waiting():
bear.speak("Let me think about that")
bear.move_arms(hide=True)
text = listener.recognize()
if text:
transcription.append(text)
print(f"Phrase Complete. Sent '{text}' to ChatGPT.")
chat_response = sendchat(text)
transcription.append(f"> {chat_response}")
print("Got response from ChatGPT. Beginning speech synthesis.")
bear.move_arms(hide=False)
bear.speak(chat_response)
os.system("clear")
for line in transcription:
print(line)
print("", end="", flush=True)
time.sleep(0.25)
except KeyboardInterrupt:
break
bear.deinit()
Page last edited January 22, 2025
Text editor powered by tinymce.