This file contains a class that provides an interface to the air quality sensor. The constructor saves the passed in UART interface and initializes instance variables for caching the measurements.

    def __init__(self, uart):
        self._uart = uart
        self._buffer = []
        self._pm10_standard = 0
        self._pm25_standard = 0
        self._pm100_standard = 0
        self._pm10_env = 0
        self._pm25_env = 0
        self._pm100_env = 0
        self._particles_03um = 0
        self._particles_05um = 0
        self._particles_10um = 0
        self._particles_25um = 0
        self._particles_50um = 0
        self._particles_100um = 0

The other significant method reads the measurement data from the sensor and caches it in the instance variables that were initialized in the constructor.

Up to 32 bytes are read from the sensor through the UART, and appended to the buffer. 

Then bytes are popped off the front of the buffer (index 0) until one with value 0x42 is found. Now the buffer size is checked. If it's too big, there's a problem and we throw out the buffer and return False. If the buffer doesn't have enough in it, it simply returns False. More data will be read into the buffer next time a read is requested.

If everything is good,  readings are extracted from the raw data in the first 32 bytes of the buffer. These readings are placed into corresponding instance variables.

The checksum is checked. If it doesn't match, the entire buffer is discarded and False is returned. Otherwise everything is good and True is returned

    def read(self):
        data =  # read up to 32 bytes
        data = list(data)

        self._buffer += data

        while self._buffer and self._buffer[0] != 0x42:

        if len(self._buffer) > 200:
            self._buffer = []  # avoid an overrun if all bad data
        if len(self._buffer) < 32:
            return False

        if self._buffer[1] != 0x4d:
            return False

        frame_len = struct.unpack(">H", bytes(self._buffer[2:4]))[0]
        if frame_len != 28:
            self._buffer = []
            return False

        logger.debug('buffer length: %d', len(self._buffer) - 4)
        frame = struct.unpack(">HHHHHHHHHHHHHH", bytes(self._buffer[4:32]))

        self._pm10_standard, self._pm25_standard, self._pm100_standard, self._pm10_env, \
          self._pm25_env, self._pm100_env, self._particles_03um, self._particles_05um, self._particles_10um, \
          self._particles_25um, self._particles_50um, self._particles_100um, skip, checksum = frame

        check = sum(self._buffer[0:30])

        if check != checksum:
            self._buffer = []
            return False

        return True

Once the measurements have been cached, the core code can then read them values through a set of properties.

    def pm10_standard(self):
        return self._pm10_standard

    def pm25_standard(self):
        return self._pm25_standard

    def pm100_standard(self):
        return self._pm100_standard

    def pm10_env(self):
        return self._pm10_env

    def pm25_env(self):
        return self._pm25_env

    def pm100_env(self):
        return self._pm100_env

    def particles_03um(self):
        return self._particles_03um

    def particles_05um(self):
        return self._particles_05um

    def particles_10um(self):
        return self._particles_10um

    def particles_25um(self):
        return self._particles_25um

    def particles_50um(self):
        return self._particles_50um

    def particles_100um(self):
        return self._particles_100um

This guide was first published on May 01, 2019. It was last updated on May 01, 2019.

This page ( was last updated on May 13, 2021.

Text editor powered by tinymce.