1010import requests
1111from aiohttp import ClientSession
1212
13- nest_asyncio .apply ()
13+ nest_asyncio .apply () # allow run at jupyter and asyncio env
1414
1515node_parameters = ['host' , 'port' , 'user' , 'password' , 'database' ]
1616node = namedtuple ('clickhouse' , node_parameters )
1717available_queries_select = ('select' , 'show' , 'desc' )
1818available_queries_insert = ('insert' , 'optimize' , 'create' )
1919
20- SEMAPHORE = 10
20+ SEMAPHORE = 10 # control async number for whole query list
2121
2222from ClickSQL .conf .parse_rfc_1738_args import _parse_rfc1738_args
2323
@@ -107,14 +107,14 @@ def __init__(self, **db_settings):
107107 self .http_settings = self ._merge_settings (None , updated_settings = self ._default_settings )
108108 self .http_settings .update ({'user' : self ._para .user , 'password' : self ._para .password })
109109
110- self ._session = ClientSession ()
110+ # self._session = ClientSession()
111111 self .max_async_query_once = 5
112112 self .is_closed = False
113113
114114 self ._test_connection_ ()
115115
116116 @staticmethod
117- def _check_db_settings (db_settings : dict , available_db_type = [node .__name__ ]):
117+ def _check_db_settings (db_settings : dict , available_db_type = [node .__name__ ]): # node.__name__ : clickhouse
118118 """
119119 it is to check db setting whether is correct!
120120 :param db_settings:
@@ -176,7 +176,7 @@ async def _post(self, url, sql, session):
176176 result = await resp .read ()
177177
178178 status = resp .status
179- reason = resp .reason
179+ # reason = resp.reason
180180 if status != 200 :
181181 raise ValueError (result )
182182 return result
@@ -270,7 +270,6 @@ def execute(self, *sql, convert_to: str = 'dataframe', loop=None, ):
270270 :param loop:
271271 :return:
272272 """
273- ## TODO warning: Unclosed client session
274273
275274 insert_process = list (map (lambda x : x .lower ().startswith (available_queries_insert ), sql ))
276275 select_process = list (map (lambda x : x .lower ().startswith (available_queries_select ), sql ))
@@ -292,19 +291,24 @@ def execute(self, *sql, convert_to: str = 'dataframe', loop=None, ):
292291 to_df = to_df )
293292 return result
294293
295- def query (self , sql : str ):
294+ def query (self , * sql : str ):
296295 """
297296 require to upgrade
298297 :param sql:
299298 :return:
300299 """
301- result = self .execute (sql , convert_to = 'dataframe' , loop = None , )
300+ result = self .execute (* sql , convert_to = 'dataframe' , loop = None , )
302301 return result
303302
304303
305304class ClickHouseTableNode (ClickHouseBaseNode ):
306- def __init__ (self , conn_str : str ):
307- db_settings = _parse_rfc1738_args (conn_str )
305+ def __init__ (self , conn_str : (str , dict )):
306+ if isinstance (conn_str , str ):
307+ db_settings = _parse_rfc1738_args (conn_str )
308+ elif isinstance (conn_str , dict ):
309+ db_settings = conn_str
310+ else :
311+ raise ParameterTypeError (f'conn_str must be str or dict but get: { type (conn_str )} ' )
308312 super (ClickHouseTableNode , self ).__init__ (** db_settings )
309313
310314 @property
0 commit comments