You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
<spandata-code-cell="annotated-cell-3" data-code-lines="12,13,14,15,16" data-code-annotation="4">Wait for data to appear on the queue, and then pass the data to <code>log_packet</code></span>
<p>The <code>StatusPacket</code> class takes a bytearray returned from the Status GATT characteristic and an optional timestamp and creates a data structure with fields corresponding to the different status flags:</p>
<p>The <code>SPS30Packet</code> takes a bytearray returned from the SPS30 GATT characteristic with an optional timestamp and creates a data structure with fields corresponding to the PM measurements in ug/m^3</p>
<p>The <code>BME280Packet</code> takes a bytearray returned from the BME280 GATT characteristic with an optional timestamp and creates a datastructure with fields corresponding the temperature, pressure, and humidity.</p>
<p>The <code>SGPC3Packet</code> takes a bytearray returned from the SGPC3 GATT characteristic with an optional timestamp and creates a datastructure with a single field for the VOC.</p>
Copy file name to clipboardExpand all lines: search.json
-7Lines changed: 0 additions & 7 deletions
Original file line number
Diff line number
Diff line change
@@ -804,13 +804,6 @@
804
804
"section": "A Logging Example",
805
805
"text": "A Logging Example\nAs an example of how to use this, consider the case where you want to connect to your AtmoTube from a PC, log data from it over a pre-defined period, then exit. This will be done asynchronously and the retrieved data put into an asynchronous queue for processing.\nThe first step is to create a function which connects to the AtmoTube, collects data, and then puts that data into a queue.\nfrom bleak import BleakClient, BleakScanner\nfrom atmotube import start_gatt_notifications, get_available_services\nimport asyncio\n\n\nasync def collect_data(mac, queue, collection_time):\n1 async def callback_queue(packet):\n await queue.put(packet)\n\n2 device = await BleakScanner.find_device_by_address(mac)\n if not device:\n raise Exception(\"Device not found\")\n\n3 async with BleakClient(device) as client:\n if not client.is_connected:\n raise Exception(\"Failed to connect to device\")\n4 packet_list = get_available_services(client)\n5 await start_gatt_notifications(client, callback_queue,\n packet_list=packet_list)\n6 await asyncio.sleep(collection_time)\n7 await queue.put(None)\n\n1\n\nDefine a callback function which takes a packet of data and does something with it. In this case it puts it in an asynchronous queue.\n\n2\n\nFind the device using Bleak, in this case by mac address\n\n3\n\nConnect to the device using Bleak\n\n4\n\nCall the get_available_services function with the connected device, this generates a list of GATT services that both the atmotube library knows about and the AtmoTube device supports.\n\n5\n\nStart the GATT notifications for the list of available services. If no list is provided, it will attempt to start GATT notifications for all services supported by the atmotube library.\n\n6\n\nWait while data is collected, for the pre-defined collection time (in seconds)\n\n7\n\nPut a None on the queue to indicate that the collection has ended.\n\n\nWhat ends up on the queue is a series of AtmoTubePacket objects representing the different types of data packets the AtmoTube returns. Each packet has an associated datetime object representing the time when the packet was received. As a somewhat silly example, this takes those packets and logs them to the logger – a more realistic thing to do might be to put the data in a database or append the data to a CSV.\nfrom atmotube import SPS30Packet, StatusPacket, BME280Packet, SGPC3Packet\n\nimport logging\n\n\ndef log_packet(packet):\n1 match packet:\n case StatusPacket():\n2 logging.info(f\"{str(packet.date_time)} - Status Packet - \"\n f\"Battery: {packet.battery_level}%, \"\n f\"PM Sensor: {packet.pm_sensor_status}, \"\n f\"Pre-heating: {packet.pre_heating}, \"\n f\"Error: {packet.error_flag}\")\n case SPS30Packet():\n logging.info(f\"{str(packet.date_time)} - SPS30 Packet - \"\n f\"PM1: {packet.pm1} µg/m³, \"\n f\"PM2.5: {packet.pm2_5} µg/m³, \"\n f\"PM4: {packet.pm4} µg/m³, \"\n f\"PM10: {packet.pm10} µg/m³\")\n case BME280Packet():\n logging.info(f\"{str(packet.date_time)} - BME280 Packet - \"\n f\"Humidity: {packet.humidity}%, \"\n f\"Temperature: {packet.temperature}°C, \"\n f\"Pressure: {packet.pressure} mbar\")\n case SGPC3Packet():\n logging.info(f\"{str(packet.date_time)} - SGPC3 Packet - \"\n f\"TVOC: {packet.tvoc} ppb\")\n case _:\n logging.info(\"Unknown packet type\")\n\n1\n\nUse structural pattern matching to identify which data has been returned.\n\n2\n\nSend some information about it to the logger\n\n\nFinally, an asynchronous event loop is created which runs the collector and then logs the data.\nATMOTUBE = \"00:00:00:00:00:00\" # the mac address of the ATMOTUBE\n\ndef main():\n mac = ATMOTUBE\n collection_time = 60 # seconds\n1 queue = asyncio.Queue()\n\n2 async def runner():\n3 collector = asyncio.create_task(\n collect_data(mac, queue, collection_time)\n )\n4 while True:\n item = await queue.get()\n if item is None:\n break\n log_packet(item)\n await collector\n\n asyncio.run(runner())\n\n\nif __name__ == \"__main__\":\n logging.basicConfig(level=logging.INFO)\n main()\n\n1\n\nInitialize an asynchronous queue, this will be used to pass the data between the two workers\n\n2\n\nCreate a runner function to handle the main sequence of events\n\n3\n\nStart the collector\n\n4\n\nWait for data to appear on the queue, and then pass the data to log_packet"
"text": "Packets\n\nStatus Packet\nThe StatusPacket class takes a bytearray returned from the Status GATT characteristic and an optional timestamp and creates a data structure with fields corresponding to the different status flags:\nfrom atmotube import StatusPacket\n\nexample_status = bytearray(b'Ad')\npacket = StatusPacket(example_status)\n\nprint(packet)\n# StatusPacket(pm_sensor_status=True, error_flag=False, bonding_flag=False, charging=False, charging_timer=False, pre_heating=True, battery_level=100%)\n\n\nSPS30 Packet\nThe SPS30Packet takes a bytearray returned from the SPS30 GATT characteristic with an optional timestamp and creates a data structure with fields corresponding to the PM measurements in ug/m^3\nfrom atmotube import SPS30Packet\n\nexample_sps30 = bytearray(b'd\\x00\\x00\\xb9\\x00\\x00J\\x01\\x00o\\x00\\x00')\npacket = SPS30Packet(example_sps30)\n\nprint(packet)\n# SPS30Packet(pm1=1.0µg/m³, pm2_5=1.85µg/m³, pm10=3.3µg/m³, pm4=1.11µg/m³)\n\n\nBME280 Packet\nThe BME280Packet takes a bytearray returned from the BME280 GATT characteristic with an optional timestamp and creates a datastructure with fields corresponding the temperature, pressure, and humidity.\nfrom atmotube import BME280Packet\n\nexample_bme280 = bytearray(b'\\x0e\\x17\\x8ao\\x01\\x00\\x1a\\t')\npacket = BME280Packet(example_bme280)\n\nprint(packet)\n# BME280Packet(humidity=14%, temperature=23.3°C, pressure=940.9mbar)\n\n\nSGPC3 Packet\nThe SGPC3Packet takes a bytearray returned from the SGPC3 GATT characteristic with an optional timestamp and creates a datastructure with a single field for the VOC.\nfrom atmotube import SGPC3Packet\n\nexample_sgpc3 = bytearray(b'\\x02\\x00\\x00\\x00')\npacket = SGPC3Packet(example_sgpc3)\n\nprint(packet)\n# SGPC3Packet(tvoc=0.002ppb)"
0 commit comments