Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from urllib.parse import quote
from urllib.parse import quote, urlencode

import pandas as pd
from sqlalchemy import create_engine
Expand All @@ -12,7 +12,7 @@
from mindsdb.integrations.libs.response import (
HandlerStatusResponse as StatusResponse,
HandlerResponse as Response,
RESPONSE_TYPE
RESPONSE_TYPE,
)

logger = log.getLogger(__name__)
Expand All @@ -23,15 +23,15 @@ class ClickHouseHandler(DatabaseHandler):
This handler handles connection and execution of the ClickHouse statements.
"""

name = 'clickhouse'
name = "clickhouse"

def __init__(self, name, connection_data, **kwargs):
super().__init__(name)
self.dialect = 'clickhouse'
self.dialect = "clickhouse"
self.connection_data = connection_data
self.renderer = SqlalchemyRender(ClickHouseDialect)
self.is_connected = False
self.protocol = connection_data.get('protocol', 'native')
self.protocol = connection_data.get("protocol", "native")

def __del__(self):
if self.is_connected is True:
Expand All @@ -50,23 +50,31 @@ def connect(self):
if self.is_connected:
return self.connection

protocol = "clickhouse+native" if self.protocol == 'native' else "clickhouse+http"
host = quote(self.connection_data['host'])
port = self.connection_data['port']
user = quote(self.connection_data['user'])
password = quote(self.connection_data['password'])
database = quote(self.connection_data['database'])
url = f'{protocol}://{user}:{password}@{host}:{port}/{database}'
protocol = "clickhouse+native" if self.protocol == "native" else "clickhouse+http"
host = quote(self.connection_data["host"])
port = self.connection_data["port"]
user = quote(self.connection_data["user"])
password = quote(self.connection_data["password"])
database = quote(self.connection_data["database"])
verify = self.connection_data.get("verify", True)
url = f"{protocol}://{user}:{password}@{host}:{port}/{database}"
# This is not redundunt. Check https://clickhouse-sqlalchemy.readthedocs.io/en/latest/connection.html#http
if self.protocol == 'https':
url = url + "?protocol=https"

params = {}
if self.protocol == "https":
params["protocol"] = "https"
if verify is False:
params["verify"] = "false"
if params:
url = f"{url}?{urlencode(params)}"

try:
engine = create_engine(url)
connection = engine.raw_connection()
self.is_connected = True
self.connection = connection
except SQLAlchemyError as e:
logger.error(f'Error connecting to ClickHouse {self.connection_data["database"]}, {e}!')
logger.error(f"Error connecting to ClickHouse {self.connection_data['database']}, {e}!")
self.is_connected = False
raise

Expand All @@ -86,12 +94,12 @@ def check_connection(self) -> StatusResponse:
connection = self.connect()
cur = connection.cursor()
try:
cur.execute('select 1;')
cur.execute("select 1;")
finally:
cur.close()
response.success = True
except SQLAlchemyError as e:
logger.error(f'Error connecting to ClickHouse {self.connection_data["database"]}, {e}!')
logger.error(f"Error connecting to ClickHouse {self.connection_data['database']}, {e}!")
response.error_message = str(e)
self.is_connected = False

Expand All @@ -117,22 +125,13 @@ def native_query(self, query: str) -> Response:
cur.execute(query)
result = cur.fetchall()
if result:
response = Response(
RESPONSE_TYPE.TABLE,
pd.DataFrame(
result,
columns=[x[0] for x in cur.description]
)
)
response = Response(RESPONSE_TYPE.TABLE, pd.DataFrame(result, columns=[x[0] for x in cur.description]))
else:
response = Response(RESPONSE_TYPE.OK)
connection.commit()
except SQLAlchemyError as e:
logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
response = Response(
RESPONSE_TYPE.ERROR,
error_message=str(e)
)
logger.error(f"Error running query: {query} on {self.connection_data['database']}!")
response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
connection.rollback()
finally:
cur.close()
Expand All @@ -155,7 +154,7 @@ def get_tables(self) -> Response:
df = result.data_frame

if df is not None:
result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
result.data_frame = df.rename(columns={df.columns[0]: "table_name"})

return result

Expand Down
65 changes: 33 additions & 32 deletions mindsdb/integrations/handlers/clickhouse_handler/connection_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,50 @@

connection_args = OrderedDict(
protocol={
'type': ARG_TYPE.STR,
'description': 'The protocol to query clickhouse. Supported: native, http, https. Default: native',
'required': False,
'label': 'Protocol'
"type": ARG_TYPE.STR,
"description": "The protocol to query clickhouse. Supported: native, http, https. Default: native",
"required": False,
"label": "Protocol",
},
user={
'type': ARG_TYPE.STR,
'description': 'The user name used to authenticate with the ClickHouse server.',
'required': True,
'label': 'User'
"type": ARG_TYPE.STR,
"description": "The user name used to authenticate with the ClickHouse server.",
"required": True,
"label": "User",
},
database={
'type': ARG_TYPE.STR,
'description': 'The database name to use when connecting with the ClickHouse server.',
'required': True,
'label': 'Database name'
"type": ARG_TYPE.STR,
"description": "The database name to use when connecting with the ClickHouse server.",
"required": True,
"label": "Database name",
},
host={
'type': ARG_TYPE.STR,
'description': 'The host name or IP address of the ClickHouse server. NOTE: use \'127.0.0.1\' instead of \'localhost\' to connect to local server.',
'required': True,
'label': 'Host'
"type": ARG_TYPE.STR,
"description": "The host name or IP address of the ClickHouse server. NOTE: use '127.0.0.1' instead of 'localhost' to connect to local server.",
"required": True,
"label": "Host",
},
port={
'type': ARG_TYPE.INT,
'description': 'The TCP/IP port of the ClickHouse server. Must be an integer.',
'required': True,
'label': 'Port'
"type": ARG_TYPE.INT,
"description": "The TCP/IP port of the ClickHouse server. Must be an integer.",
"required": True,
"label": "Port",
},
password={
'type': ARG_TYPE.PWD,
'description': 'The password to authenticate the user with the ClickHouse server.',
'required': True,
'label': 'Password',
'secret': True
}
"type": ARG_TYPE.PWD,
"description": "The password to authenticate the user with the ClickHouse server.",
"required": True,
"label": "Password",
"secret": True,
},
verify={
"type": ARG_TYPE.BOOL,
"description": "Controls certificate verification in https protocol. Possible choices: true/false. Default is true.",
"required": False,
"label": "SSL Verification",
},
)

connection_args_example = OrderedDict(
protocol='native',
host='127.0.0.1',
port=9000,
user='root',
password='password',
database='database'
protocol="native", host="127.0.0.1", port=9000, user="root", password="password", database="database", verify=True
)
Loading