diff --git a/app/ChirpHeliumJoinRpc.py b/app/ChirpHeliumJoinRpc.py index b1ad409..125b6ff 100644 --- a/app/ChirpHeliumJoinRpc.py +++ b/app/ChirpHeliumJoinRpc.py @@ -1,6 +1,4 @@ import os -import psycopg2 -import psycopg2.extras import redis.asyncio as redis import grpc from google.protobuf.json_format import MessageToJson, MessageToDict @@ -23,34 +21,19 @@ class ChirpstackJoins: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f"postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}" - if self.pg_ssl_mode[0] != "require": - self.postgres = conn_str - else: - self.postgres = "%s?sslmode=%s" % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_grpc = chirpstack_host self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] - def db_transaction(self, query: str): - with psycopg2.connect(self.postgres) as con: - with con.cursor() as cur: - cur.execute(query) + async def db_transaction(self, query: str): + async with self.pool.acquire() as con: + async with con.transaction(): + await con.execute(query) ########################################################################### # follow internal redis stream gRPC for actionable changes @@ -160,4 +143,4 @@ async def add_session_key(self, dev_eui): devices["fCntUp"], devices["nFCntDown"], ) - self.db_transaction(query) + await self.db_transaction(query) diff --git a/app/ChirpHeliumKeysRpc.py b/app/ChirpHeliumKeysRpc.py index b84ca5c..a54e13f 100644 --- a/app/ChirpHeliumKeysRpc.py +++ b/app/ChirpHeliumKeysRpc.py @@ -1,7 +1,3 @@ -from functools import wraps -# import asyncpg -import psycopg2 -import psycopg2.extras import grpc from google.protobuf.json_format import MessageToDict from chirpstack_api import api @@ -9,96 +5,38 @@ from ChirpHeliumCrypto import get_route_skfs, update_device_skfs from protos.helium import iot_config -from DatabasePool import Database - - -# def my_logger(orig_func): -# logging.basicConfig( -# filename='chirpstack-hpr.log', -# filemode='a', -# format='%(asctime)s %(levelname)s:%(name)s:%(message)s', -# level=logging.INFO, -# datefmt='%Y-%m-%d %H:%M:%S', -# ) -# logging.getLogger("asyncio").setLevel(logging.INFO) -# -# @wraps(orig_func) -# def wrapper(*args, **kwargs): -# logging.info( -# f'Passed args: {args}, kwargs: {kwargs}') -# return orig_func(*args, **kwargs) -# return wrapper class ChirpDeviceKeys: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f"postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}" - if self.pg_ssl_mode[0] != "require": - self.postgres = conn_str - else: - self.postgres = "%s?sslmode=%s" % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_gprc = chirpstack_host self.auth_token = [("authorization", f"Bearer {chirpstack_token}")] - def db_fetch(self, query: str): - with psycopg2.connect(self.postgres) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(query) - return cur.fetchall() - - async def async_db_fetch(self, query: str): - db = Database() - await db.connect() - assert db.pool - - async with db.pool.acquire() as conn: + async def db_fetch(self, query: str): + async with self.pool.acquire() as conn: async with conn.transaction(): - cur = await conn.fetch(query) - await db.close() - return cur - - def db_transaction(self, query: str): - with psycopg2.connect(self.postgres) as con: - with con.cursor() as cur: - cur.execute(query) + return await conn.fetch(query) - async def async_db_transaction(self, query: str): - db = Database() - await db.connect() - assert db.pool - async with db.pool.acquire() as conn: + async def db_transaction(self, query: str): + async with self.pool.acquire() as conn: async with conn.transaction(): await conn.execute(query) - def fetch_all_devices(self) -> list[str]: - with psycopg2.connect(self.postgres) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute( - """ - SELECT encode(dev_eui, 'hex') AS dev_eui - FROM device - WHERE is_disabled=false; - """ - ) - return [dev['dev_eui'] for dev in cur.fetchall()] + async def fetch_all_devices(self) -> list[str]: + query = """ + SELECT encode(dev_eui, 'hex') AS dev_eui + FROM device + WHERE is_disabled=false; + """ + return [dev['dev_eui'] for dev in await self.db_fetch(query)] def chunker(self, seq, size): return (seq[pos:pos + size] for pos in range(0, len(seq), size)) @@ -106,30 +44,27 @@ def chunker(self, seq, size): ########################################################################### # Chirpstack gRPC API calls ########################################################################### - # @my_logger - def get_device(self, dev_eui: str) -> dict[str]: - with grpc.insecure_channel(self.cs_gprc) as channel: + async def get_device(self, dev_eui: str) -> dict[str]: + async with grpc.aio.insecure_channel(self.cs_gprc) as channel: client = api.DeviceServiceStub(channel) req = api.GetDeviceRequest() req.dev_eui = dev_eui - resp = client.Get(req, metadata=self.auth_token) + resp = await client.Get(req, metadata=self.auth_token) data = MessageToDict(resp)["device"] return data - # @my_logger - def get_device_activation(self, dev_eui: str) -> dict[str]: - with grpc.insecure_channel(self.cs_gprc) as channel: + async def get_device_activation(self, dev_eui: str) -> dict[str]: + async with grpc.aio.insecure_channel(self.cs_gprc) as channel: client = api.DeviceServiceStub(channel) req = api.GetDeviceActivationRequest() req.dev_eui = dev_eui - resp = client.GetActivation(req, metadata=self.auth_token) + resp = await client.GetActivation(req, metadata=self.auth_token) data = MessageToDict(resp) if bool(data): return data["deviceActivation"] return data - # @my_logger - def get_merged_keys(self, dev_eui: str) -> dict[str]: + async def get_merged_keys(self, dev_eui: str) -> str: devices = { "devAddr": "", "appSKey": "", @@ -137,8 +72,8 @@ def get_merged_keys(self, dev_eui: str) -> dict[str]: "name": "", } - devices.update(self.get_device(dev_eui)) - devices.update(self.get_device_activation(dev_eui)) + devices.update(await self.get_device(dev_eui)) + devices.update(await self.get_device_activation(dev_eui)) max_copies = 0 if devices.get("variables") and "max_copies" in devices.get("variables"): @@ -172,11 +107,9 @@ def get_merged_keys(self, dev_eui: str) -> dict[str]: devices["fCntUp"], devices["nFCntDown"], ) - self.db_transaction(query) - # await self.async_db_transaction(query) + await self.db_transaction(query) return f"Updated: {dev_eui}" - # @my_logger async def helium_skfs_update(self): """ TODO: @@ -188,15 +121,10 @@ async def helium_skfs_update(self): WHERE is_disabled=false AND dev_addr != ''; """ - # all_helium_devices = self.db_fetch(helium_devices) - all_helium_devices = await self.async_db_fetch(helium_devices) - # logging.info(f"All Helium Devices: {all_helium_devices}") + all_helium_devices = await self.db_fetch(helium_devices) skfs_list = await get_route_skfs() - # logging.info(f"All Helium Devices: {all_helium_devices}") - # logging.info(f"SKFS List: {skfs_list}") - # Convert the lists to sets for efficient set operations # compare dev_addr & session_key for match else remove all_helium_sessions_set = { @@ -222,9 +150,6 @@ async def helium_skfs_update(self): for d in skfs_list } - # logging.info(f"All Helium Devices Set: {all_helium_devices_set}") - # logging.info(f"SKFS List Set: {skfs_list_set}") - # Devices to add to skfs_list devices_to_add = all_helium_devices_set - skfs_list_set logging.info(f"Devices_to_add: {devices_to_add}") @@ -241,7 +166,6 @@ async def helium_skfs_update(self): action=iot_config.ActionV1(1) ) for dev_addr, nws_key in devices_to_remove ] - # logging.info(f'RM-SKFS: {rm_skfs}') if devices_to_add: add_skfs = [ @@ -252,7 +176,6 @@ async def helium_skfs_update(self): max_copies=max_copies ) for dev_addr, nws_key, max_copies in devices_to_add ] - # logging.info(f'ADD-SKFS: {add_skfs}') skfs_action = rm_skfs + add_skfs diff --git a/app/ChirpHeliumRequestsRpc.py b/app/ChirpHeliumRequestsRpc.py index da1dd71..ab05c69 100644 --- a/app/ChirpHeliumRequestsRpc.py +++ b/app/ChirpHeliumRequestsRpc.py @@ -1,7 +1,5 @@ import os from functools import wraps -import psycopg2 -import psycopg2.extras import redis.asyncio as redis import grpc from google.protobuf.json_format import MessageToJson, MessageToDict @@ -41,50 +39,34 @@ class ChirpstackStreams: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f'postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}' - if self.pg_ssl_mode[0] != 'require': - self.postgres = conn_str - else: - self.postgres = '%s?sslmode=%s' % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_gprc = chirpstack_host self.auth_token = [('authorization', f'Bearer {chirpstack_token}')] ########################################################################### # functions to handle helium device db transactions ########################################################################### - def db_transaction(self, query): - with psycopg2.connect(self.postgres) as con: - with con.cursor() as cur: - cur.execute(query) - - def db_fetch(self, query): - with psycopg2.connect(self.postgres) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(query) - return cur.fetchall() - - def fetch_active_devices(self) -> list[str]: + async def db_transaction(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + await con.execute(query) + + async def db_fetch(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + return await con.fetch(query) + + async def fetch_active_devices(self) -> list[str]: query = "SELECT dev_eui FROM device WHERE is_disabled=false;" - result = [device['dev_eui'].hex() for device in self.db_fetch(query)] + result = [device['dev_eui'].hex() for device in await self.db_fetch(query)] return result - def create_tables(self): + async def create_tables(self): query = """ CREATE TABLE IF NOT EXISTS helium_devices ( dev_eui text primary key, -- devices['devEui'] @@ -101,7 +83,7 @@ def create_tables(self): ); """ print('Run create helium device table if not exists.') - self.db_transaction(query) + await self.db_transaction(query) ########################################################################### # Chirpstack gRPC API calls @@ -194,7 +176,7 @@ async def add_device_euis(self, data: dict): VALUES ('{0}', '{1}') ON CONFLICT (dev_eui) DO NOTHING; """.format(dev_eui, join_eui) - self.db_transaction(query) + await self.db_transaction(query) await sync_device_euis(0, join_eui, dev_eui, self.route_id) return @@ -214,7 +196,7 @@ async def remove_device_euis(self, data: dict): print(f'Remove Device: {device}') query = "SELECT * FROM helium_devices WHERE dev_eui='{}';".format(device) - data = self.db_fetch(query)[0] + data = (await self.db_fetch(query))[0] dev_eui = data['dev_eui'] # this should be a string join_eui = data['join_eui'] # this should be a string @@ -229,7 +211,7 @@ async def remove_device_euis(self, data: dict): delete_device = """ DELETE FROM helium_devices WHERE dev_eui='{}'; """.format(device) - self.db_transaction(delete_device) + await self.db_transaction(delete_device) return @my_logger @@ -253,14 +235,14 @@ async def update_device_euis(self, data: dict): query = """ UPDATE helium_devices SET is_disabled=true WHERE dev_eui='{}'; """.format(dev_eui) - self.db_transaction(query) + await self.db_transaction(query) elif is_disabled == 'false': action = 0 query = """ UPDATE helium_devices SET is_disabled=false WHERE dev_eui='{}'; """.format(dev_eui) - self.db_transaction(query) + await self.db_transaction(query) print('action', action, 'join', join_eui, 'dev', dev_eui) await sync_device_euis(action, join_eui, dev_eui, self.route_id) return diff --git a/app/ChirpHeliumTenant.py b/app/ChirpHeliumTenant.py index 3c91106..943c88c 100644 --- a/app/ChirpHeliumTenant.py +++ b/app/ChirpHeliumTenant.py @@ -1,7 +1,6 @@ import os -import psycopg2 -import psycopg2.extras -import redis +import asyncio +import redis.asyncio as redis from math import ceil from google.protobuf.json_format import MessageToDict from chirpstack_api import stream @@ -20,47 +19,31 @@ class ChirpstackTenant: def __init__( self, route_id: str, - postgres_host: str, - postgres_user: str, - postgres_pass: str, - postgres_name: str, - postgres_port: str, - postgres_ssl_mode: str, + pool, chirpstack_host: str, chirpstack_token: str, ): self.route_id = route_id - self.pg_host = postgres_host - self.pg_user = postgres_user - self.pg_pass = postgres_pass - self.pg_name = postgres_name - self.pg_port = postgres_port - self.pg_ssl_mode = postgres_ssl_mode - conn_str = f'postgresql://{self.pg_user}:{self.pg_pass}@{self.pg_host}:{self.pg_port}/{self.pg_name}' - if self.pg_ssl_mode[0] != 'require': - self.postgres = conn_str - else: - self.postgres = '%s?sslmode=%s' % (conn_str, self.pg_ssl_mode) + self.pool = pool self.cs_gprc = chirpstack_host self.auth_token = [('authorization', f'Bearer {chirpstack_token}')] - def db_transaction(self, query): - with psycopg2.connect(self.postgres) as con: - with con.cursor() as cur: - cur.execute(query) + async def db_transaction(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + await con.execute(query) - def db_fetch(self, query): - with psycopg2.connect(self.postgres) as con: - with con.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(query) - return cur.fetchall() + async def db_fetch(self, query): + async with self.pool.acquire() as con: + async with con.transaction(): + return await con.fetch(query) - def stream_meta(self): + async def stream_meta(self): stream_key = 'stream:meta' last_id = '0' - try: - while True: - resp = rdb.xread({stream_key: last_id}, count=1, block=0) + while True: + try: + resp = await rdb.xread({stream_key: last_id}, count=1, block=0) for message in resp[0][1]: last_id = message[0] @@ -70,14 +53,12 @@ def stream_meta(self): pl = stream.meta_pb2.UplinkMeta() pl.ParseFromString(b) data = MessageToDict(pl) - self.meta_up(data) + await self.meta_up(data) - except Exception as exc: - print(f'Error: {exc}') - # log exception error here when adding logger - pass + except Exception as exc: + logging.info(f'stream_meta: {exc}') - def meta_up(self, data: dict): + async def meta_up(self, data: dict): dev_eui = data['devEui'] dupes = len(data['rxInfo']) # dc = ceil(data['phyPayloadByteCount'] / 24) @@ -95,7 +76,7 @@ def meta_up(self, data: dict): query = """ UPDATE helium_devices SET dc_used = (dc_used + {}) WHERE dev_eui='{}'; """.format(total_dc, dev_eui) - self.db_transaction(query) + await self.db_transaction(query) if os.getenv('PUBLISH_USAGE_EVENTS') == 'True': # First we get the tenant id for the device... @@ -104,11 +85,16 @@ def meta_up(self, data: dict): JOIN device ON application.id = device.application_id WHERE device.dev_eui = decode('%s', 'hex'); """ % dev_eui - result = self.db_fetch(query) + result = await self.db_fetch(query) tenant_id = None application_id = None for row in result: tenant_id = row['tenant_id'] application_id = row['id'] - publish_usage_event(dev_eui, tenant_id, application_id, total_dc) + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, + publish_usage_event, + dev_eui, tenant_id, application_id, total_dc, + ) return diff --git a/app/DatabasePool.py b/app/DatabasePool.py index e0f39b6..f1972c8 100644 --- a/app/DatabasePool.py +++ b/app/DatabasePool.py @@ -1,20 +1,27 @@ -import os import asyncpg -import asyncio class Database: + """Holds a single asyncpg connection pool, shared across the app. + + The pool is created once at startup (``connect``) and closed once at + shutdown (``close``). Callers acquire/release individual connections + from ``pool`` per transaction. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + self.pool: asyncpg.Pool | None = None + async def connect(self) -> None: - self.pool = await asyncpg.create_pool( - user=os.getenv('POSTGRES_USER', 'chirpstack'), - host=os.getenv('POSTGRES_HOST', 'chirpstack-postgres'), - port=os.getenv('POSTGRES_PORT', 5432), - database=os.getenv('POSTGRES_DB', 'chirpstack'), - password=os.getenv('POSTGRES_PASS', 'chirpstack'), - loop=asyncio.get_running_loop(), - ) + if self.pool is None: + self.pool = await asyncpg.create_pool( + dsn=self.dsn, + min_size=1, # do not hold idle connections open + max_size=5, # small ceiling; queries are short-lived + ) async def close(self) -> None: - if self.pool: + if self.pool is not None: await self.pool.close() - pass + self.pool = None diff --git a/app/app.py b/app/app.py index d670d36..9a5dcdd 100644 --- a/app/app.py +++ b/app/app.py @@ -2,7 +2,8 @@ import asyncio import logging import time -from concurrent.futures import ThreadPoolExecutor + +from DatabasePool import Database from ChirpHeliumRequestsRpc import ChirpstackStreams from ChirpHeliumKeysRpc import ChirpDeviceKeys from ChirpHeliumTenant import ChirpstackTenant @@ -12,107 +13,71 @@ logging.basicConfig(level=logging.INFO) -if __name__ == '__main__': +def build_dsn() -> str: + user = os.getenv('POSTGRES_USER') + password = os.getenv('POSTGRES_PASS') + host = os.getenv('POSTGRES_HOST') + port = os.getenv('POSTGRES_PORT', 5432) + name = os.getenv('POSTGRES_DB') + ssl_mode = os.getenv('POSTGRES_SSL_MODE', 'allow') + dsn = f'postgresql://{user}:{password}@{host}:{port}/{name}' + return f'{dsn}?sslmode={ssl_mode}' + + +async def run_periodically(coro_fn, interval: int, name: str): + """Run an async callable forever, every `interval` seconds.""" + while True: + start = time.time() + try: + print(f'{time.ctime()} Executing: {name}, interval: {interval}s.') + await coro_fn() + except Exception as err: + print(f'{name} Error: {err}') + elapsed = time.time() - start + await asyncio.sleep(max(0, interval - elapsed)) + + +async def main(): route_id = os.getenv('ROUTE_ID') - postgres_host = os.getenv('POSTGRES_HOST') - postgres_user = os.getenv('POSTGRES_USER') - postgres_pass = os.getenv('POSTGRES_PASS') - postgres_name = os.getenv('POSTGRES_DB') - postgres_port = os.getenv('POSTGRES_PORT', 5432) - postgres_ssl_mode = os.getenv('POSTGRES_SSL_MODE', 'allow') chirpstack_host = os.getenv('CHIRPSTACK_SERVER') chirpstack_token = os.getenv('CHIRPSTACK_APIKEY') - events = ChirpstackJoins( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) + db = Database(build_dsn()) + await db.connect() + events = ChirpstackJoins( + route_id, db.pool, chirpstack_host, chirpstack_token) client_streams = ChirpstackStreams( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) - + route_id, db.pool, chirpstack_host, chirpstack_token) client_keys = ChirpDeviceKeys( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) - + route_id, db.pool, chirpstack_host, chirpstack_token) tenant = ChirpstackTenant( - route_id=route_id, - postgres_host=postgres_host, - postgres_user=postgres_user, - postgres_pass=postgres_pass, - postgres_name=postgres_name, - postgres_port=postgres_port, - postgres_ssl_mode=postgres_ssl_mode, - chirpstack_host=chirpstack_host, - chirpstack_token=chirpstack_token, - ) - - def run_every(fn: str, interval: int): - name = str(fn) - while True: - try: - start = time.time() - print(f'{time.ctime()} Executing: {name}, sleeping: {interval} seconds.') - fn() - stop = time.time() - time.sleep(interval - (stop - start)) - except Exception as err: - print(f'{name} Error: {err}') - pass - - def async_run_every(fn: str, interval: int): - name = str(fn) - while True: - try: - start = time.time() - print(f'{time.ctime()} Executing: {name}, sleeping: {interval} seconds.') - asyncio.run(fn()) - stop = time.time() - time.sleep(interval - (stop - start)) - except Exception as err: - print(f'{name} Error: {err}') - pass - - def async_wrapper(corro): - return asyncio.run(corro()) - - def update_device_status(): - updates = list(map(client_keys.get_merged_keys, client_keys.fetch_all_devices())) + route_id, db.pool, chirpstack_host, chirpstack_token) + + async def update_device_status(): + updates = [] + for dev_eui in await client_keys.fetch_all_devices(): + updates.append(await client_keys.get_merged_keys(dev_eui)) print('\n'.join(updates)) - return skfs_int = 60 * 5 # 5 minutes device_int = 60 * 5 # 5 minutes - client_streams.create_tables() + await client_streams.create_tables() - with ThreadPoolExecutor(max_workers=5) as executor: - executor.submit(async_wrapper, client_streams.api_stream_requests) - executor.submit(async_wrapper, events.device_stream_event) - executor.submit(tenant.stream_meta) - executor.submit(run_every, update_device_status, device_int) - executor.submit(async_run_every, client_keys.helium_skfs_update, skfs_int) + try: + await asyncio.gather( + client_streams.api_stream_requests(), + events.device_stream_event(), + tenant.stream_meta(), + run_periodically(update_device_status, device_int, + 'update_device_status'), + run_periodically(client_keys.helium_skfs_update, skfs_int, + 'helium_skfs_update'), + ) + finally: + await db.close() + + +if __name__ == '__main__': + asyncio.run(main())