66from collections import namedtuple
77from urllib import parse
88
9+ import nest_asyncio
910import pandas as pd
1011import requests
1112from aiohttp import ClientSession
1213
14+ nest_asyncio .apply ()
1315node = namedtuple ('clickhouse' , ['host' , 'port' , 'user' , 'password' , 'database' ])
1416available_queries_select = ('select' , 'show' , 'desc' )
1517available_queries_insert = ('insert' , 'optimize' , 'create' )
18+
1619SEMAPHORE = 10
1720
1821
19- class ClickhouseTools (object ):
22+ class ClickHouseCreateTableTools (object ):
23+ @classmethod
24+ def _create_table_sql (cls , db : str , table : str , dtypes_dict : dict ,
25+ order_by_key_cols : (list , tuple ),
26+ primary_key_cols = None , sample_expr = None ,
27+ engine_type : str = 'ReplacingMergeTree' , partitions_expr = None ,
28+ settings = "SETTINGS index_granularity = 8192" , other = '' ):
29+ """
30+
31+ :param db:
32+ :param table:
33+ :param dtypes_dict:
34+ :param order_by_key_cols:
35+ :param primary_key_cols:
36+ :param sample_expr:
37+ :param engine_type:
38+ :param partitions_expr:
39+ :param settings:
40+ :param other:
41+ :return:
42+ """
43+ """CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
44+ (
45+ name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
46+ name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
47+ ...
48+ ) ENGINE = ReplacingMergeTree([ver])
49+ [PARTITION BY expr]
50+ [ORDER BY expr]
51+ [PRIMARY KEY expr]
52+ [SAMPLE BY expr]
53+ [SETTINGS name=value, ...]"""
54+
55+ cols_def = ',' .join ([f"{ name } { d_type } " for name , d_type in dtypes_dict .items ()])
56+
57+ maid_body = f"CREATE TABLE IF NOT EXISTS { db } .{ table } ( { cols_def } ) ENGINE = { engine_type } "
58+
59+ ORDER_BY_CLAUSE = f"ORDER BY ( { ',' .join (order_by_key_cols )} )"
60+
61+ if partitions_expr is None :
62+ PARTITION_by_CLAUSE = ''
63+ else :
64+ PARTITION_by_CLAUSE = f"PARTITION BY { partitions_expr } "
65+
66+ if primary_key_cols is None :
67+ PRIMARY_BY_CLAUSE = ''
68+ else :
69+ primary_key_expr = ',' .join (primary_key_cols )
70+ PRIMARY_BY_CLAUSE = f"PARTITION BY ({ primary_key_expr } )"
71+
72+ if sample_expr is None :
73+ SAMPLE_CLAUSE = ''
74+ else :
75+ SAMPLE_CLAUSE = sample_expr
76+
77+ base = f"{ maid_body } { PARTITION_by_CLAUSE } { ORDER_BY_CLAUSE } { PRIMARY_BY_CLAUSE } { SAMPLE_CLAUSE } { other } { settings } "
78+
79+ return base
80+
2081 @staticmethod
21- def detect_end_with_limit (string , pattern = r'[\s]+limit[\s]+[0-9]+$' ):
82+ def _check_end_with_limit (string , pattern = r'[\s]+limit[\s]+[0-9]+$' ):
2283 m = re .findall (pattern , string )
2384 if m is None or m == []:
2485 return False
2586 else :
2687 return True
2788
28- @staticmethod
29- def translate_dtypes_from_df (df : pd .DataFrame ,
30- translate_dtypes : dict = {'object' : 'String' ,
31- 'datetime64[ns]' : 'Datetime' }):
32- if hasattr (df , 'dtypes' ):
33- dtypes_series = df .dtypes .replace (translate_dtypes )
34- return dtypes_series .map (lambda x : str (x ).capitalize ()).to_dict ()
35- elif hasattr (df , '_columns_' ) and 'type' in df ._columns_ and 'name' in df ._columns_ :
36- dtypes_series = df .set_index ('name' )['type' ].replace (translate_dtypes )
37- return dtypes_series .map (lambda x : str (x )).to_dict ()
89+ @classmethod
90+ def _create_table_from_sql (cls , db : str , table : str , sql : str , key_cols : list ,
91+ extra_format_dict : (dict , None ) = None ,
92+ primary_key_cols = None , sample_expr = None , other = '' ,
93+ engine_type : str = 'ReplacingMergeTree' ,
94+ partitions_expr = None , query_func = None ):
95+ """
96+
97+ :param obj:
98+ :param db:
99+ :param table:
100+ :param sql:
101+ :param key_cols:
102+ :param engine_type:
103+ :param extra_format_dict:
104+ :return:
105+ """
106+
107+ if isinstance (sql , str ):
108+ pass
38109 else :
39- raise ValueError (f'unknown df:{ type (df )} ' )
110+ raise ValueError ('sql must be string' )
111+
112+ limit_status = cls ._check_end_with_limit (sql , pattern = r'[\s]+limit[\s]+[0-9]+$' )
113+ if limit_status :
114+ describe_sql = f' describe({ sql } ) '
115+ else :
116+ describe_sql = f'describe ( { sql } limit 1)'
117+
118+ if query_func is None :
119+ raise ValueError ('query function should be set!' )
120+
121+ dtypes_df = query_func (describe_sql )
122+
123+ dtypes_dict = dict (dtypes_df [['name' , 'type' ]].drop_duplicates ().values )
124+ if extra_format_dict is None :
125+ pass
126+ else :
127+ dtypes_dict .update (extra_format_dict )
128+ sql = cls ._create_table_sql (db , table , dtypes_dict , key_cols , engine_type = engine_type ,
129+ primary_key_cols = primary_key_cols , sample_expr = sample_expr ,
130+ partitions_expr = partitions_expr , other = other )
131+
132+ return sql
40133
41134 @staticmethod
42- def translate_dtype1_as_dtype2 (df : pd .DataFrame , src2target = {'category' : 'str' }):
135+ def translate_dtypes1_as_dtypes2 (df : pd .DataFrame , src2target = {'category' : 'str' }):
43136 dtypes_series = df .dtypes
44137 for src , dest in src2target .items ():
45138 if src in dtypes_series :
@@ -50,12 +143,26 @@ def translate_dtype1_as_dtype2(df: pd.DataFrame, src2target={'category': 'str'})
50143 pass
51144 return df
52145
146+ @staticmethod
147+ def translate_dtypes_from_df (df : pd .DataFrame , translate_dtypes : dict = {'object' : 'String' ,
148+ 'datetime64[ns]' : 'Datetime' }):
149+ if hasattr (df , 'dtypes' ):
150+ dtypes_series = df .dtypes .replace (translate_dtypes )
151+ return dtypes_series .map (lambda x : str (x ).capitalize ()).to_dict ()
152+ elif hasattr (df , '_columns_' ) and 'type' in df ._columns_ and 'name' in df ._columns_ :
153+ dtypes_series = df .set_index ('name' )['type' ].replace (translate_dtypes )
154+ return dtypes_series .map (lambda x : str (x )).to_dict ()
155+ else :
156+ raise ValueError (f'unknown df:{ type (df )} ' )
157+
53158 @classmethod
54- def _create_table_from_df (cls , obj : object , db : str , table : str , df : pd .DataFrame , key_cols : (list , tuple ),
55- engine_type : str = 'ReplacingMergeTree' , extra_format_dict = None , partitions_expr = None ):
56- query_func = obj .query
159+ def _create_table_from_df (cls , db : str , table : str , df : pd .DataFrame , key_cols : (list , tuple ),
160+ engine_type : str = 'ReplacingMergeTree' , extra_format_dict = None , partitions_expr = None ,
161+ src2target = {'category' : 'str' },
162+ query_func = None
163+ ):
57164
58- df = cls .translate_dtype1_as_dtype2 (df , src2target = {'category' : 'str' })
165+ df = cls .translate_dtypes1_as_dtypes2 (df , src2target = {'category' : 'str' })
59166 cols = df .columns
60167 dtypes_dict = cls .translate_dtypes_from_df (df )
61168 if extra_format_dict is None :
@@ -71,65 +178,31 @@ def _create_table_from_df(cls, obj: object, db: str, table: str, df: pd.DataFram
71178 return exist_status
72179
73180 @classmethod
74- def _create_table_sql (cls , db : str , table : str , dtypes_dict : dict , key_cols : (list , tuple ),
75- engine_type : str = 'ReplacingMergeTree' , partitions_expr = None ):
76- # dtypes_dict.update(extra_format_dict)
77- cols_def = ',' .join ([f"{ name } { d_type } " for name , d_type in dtypes_dict .items ()])
78- order_by_cols = ',' .join (key_cols )
181+ def _check_table_exists (cls , obj , db , table ):
182+ ## todo check the table exists
183+ pass
79184
80- maid_body = f"CREATE TABLE IF NOT EXISTS { db } .{ table } ( { cols_def } ) ENGINE = { engine_type } "
81- settings = "SETTINGS index_granularity = 8192"
82- conds = f"ORDER BY ( { order_by_cols } )"
83- if partitions_expr is None :
84- partitions = ''
85- else :
86- partitions = f"PARTITION BY { partitions_expr } "
87- base = f"{ maid_body } { conds } { partitions } { settings } "
88- return base
89185
90- @classmethod
91- def _create_table_from_sql (cls , obj : object , db : str , table : str , sql : str , key_cols : list ,
92- engine_type : str = 'ReplacingMergeTree' ,
93- extra_format_dict : (dict , None ) = None ,
94- partitions_expr = None ) -> bool :
95- """
186+ class ClickHouseTools (ClickHouseCreateTableTools ):
96187
97- :param obj:
98- :param db:
99- :param table:
100- :param sql:
101- :param key_cols:
102- :param engine_type:
103- :param extra_format_dict:
104- :return:
105- """
188+ @classmethod
189+ def _create_table_from_df (cls , obj : object , db : str , table : str , df : pd .DataFrame , key_cols : (list , tuple ),
190+ engine_type : str = 'ReplacingMergeTree' , extra_format_dict = None , partitions_expr = None ):
191+ query_func = obj .query
106192
193+ df = cls .translate_dtypes1_as_dtypes2 (df , src2target = {'category' : 'str' })
194+ cols = df .columns
195+ dtypes_dict = cls .translate_dtypes_from_df (df )
107196 if extra_format_dict is None :
108- extra_format_dict = {}
109-
110- query_func = obj .query
111- if sql .endswith (';' ):
112- sql = sql [:- 1 ]
113- end_with_limit_status = cls .detect_end_with_limit (sql , pattern = r'[\s]+limit[\s]+[0-9]+$' )
114- if end_with_limit_status :
115- describe_sql = f' describe({ sql } ) '
197+ pass
116198 else :
117- describe_sql = f'describe ( { sql } limit 1)'
118-
199+ dtypes_dict .update (extra_format_dict )
200+ dtypes_dict = {k : v for k , v in dtypes_dict .items () if k in cols }
201+ base = cls ._create_table_from_sql (db , table , dtypes_dict , key_cols , engine_type = engine_type ,
202+ extra_format_dict = extra_format_dict , partitions_expr = partitions_expr )
119203 exist_status = cls ._check_table_exists (obj , db , table )
120- if exist_status :
121- print ('table:{table} already exists!' )
122- else :
123- print ('will create {table} at {db}' )
124- dtypes_df = query_func (describe_sql )
125- dtypes_dict = dict (dtypes_df [['name' , 'type' ]].drop_duplicates ().values )
126- if extra_format_dict is None :
127- pass
128- else :
129- dtypes_dict .update (extra_format_dict )
130- sql = cls ._create_table_sql (db , table , dtypes_dict , key_cols , engine_type = engine_type ,
131- partitions_expr = partitions_expr )
132- query_func (sql )
204+
205+ query_func (base )
133206 return exist_status
134207
135208 @classmethod
@@ -167,7 +240,7 @@ def _load_into_pd(ret_value, convert_to: str = 'dataframe'):
167240
168241 for i in meta :
169242 if i ['type' ] in ['DateTime' , 'Nullable(DateTime)' ]:
170- df [i ['name' ]] = pd .to_datetime (df [i ['name' ]])
243+ df [i ['name' ]] = pd .to_datetime (df [i ['name' ]], errors = 'ignore' )
171244 ret_value = df
172245 return ret_value
173246
@@ -197,7 +270,7 @@ def _merge_settings(cls, settings, updated_settings=None):
197270 return {k : v * 1 if isinstance (v , bool ) else v for k , v in updated_settings .items ()}
198271
199272
200- class ClickhouseBaseNode (ClickhouseTools ):
273+ class ClickhouseBaseNode (ClickHouseTools ):
201274 accepted_formats = ['DataFrame' , 'TabSeparated' , 'TabSeparatedRaw' , 'TabSeparatedWithNames' ,
202275 'TabSeparatedWithNamesAndTypes' , 'CSV' , 'CSVWithNames' , 'Values' , 'Vertical' , 'JSON' ,
203276 'JSONCompact' , 'JSONEachRow' , 'TSKV' , 'Pretty' , 'PrettyCompact' ,
@@ -267,21 +340,28 @@ async def _compression_switched_request(self, query_with_format: str, convert_to
267340 if isinstance (query_with_format , str ):
268341 sql2 = self ._transfer_sql_format (query_with_format , convert_to = convert_to ,
269342 transfer_sql_format = transfer_sql_format )
270-
271343 result = await self ._post (url , sql2 , session )
272-
273344 elif isinstance (query_with_format , (tuple , list )):
274345 result = []
275346 for sql in query_with_format :
276347 s = self ._transfer_sql_format (sql , convert_to = convert_to ,
277348 transfer_sql_format = transfer_sql_format )
278-
279349 res = await self ._post (url , s , session )
280350 result .append (res )
281-
282351 else :
283- raise ValueError ('query_with_format must be str or list or tuple' )
352+ raise ValueError ('query_with_format must be str , list or tuple' )
353+
354+ return result
284355
356+ def _load_into_pd_ext (self , sql , res , convert_to , to_df ):
357+ if isinstance (sql , str ):
358+ if to_df :
359+ result = self ._load_into_pd (res , convert_to )
360+ elif isinstance (sql , (list , tuple )):
361+ if to_df :
362+ result = [self ._load_into_pd (s , convert_to ) for s in res ]
363+ else :
364+ raise ValueError ('sql must be str or list or tuple' )
285365 return result
286366
287367 def __execute__ (self , sql : (str , list , tuple ), convert_to : str = 'dataframe' , transfer_sql_format : bool = True ,
@@ -306,14 +386,7 @@ def __execute__(self, sql: (str, list, tuple), convert_to: str = 'dataframe', tr
306386
307387 res = loop .run_until_complete (resp_list )
308388
309- if isinstance (sql , str ):
310- if to_df :
311- result = self ._load_into_pd (res , convert_to )
312- elif isinstance (sql , (list , tuple )):
313- if to_df :
314- result = [self ._load_into_pd (s , convert_to ) for s in res ]
315- else :
316- raise ValueError ('sql must be str or list or tuple' )
389+ result = self ._load_into_pd_ext (sql , res , convert_to , to_df )
317390
318391 return result
319392
@@ -336,7 +409,7 @@ def execute(self, *sql, convert_to: str = 'dataframe', loop=None, ):
336409 to_df = True
337410 transfer_sql_format = True
338411 else :
339- raise ValueError ('sql list must be same type query!' )
412+ raise ValueError ('the list of query must be same type query!' )
340413
341414 if len (sql ) != 1 :
342415
@@ -376,5 +449,5 @@ def databases(self):
376449 ** {'host' : '47.104.186.157' , 'port' : 8123 , 'user' : 'default' , 'password' : 'Imsn0wfree' ,
377450 'database' : 'EDGAR_LOG' })
378451 df1 = node .tables
379- print ( df1 )
452+ dff = node . query ( "select * from system.parts" )
380453 pass
0 commit comments