This file contains a class that provides an interface to the air quality sensor. The constructor saves the passed in UART interface and initializes instance variables for caching the measurements.
def __init__(self, uart): self._uart = uart self._buffer = [] self._pm10_standard = 0 self._pm25_standard = 0 self._pm100_standard = 0 self._pm10_env = 0 self._pm25_env = 0 self._pm100_env = 0 self._particles_03um = 0 self._particles_05um = 0 self._particles_10um = 0 self._particles_25um = 0 self._particles_50um = 0 self._particles_100um = 0
The other significant method reads the measurement data from the sensor and caches it in the instance variables that were initialized in the constructor.
Up to 32 bytes are read from the sensor through the UART, and appended to the buffer.
Then bytes are popped off the front of the buffer (index 0) until one with value 0x42 is found. Now the buffer size is checked. If it's too big, there's a problem and we throw out the buffer and return False
. If the buffer doesn't have enough in it, it simply returns False
. More data will be read into the buffer next time a read is requested.
If everything is good, readings are extracted from the raw data in the first 32 bytes of the buffer. These readings are placed into corresponding instance variables.
The checksum is checked. If it doesn't match, the entire buffer is discarded and False
is returned. Otherwise everything is good and True
is returned
def read(self): data = self._uart.read(32) # read up to 32 bytes data = list(data) self._buffer += data while self._buffer and self._buffer[0] != 0x42: self._buffer.pop(0) if len(self._buffer) > 200: self._buffer = [] # avoid an overrun if all bad data if len(self._buffer) < 32: return False if self._buffer[1] != 0x4d: self._buffer.pop(0) return False frame_len = struct.unpack(">H", bytes(self._buffer[2:4]))[0] if frame_len != 28: self._buffer = [] return False logger.debug('buffer length: %d', len(self._buffer) - 4) frame = struct.unpack(">HHHHHHHHHHHHHH", bytes(self._buffer[4:32])) self._pm10_standard, self._pm25_standard, self._pm100_standard, self._pm10_env, \ self._pm25_env, self._pm100_env, self._particles_03um, self._particles_05um, self._particles_10um, \ self._particles_25um, self._particles_50um, self._particles_100um, skip, checksum = frame check = sum(self._buffer[0:30]) if check != checksum: self._buffer = [] return False return True
Once the measurements have been cached, the core code can then read them values through a set of properties.
@property def pm10_standard(self): return self._pm10_standard @property def pm25_standard(self): return self._pm25_standard @property def pm100_standard(self): return self._pm100_standard @property def pm10_env(self): return self._pm10_env @property def pm25_env(self): return self._pm25_env @property def pm100_env(self): return self._pm100_env @property def particles_03um(self): return self._particles_03um @property def particles_05um(self): return self._particles_05um @property def particles_10um(self): return self._particles_10um @property def particles_25um(self): return self._particles_25um @property def particles_50um(self): return self._particles_50um @property def particles_100um(self): return self._particles_100um
Page last edited March 08, 2024
Text editor powered by tinymce.