This is the code that will run on the bear. Since the API keys are stored in your environment variables, there's no need to modify the code.
# SPDX-FileCopyrightText: 2023 Melissa LeBlanc-Williams for Adafruit Industries # # SPDX-License-Identifier: MIT import threading import os import sys import time import random import configparser from tempfile import NamedTemporaryFile import azure.cognitiveservices.speech as speechsdk from openai import OpenAI import board import digitalio from adafruit_motorkit import MotorKit from listener import Listener API_KEYS_FILE = "~/keys.txt" # ChatGPT Parameters SYSTEM_ROLE = ( "You are a helpful voice assistant in the form of a talking teddy bear" " that answers questions and gives information" ) CHATGPT_MODEL = "gpt-3.5-turbo" WHISPER_MODEL = "whisper-1" # Azure Parameters AZURE_SPEECH_VOICE = "en-GB-OliverNeural" DEVICE_ID = None # Speech Recognition Parameters ENERGY_THRESHOLD = 1000 # Energy level for mic to detect RECORD_TIMEOUT = 30 # Motor Parameters ARM_MOVEMENT_TIME = 0.5 BASE_MOUTH_DURATION = 0.2 # A higher number means slower mouth movement SPEECH_VARIANCE = 0.1 # Higher allows more mouth movement variance. # It pauses for BASE_MOUTH_DURATION ± SPEECH_VARIANCE MOTOR_DUTY_CYCLE = 1.0 # Lower provides less power to the motors # Do some checks and Import API keys from API_KEYS_FILE config = configparser.ConfigParser() username = os.environ["USER"] user_homedir = os.path.expanduser(f"~{username}") API_KEYS_FILE = API_KEYS_FILE.replace("~", user_homedir) def get_config_value(section, key, min_length=None): if not config.has_section(section): print("Please make sure API_KEYS_FILE points to " f"a valid file and has an [{section}] section.") sys.exit(1) if key not in config[section]: print( f"Please make sure your API keys file contains an {key} under the {section} section." ) sys.exit(1) value = config[section][key] if min_length and len(value) < min_length: print(f"Please set {key} in your API keys file with a valid key.") sys.exit(1) return config[section][key] print(os.path.expanduser(API_KEYS_FILE)) config.read(os.path.expanduser(API_KEYS_FILE)) openai = OpenAI( # This is the default and can be omitted api_key=get_config_value("openai", "OPENAI_API_KEY", 10) ) speech_key = get_config_value("azure", "SPEECH_KEY", 15) service_region = get_config_value("azure", "SPEECH_REGION") speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region) speech_config.speech_synthesis_voice_name = AZURE_SPEECH_VOICE def sendchat(prompt): response = "" stream = openai.chat.completions.create( model=CHATGPT_MODEL, messages=[ {"role": "system", "content": SYSTEM_ROLE}, {"role": "user", "content": prompt}, ], stream=True, ) # Send the heard text to ChatGPT and return the result for chunk in stream: if chunk.choices[0].delta.content is not None: response += chunk.choices[0].delta.content # Send the heard text to ChatGPT and return the result return response def transcribe(wav_data): # Read the transcription. print("Transcribing...") attempts = 0 while attempts < 3: try: with NamedTemporaryFile(suffix=".wav") as temp_file: result = openai.Audio.translate_raw( WHISPER_MODEL, wav_data, temp_file.name ) return result["text"].strip() except (openai.error.ServiceUnavailableError, openai.error.APIError): time.sleep(3) attempts += 1 return "I wasn't able to understand you. Please repeat that." class Bear: def __init__(self, azure_speech_config): kit = MotorKit(i2c=board.I2C()) self._arms_motor = kit.motor1 self._mouth_motor = kit.motor2 # Setup Foot Button self._foot_button = digitalio.DigitalInOut(board.D16) self._foot_button.direction = digitalio.Direction.INPUT self._foot_button.pull = digitalio.Pull.UP self.do_mouth_movement = False self._mouth_thread = threading.Thread(target=self.move_mouth, daemon=True) self._mouth_thread.start() if DEVICE_ID is None: audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True) else: audio_config = speechsdk.audio.AudioOutputConfig(device_name=DEVICE_ID) self._speech_synthesizer = speechsdk.SpeechSynthesizer( speech_config=azure_speech_config, audio_config=audio_config ) self._speech_synthesizer.synthesizing.connect(self.start_moving_mouth) self._speech_synthesizer.synthesis_completed.connect(self.stop_moving_mouth) def start_moving_mouth(self, _event): self.do_mouth_movement = True def stop_moving_mouth(self, _event): self.do_mouth_movement = False def deinit(self): self.do_mouth_movement = False self._mouth_thread.join() self._arms_motor.throttle = None self._mouth_motor.throttle = None self._speech_synthesizer.synthesis_started.disconnect_all() self._speech_synthesizer.synthesis_completed.disconnect_all() def _move_arms_motor(self, dir_up=True): direction = -1 if dir_up else 1 self._arms_motor.throttle = MOTOR_DUTY_CYCLE * direction time.sleep(ARM_MOVEMENT_TIME) # Remove Power from the motor to avoid overheating self._arms_motor.throttle = None def _move_mouth_motor(self, dir_open=True): duration = ( BASE_MOUTH_DURATION + random.random() * SPEECH_VARIANCE - (SPEECH_VARIANCE / 2) ) # Only power the motor while opening and let the spring close it self._mouth_motor.throttle = MOTOR_DUTY_CYCLE if dir_open else None time.sleep(duration) # Remove Power from the motor and let close to avoid overheating self._mouth_motor.throttle = None def foot_pressed(self): return not self._foot_button.value def move_mouth(self): print("Starting mouth movement thread") while True: if self.do_mouth_movement: self._move_mouth_motor(dir_open=True) self._move_mouth_motor(dir_open=False) def move_arms(self, hide=True): self._move_arms_motor(dir_up=hide) def speak(self, text): result = self._speech_synthesizer.speak_text_async(text).get() # Check result if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: print("Speech synthesized for text [{}]".format(text)) elif result.reason == speechsdk.ResultReason.Canceled: cancellation_details = result.cancellation_details print("Speech synthesis canceled: {}".format(cancellation_details.reason)) if cancellation_details.reason == speechsdk.CancellationReason.Error: print("Error details: {}".format(cancellation_details.error_details)) def main(): listener = Listener(openai.api_key, ENERGY_THRESHOLD, RECORD_TIMEOUT) bear = Bear(speech_config) transcription = [""] bear.speak( "Hello there! Just give my left foot a squeeze if you would like to get my attention." ) while True: try: # If button is pressed, start listening if bear.foot_pressed(): bear.speak("How may I help you?") listener.listen() if listener.speech_waiting(): bear.speak("Let me think about that") bear.move_arms(hide=True) text = listener.recognize() if text: transcription.append(text) print(f"Phrase Complete. Sent '{text}' to ChatGPT.") chat_response = sendchat(text) transcription.append(f"> {chat_response}") print("Got response from ChatGPT. Beginning speech synthesis.") bear.move_arms(hide=False) bear.speak(chat_response) os.system("clear") for line in transcription: print(line) print("", end="", flush=True) time.sleep(0.25) except KeyboardInterrupt: break bear.deinit() if __name__ == "__main__": main()
cd ~ wget https://raw.githubusercontent.com/adafruit/Adafruit_Learning_System_Guides/main/ChatGPT_Bear/assistant.py
How the Python Code Works
This is an overview about how the code works. It's mostly a combination of code to interface between:
- Azure Speech Services API
- OpenAI ChatGPT API
- OpenAI Whisper API
- SpeechRecognition Library
- CircuitPython Motor Library
The code uses threading and events in order to be able to move the mouth motors at the same time the audio output from the Azure Speech Services is playing.
Parameters
The code starts out with a number of parameters that can be altered. Here are the main parameters that you might want to change:
- The
SYSTEM_ROLE
parameter is a description to ChatGPT about how it should act. - The
CHATGPT_MODEL
andWHISPER_MODEL
parameters can be updated as newer versions of the API become available. - The
AZURE_SPEECH_VOICE
is the voice name that can be altered to change how the bear sounds. See the Create an account with Azure page for more details on selecting a different voice in Azure Speech Services. -
The
RECORD_TIMEOUT
is the amount of time in seconds that the bear will listen after pushing the button on the foot. - The
DEVICE_ID
is for specifying the ID of a particular Audio Output Device. See the Troubleshooting section on the Usage page of this guide to find the right value. Leaving it set toNone
will try and use the system default device.
The remaining parameters have already been tuned pretty well, but you are welcome to experiment with them.
# ChatGPT Parameters SYSTEM_ROLE = ( "You are a helpful voice assistant in the form of a talking teddy bear" " that answers questions and gives information" ) CHATGPT_MODEL = "gpt-3.5-turbo" WHISPER_MODEL = "whisper-1" # Azure Parameters AZURE_SPEECH_VOICE = "en-GB-OliverNeural" DEVICE_ID = None # Speech Recognition Parameters ENERGY_THRESHOLD = 1000 # Energy level for mic to detect PHRASE_TIMEOUT = 3.0 # Space between recordings for sepating phrases RECORD_TIMEOUT = 30 # Motor Parameters ARM_MOVEMENT_TIME = 0.5 BASE_MOUTH_DURATION = 0.2 # A higher number means slower mouth movement SPEECH_VARIANCE = 0.1 # Higher allows more mouth movement variance. # It pauses for BASE_MOUTH_DURATION ± SPEECH_VARIANCE MOTOR_DUTY_CYCLE = 1.0 # Lower provides less power to the motors
Initialization
Grab the API parameters from the keys.txt file and create the speech configuration.
# Do some checks and Import API keys from API_KEYS_FILE config = configparser.ConfigParser() username = os.environ["USER"] user_homedir = os.path.expanduser(f"~{username}") API_KEYS_FILE = API_KEYS_FILE.replace("~", user_homedir) def get_config_value(section, key, min_length=None): if not config.has_section(section): print("Please make sure API_KEYS_FILE points to " f"a valid file and has an [{section}] section.") sys.exit(1) if key not in config[section]: print( f"Please make sure your API keys file contains an {key} under the {section} section." ) sys.exit(1) value = config[section][key] if min_length and len(value) < min_length: print(f"Please set {key} in your API keys file with a valid key.") sys.exit(1) return config[section][key] print(os.path.expanduser(API_KEYS_FILE)) config.read(os.path.expanduser(API_KEYS_FILE)) openai = OpenAI( # This is the default and can be omitted api_key=get_config_value("openai", "OPENAI_API_KEY", 10) ) speech_key = get_config_value("azure", "SPEECH_KEY", 15) service_region = get_config_value("azure", "SPEECH_REGION") speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region) speech_config.speech_synthesis_voice_name = AZURE_SPEECH_VOICE
OpenAI API Interface Helpers
The sendchat
and transcribe
functions are for packaging and sending data to the ChatGPT and Whisper APIs respectively.
def sendchat(prompt): response = "" stream = openai.chat.completions.create( model=CHATGPT_MODEL, messages=[ {"role": "system", "content": SYSTEM_ROLE}, {"role": "user", "content": prompt}, ], stream=True, ) # Send the heard text to ChatGPT and return the result for chunk in stream: if chunk.choices[0].delta.content is not None: response += chunk.choices[0].delta.content # Send the heard text to ChatGPT and return the result return response def transcribe(wav_data): # Read the transcription. print("Transcribing...") attempts = 0 while attempts < 3: try: with NamedTemporaryFile(suffix=".wav") as temp_file: result = openai.Audio.translate_raw( WHISPER_MODEL, wav_data, temp_file.name ) return result["text"].strip() except (openai.error.ServiceUnavailableError, openai.error.APIError): time.sleep(3) attempts += 1 return "I wasn't able to understand you. Please repeat that."
The Listener Class
The Listener
class is now in its own file and interfaces with the SpeechRecognition library to handle listening for speech and returning the prepared audio data.
# SPDX-FileCopyrightText: 2023 Melissa LeBlanc-Williams for Adafruit Industries # # SPDX-License-Identifier: MIT import time import speech_recognition as sr class Listener: def __init__( self, api_key, energy_threshold=300, record_timeout=30 ): self.listener_handle = None self.microphone = sr.Microphone() self.recognizer = sr.Recognizer() self.recognizer.energy_threshold = energy_threshold self.recognizer.dynamic_energy_threshold = False self.recognizer.pause_threshold = 1 self.phrase_time = time.monotonic() with self.microphone as source: self.recognizer.adjust_for_ambient_noise( source ) # we only need to calibrate once, before we start listening self.record_timeout = record_timeout self._audio = None self.listener_handle = None self.api_key = api_key def listen(self, ready_callback=None): print("Start listening...") self._start_listening() if ready_callback: ready_callback() while ( self.listener_handle and not self.speech_waiting() ): time.sleep(0.1) self.stop_listening() def _save_audio_callback(self, _, audio): print("Saving audio") self._audio = audio def _start_listening(self): if not self.listener_handle: self.listener_handle = self.recognizer.listen_in_background( self.microphone, self._save_audio_callback, phrase_time_limit=self.record_timeout, ) def stop_listening(self, wait_for_stop=False): if self.listener_handle: self.listener_handle(wait_for_stop=wait_for_stop) self.listener_handle = None print("Stop listening...") def is_listening(self): return self.listener_handle is not None def speech_waiting(self): return self._audio is not None def recognize(self): if self._audio: # Transcribe the audio data to text using Whisper print("Recognizing...") attempts = 0 while attempts < 3: try: result = self.recognizer.recognize_whisper_api( self._audio, api_key=self.api_key ) self._audio = None return result.strip() except sr.RequestError as e: print(f"Error: {e}") time.sleep(3) attempts += 1 print("Retry attempt: ", attempts) print("Failed to recognize") return None return None
The Bear Class
The Bear
class initializes and sets up the motors and button. It handles the timing of the motors and the speech synthesis.
One interesting function inside of this class is the move_mouth
function which is threaded. It is in a constant loop and will constantly move the mouth while the do_mouth_movement
variable is true. This makes turning the mouth movement on and off very easy.
In the initialization, there are also a couple of events that are triggered by the Azure Speech Services API to time the mouth movement to the speech much better. The way these events are set up is by using the connect
function and passing in the function that will be called when triggered.
class Bear: def __init__(self, azure_speech_config): kit = MotorKit(i2c=board.I2C()) self._arms_motor = kit.motor1 self._mouth_motor = kit.motor2 # Setup Foot Button self._foot_button = digitalio.DigitalInOut(board.D16) self._foot_button.direction = digitalio.Direction.INPUT self._foot_button.pull = digitalio.Pull.UP self.do_mouth_movement = False self._mouth_thread = threading.Thread(target=self.move_mouth, daemon=True) self._mouth_thread.start() if DEVICE_ID is None: audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True) else: audio_config = speechsdk.audio.AudioOutputConfig(device_name=DEVICE_ID) self._speech_synthesizer = speechsdk.SpeechSynthesizer( speech_config=azure_speech_config, audio_config=audio_config ) self._speech_synthesizer.synthesizing.connect(self.start_moving_mouth) self._speech_synthesizer.synthesis_completed.connect(self.stop_moving_mouth) def start_moving_mouth(self, _event): self.do_mouth_movement = True def stop_moving_mouth(self, _event): self.do_mouth_movement = False def deinit(self): self.do_mouth_movement = False self._mouth_thread.join() self._arms_motor.throttle = None self._mouth_motor.throttle = None self._speech_synthesizer.synthesis_started.disconnect_all() self._speech_synthesizer.synthesis_completed.disconnect_all() def _move_arms_motor(self, dir_up=True): direction = -1 if dir_up else 1 self._arms_motor.throttle = MOTOR_DUTY_CYCLE * direction time.sleep(ARM_MOVEMENT_TIME) # Remove Power from the motor to avoid overheating self._arms_motor.throttle = None def _move_mouth_motor(self, dir_open=True): duration = ( BASE_MOUTH_DURATION + random.random() * SPEECH_VARIANCE - (SPEECH_VARIANCE / 2) ) # Only power the motor while opening and let the spring close it self._mouth_motor.throttle = MOTOR_DUTY_CYCLE if dir_open else None time.sleep(duration) # Remove Power from the motor and let close to avoid overheating self._mouth_motor.throttle = None def foot_pressed(self): return not self._foot_button.value def move_mouth(self): print("Starting mouth movement thread") while True: if self.do_mouth_movement: self._move_mouth_motor(dir_open=True) self._move_mouth_motor(dir_open=False) def move_arms(self, hide=True): self._move_arms_motor(dir_up=hide) def speak(self, text): result = self._speech_synthesizer.speak_text_async(text).get() # Check result if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: print("Speech synthesized for text [{}]".format(text)) elif result.reason == speechsdk.ResultReason.Canceled: cancellation_details = result.cancellation_details print("Speech synthesis canceled: {}".format(cancellation_details.reason)) if cancellation_details.reason == speechsdk.CancellationReason.Error: print("Error details: {}".format(cancellation_details.error_details))
The Main Function Loop
The main function starts off by setting up instances of the Bear
and Listener
classes. It also sets up a transcript that it maintains throughout interactions. This allows for you to see what is being heard and what the bear is returning in a text format.
The Main Loop waits to the foot to be presses. Once pressed, it listens for up to 30 seconds or until it determines a phase has been spoken. Once it has a phrase, it is processed and the bear moves and responds appropriately.
Once you exit the main loop, a few things are deinitialized in order to free up resources and hardware so that the bear isn't drawing excess power.
def main(): listener = Listener(openai.api_key, ENERGY_THRESHOLD, RECORD_TIMEOUT) bear = Bear(speech_config) transcription = [""] bear.speak( "Hello there! Just give my left foot a squeeze if you would like to get my attention." ) while True: try: # If button is pressed, start listening if bear.foot_pressed(): bear.speak("How may I help you?") listener.listen() if listener.speech_waiting(): bear.speak("Let me think about that") bear.move_arms(hide=True) text = listener.recognize() if text: transcription.append(text) print(f"Phrase Complete. Sent '{text}' to ChatGPT.") chat_response = sendchat(text) transcription.append(f"> {chat_response}") print("Got response from ChatGPT. Beginning speech synthesis.") bear.move_arms(hide=False) bear.speak(chat_response) os.system("clear") for line in transcription: print(line) print("", end="", flush=True) time.sleep(0.25) except KeyboardInterrupt: break bear.deinit()
Text editor powered by tinymce.