This file contains a class that provides an interface to the air quality sensor. The constructor saves the passed in UART interface and initializes instance variables for caching the measurements.
def __init__(self, uart):
self._uart = uart
self._buffer = []
self._pm10_standard = 0
self._pm25_standard = 0
self._pm100_standard = 0
self._pm10_env = 0
self._pm25_env = 0
self._pm100_env = 0
self._particles_03um = 0
self._particles_05um = 0
self._particles_10um = 0
self._particles_25um = 0
self._particles_50um = 0
self._particles_100um = 0
The other significant method reads the measurement data from the sensor and caches it in the instance variables that were initialized in the constructor.
Up to 32 bytes are read from the sensor through the UART, and appended to the buffer.
Then bytes are popped off the front of the buffer (index 0) until one with value 0x42 is found. Now the buffer size is checked. If it's too big, there's a problem and we throw out the buffer and return False. If the buffer doesn't have enough in it, it simply returns False. More data will be read into the buffer next time a read is requested.
If everything is good, readings are extracted from the raw data in the first 32 bytes of the buffer. These readings are placed into corresponding instance variables.
The checksum is checked. If it doesn't match, the entire buffer is discarded and False is returned. Otherwise everything is good and True is returned
def read(self):
data = self._uart.read(32) # read up to 32 bytes
data = list(data)
self._buffer += data
while self._buffer and self._buffer[0] != 0x42:
self._buffer.pop(0)
if len(self._buffer) > 200:
self._buffer = [] # avoid an overrun if all bad data
if len(self._buffer) < 32:
return False
if self._buffer[1] != 0x4d:
self._buffer.pop(0)
return False
frame_len = struct.unpack(">H", bytes(self._buffer[2:4]))[0]
if frame_len != 28:
self._buffer = []
return False
logger.debug('buffer length: %d', len(self._buffer) - 4)
frame = struct.unpack(">HHHHHHHHHHHHHH", bytes(self._buffer[4:32]))
self._pm10_standard, self._pm25_standard, self._pm100_standard, self._pm10_env, \
self._pm25_env, self._pm100_env, self._particles_03um, self._particles_05um, self._particles_10um, \
self._particles_25um, self._particles_50um, self._particles_100um, skip, checksum = frame
check = sum(self._buffer[0:30])
if check != checksum:
self._buffer = []
return False
return True
Once the measurements have been cached, the core code can then read them values through a set of properties.
@property
def pm10_standard(self):
return self._pm10_standard
@property
def pm25_standard(self):
return self._pm25_standard
@property
def pm100_standard(self):
return self._pm100_standard
@property
def pm10_env(self):
return self._pm10_env
@property
def pm25_env(self):
return self._pm25_env
@property
def pm100_env(self):
return self._pm100_env
@property
def particles_03um(self):
return self._particles_03um
@property
def particles_05um(self):
return self._particles_05um
@property
def particles_10um(self):
return self._particles_10um
@property
def particles_25um(self):
return self._particles_25um
@property
def particles_50um(self):
return self._particles_50um
@property
def particles_100um(self):
return self._particles_100um
Page last edited March 08, 2024
Text editor powered by tinymce.