Install the Required Libraries
You will need to have a few libraries installed before the script will run on your Raspberry Pi.
Install the required CircuitPython libraries with the terminal:
pip3 install adafruit-circuitpython-bitmap-font pip3 install adafruit-circuitpython-display-shapes pip3 install adafruit-circuitpython-display-text pip3 install adafruit-circuitpython-imageload pip3 install adafruit-blinka-displayio pip3 install blinka-displayio-pygamedisplay
Then, install the required Python libraries with the terminal:
pip3 install subprocess pip3 install datetime pip3 install pygame pip3 install recordclass pip3 install requests
Finally, install the required Azure IoT Python libraries with the terminal:
pip3 install azure-iot-hub pip3 install azure-event-hub
Download the Code, Image and Font Files
Once you've finished setting up your Raspberry Pi with Blinka and the library dependencies, you can access the Python code, bitmap image file and two font files by downloading the Project Bundle.
To do this, click on the Download Project Bundle button in the window below. It will download as a zipped folder.
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries # # SPDX-License-Identifier: MIT #!/usr/bin/env python3 # pylint: disable=unused-import import json from subprocess import Popen, PIPE from datetime import datetime, timezone import requests import displayio import adafruit_imageload from adafruit_display_text import label from adafruit_bitmap_font import bitmap_font from adafruit_display_shapes.circle import Circle from azure.iot.hub import IoTHubRegistryManager from azure.iot.hub.models import Twin, TwinProperties, QuerySpecification, QueryResult from azure.eventhub import EventHubConsumerClient, TransportType from blinka_displayio_pygamedisplay import PyGameDisplay # *** # UPDATE THESE VARIABLES WITH YOUR IOT HUB INFORMATION # event hub compatible end point from the built-in endpoints page event_connection = "YOUR-EVENT-HUB-CONNECTION-STRING-HERE" # Primary Connection String with registry read and service connect rights # managed on shared access polices page status_connection = "YOUR-STATUS-CONNECTION-STRING-HERE" # iot hub subscription ID # found on the overview of your iot hub # format: ########-####-####-############ subscription_id = "YOUR-SUBSCRIPTION-ID-HERE" # device id's (device names in iot hub) qt_py = "YOUR-QT-PY-DEVICE-HERE" tft_feather = "YOUR-TFT-FEATHER-DEVICE-HERE" s3_feather = "YOUR-S3-FEATHER-DEVICE-HERE" # *** # array of devices devices = [qt_py, tft_feather, s3_feather] # create display display = PyGameDisplay(width=1920, height=1080) # open bitmap background with imageload bitmap, palette = adafruit_imageload.load( "/home/pi/azure_pi/piBG_0.bmp", bitmap=displayio.Bitmap, palette=displayio.Palette ) background = displayio.TileGrid(bitmap, pixel_shader=palette) # load font files font_file = "/home/pi/azure_pi/OstrichSans-Heavy-88.bdf" smallFont_file = "/home/pi/azure_pi/OstrichSans-Heavy-60.bdf" font = bitmap_font.load_font(font_file) small_font = bitmap_font.load_font(smallFont_file) # text objects # co2 data co2_text = label.Label(font, text = "500", color = 0xFFFFFF, x = 611, y =800) # living room data lr_temp = label.Label(font, text = "72°F", color = 0xFFFFFF, x = 90, y = 424) lr_humid = label.Label(font, text = "52%", color = 0xFFFFFF, x = 405, y = 424) lr_press = label.Label(font, text = "1005", color = 0xFFFFFF, x = 715, y = 424) # bedroom data bd_temp = label.Label(font, text = "72°F", color = 0xFFFFFF, x = 1050, y =424) bd_humid = label.Label(font, text = "52%", color = 0xFFFFFF, x = 1365, y =424) bd_press = label.Label(font, text = "1005", color = 0xFFFFFF, x = 1675, y =424) # cost, timestamp and last device check-in text timestamp_text = label.Label(small_font, text = "00:00AM 00/00/00", color = 0xFFFFFF, x = 1216, y =886) lastDevice_text = label.Label(small_font, text = "device", color = 0xFFFFFF, x = 1450, y =805) cost_text = label.Label(small_font, text = "$0.00", color = 0xFFFFFF, x = 1511, y =734) # status circles, defaults to white qt_status = Circle(908, 695, 22, fill=0xFFFFFF) lr_status = Circle(908, 52, 22, fill=0xFFFFFF) bd_status = Circle(1869, 52, 22, fill=0xFFFFFF) # array of status circles status_circles = [qt_status, lr_status, bd_status] # array of display objects display_objects = [background, co2_text, lr_temp, lr_humid, lr_press, bd_temp, bd_humid, bd_press, timestamp_text, lastDevice_text, cost_text, qt_status, lr_status, bd_status] # display group main_group = displayio.Group() # add all display objects from array to the display group for x in display_objects: main_group.append(x) display.root_group = main_group # convert UTC time to local timezone def utc_to_local(utc_dt): return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None) # function for an incoming event from one of the devices # pylint: disable=too-many-locals def on_event_batch(partition_context, events): # gets bearer token from azure cli in terminal cost_json_index = 0 output = Popen(['az', 'account', 'get-access-token'], stdout=PIPE) # parses the output into a json and grabs the bearer token bear = json.loads(output.communicate()[0].decode('utf-8').strip()) # creates headers to include with GET HTTP request for cost data headers = {'Authorization': 'Bearer ' + bear['accessToken'], 'Content-Type': 'application/json'} # updates request URL with your subscription ID # pylint: disable=line-too-long url = "https://management.azure.com/subscriptions/{}/providers/Microsoft.Consumption/usageDetails?api-version=2021-10-01".format(subscription_id) # makes HTTP request response = requests.get(url, headers=headers) # packs the HTTP response into a JSON feed = response.json() # grabs the cost per day from the JSON feed cost = feed['value'][cost_json_index]['properties']['costInBillingCurrency'] # the JSON index for cost can move depending on the data recieved # adjusts index until a value is recieved while cost == 0: cost_json_index += 1 cost = feed['value'][cost_json_index]['properties']['costInBillingCurrency'] # iterates through incoming events for event in events: # gets timestamp for event clock = utc_to_local(event.enqueued_time) # calculates the month to date cost based on the date month_cost = cost*((float(clock.strftime("%d")) - 1)) round(month_cost, 2) # updates cost text cost_text.text = "$%.2f" % month_cost # updates timestamp text timestamp_text.text = clock.strftime("%I:%M%p %m/%d/%y") # gets the incoming event as a JSON feed telemetry = event.body_as_json() # grabs the device ID from the JSON device = event.system_properties[b'iothub-connection-device-id'] # converts the device ID to a string string_device = device.decode("utf-8") # updates last device text lastDevice_text.text = string_device # if the device is the qt_py if string_device == qt_py: # update co2 text co2_text.text = str(telemetry['CO2']) # if the device is the tft feather if string_device == tft_feather: # update living room sensor text lr_temp.text = "%s°F" % str(telemetry['Temperature']) lr_humid.text = "%s%%" % str(telemetry['Humidity']) lr_press.text = str(telemetry['Pressure']) # if the device is the s3 feather if string_device == s3_feather: # update bedroom sensor text bd_temp.text = "%s°F" % str(telemetry['Temperature']) bd_humid.text = "%s%%" % str(telemetry['Humidity']) bd_press.text = str(telemetry['Pressure']) # iterate through the status circles for d in range(0, 3): # get connection status of all 3 devices using device twin twin = status_client.get_twin(devices[d]) # if a device is connected, make the status circle green if twin.connection_state == 'Connected': status_circles[d].fill = 0x00FF00 # if a device is disconnected, make the status circle red if twin.connection_state == 'Disconnected': status_circles[d].fill = 0xFF0000 # update the read partition from event hub partition_context.update_checkpoint() # in case of error with an incoming event def on_error(partition_context, error): if partition_context: print("An exception: {} occurred during receiving from Partition: {}.".format( partition_context.partition_id, error )) else: print("An exception: {} occurred during the load balance process.".format(error)) # connect to event hub client = EventHubConsumerClient.from_connection_string( conn_str = event_connection, consumer_group = "$default", ) # connect to device twin status_client = IoTHubRegistryManager(status_connection) # while the display is active while display.running: try: # recieve incoming events client.receive_batch( on_event_batch = on_event_batch, on_error = on_error ) # keyboard exception except KeyboardInterrupt: print("Receiving has ")
After downloading the Project Bundle, move the folder to your /home/pi directory. Then, unzip the folder. Your Raspberry Pi should have the following files in the /home/pi/azure_pi folder:
Add Your IoT Hub Information
At the top of the code.py file, you'll see six variables that need to be updated with your IoT hub information.
# UPDATE THESE VARIABLES WITH YOUR IOT HUB INFORMATION # event hub compatible end point from the built-in endpoints page event_connection = "YOUR-EVENT-HUB-CONNECTION-STRING-HERE" # Primary Connection String with registry read and service connect rights # managed on shared access polices page status_connection = "YOUR-STATUS-CONNECTION-STRING-HERE" # iot hub subscription ID # found on the overview of your iot hub # format: ########-####-####-############ subscription_id = "YOUR-SUBSCRIPTION-ID-HERE" # device id's (device names in iot hub) qt_py = "YOUR-QT-PY-DEVICE-HERE" tft_feather = "YOUR-TFT-FEATHER-DEVICE-HERE" s3_feather = "YOUR-S3-FEATHER-DEVICE-HERE"
event_connection
is your Event Hub compatible endpoint. It is located on the Built-in endpoints page of your IoT Hub.
status_connection
is the Primary connection string from a Shared Access Policy (SAP) that has Registry Read and Service Connect permissions.
You can setup this SAP by going to the Shared access policies page of your IoT Hub and then clicking on + Add shared access policy. Then, name the SAP in the Access policy name box and check off Registry Read and Service Connect. Click Add to create the SAP.
You access the Primary connection string by clicking on the SAP's name to view the security keys. Then, copy and paste the Primary connection string into the code.py file as status_connection
.
Finally, your Device IDs are the plain text names of your devices setup in your IoT hub. These can be found on the Devices page in your IoT hub.
How the Code Works
The Blinka_Displayio_PyGameDisplay library lets you use CircuitPython displayio
elements over HDMI on the Raspberry Pi's main display. There are text elements that update with the incoming device telemetry from IoT Hub.
# living room data lr_temp = label.Label(font, text = "72°F", color = 0xFFFFFF, x = 90, y = 424) lr_humid = label.Label(font, text = "52%", color = 0xFFFFFF, x = 405, y = 424) lr_press = label.Label(font, text = "1005", color = 0xFFFFFF, x = 715, y = 424) ... # if the device is the tft feather if string_device == tft_feather: # update living room sensor text lr_temp.text = "%s°F" % str(telemetry['Temperature']) lr_humid.text = "%s%%" % str(telemetry['Humidity']) lr_press.text = str(telemetry['Pressure'])
The code is accessing your IoT Hub information with three different methods: built-in endpoints, Azure REST API and device twin. The following pages will detail how each of these methods are being used.