From a6653c48aa23708a8701e738a9db7d86cb533cd7 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 9 May 2026 22:58:49 -0300 Subject: [PATCH 01/10] Fix memory leaks in database connection handling Use contextlib.closing to ensure psycopg2 connections are closed immediately after use instead of relying on GC. Add try/finally to async_db_fetch and async_db_transaction so asyncpg pools are always closed even when exceptions occur. Add explicit con.commit() for write operations to preserve transactional semantics. --- app/ChirpHeliumJoinRpc.py | 4 +++- app/ChirpHeliumKeysRpc.py | 29 +++++++++++++++++------------ app/ChirpHeliumRequestsRpc.py | 6 ++++-- app/ChirpHeliumTenant.py | 6 ++++-- 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/app/ChirpHeliumJoinRpc.py b/app/ChirpHeliumJoinRpc.py index b1ad409..bf4d9be 100644 --- a/app/ChirpHeliumJoinRpc.py +++ b/app/ChirpHeliumJoinRpc.py @@ -1,4 +1,5 @@ import os +from contextlib import closing import psycopg2 import psycopg2.extras import redis.asyncio as redis @@ -48,9 +49,10 @@ def __init__( self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] def db_transaction(self, query: str): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor() as cur: cur.execute(query) + con.commit() ########################################################################### # follow internal redis stream gRPC for actionable changes diff --git a/app/ChirpHeliumKeysRpc.py b/app/ChirpHeliumKeysRpc.py index b84ca5c..210fd1b 100644 --- a/app/ChirpHeliumKeysRpc.py +++ b/app/ChirpHeliumKeysRpc.py @@ -1,3 +1,4 @@ +from contextlib import closing from functools import wraps # import asyncpg import psycopg2 @@ -59,7 +60,7 @@ def __init__( self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] def db_fetch(self, query: str): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(query) return cur.fetchall() @@ -68,28 +69,32 @@ async def async_db_fetch(self, query: str): db = Database() await db.connect() assert db.pool - - async with db.pool.acquire() as conn: - async with conn.transaction(): - cur = await conn.fetch(query) - await db.close() - return cur + try: + async with db.pool.acquire() as conn: + async with conn.transaction(): + return await conn.fetch(query) + finally: + await db.close() def db_transaction(self, query: str): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor() as cur: cur.execute(query) + con.commit() async def async_db_transaction(self, query: str): db = Database() await db.connect() assert db.pool - async with db.pool.acquire() as conn: - async with conn.transaction(): - await conn.execute(query) + try: + async with db.pool.acquire() as conn: + async with conn.transaction(): + await conn.execute(query) + finally: + await db.close() def fetch_all_devices(self) -> list[str]: - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ diff --git a/app/ChirpHeliumRequestsRpc.py b/app/ChirpHeliumRequestsRpc.py index da1dd71..b52c551 100644 --- a/app/ChirpHeliumRequestsRpc.py +++ b/app/ChirpHeliumRequestsRpc.py @@ -1,4 +1,5 @@ import os +from contextlib import closing from functools import wraps import psycopg2 import psycopg2.extras @@ -69,12 +70,13 @@ def __init__( # functions to handle helium device db transactions ########################################################################### def db_transaction(self, query): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor() as cur: cur.execute(query) + con.commit() def db_fetch(self, query): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(query) return cur.fetchall() diff --git a/app/ChirpHeliumTenant.py b/app/ChirpHeliumTenant.py index 3c91106..df43525 100644 --- a/app/ChirpHeliumTenant.py +++ b/app/ChirpHeliumTenant.py @@ -1,4 +1,5 @@ import os +from contextlib import closing import psycopg2 import psycopg2.extras import redis @@ -45,12 +46,13 @@ def __init__( self.auth_token = [('authorization', f'Bearer {chirpstack_token}')] def db_transaction(self, query): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor() as cur: cur.execute(query) + con.commit() def db_fetch(self, query): - with psycopg2.connect(self.postgres) as con: + with closing(psycopg2.connect(self.postgres)) as con: with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(query) return cur.fetchall() From ec2b69ffcd9a66cf3a2c5871197c588ff1d3b9de Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 9 May 2026 23:26:32 -0300 Subject: [PATCH 02/10] Refactor database connection handling to reduce memory usage and improve efficiency --- app/ChirpHeliumKeysRpc.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/app/ChirpHeliumKeysRpc.py b/app/ChirpHeliumKeysRpc.py index 210fd1b..0b11d6f 100644 --- a/app/ChirpHeliumKeysRpc.py +++ b/app/ChirpHeliumKeysRpc.py @@ -58,6 +58,13 @@ def __init__( self.postgres = "%s?sslmode=%s" % (conn_str, self.pg_ssl_mode) self.cs_gprc = chirpstack_host self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] + self._db: Database | None = None + + async def _get_pool(self): + if self._db is None: + self._db = Database() + await self._db.connect() + return self._db.pool def db_fetch(self, query: str): with closing(psycopg2.connect(self.postgres)) as con: @@ -66,15 +73,10 @@ def db_fetch(self, query: str): return cur.fetchall() async def async_db_fetch(self, query: str): - db = Database() - await db.connect() - assert db.pool - try: - async with db.pool.acquire() as conn: - async with conn.transaction(): - return await conn.fetch(query) - finally: - await db.close() + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + return await conn.fetch(query) def db_transaction(self, query: str): with closing(psycopg2.connect(self.postgres)) as con: @@ -83,15 +85,10 @@ def db_transaction(self, query: str): con.commit() async def async_db_transaction(self, query: str): - db = Database() - await db.connect() - assert db.pool - try: - async with db.pool.acquire() as conn: - async with conn.transaction(): - await conn.execute(query) - finally: - await db.close() + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + await conn.execute(query) def fetch_all_devices(self) -> list[str]: with closing(psycopg2.connect(self.postgres)) as con: From 143fdb614497b70f3071fd1496f1f96caa436b84 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sun, 10 May 2026 00:01:56 -0300 Subject: [PATCH 03/10] Add close method to ChirpDeviceKeys for proper database connection cleanup --- app/ChirpHeliumKeysRpc.py | 5 +++++ app/app.py | 15 +++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/app/ChirpHeliumKeysRpc.py b/app/ChirpHeliumKeysRpc.py index 0b11d6f..785da00 100644 --- a/app/ChirpHeliumKeysRpc.py +++ b/app/ChirpHeliumKeysRpc.py @@ -66,6 +66,11 @@ async def _get_pool(self): await self._db.connect() return self._db.pool + async def close(self): + if self._db is not None: + await self._db.close() + self._db = None + def db_fetch(self, query: str): with closing(psycopg2.connect(self.postgres)) as con: with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: diff --git a/app/app.py b/app/app.py index d670d36..7a2c40e 100644 --- a/app/app.py +++ b/app/app.py @@ -110,9 +110,12 @@ def update_device_status(): client_streams.create_tables() - with ThreadPoolExecutor(max_workers=5) as executor: - executor.submit(async_wrapper, client_streams.api_stream_requests) - executor.submit(async_wrapper, events.device_stream_event) - executor.submit(tenant.stream_meta) - executor.submit(run_every, update_device_status, device_int) - executor.submit(async_run_every, client_keys.helium_skfs_update, skfs_int) + try: + with ThreadPoolExecutor(max_workers=5) as executor: + executor.submit(async_wrapper, client_streams.api_stream_requests) + executor.submit(async_wrapper, events.device_stream_event) + executor.submit(tenant.stream_meta) + executor.submit(run_every, update_device_status, device_int) + executor.submit(async_run_every, client_keys.helium_skfs_update, skfs_int) + finally: + asyncio.run(client_keys.close()) From 789b8baf1887d8fe74813c6ac2c199f3041fc3ed Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 16:04:16 -0300 Subject: [PATCH 04/10] Make Database hold a single shared asyncpg pool created from a DSN Database now takes a DSN and creates the pool once (connect) / closes it once (close), instead of being instantiated per call. --- app/DatabasePool.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/app/DatabasePool.py b/app/DatabasePool.py index e0f39b6..55c57db 100644 --- a/app/DatabasePool.py +++ b/app/DatabasePool.py @@ -1,20 +1,23 @@ -import os import asyncpg -import asyncio class Database: + """Holds a single asyncpg connection pool, shared across the app. + + The pool is created once at startup (``connect``) and closed once at + shutdown (``close``). Callers acquire/release individual connections + from ``pool`` per transaction. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + self.pool: asyncpg.Pool | None = None + async def connect(self) -> None: - self.pool = await asyncpg.create_pool( - user=os.getenv('POSTGRES_USER', 'chirpstack'), - host=os.getenv('POSTGRES_HOST', 'chirpstack-postgres'), - port=os.getenv('POSTGRES_PORT', 5432), - database=os.getenv('POSTGRES_DB', 'chirpstack'), - password=os.getenv('POSTGRES_PASS', 'chirpstack'), - loop=asyncio.get_running_loop(), - ) + if self.pool is None: + self.pool = await asyncpg.create_pool(dsn=self.dsn) async def close(self) -> None: - if self.pool: + if self.pool is not None: await self.pool.close() - pass + self.pool = None From b8ea7b4efd683347dcd99a895884d9015279db25 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 16:04:35 -0300 Subject: [PATCH 05/10] Convert ChirpstackStreams DB access to async via shared pool db_transaction/db_fetch/fetch_active_devices/create_tables are now async and acquire connections from the shared pool. __init__ takes a pool instead of the postgres_* params. --- app/ChirpHeliumRequestsRpc.py | 62 ++++++++++++----------------------- 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/app/ChirpHeliumRequestsRpc.py b/app/ChirpHeliumRequestsRpc.py index b52c551..ab05c69 100644 --- a/app/ChirpHeliumRequestsRpc.py +++ b/app/ChirpHeliumRequestsRpc.py @@ -1,8 +1,5 @@ import os -from contextlib import closing from functools import wraps -import psycopg2 -import psycopg2.extras import redis.asyncio as redis import grpc from google.protobuf.json_format import MessageToJson, MessageToDict @@ -42,51 +39,34 @@ class ChirpstackStreams: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f'postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}' - if self.pg_ssl_mode[0] != 'require': - self.postgres = conn_str - else: - self.postgres = '%s?sslmode=%s' % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_gprc = chirpstack_host self.auth_token = [('authorization', f'Bearer {chirpstack_token}')] ########################################################################### # functions to handle helium device db transactions ########################################################################### - def db_transaction(self, query): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor() as cur: - cur.execute(query) - con.commit() - - def db_fetch(self, query): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(query) - return cur.fetchall() - - def fetch_active_devices(self) -> list[str]: + async def db_transaction(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + await con.execute(query) + + async def db_fetch(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + return await con.fetch(query) + + async def fetch_active_devices(self) -> list[str]: query = "SELECT dev_eui FROM device WHERE is_disabled=false;" - result = [device['dev_eui'].hex() for device in self.db_fetch(query)] + result = [device['dev_eui'].hex() for device in await self.db_fetch(query)] return result - def create_tables(self): + async def create_tables(self): query = """ CREATE TABLE IF NOT EXISTS helium_devices ( dev_eui text primary key, -- devices['devEui'] @@ -103,7 +83,7 @@ def create_tables(self): ); """ print('Run create helium device table if not exists.') - self.db_transaction(query) + await self.db_transaction(query) ########################################################################### # Chirpstack gRPC API calls @@ -196,7 +176,7 @@ async def add_device_euis(self, data: dict): VALUES ('{0}', '{1}') ON CONFLICT (dev_eui) DO NOTHING; """.format(dev_eui, join_eui) - self.db_transaction(query) + await self.db_transaction(query) await sync_device_euis(0, join_eui, dev_eui, self.route_id) return @@ -216,7 +196,7 @@ async def remove_device_euis(self, data: dict): print(f'Remove Device: {device}') query = "SELECT * FROM helium_devices WHERE dev_eui='{}';".format(device) - data = self.db_fetch(query)[0] + data = (await self.db_fetch(query))[0] dev_eui = data['dev_eui'] # this should be a string join_eui = data['join_eui'] # this should be a string @@ -231,7 +211,7 @@ async def remove_device_euis(self, data: dict): delete_device = """ DELETE FROM helium_devices WHERE dev_eui='{}'; """.format(device) - self.db_transaction(delete_device) + await self.db_transaction(delete_device) return @my_logger @@ -255,14 +235,14 @@ async def update_device_euis(self, data: dict): query = """ UPDATE helium_devices SET is_disabled=true WHERE dev_eui='{}'; """.format(dev_eui) - self.db_transaction(query) + await self.db_transaction(query) elif is_disabled == 'false': action = 0 query = """ UPDATE helium_devices SET is_disabled=false WHERE dev_eui='{}'; """.format(dev_eui) - self.db_transaction(query) + await self.db_transaction(query) print('action', action, 'join', join_eui, 'dev', dev_eui) await sync_device_euis(action, join_eui, dev_eui, self.route_id) return From 69c98431539131a97e3bc2ff8496f71fb79ae535 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 16:28:23 -0300 Subject: [PATCH 06/10] convert ChirpDeviceKeys to async DB and async gRPC via shared pool --- app/ChirpHeliumKeysRpc.py | 132 +++++++------------------------------- 1 file changed, 24 insertions(+), 108 deletions(-) diff --git a/app/ChirpHeliumKeysRpc.py b/app/ChirpHeliumKeysRpc.py index 785da00..a54e13f 100644 --- a/app/ChirpHeliumKeysRpc.py +++ b/app/ChirpHeliumKeysRpc.py @@ -1,8 +1,3 @@ -from contextlib import closing -from functools import wraps -# import asyncpg -import psycopg2 -import psycopg2.extras import grpc from google.protobuf.json_format import MessageToDict from chirpstack_api import api @@ -10,102 +5,38 @@ from ChirpHeliumCrypto import get_route_skfs, update_device_skfs from protos.helium import iot_config -from DatabasePool import Database - - -# def my_logger(orig_func): -# logging.basicConfig( -# filename='chirpstack-hpr.log', -# filemode='a', -# format='%(asctime)s %(levelname)s:%(name)s:%(message)s', -# level=logging.INFO, -# datefmt='%Y-%m-%d %H:%M:%S', -# ) -# logging.getLogger("asyncio").setLevel(logging.INFO) -# -# @wraps(orig_func) -# def wrapper(*args, **kwargs): -# logging.info( -# f'Passed args: {args}, kwargs: {kwargs}') -# return orig_func(*args, **kwargs) -# return wrapper class ChirpDeviceKeys: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f"postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}" - if self.pg_ssl_mode[0] != "require": - self.postgres = conn_str - else: - self.postgres = "%s?sslmode=%s" % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_gprc = chirpstack_host self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] - self._db: Database | None = None - - async def _get_pool(self): - if self._db is None: - self._db = Database() - await self._db.connect() - return self._db.pool - - async def close(self): - if self._db is not None: - await self._db.close() - self._db = None - - def db_fetch(self, query: str): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(query) - return cur.fetchall() - async def async_db_fetch(self, query: str): - pool = await self._get_pool() - async with pool.acquire() as conn: + async def db_fetch(self, query: str): + async with self.pool.acquire() as conn: async with conn.transaction(): return await conn.fetch(query) - def db_transaction(self, query: str): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor() as cur: - cur.execute(query) - con.commit() - - async def async_db_transaction(self, query: str): - pool = await self._get_pool() - async with pool.acquire() as conn: + async def db_transaction(self, query: str): + async with self.pool.acquire() as conn: async with conn.transaction(): await conn.execute(query) - def fetch_all_devices(self) -> list[str]: - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - """ - SELECT encode(dev_eui, 'hex') AS dev_eui - FROM device - WHERE is_disabled=false; - """ - ) - return [dev['dev_eui'] for dev in cur.fetchall()] + async def fetch_all_devices(self) -> list[str]: + query = """ + SELECT encode(dev_eui, 'hex') AS dev_eui + FROM device + WHERE is_disabled=false; + """ + return [dev['dev_eui'] for dev in await self.db_fetch(query)] def chunker(self, seq, size): return (seq[pos:pos + size] for pos in range(0, len(seq), size)) @@ -113,30 +44,27 @@ def chunker(self, seq, size): ########################################################################### # Chirpstack gRPC API calls ########################################################################### - # @my_logger - def get_device(self, dev_eui: str) -> dict[str]: - with grpc.insecure_channel(self.cs_gprc) as channel: + async def get_device(self, dev_eui: str) -> dict[str]: + async with grpc.aio.insecure_channel(self.cs_gprc) as channel: client = api.DeviceServiceStub(channel) req = api.GetDeviceRequest() req.dev_eui = dev_eui - resp = client.Get(req, metadata=self.auth_token) + resp = await client.Get(req, metadata=self.auth_token) data = MessageToDict(resp)["device"] return data - # @my_logger - def get_device_activation(self, dev_eui: str) -> dict[str]: - with grpc.insecure_channel(self.cs_gprc) as channel: + async def get_device_activation(self, dev_eui: str) -> dict[str]: + async with grpc.aio.insecure_channel(self.cs_gprc) as channel: client = api.DeviceServiceStub(channel) req = api.GetDeviceActivationRequest() req.dev_eui = dev_eui - resp = client.GetActivation(req, metadata=self.auth_token) + resp = await client.GetActivation(req, metadata=self.auth_token) data = MessageToDict(resp) if bool(data): return data["deviceActivation"] return data - # @my_logger - def get_merged_keys(self, dev_eui: str) -> dict[str]: + async def get_merged_keys(self, dev_eui: str) -> str: devices = { "devAddr": "", "appSKey": "", @@ -144,8 +72,8 @@ def get_merged_keys(self, dev_eui: str) -> dict[str]: "name": "", } - devices.update(self.get_device(dev_eui)) - devices.update(self.get_device_activation(dev_eui)) + devices.update(await self.get_device(dev_eui)) + devices.update(await self.get_device_activation(dev_eui)) max_copies = 0 if devices.get("variables") and "max_copies" in devices.get("variables"): @@ -179,11 +107,9 @@ def get_merged_keys(self, dev_eui: str) -> dict[str]: devices["fCntUp"], devices["nFCntDown"], ) - self.db_transaction(query) - # await self.async_db_transaction(query) + await self.db_transaction(query) return f"Updated: {dev_eui}" - # @my_logger async def helium_skfs_update(self): """ TODO: @@ -195,15 +121,10 @@ async def helium_skfs_update(self): WHERE is_disabled=false AND dev_addr != ''; """ - # all_helium_devices = self.db_fetch(helium_devices) - all_helium_devices = await self.async_db_fetch(helium_devices) - # logging.info(f"All Helium Devices: {all_helium_devices}") + all_helium_devices = await self.db_fetch(helium_devices) skfs_list = await get_route_skfs() - # logging.info(f"All Helium Devices: {all_helium_devices}") - # logging.info(f"SKFS List: {skfs_list}") - # Convert the lists to sets for efficient set operations # compare dev_addr & session_key for match else remove all_helium_sessions_set = { @@ -229,9 +150,6 @@ async def helium_skfs_update(self): for d in skfs_list } - # logging.info(f"All Helium Devices Set: {all_helium_devices_set}") - # logging.info(f"SKFS List Set: {skfs_list_set}") - # Devices to add to skfs_list devices_to_add = all_helium_devices_set - skfs_list_set logging.info(f"Devices_to_add: {devices_to_add}") @@ -248,7 +166,6 @@ async def helium_skfs_update(self): action=iot_config.ActionV1(1) ) for dev_addr, nws_key in devices_to_remove ] - # logging.info(f'RM-SKFS: {rm_skfs}') if devices_to_add: add_skfs = [ @@ -259,7 +176,6 @@ async def helium_skfs_update(self): max_copies=max_copies ) for dev_addr, nws_key, max_copies in devices_to_add ] - # logging.info(f'ADD-SKFS: {add_skfs}') skfs_action = rm_skfs + add_skfs From 070cdc5c2cb0a3caf91e61c8f753b0ed4fa7adbd Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 16:32:13 -0300 Subject: [PATCH 07/10] convert ChirpstackJoins DB access to async via shared pool --- app/ChirpHeliumJoinRpc.py | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/app/ChirpHeliumJoinRpc.py b/app/ChirpHeliumJoinRpc.py index bf4d9be..125b6ff 100644 --- a/app/ChirpHeliumJoinRpc.py +++ b/app/ChirpHeliumJoinRpc.py @@ -1,7 +1,4 @@ import os -from contextlib import closing -import psycopg2 -import psycopg2.extras import redis.asyncio as redis import grpc from google.protobuf.json_format import MessageToJson, MessageToDict @@ -24,35 +21,19 @@ class ChirpstackJoins: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f"postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}" - if self.pg_ssl_mode[0] != "require": - self.postgres = conn_str - else: - self.postgres = "%s?sslmode=%s" % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_grpc = chirpstack_host self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] - def db_transaction(self, query: str): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor() as cur: - cur.execute(query) - con.commit() + async def db_transaction(self, query: str): + async with self.pool.acquire() as con: + async with con.transaction(): + await con.execute(query) ########################################################################### # follow internal redis stream gRPC for actionable changes @@ -162,4 +143,4 @@ async def add_session_key(self, dev_eui): devices["fCntUp"], devices["nFCntDown"], ) - self.db_transaction(query) + await self.db_transaction(query) From add93c356933c5b085b8aea6d21812d35b9633f5 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 16:35:51 -0300 Subject: [PATCH 08/10] convert ChirpstackTenant to async DB and async redis db_transaction/db_fetch/stream_meta/meta_up are now async over the shared pool and redis.asyncio. Sync usage publishers run via run_in_executor. stream_meta's try/except now wraps the loop body so a transient error no longer kills the stream permanently. --- app/ChirpHeliumTenant.py | 72 ++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 44 deletions(-) diff --git a/app/ChirpHeliumTenant.py b/app/ChirpHeliumTenant.py index df43525..943c88c 100644 --- a/app/ChirpHeliumTenant.py +++ b/app/ChirpHeliumTenant.py @@ -1,8 +1,6 @@ import os -from contextlib import closing -import psycopg2 -import psycopg2.extras -import redis +import asyncio +import redis.asyncio as redis from math import ceil from google.protobuf.json_format import MessageToDict from chirpstack_api import stream @@ -21,48 +19,31 @@ class ChirpstackTenant: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f'postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}' - if self.pg_ssl_mode[0] != 'require': - self.postgres = conn_str - else: - self.postgres = '%s?sslmode=%s' % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_gprc = chirpstack_host self.auth_token = [('authorization', f'Bearer {chirpstack_token}')] - def db_transaction(self, query): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor() as cur: - cur.execute(query) - con.commit() + async def db_transaction(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + await con.execute(query) - def db_fetch(self, query): - with closing(psycopg2.connect(self.postgres)) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(query) - return cur.fetchall() + async def db_fetch(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + return await con.fetch(query) - def stream_meta(self): + async def stream_meta(self): stream_key = 'stream:meta' last_id = '0' - try: - while True: - resp = rdb.xread({stream_key: last_id}, count=1, block=0) + while True: + try: + resp = await rdb.xread({stream_key: last_id}, count=1, block=0) for message in resp[0][1]: last_id = message[0] @@ -72,14 +53,12 @@ def stream_meta(self): pl = stream.meta_pb2.UplinkMeta() pl.ParseFromString(b) data = MessageToDict(pl) - self.meta_up(data) + await self.meta_up(data) - except Exception as exc: - print(f'Error: {exc}') - # log exception error here when adding logger - pass + except Exception as exc: + logging.info(f'stream_meta: {exc}') - def meta_up(self, data: dict): + async def meta_up(self, data: dict): dev_eui = data['devEui'] dupes = len(data['rxInfo']) # dc = ceil(data['phyPayloadByteCount'] / 24) @@ -97,7 +76,7 @@ def meta_up(self, data: dict): query = """ UPDATE helium_devices SET dc_used = (dc_used + {}) WHERE dev_eui='{}'; """.format(total_dc, dev_eui) - self.db_transaction(query) + await self.db_transaction(query) if os.getenv('PUBLISH_USAGE_EVENTS') == 'True': # First we get the tenant id for the device... @@ -106,11 +85,16 @@ def meta_up(self, data: dict): JOIN device ON application.id = device.application_id WHERE device.dev_eui = decode('%s', 'hex'); """ % dev_eui - result = self.db_fetch(query) + result = await self.db_fetch(query) tenant_id = None application_id = None for row in result: tenant_id = row['tenant_id'] application_id = row['id'] - publish_usage_event(dev_eui, tenant_id, application_id, total_dc) + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, + publish_usage_event, + dev_eui, tenant_id, application_id, total_dc, + ) return From 3fb6e6faae3f406d7252c7f73171e25528a5a519 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 16:49:51 -0300 Subject: [PATCH 09/10] run all tasks on a single event loop with one shared pool Replaces the ThreadPoolExecutor (one event loop pe0r thread) with a single asyncio.run(main()) that creates one shared pool and runs all five tasks via asyncio.gather. Periodic tasks use an async run_periodically helper instead of asyncio.run per tick. build_dsn now applies POSTGRES_SSL_MODE correctly (the old check compared a single char and never matched). --- app/app.py | 146 ++++++++++++++++++++--------------------------------- 1 file changed, 54 insertions(+), 92 deletions(-) diff --git a/app/app.py b/app/app.py index 7a2c40e..9a5dcdd 100644 --- a/app/app.py +++ b/app/app.py @@ -2,7 +2,8 @@ import asyncio import logging import time -from concurrent.futures import ThreadPoolExecutor + +from DatabasePool import Database from ChirpHeliumRequestsRpc import ChirpstackStreams from ChirpHeliumKeysRpc import ChirpDeviceKeys from ChirpHeliumTenant import ChirpstackTenant @@ -12,110 +13,71 @@ logging.basicConfig(level=logging.INFO) -if __name__ == '__main__': +def build_dsn() -> str: + user = os.getenv('POSTGRES_USER') + password = os.getenv('POSTGRES_PASS') + host = os.getenv('POSTGRES_HOST') + port = os.getenv('POSTGRES_PORT', 5432) + name = os.getenv('POSTGRES_DB') + ssl_mode = os.getenv('POSTGRES_SSL_MODE', 'allow') + dsn = f'postgresql://{user}:{password}@{host}:{port}/{name}' + return f'{dsn}?sslmode={ssl_mode}' + + +async def run_periodically(coro_fn, interval: int, name: str): + """Run an async callable forever, every `interval` seconds.""" + while True: + start = time.time() + try: + print(f'{time.ctime()} Executing: {name}, interval: {interval}s.') + await coro_fn() + except Exception as err: + print(f'{name} Error: {err}') + elapsed = time.time() - start + await asyncio.sleep(max(0, interval - elapsed)) + + +async def main(): route_id = os.getenv('ROUTE_ID') - postgres_host = os.getenv('POSTGRES_HOST') - postgres_user = os.getenv('POSTGRES_USER') - postgres_pass = os.getenv('POSTGRES_PASS') - postgres_name = os.getenv('POSTGRES_DB') - postgres_port = os.getenv('POSTGRES_PORT', 5432) - postgres_ssl_mode = os.getenv('POSTGRES_SSL_MODE', 'allow') chirpstack_host = os.getenv('CHIRPSTACK_SERVER') chirpstack_token = os.getenv('CHIRPSTACK_APIKEY') - events = ChirpstackJoins( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) + db = Database(build_dsn()) + await db.connect() + events = ChirpstackJoins( + route_id, db.pool, chirpstack_host, chirpstack_token) client_streams = ChirpstackStreams( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) - + route_id, db.pool, chirpstack_host, chirpstack_token) client_keys = ChirpDeviceKeys( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) - + route_id, db.pool, chirpstack_host, chirpstack_token) tenant = ChirpstackTenant( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) - - def run_every(fn: str, interval: int): - name = str(fn) - while True: - try: - start = time.time() - print(f'{time.ctime()} Executing: {name}, sleeping: {interval} seconds.') - fn() - stop = time.time() - time.sleep(interval - (stop - start)) - except Exception as err: - print(f'{name} Error: {err}') - pass - - def async_run_every(fn: str, interval: int): - name = str(fn) - while True: - try: - start = time.time() - print(f'{time.ctime()} Executing: {name}, sleeping: {interval} seconds.') - asyncio.run(fn()) - stop = time.time() - time.sleep(interval - (stop - start)) - except Exception as err: - print(f'{name} Error: {err}') - pass - - def async_wrapper(corro): - return asyncio.run(corro()) - - def update_device_status(): - updates = list(map(client_keys.get_merged_keys, client_keys.fetch_all_devices())) + route_id, db.pool, chirpstack_host, chirpstack_token) + + async def update_device_status(): + updates = [] + for dev_eui in await client_keys.fetch_all_devices(): + updates.append(await client_keys.get_merged_keys(dev_eui)) print('\n'.join(updates)) - return skfs_int = 60 * 5 # 5 minutes device_int = 60 * 5 # 5 minutes - client_streams.create_tables() + await client_streams.create_tables() try: - with ThreadPoolExecutor(max_workers=5) as executor: - executor.submit(async_wrapper, client_streams.api_stream_requests) - executor.submit(async_wrapper, events.device_stream_event) - executor.submit(tenant.stream_meta) - executor.submit(run_every, update_device_status, device_int) - executor.submit(async_run_every, client_keys.helium_skfs_update, skfs_int) + await asyncio.gather( + client_streams.api_stream_requests(), + events.device_stream_event(), + tenant.stream_meta(), + run_periodically(update_device_status, device_int, + 'update_device_status'), + run_periodically(client_keys.helium_skfs_update, skfs_int, + 'helium_skfs_update'), + ) finally: - asyncio.run(client_keys.close()) + await db.close() + + +if __name__ == '__main__': + asyncio.run(main()) From e2df80f3bed00e494b59e7b095b8c40d7c9f0ed7 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 16 May 2026 18:44:37 -0300 Subject: [PATCH 10/10] bound the asyncpg pool size to avoid starving shared Postgres create_pool defaults to min_size=10/max_size=10, so the service held 10 Postgres connections open for its whole lifetime. On a database shared with ChirpStack that can push it past max_connections. Cap the pool at max_size=5 and min_size=1 so idle connections are released. --- app/DatabasePool.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/DatabasePool.py b/app/DatabasePool.py index 55c57db..f1972c8 100644 --- a/app/DatabasePool.py +++ b/app/DatabasePool.py @@ -15,7 +15,11 @@ def __init__(self, dsn: str): async def connect(self) -> None: if self.pool is None: - self.pool = await asyncpg.create_pool(dsn=self.dsn) + self.pool = await asyncpg.create_pool( + dsn=self.dsn, + min_size=1, # do not hold idle connections open + max_size=5, # small ceiling; queries are short-lived + ) async def close(self) -> None: if self.pool is not None: