Install the Required Libraries

You will need to have a few libraries installed before the script will run on your Raspberry Pi.

Install the required CircuitPython libraries with the terminal:

pip3 install adafruit-circuitpython-bitmap-font
pip3 install adafruit-circuitpython-display-shapes
pip3 install adafruit-circuitpython-display-text
pip3 install adafruit-circuitpython-imageload
pip3 install adafruit-blinka-displayio
pip3 install blinka-displayio-pygamedisplay

Then, install the required Python libraries with the terminal:

pip3 install subprocess
pip3 install datetime
pip3 install pygame
pip3 install recordclass
pip3 install requests

Finally, install the required Azure IoT Python libraries with the terminal:

pip3 install azure-iot-hub
pip3 install azure-event-hub

Download the Code, Image and Font Files

Once you've finished setting up your Raspberry Pi with Blinka and the library dependencies, you can access the Python code, bitmap image file and two font files by downloading the Project Bundle.

To do this, click on the Download Project Bundle button in the window below. It will download as a zipped folder.

# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT

#!/usr/bin/env python3

# pylint: disable=unused-import
import json
from subprocess import Popen, PIPE
from datetime import datetime, timezone
import requests
import displayio
import adafruit_imageload
from adafruit_display_text import label
from adafruit_bitmap_font import bitmap_font
from adafruit_display_shapes.circle import Circle
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import Twin, TwinProperties, QuerySpecification, QueryResult
from azure.eventhub import EventHubConsumerClient, TransportType
from blinka_displayio_pygamedisplay import PyGameDisplay

# ***

#  UPDATE THESE VARIABLES WITH YOUR IOT HUB INFORMATION

#  event hub compatible end point from the built-in endpoints page
event_connection = "YOUR-EVENT-HUB-CONNECTION-STRING-HERE"
#  Primary Connection String with registry read and service connect rights
#  managed on shared access polices page
status_connection = "YOUR-STATUS-CONNECTION-STRING-HERE"

#  iot hub subscription ID
#  found on the overview of your iot hub
#  format: ########-####-####-############
subscription_id = "YOUR-SUBSCRIPTION-ID-HERE"

#  device id's (device names in iot hub)
qt_py = "YOUR-QT-PY-DEVICE-HERE"
tft_feather = "YOUR-TFT-FEATHER-DEVICE-HERE"
s3_feather = "YOUR-S3-FEATHER-DEVICE-HERE"

# ***

#  array of devices
devices = [qt_py, tft_feather, s3_feather]

#  create display
display = PyGameDisplay(width=1920, height=1080)

#  open bitmap background with imageload
bitmap, palette = adafruit_imageload.load(
    "/home/pi/azure_pi/piBG_0.bmp", bitmap=displayio.Bitmap, palette=displayio.Palette
)

background = displayio.TileGrid(bitmap, pixel_shader=palette)

#  load font files
font_file = "/home/pi/azure_pi/OstrichSans-Heavy-88.bdf"
smallFont_file = "/home/pi/azure_pi/OstrichSans-Heavy-60.bdf"
font = bitmap_font.load_font(font_file)
small_font = bitmap_font.load_font(smallFont_file)

#  text objects
#  co2 data
co2_text = label.Label(font, text = "500", color = 0xFFFFFF, x = 611, y =800)

#  living room data
lr_temp = label.Label(font, text = "72°F", color = 0xFFFFFF, x = 90, y = 424)
lr_humid = label.Label(font, text = "52%", color = 0xFFFFFF, x = 405, y = 424)
lr_press = label.Label(font, text = "1005", color = 0xFFFFFF, x = 715, y = 424)

#  bedroom data
bd_temp = label.Label(font, text = "72°F", color = 0xFFFFFF, x = 1050, y =424)
bd_humid = label.Label(font, text = "52%", color = 0xFFFFFF, x = 1365, y =424)
bd_press = label.Label(font, text = "1005", color = 0xFFFFFF, x = 1675, y =424)

#  cost, timestamp and last device check-in text
timestamp_text = label.Label(small_font, text = "00:00AM 00/00/00",
                             color = 0xFFFFFF, x = 1216, y =886)
lastDevice_text = label.Label(small_font, text = "device",
                              color = 0xFFFFFF, x = 1450, y =805)
cost_text = label.Label(small_font, text = "$0.00",
                        color = 0xFFFFFF, x = 1511, y =734)

#  status circles, defaults to white
qt_status = Circle(908, 695, 22, fill=0xFFFFFF)
lr_status = Circle(908, 52, 22, fill=0xFFFFFF)
bd_status = Circle(1869, 52, 22, fill=0xFFFFFF)

#  array of status circles
status_circles = [qt_status, lr_status, bd_status]

#  array of display objects
display_objects = [background, co2_text, lr_temp, lr_humid, lr_press,
                   bd_temp, bd_humid, bd_press, timestamp_text, lastDevice_text,
                   cost_text, qt_status, lr_status, bd_status]

#  display group
main_group = displayio.Group()

#  add all display objects from array to the display group
for x in display_objects:
    main_group.append(x)

display.root_group = main_group

#  convert UTC time to local timezone
def utc_to_local(utc_dt):
    return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)

#  function for an incoming event from one of the devices
# pylint: disable=too-many-locals
def on_event_batch(partition_context, events):
    #  gets bearer token from azure cli in terminal
    cost_json_index = 0
    output = Popen(['az', 'account', 'get-access-token'], stdout=PIPE)
    #  parses the output into a json and grabs the bearer token
    bear = json.loads(output.communicate()[0].decode('utf-8').strip())
    #  creates headers to include with GET HTTP request for cost data
    headers = {'Authorization': 'Bearer ' + bear['accessToken'], 'Content-Type': 'application/json'}
    #  updates request URL with your subscription ID
    # pylint: disable=line-too-long
    url = "https://management.azure.com/subscriptions/{}/providers/Microsoft.Consumption/usageDetails?api-version=2021-10-01".format(subscription_id)
    #  makes HTTP request
    response = requests.get(url, headers=headers)
    #  packs the HTTP response into a JSON
    feed = response.json()
    #  grabs the cost per day from the JSON feed
    cost = feed['value'][cost_json_index]['properties']['costInBillingCurrency']
    #  the JSON index for cost can move depending on the data recieved
    #  adjusts index until a value is recieved
    while cost == 0:
        cost_json_index += 1
        cost = feed['value'][cost_json_index]['properties']['costInBillingCurrency']
    #  iterates through incoming events
    for event in events:
        #  gets timestamp for event
        clock = utc_to_local(event.enqueued_time)
        #  calculates the month to date cost based on the date
        month_cost = cost*((float(clock.strftime("%d")) - 1))
        round(month_cost, 2)
        #  updates cost text
        cost_text.text = "$%.2f" % month_cost
        #  updates timestamp text
        timestamp_text.text = clock.strftime("%I:%M%p %m/%d/%y")
        #  gets the incoming event as a JSON feed
        telemetry = event.body_as_json()
        #  grabs the device ID from the JSON
        device = event.system_properties[b'iothub-connection-device-id']
        #  converts the device ID to a string
        string_device = device.decode("utf-8")
        #  updates last device text
        lastDevice_text.text = string_device
        #  if the device is the qt_py
        if string_device == qt_py:
            #  update co2 text
            co2_text.text = str(telemetry['CO2'])
        #  if the device is the tft feather
        if string_device == tft_feather:
            #  update living room sensor text
            lr_temp.text = "%s°F" % str(telemetry['Temperature'])
            lr_humid.text = "%s%%" % str(telemetry['Humidity'])
            lr_press.text = str(telemetry['Pressure'])
        #  if the device is the s3 feather
        if string_device == s3_feather:
            #  update bedroom sensor text
            bd_temp.text = "%s°F" % str(telemetry['Temperature'])
            bd_humid.text = "%s%%" % str(telemetry['Humidity'])
            bd_press.text = str(telemetry['Pressure'])
    #  iterate through the status circles
    for d in range(0, 3):
        #  get connection status of all 3 devices using device twin
        twin = status_client.get_twin(devices[d])
        #  if a device is connected, make the status circle green
        if twin.connection_state == 'Connected':
            status_circles[d].fill = 0x00FF00
        #  if a device is disconnected, make the status circle red
        if twin.connection_state == 'Disconnected':
            status_circles[d].fill = 0xFF0000
    #  update the read partition from event hub
    partition_context.update_checkpoint()
#  in case of error with an incoming event
def on_error(partition_context, error):
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))
#  connect to event hub
client = EventHubConsumerClient.from_connection_string(
    conn_str = event_connection,
    consumer_group = "$default",
)
#  connect to device twin
status_client = IoTHubRegistryManager(status_connection)

#  while the display is active
while display.running:
    try:
        #  recieve incoming events
        client.receive_batch(
            on_event_batch = on_event_batch,
            on_error = on_error
        )
    #  keyboard exception
    except KeyboardInterrupt:
        print("Receiving has ")

After downloading the Project Bundle, move the folder to your /home/pi directory. Then, unzip the folder. Your Raspberry Pi should have the following files in the /home/pi/azure_pi folder:

  • code.py
  • piBG_0.bmp
  • OstrichSans-Heavy-88.bdf
  • OstrichSans-Heavy-60.bdf

Add Your IoT Hub Information

At the top of the code.py file, you'll see six variables that need to be updated with your IoT hub information.

#  UPDATE THESE VARIABLES WITH YOUR IOT HUB INFORMATION

#  event hub compatible end point from the built-in endpoints page
event_connection = "YOUR-EVENT-HUB-CONNECTION-STRING-HERE"
#  Primary Connection String with registry read and service connect rights
#  managed on shared access polices page
status_connection = "YOUR-STATUS-CONNECTION-STRING-HERE"

#  iot hub subscription ID
#  found on the overview of your iot hub
#  format: ########-####-####-############
subscription_id = "YOUR-SUBSCRIPTION-ID-HERE"

#  device id's (device names in iot hub)
qt_py = "YOUR-QT-PY-DEVICE-HERE"
tft_feather = "YOUR-TFT-FEATHER-DEVICE-HERE"
s3_feather = "YOUR-S3-FEATHER-DEVICE-HERE"

event_connection is your Event Hub compatible endpoint. It is located on the Built-in endpoints page of your IoT Hub.

status_connection is the Primary connection string from a Shared Access Policy (SAP) that has Registry Read and Service Connect permissions.

You can setup this SAP by going to the Shared access policies page of your IoT Hub and then clicking on + Add shared access policy. Then, name the SAP in the Access policy name box and check off Registry Read and Service Connect. Click Add to create the SAP.

You access the Primary connection string by clicking on the SAP's name to view the security keys. Then, copy and paste the Primary connection string into the code.py file as status_connection.

subscription_id is located at the top of your IoT Hub's Overview page.

Finally, your Device IDs are the plain text names of your devices setup in your IoT hub. These can be found on the Devices page in your IoT hub.

How the Code Works

The Blinka_Displayio_PyGameDisplay library lets you use CircuitPython displayio elements over HDMI on the Raspberry Pi's main display. There are text elements that update with the incoming device telemetry from IoT Hub.

#  living room data
lr_temp = label.Label(font, text = "72°F", color = 0xFFFFFF, x = 90, y = 424)
lr_humid = label.Label(font, text = "52%", color = 0xFFFFFF, x = 405, y = 424)
lr_press = label.Label(font, text = "1005", color = 0xFFFFFF, x = 715, y = 424)

...

#  if the device is the tft feather
if string_device == tft_feather:
    #  update living room sensor text
    lr_temp.text = "%s°F" % str(telemetry['Temperature'])
    lr_humid.text = "%s%%" % str(telemetry['Humidity'])
    lr_press.text = str(telemetry['Pressure'])

The code is accessing your IoT Hub information with three different methods: built-in endpoints, Azure REST API and device twin. The following pages will detail how each of these methods are being used.

This guide was first published on Aug 23, 2022. It was last updated on Nov 27, 2023.

This page (Code the Raspberry Pi IoT Dashboard) was last updated on Nov 27, 2023.

Text editor powered by tinymce.