44import json
55from collections import namedtuple
66from urllib import parse
7-
8- import nest_asyncio
97import pandas as pd
8+ import nest_asyncio
109import requests
1110from aiohttp import ClientSession
11+ from ClickSQL .conf .parse_rfc_1738_args import parse_rfc1738_args
12+
13+ """
14+ this will hold base function of clickhouse and it will apply a path of access clickhouse through clickhouse api service
15+ this scripts will use none of clickhouse client and only depend on requests to make transactions with
16+ clickhouse-server
17+
18+ """
1219
1320nest_asyncio .apply () # allow run at jupyter and asyncio env
1421
1522node_parameters = ['host' , 'port' , 'user' , 'password' , 'database' ]
1623node = namedtuple ('clickhouse' , node_parameters )
1724available_queries_select = ('select' , 'show' , 'desc' )
1825available_queries_insert = ('insert' , 'optimize' , 'create' )
19-
26+ PRINT_TEST_RESULT = True
2027SEMAPHORE = 10 # control async number for whole query list
2128
22- from ClickSQL .conf .parse_rfc_1738_args import _parse_rfc1738_args
2329
24-
25- class ParameterKeyError ( Exception ): pass
30+ class ParameterKeyError ( Exception ):
31+ pass
2632
2733
28- class ParameterTypeError (Exception ): pass
34+ class ParameterTypeError (Exception ):
35+ pass
2936
3037
31- class DatabaseTypeError (Exception ): pass
38+ class DatabaseTypeError (Exception ):
39+ pass
3240
3341
42+ # TODO change to queue mode change remove aiohttp depends
3443class ClickHouseTools (object ):
3544 @staticmethod
3645 def _transfer_sql_format (sql , convert_to , transfer_sql_format = True ):
46+ """
47+ provide a method which will translate a standard sql into clickhouse sql with might use format as suffix
48+ :param sql:
49+ :param convert_to:
50+ :param transfer_sql_format:
51+ :return:
52+ """
3753 if transfer_sql_format :
3854 clickhouse_format = 'JSON' if convert_to is None else 'JSONCompact' if convert_to .lower () == 'dataframe' else convert_to
3955 query_with_format = (sql .rstrip ('; \n \t ' ) + ' format ' + clickhouse_format ).replace ('\n ' , ' ' ).strip (' ' )
@@ -43,78 +59,100 @@ def _transfer_sql_format(sql, convert_to, transfer_sql_format=True):
4359
4460 @staticmethod
4561 def _load_into_pd (ret_value , convert_to : str = 'dataframe' , errors = 'ignore' ):
62+ """
63+ will provide a approach to load data from clickhouse into pd.DataFrame format which may be easy to use
64+
65+ :param ret_value:
66+ :param convert_to:
67+ :param errors:
68+ :return:
69+ """
70+
4671 if convert_to .lower () == 'dataframe' :
72+
4773 result_dict = json .loads (ret_value , strict = False )
4874 meta = result_dict ['meta' ]
4975 name = map (lambda x : x ['name' ], meta )
5076 data = result_dict ['data' ]
5177 df = pd .DataFrame .from_records (data , columns = list (name ))
5278
5379 for i in meta :
54- if i ['type' ] in ['DateTime' , 'Nullable(DateTime)' ]:
80+ if i ['type' ] in ['DateTime' , 'Nullable(DateTime)' ]: # translate format
81+ # process datetime format
5582 df [i ['name' ]] = pd .to_datetime (df [i ['name' ]], errors = errors )
56- ret_value = df
57- return ret_value
83+ return df
84+ else :
85+ return ret_value
5886
5987 @classmethod
60- def _merge_settings (cls , settings , updated_settings = None ):
88+ def _merge_settings (cls , settings : (None , dict ), updated_settings : (None , dict ) = None ,
89+ extra_settings : (None , dict ) = None ):
6190 """
6291
6392 :param settings:
6493 :param updated_settings:
6594 :return:
6695 """
96+
6797 if updated_settings is None :
6898 updated_settings = {'enable_http_compression' : 1 , 'send_progress_in_http_headers' : 0 ,
6999 'log_queries' : 1 , 'connect_timeout' : 10 , 'receive_timeout' : 300 ,
70100 'send_timeout' : 300 , 'output_format_json_quote_64bit_integers' : 0 ,
71101 'wait_end_of_query' : 0 }
102+ elif not isinstance (updated_settings , dict ):
103+ raise ParameterTypeError (f'updated_settings must be dict type, but get { type (updated_settings )} ' )
104+ else :
105+ pass
72106
73- if settings is not None :
107+ if settings is not None and isinstance ( settings , dict ) :
74108 invalid_setting_keys = list (set (settings .keys ()) - set (updated_settings .keys ()))
75109 if len (invalid_setting_keys ) > 0 :
76110 raise ValueError ('setting "{0}" is invalid, valid settings are: {1}' .format (
77111 invalid_setting_keys [0 ], ', ' .join (updated_settings .keys ())))
78- else :
79- pass
112+
80113 updated_settings .update (settings )
81114
115+ if extra_settings is not None and isinstance (extra_settings , dict ):
116+ updated_settings .update (extra_settings )
117+
82118 return {k : v * 1 if isinstance (v , bool ) else v for k , v in updated_settings .items ()}
83119
84120
85121class ClickHouseBaseNode (ClickHouseTools ):
86- accepted_formats = [ 'DataFrame' , 'TabSeparated' , 'TabSeparatedRaw' , 'TabSeparatedWithNames' ,
122+ accepted_formats = ( 'DataFrame' , 'TabSeparated' , 'TabSeparatedRaw' , 'TabSeparatedWithNames' ,
87123 'TabSeparatedWithNamesAndTypes' , 'CSV' , 'CSVWithNames' , 'Values' , 'Vertical' , 'JSON' ,
88124 'JSONCompact' , 'JSONEachRow' , 'TSKV' , 'Pretty' , 'PrettyCompact' ,
89- 'PrettyCompactMonoBlock' , 'PrettyNoEscapes' , 'PrettySpace' , 'XML' ]
125+ 'PrettyCompactMonoBlock' , 'PrettyNoEscapes' , 'PrettySpace' , 'XML' )
126+
90127 _default_settings = {'enable_http_compression' : 1 , 'send_progress_in_http_headers' : 0 ,
91128 'log_queries' : 1 , 'connect_timeout' : 10 , 'receive_timeout' : 300 ,
92129 'send_timeout' : 300 , 'output_format_json_quote_64bit_integers' : 0 ,
93130 'wait_end_of_query' : 0 }
94131
132+ __slots__ = ('_db' , '_base_url' , '_para' , 'http_settings' , 'max_async_query_once' , 'is_closed' )
133+
95134 def __init__ (self , ** db_settings ):
96135 """
97136 :param db_settings:
98137 """
99- self ._check_db_settings (db_settings )
138+ self ._check_db_settings (db_settings , available_db_type = [ node . __name__ ] )
100139
140+ self ._db = db_settings ['database' ]
141+ self ._base_url = "http://{host}:{port}/?" .format (host = db_settings ['host' ], port = int (db_settings ['port' ]))
101142 self ._para = node (db_settings ['host' ], db_settings ['port' ], db_settings ['user' ],
102- db_settings ['password' ], db_settings ['database' ])
103- self ._db = self ._para .database
104-
105- self ._base_url = "http://{host}:{port}/?" .format (host = self ._para .host , port = int (self ._para .port ))
143+ db_settings ['password' ], db_settings ['database' ]) # store connection information
106144
107- self .http_settings = self ._merge_settings (None , updated_settings = self ._default_settings )
108- self . http_settings . update ( {'user' : self ._para .user , 'password' : self . _para . password })
109-
110- # self._session = ClientSession()
145+ self .http_settings = self ._merge_settings (None , updated_settings = self ._default_settings ,
146+ extra_settings = {'user' : self ._para .user ,
147+ 'password' : self . _para . password })
148+ # self._session = ClientSession() # the reason of unclose session client
111149 self .max_async_query_once = 5
112150 self .is_closed = False
113151
114- self ._test_connection_ ()
152+ self ._test_connection_ (self . _base_url )
115153
116154 @staticmethod
117- def _check_db_settings (db_settings : dict , available_db_type = [ node .__name__ ] ): # node.__name__ : clickhouse
155+ def _check_db_settings (db_settings : dict , available_db_type = ( node .__name__ ,) ): # node.__name__ : clickhouse
118156 """
119157 it is to check db setting whether is correct!
120158 :param db_settings:
@@ -123,27 +161,32 @@ def _check_db_settings(db_settings: dict, available_db_type=[node.__name__]): #
123161
124162 if isinstance (db_settings , dict ):
125163 if db_settings ['name' ].lower () not in available_db_type :
126- raise DatabaseTypeError ('database symbol is not accept, now only accept: {",".join(available_db_type)}' )
127- missing_keys = []
128- for key in node_parameters :
129- if key not in db_settings . keys () :
130- missing_keys .append (key )
131- else :
132- pass
164+ raise DatabaseTypeError (
165+ f'database symbol is not accept, now only accept: { "," . join ( available_db_type ) } ' )
166+ missing_keys = [ key for key in node_parameters if key not in db_settings . keys ()]
167+ # :
168+ # missing_keys.append(key)
169+ # else:
170+ # pass
133171 if len (missing_keys ) == 0 :
134172 pass
135173 else :
136174 raise ParameterKeyError (f"the following keys are not at settings: { ',' .join (missing_keys )} " )
137175 else :
138176 raise ParameterTypeError (f'db_setting must be dict type! but get { type (db_settings )} ' )
139177
140- def _test_connection_ (self ):
178+ @staticmethod
179+ def _test_connection_ (_base_url ):
141180 """
142181 is to test connection by normal way!
182+
183+ alter function type into staticmethod
143184 :return:
144185 """
145- ret_value = requests .get (self ._base_url )
146- print ('connection test: ' , ret_value .text .strip ())
186+ ret_value = requests .get (_base_url )
187+ if PRINT_TEST_RESULT :
188+ print ('connection test: ' , ret_value .text .strip ())
189+ del ret_value
147190
148191 @property
149192 def _connect_url (self ):
@@ -158,9 +201,9 @@ def _connect_url(self):
158201 )
159202 return url_str
160203
161- async def _post (self , url , sql , session ):
204+ async def _post (self , url : str , sql : str , session ):
162205 """
163- the aysnc way to send post request to the server
206+ the async way to send post request to the server
164207 :param url:
165208 :param sql:
166209 :param session:
@@ -213,27 +256,28 @@ async def _compression_switched_request(self, query_with_format: str, convert_to
213256
214257 return result
215258
216- def _load_into_pd_ext (self , sql , res , convert_to , to_df ):
259+ @classmethod
260+ def _load_into_pd_ext (cls , sql : (str , list , tuple ), ret_value , convert_to : str , to_df : bool ):
217261 """
218262 a way to parse into dataframe
219263 :param sql:
220- :param res :
264+ :param ret_value :
221265 :param convert_to:
222266 :param to_df:
223267 :return:
224268 """
225269 if isinstance (sql , str ):
226- if to_df :
227- result = self ._load_into_pd (res , convert_to )
270+ if to_df or ret_value != b'' :
271+ result = cls ._load_into_pd (ret_value , convert_to )
228272 else :
229- result = res
273+ result = ret_value
230274 elif isinstance (sql , (list , tuple )):
231275 if to_df :
232- result = [self ._load_into_pd (s , convert_to ) for s in res ]
276+ result = [cls ._load_into_pd (s , convert_to ) if ret_value != b'' else None for s in ret_value ]
233277 else :
234- result = res
278+ result = ret_value
235279 else :
236- raise ValueError ('sql must be str or list or tuple' )
280+ raise ValueError (f 'sql must be str or list or tuple,but get { type ( sql ) } ' )
237281 return result
238282
239283 def __execute__ (self , sql : (str , list , tuple ), convert_to : str = 'dataframe' , transfer_sql_format : bool = True ,
@@ -252,26 +296,26 @@ def __execute__(self, sql: (str, list, tuple), convert_to: str = 'dataframe', tr
252296 loop = asyncio .get_event_loop ()
253297
254298 sem = asyncio .Semaphore (SEMAPHORE ) # limit async num
255-
256299 resp_list = self ._compression_switched_request (sql , convert_to = convert_to ,
257300 transfer_sql_format = transfer_sql_format , sem = sem )
258301
259302 res = loop .run_until_complete (resp_list )
260-
261303 result = self ._load_into_pd_ext (sql , res , convert_to , to_df )
262304
263305 return result
264306
265307 def execute (self , * sql , convert_to : str = 'dataframe' , loop = None , ):
266308 """
267-
309+ execute sql or multi sql
268310 :param sql:
269311 :param convert_to:
270312 :param loop:
271313 :return:
272314 """
273-
315+ # TODO change to smart mode, can receive any kind sql combination and handle them
316+ # detect whether all query are insert process
274317 insert_process = list (map (lambda x : x .lower ().startswith (available_queries_insert ), sql ))
318+ # detect whether all query are select process
275319 select_process = list (map (lambda x : x .lower ().startswith (available_queries_select ), sql ))
276320 if all (insert_process ) is True :
277321 to_df = False
@@ -280,10 +324,12 @@ def execute(self, *sql, convert_to: str = 'dataframe', loop=None, ):
280324 to_df = True
281325 transfer_sql_format = True
282326 else :
283- raise ValueError ('the list of query must be same type query!' )
327+ # TODO change to smart mode, can receive any kind sql combination and handle them
328+ raise ValueError (
329+ 'the list of queries must be same type query! currently cannot handle various kind SQL type'
330+ 'combination' )
284331
285332 if len (sql ) != 1 :
286-
287333 result = self .__execute__ (sql , convert_to = convert_to , transfer_sql_format = transfer_sql_format , loop = loop ,
288334 to_df = to_df )
289335 else :
@@ -293,7 +339,7 @@ def execute(self, *sql, convert_to: str = 'dataframe', loop=None, ):
293339
294340 def query (self , * sql : str ):
295341 """
296- require to upgrade
342+ ## TODO require to upgrade
297343 :param sql:
298344 :return:
299345 """
@@ -304,7 +350,7 @@ def query(self, *sql: str):
304350class ClickHouseTableNode (ClickHouseBaseNode ):
305351 def __init__ (self , conn_str : (str , dict )):
306352 if isinstance (conn_str , str ):
307- db_settings = _parse_rfc1738_args (conn_str )
353+ db_settings = parse_rfc1738_args (conn_str )
308354 elif isinstance (conn_str , dict ):
309355 db_settings = conn_str
310356 else :
0 commit comments