From 8cbbe602f4f6e45ee815b7adc8c9b282073e50ed Mon Sep 17 00:00:00 2001 From: Michael Olayemi Olawepo <154475559+sejubar@users.noreply.github.com> Date: Tue, 11 Nov 2025 17:52:33 -0500 Subject: [PATCH 01/11] Fqe 1675 - additional test for mysql api (#11797) Co-authored-by: andrew --- .../executor/sql_query/steps/insert_step.py | 3 + .../flows/test_mysql_api_extended.py | 756 ++++++++++++++++++ 2 files changed, 759 insertions(+) create mode 100644 tests/integration/flows/test_mysql_api_extended.py diff --git a/mindsdb/api/executor/sql_query/steps/insert_step.py b/mindsdb/api/executor/sql_query/steps/insert_step.py index 097b6e2e116..73657ffc42c 100644 --- a/mindsdb/api/executor/sql_query/steps/insert_step.py +++ b/mindsdb/api/executor/sql_query/steps/insert_step.py @@ -4,6 +4,7 @@ from mindsdb.api.executor.planner.steps import SaveToTable, InsertToTable, CreateTableStep from mindsdb.api.executor.sql_query.result_set import ResultSet, Column +from mindsdb.utilities.exception import EntityNotExistsError from mindsdb.api.executor.exceptions import NotSupportedYet, LogicError from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES @@ -114,6 +115,8 @@ def call(self, step): table_name = step.table dn = self.session.datahub.get(integration_name) + if dn is None: + raise EntityNotExistsError("Database not found", integration_name) dn.create_table(table_name=table_name, columns=step.columns, is_replace=step.is_replace, is_create=True) return ResultSet() diff --git a/tests/integration/flows/test_mysql_api_extended.py b/tests/integration/flows/test_mysql_api_extended.py new file mode 100644 index 00000000000..6839b437e29 --- /dev/null +++ b/tests/integration/flows/test_mysql_api_extended.py @@ -0,0 +1,756 @@ +import os +import pytest +import time +from .test_mysql_api import BaseStuff +import mysql.connector + + +@pytest.fixture(scope="module") +def setup_local_db(): + """Module-scoped fixture to create a writeable DB for table tests.""" + db_name = "test_db_local" + helper = BaseStuff() + helper.use_binary = False + + params = {"user": "postgres", "password": "postgres", "host": "postgres", "port": 5432, "database": "postgres"} + + print(f"\n--> [Fixture setup_local_db] Setting up local database: {db_name} on {params['host']}:{params['port']}") + try: + helper.query(f"DROP DATABASE IF EXISTS {db_name}") + create_datasource_sql_via_connector(helper, db_name, "postgres", params) + yield db_name + except (mysql.connector.Error, TimeoutError) as e: + pytest.skip( + f"\n\n--- FIXTURE SETUP FAILED ---\n" + f"Could not connect to the PostgreSQL container ('{params['host']}').\n" + f"Please ensure your Docker Compose environment is running correctly.\n" + f"Original Error: {e}\n" + ) + finally: + print(f"\n--> [Fixture setup_local_db] Tearing down database: {db_name}") + try: + helper.query(f"DROP DATABASE IF EXISTS {db_name};") + except mysql.connector.Error: + pass + + +def create_datasource_sql_via_connector(helper_instance, db_name, engine, parameters, poll_timeout=30, poll_interval=2): + """Helper to create a datasource via a CREATE DATABASE query.""" + params_list = [f'"{k}": "{v}"' if isinstance(v, str) else f'"{k}": {v}' for k, v in parameters.items()] + params_str = ", ".join(params_list) + query_str = f"CREATE DATABASE {db_name} WITH ENGINE = '{engine}', PARAMETERS = {{{params_str}}};" + print(f" [Helper create_datasource] Executing: CREATE DATABASE {db_name}...") + helper_instance.query(query_str) + start_time = time.time() + while True: + try: + helper_instance.validate_database_creation(db_name) + print(f" [Helper create_datasource] DATABASE {db_name} created and validated.") + break + except AssertionError as e: + elapsed_time = time.time() - start_time + if elapsed_time > poll_timeout: + print(f" [Helper create_datasource] ERROR: Timeout after {poll_timeout}s waiting for {db_name}.") + raise TimeoutError(f"Timed out waiting for database {db_name} to be created.") from e + time.sleep(poll_interval) + + +def wait_for_trigger_creation(query_fn, trigger_name, timeout=20, max_interval=5): + """ + Polls information_schema to see if a trigger is visible. + Does not raise an error, returns True (found) or False (not found). + """ + start = time.time() + interval = 1 + print(f"\n[DEBUG] Checking for trigger '{trigger_name}' in information_schema (timeout={timeout}s)...") + while time.time() - start < timeout: + try: + result = query_fn(f"SELECT 1 FROM information_schema.triggers WHERE trigger_name = '{trigger_name}';") + if result: + print(f"[DEBUG] Trigger '{trigger_name}' found in information_schema after {time.time() - start:.2f}s.") + return True + except Exception: + pass + + try: + result = query_fn("SHOW TRIGGERS;") + if result and trigger_name in [row.get("Trigger", row.get("TRIGGER")) for row in result]: + print(f"[DEBUG] Trigger '{trigger_name}' found in SHOW TRIGGERS after {time.time() - start:.2f}s.") + return True + except Exception: + pass + time.sleep(interval) + interval = min(interval * 1.5, max_interval) + + print( + f"[DEBUG] WARNING: Trigger '{trigger_name}' was not found in metadata after {timeout}s. Proceeding with functional test..." + ) + return False + + +def wait_for_trigger_to_fire( + query_fn, db_name, source_table_name, target_table_name, test_id, updated_message, timeout=120, max_interval=10 +): + """ + Polls for a trigger to fire by repeatedly sending the UPDATE command + and checking the target table. This is robust against trigger creation lag. + """ + start = time.time() + interval = 1 + + while time.time() - start < timeout: + print(f"[DEBUG] Firing trigger (elapsed={time.time() - start:.1f}s, interval={interval:.2f}s)...") + query_fn(f"UPDATE {db_name}.{source_table_name} SET message = '{updated_message}' WHERE id = {test_id};") + + time.sleep(interval) + + result = query_fn(f"SELECT id, message FROM {db_name}.{target_table_name} WHERE id = {test_id};") + if result: + elapsed = time.time() - start + print(f"[DEBUG] Trigger fired and verified after {elapsed:.2f}s → {result}") + return result + + print("[DEBUG] Trigger not fired yet. Retrying...") + interval = min(interval * 1.5, max_interval) + + raise TimeoutError(f"Trigger did not fire for id {test_id} within {timeout}s despite repeated attempts.") + + +def wait_for_kb_creation(query_fn, kb_name, timeout=90, poll_interval=1): + """Polls to check if a Knowledge Base has been created successfully.""" + start_time = time.time() + while True: + try: + result = query_fn(f"DESCRIBE KNOWLEDGE_BASE {kb_name};") + if result and result[0]["name"] == kb_name: + print(f" [Helper wait_for_kb] KB {kb_name} created and validated.") + return result + except Exception: + # KB might not be queryable at all yet + pass + + elapsed_time = time.time() - start_time + if elapsed_time > timeout: + print(f" [Helper wait_for_kb] ERROR: Timeout after {timeout}s waiting for {kb_name}.") + raise TimeoutError(f"Timed out waiting for Knowledge Base {kb_name} to be created.") + time.sleep(poll_interval) + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTables(BaseStuff): + """Test suite for Table operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_local_db") + def test_table_lifecycle(self, setup_local_db, use_binary): + db_name = setup_local_db + table_name = "test_lifecycle_table" + try: + create_table_query = f"CREATE TABLE {db_name}.{table_name} (id INT, value VARCHAR(255));" + self.query(create_table_query) + result = self.query(f"SHOW TABLES FROM {db_name};") + assert table_name in [list(row.values())[0] for row in result] + replace_query = f"CREATE OR REPLACE TABLE {db_name}.{table_name} (SELECT 2 as id, 'new_data' as value);" + self.query(replace_query) + result = self.query(f"SELECT * FROM {db_name}.{table_name};") + assert result and result[0]["id"] == 2 and result[0]["value"] == "new_data" + finally: + self.query(f"DROP TABLE IF EXISTS {db_name}.{table_name};") + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTablesNegative(BaseStuff): + """Negative tests for Table operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_local_db") + def test_create_duplicate_table(self, setup_local_db, use_binary): + db_name = setup_local_db + table_name = "test_duplicate_table" + create_query = f"CREATE TABLE {db_name}.{table_name} (id INT);" + try: + self.query(create_query) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + self.query(f"DROP TABLE IF EXISTS {db_name}.{table_name};") + + def test_create_table_in_missing_db_raises_error(self, use_binary): + create_query = "CREATE TABLE non_existent_db.non_existent_table (id INT);" + with pytest.raises(Exception) as e: + self.query(create_query) + assert "non_existent_db" or "Database not found" in str(e.value).lower() + + @pytest.mark.usefixtures("setup_local_db") + def test_drop_non_existent_table(self, setup_local_db, use_binary): + db_name = setup_local_db + table_name = "test_non_existent_table" + with pytest.raises(Exception) as e: + self.query(f"DROP TABLE {db_name}.{table_name};") + assert "does not exist" in str(e.value).lower() + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLViews(BaseStuff): + """Test suite for View operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + def test_view_lifecycle(self, use_binary): + db_name = "test_sql_view_db" + view_name = "test_sql_view" + try: + self.query(f"DROP VIEW IF EXISTS {view_name};") + self.query(f"DROP DATABASE IF EXISTS {db_name};") + + create_db_query = f""" + CREATE DATABASE {db_name} + WITH ENGINE = 'postgres', PARAMETERS = {{"user": "demo_user", "password": "demo_password", "host": "samples.mindsdb.com", "port": "5432", "database": "demo", "schema": "demo"}}; + """ + self.query(create_db_query) + + create_view_query = ( + f"CREATE VIEW {view_name} AS (SELECT * FROM {db_name}.home_rentals WHERE number_of_rooms = 2);" + ) + self.query(create_view_query) + result = self.query("SHOW VIEWS;") + assert view_name in [row.get("name", row.get("Name", row.get("NAME"))) for row in result] + result = self.query(f"SELECT * FROM {view_name};") + assert len(result) > 0 and all(row["number_of_rooms"] == 2 for row in result) + alter_view_query = ( + f"ALTER VIEW {view_name} AS (SELECT * FROM {db_name}.home_rentals WHERE number_of_rooms = 1);" + ) + self.query(alter_view_query) + result_after_alter = self.query(f"SELECT * FROM {view_name};") + assert len(result_after_alter) > 0 and all(row["number_of_rooms"] == 1 for row in result_after_alter) + finally: + self.query(f"DROP VIEW IF EXISTS {view_name};") + self.query(f"DROP DATABASE IF EXISTS {db_name};") + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLViewsNegative(BaseStuff): + """Negative tests for View operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + def test_create_duplicate_view(self, use_binary): + view_name = "test_duplicate_view" + create_query = f"CREATE VIEW {view_name} AS (SELECT 1);" + try: + self.query(f"DROP VIEW IF EXISTS {view_name};") + self.query(create_query) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + self.query(f"DROP VIEW IF EXISTS {view_name};") + + def test_create_view_on_non_existent_table(self, use_binary): + view_name = "test_bad_source_view" + create_query = f"CREATE VIEW {view_name} AS (SELECT * FROM non_existent_db.non_existent_table);" + with pytest.raises(Exception) as e: + self.query(create_query) + error_str = str(e.value).lower() + assert "not found in the database" in error_str or "table name should contain only one part" in error_str + + def test_drop_non_existent_view(self, use_binary): + view_name = "non_existent_view" + try: + self.query(f"DROP VIEW IF EXISTS {view_name};") + except Exception: + pass + + with pytest.raises(Exception) as e: + self.query(f"DROP VIEW {view_name};") + error_str = str(e.value).lower() + assert "view not found" in error_str or "unknown view" in error_str + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLKnowledgeBases(BaseStuff): + """Test suite for Knowledge Base operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.fixture + def basic_kb(self, request): + """ + Fixture to create a basic Knowledge Base for alteration tests. + Requires OPENAI_API_KEY. + """ + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set. Skipping KB tests.") + + kb_name = "test_alter_kb_local" + embedding_model = "text-embedding-3-small" + + create_kb_query = f""" + CREATE KNOWLEDGE_BASE {kb_name} + USING embedding_model = {{"provider": "openai", "model_name": "{embedding_model}", "api_key": "{openai_api_key}"}}; + """ + try: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + self.query(create_kb_query) + result = wait_for_kb_creation(self.query, kb_name) + assert result and result[0]["name"] == kb_name + yield kb_name + finally: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + def test_knowledge_base_full_lifecycle(self, use_binary): + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set. Skipping Knowledge Base lifecycle test.") + + kb_name = "test_kb_sql" + content_to_insert = "MindsDB helps developers build AI-powered applications." + embedding_model = "text-embedding-3-small" + try: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + create_kb_query = f""" + CREATE KNOWLEDGE_BASE {kb_name} + USING embedding_model = {{"provider": "openai", "model_name": "{embedding_model}", "api_key": "{openai_api_key}"}}; + """ + self.query(create_kb_query) + result = wait_for_kb_creation(self.query, kb_name) + assert result and result[0]["name"] == kb_name and embedding_model in result[0]["embedding_model"] + self.query(f"INSERT INTO {kb_name} (content) VALUES ('{content_to_insert}');") + + # Give insertion a moment to process before querying + time.sleep(2) + + result = self.query(f"SELECT chunk_content FROM {kb_name} WHERE content = 'What is MindsDB?';") + assert result and "MindsDB" in result[0]["chunk_content"] + finally: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + def test_create_kb_with_invalid_provider(self, use_binary): + kb_name = "test_invalid_provider" + create_query = ( + f'CREATE KNOWLEDGE_BASE {kb_name} USING embedding_model = {{"provider": "non_existent_provider"}};' + ) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "wrong embedding provider" in str(e.value).lower() + + def test_create_kb_with_invalid_api_key(self, use_binary, request): + kb_name = "test_invalid_key_local" + create_query = f'CREATE KNOWLEDGE_BASE {kb_name} USING embedding_model = {{"provider": "openai", "api_key": "this_is_a_fake_key"}};' + try: + with pytest.raises(Exception) as e: + self.query(create_query) + assert ( + "problem with embedding model config" in str(e.value).lower() + or "invalid api key" in str(e.value).lower() + ) + finally: + # Ensure cleanup even if creation fails + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + def test_insert_into_non_existent_kb(self, use_binary): + kb_name = "non_existent_kb" + with pytest.raises(Exception) as e: + self.query(f"INSERT INTO {kb_name} (content) VALUES ('some data');") + error_str = str(e.value).lower() + assert "can't create table" in error_str or "doesn't exist" in error_str or "unknown table" in error_str + + def test_query_non_existent_kb(self, use_binary): + kb_name = "non_existent_kb" + with pytest.raises(Exception) as e: + self.query(f"SELECT * FROM {kb_name} WHERE content = 'some query';") + error_str = str(e.value).lower() + assert "not found in database" in error_str or "doesn't exist" in error_str or "unknown table" in error_str + + def test_create_duplicate_kb(self, use_binary, request): + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set. Skipping duplicate KB test.") + + kb_name = "test_duplicate_kb" + embedding_model = "text-embedding-3-small" + create_query = f""" + CREATE KNOWLEDGE_BASE {kb_name} + USING embedding_model = {{"provider": "openai", "model_name": "{embedding_model}", "api_key": "{openai_api_key}"}}; + """ + try: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + self.query(create_query) + wait_for_kb_creation(self.query, kb_name) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + @pytest.mark.usefixtures("basic_kb") + def test_alter_kb_embedding_api_key(self, basic_kb, use_binary): + """Tests altering the api_key of the embedding_model.""" + kb_name = basic_kb + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY needed for this alter test.") + + new_api_key = openai_api_key + + alter_query = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + embedding_model = {{ 'api_key': '{new_api_key}' }}; + """ + self.query(alter_query) + + time.sleep(1) + + result = self.query(f"SELECT embedding_model FROM information_schema.knowledge_bases WHERE name = '{kb_name}';") + assert result + embedding_model_json = result[0].get("embedding_model") + assert embedding_model_json is not None + + assert '"provider": "openai"' in embedding_model_json + assert '"model_name": "text-embedding-3-small"' in embedding_model_json + assert '"api_key": "' in embedding_model_json + + @pytest.mark.xfail( + reason="Bug: ALTER KNOWLEDGE_BASE does not unset reranking_model. See LINEAR-TICKET-NUMBER: FQE-1716" + ) + @pytest.mark.usefixtures("basic_kb") + def test_alter_kb_reranking_model(self, basic_kb, use_binary): + """Tests adding and then disabling the reranking_model.""" + kb_name = basic_kb + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY needed for this alter test.") + + alter_query_add = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + reranking_model = {{ 'provider': 'openai', 'model_name': 'gpt-4o', 'api_key': '{openai_api_key}' }}; + """ + self.query(alter_query_add) + + result_add = self.query(f"DESCRIBE KNOWLEDGE_BASE {kb_name};") + assert result_add and '"provider": "openai"' in result_add[0]["RERANKING_MODEL"] + + alter_query_disable = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + reranking_model = false; + """ + self.query(alter_query_disable) + + result_disable = self.query(f"DESCRIBE KNOWLEDGE_BASE {kb_name};") + assert result_disable + reranking_model_desc = result_disable[0].get("RERANKING_MODEL") + assert reranking_model_desc is None or reranking_model_desc == "{}" + + @pytest.mark.xfail(reason="Bug: information_schema.knowledge_bases.PARAMS is not updated on ALTER") + @pytest.mark.usefixtures("basic_kb") + def test_alter_kb_preprocessing(self, basic_kb, use_binary): + """Tests altering the preprocessing parameters by checking the PARAMS column.""" + kb_name = basic_kb + + alter_query = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + preprocessing = {{ 'chunk_size': 300, 'chunk_overlap': 50 }}; + """ + self.query(alter_query) + + time.sleep(1) + + result = self.query(f"SELECT PARAMS FROM information_schema.knowledge_bases WHERE name = '{kb_name}';") + + assert result, "Query to information_schema returned no results." + + params_json = result[0].get("PARAMS") + assert params_json is not None, "PARAMS column is NULL." + + assert '"preprocessing":' in params_json, "The 'preprocessing' key was not added to the PARAMS column." + assert '"chunk_size": 300' in params_json, "chunk_size was not set correctly in PARAMS." + assert '"chunk_overlap": 50' in params_json, "chunk_overlap was not set correctly in PARAMS." + + +@pytest.fixture(scope="function") +def setup_trigger_db(request): + """Function-scoped fixture to ensure a clean DB for each trigger test.""" + + db_name = "trigger_test_db_local" + + source_table_name = "trigger_source_table" + target_table_name = "trigger_target_table" + + params = {"user": "postgres", "password": "postgres", "host": "postgres", "port": 5432, "database": "postgres"} + helper = BaseStuff() + helper.use_binary = False + + try: + print( + f"\n--> [Fixture setup_trigger_db] Setting up local database: {db_name} on {params['host']}:{params['port']}" + ) + helper.query(f"DROP DATABASE IF EXISTS {db_name}") + + create_datasource_sql_via_connector(helper, db_name, "postgres", params) + + helper.query(f"CREATE TABLE {db_name}.{source_table_name} (id INT, message VARCHAR(255));") + helper.query(f"CREATE TABLE {db_name}.{target_table_name} (id INT, message VARCHAR(255));") + helper.query(f"INSERT INTO {db_name}.{source_table_name} (id, message) VALUES (101, 'initial_update_message');") + helper.query(f"INSERT INTO {db_name}.{source_table_name} (id, message) VALUES (102, 'initial_delete_message');") + + yield db_name, source_table_name, target_table_name + + except (mysql.connector.Error, TimeoutError) as setup_err: + pytest.skip(f"Trigger fixture setup failed. Ensure Docker environment is running. Error: {setup_err}") + + finally: + print(f"\n--> [CLEANUP] Dropping tables and DATABASE: {db_name}") + try: + helper.query(f"DROP TABLE IF EXISTS {db_name}.{source_table_name};") + helper.query(f"DROP TABLE IF EXISTS {db_name}.{target_table_name};") + except mysql.connector.Error as e: + print(f"Warning: Error dropping tables during cleanup: {e}") + pass + + try: + helper.query(f"DROP DATABASE IF EXISTS {db_name};") + except mysql.connector.Error as e: + print(f"Warning: Error dropping database during cleanup: {e}") + pass + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTriggers(BaseStuff): + """Test suite for Trigger operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_trigger_db") + def test_trigger_lifecycle(self, setup_trigger_db, use_binary): + db_name, source_table_name, target_table_name = setup_trigger_db + trigger_name = "test_insert_trigger" + test_id = 201 # Use a new ID that will be INSERTED + inserted_message = "this message was inserted" + try: + # Ensure the target table is clean + self.query(f"DELETE FROM {db_name}.{target_table_name};") + + # Pre-drop trigger to ensure clean state + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + create_trigger_query = f""" + CREATE TRIGGER {trigger_name} + ON {db_name}.{source_table_name} + (INSERT INTO {db_name}.{target_table_name} (id, message) SELECT id, message FROM TABLE_DELTA); + """ + print("\n[DEBUG] Sending CREATE TRIGGER command...") + self.query(create_trigger_query) + + wait_for_trigger_creation(self.query, trigger_name, timeout=20) + + print("[DEBUG] Schema check complete. Proceeding to functional firing test...") + + # Activate Trigger with an INSERT, not an UPDATE + self.query( + f"INSERT INTO {db_name}.{source_table_name} (id, message) VALUES ({test_id}, '{inserted_message}');" + ) + + # Poll the target table for the new result + result = [] + max_wait_time = 60 + interval = 1 + max_interval = 8 + start_time = time.time() + while time.time() - start_time < max_wait_time: + result = self.query(f"SELECT id, message FROM {db_name}.{target_table_name} WHERE id = {test_id};") + if result: + break + time.sleep(interval) + interval = min(interval * 2, max_interval) + + # Verify + assert result, f"Trigger did not fire for id {test_id} within {max_wait_time}s." + assert result[0]["message"] == inserted_message + + finally: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTriggersNegative(BaseStuff): + """Negative tests for Trigger operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_trigger_db") + def test_create_duplicate_trigger(self, setup_trigger_db, use_binary): + db_name, source_table_name, _ = setup_trigger_db + trigger_name = "duplicate_trigger" + create_query = f"CREATE TRIGGER {trigger_name} ON {db_name}.{source_table_name} (SELECT 1);" + try: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + self.query(create_query) + wait_for_trigger_creation(self.query, trigger_name, timeout=20) + + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + def test_create_trigger_on_non_existent_table(self, use_binary, request): + trigger_name = "bad_trigger_local" + create_query = f"CREATE TRIGGER {trigger_name} ON non_existent_db.non_existent_table (SELECT 1);" + try: + with pytest.raises(Exception) as e: + self.query(create_query) + error_str = str(e.value).lower() + assert "no integration with name" in error_str or "unknown database" in error_str + finally: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + def test_drop_non_existent_trigger(self, use_binary): + trigger_name = "non_existent_trigger" + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + with pytest.raises(Exception) as e: + self.query(f"DROP TRIGGER {trigger_name};") + error_str = str(e.value).lower() + assert "doesn't exist" in error_str or "unknown trigger" in error_str + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLQueryComposability(BaseStuff): + """Test suite for advanced query composability (CTEs, Subqueries, UNIONs).""" + + db_name = "test_composability_db" + + @pytest.fixture(scope="class") + def composability_db(self): + """Class-scoped fixture to create the postgres DB.""" + print(f"\n--> [Fixture composability_db] Setting up database: {self.db_name}") + db_details = { + "type": "postgres", + "connection_data": { + "host": "samples.mindsdb.com", + "port": "5432", + "user": "demo_user", + "password": "demo_password", + "database": "demo", + "schema": "demo", + }, + } + try: + self.create_database(self.db_name, db_details) + self.validate_database_creation(self.db_name) + yield self.db_name + finally: + print(f"\n--> [Fixture composability_db] Tearing down database: {self.db_name}") + self.query(f"DROP DATABASE IF EXISTS {self.db_name};") + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("composability_db") + def test_common_table_expression_with(self, use_binary): + """ + Tests a query using a WITH clause (CTE). + tests that you can define a temporary result set (cte) and then query that result set. + """ + query = f""" + WITH cte AS ( + SELECT * FROM {self.db_name}.home_rentals WHERE number_of_rooms = 2 + ) + SELECT * FROM cte LIMIT 5; + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] == 2 for row in result) + + @pytest.mark.usefixtures("composability_db") + def test_union_operator(self, use_binary): + """Tests a query using the UNION set operator. + It tests that you can combine the results from two separate queries into one list. + """ + query = f""" + (SELECT sqft, location, number_of_rooms FROM {self.db_name}.home_rentals WHERE number_of_rooms = 1 LIMIT 5) + UNION + (SELECT sqft, location, number_of_rooms FROM {self.db_name}.home_rentals WHERE number_of_rooms = 2 LIMIT 5); + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] in (1, 2) for row in result) + + @pytest.mark.usefixtures("composability_db") + def test_subquery_with_join_and_cte(self, use_binary): + """ + Tests a subquery rewrite for the unsupported 'WHERE IN (SELECT...)' syntax. + This tests CTE, UNION, and JOIN composability. + """ + query = f""" + WITH allowed_rooms AS ( + SELECT 1 as room_num + UNION + SELECT 3 as room_num + ) + SELECT t1.* + FROM {self.db_name}.home_rentals AS t1 + JOIN allowed_rooms AS t2 ON t1.number_of_rooms = t2.room_num + LIMIT 10; + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] in (1, 3) for row in result) + + @pytest.mark.usefixtures("composability_db") + def test_from_subquery(self, use_binary): + """Tests a subquery in the FROM clause. + It tests that you can run a query inside the FROM clause and use its results + as the source table for an outer query. + """ + query = f""" + SELECT * FROM ( + SELECT * FROM {self.db_name}.home_rentals WHERE number_of_rooms = 2 + ) as sub_table + LIMIT 5; + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] == 2 for row in result) From 1c7eb200ee037f7a63df11f6658d5cf0ead92164 Mon Sep 17 00:00:00 2001 From: "April I. Murphy" <36110273+aimurphy@users.noreply.github.com> Date: Wed, 12 Nov 2025 06:18:24 -0800 Subject: [PATCH 02/11] Update datastax.mdx (#11873) --- .../data-integrations/datastax.mdx | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/docs/integrations/data-integrations/datastax.mdx b/docs/integrations/data-integrations/datastax.mdx index 4d3370ce586..a0e99915c17 100644 --- a/docs/integrations/data-integrations/datastax.mdx +++ b/docs/integrations/data-integrations/datastax.mdx @@ -5,7 +5,7 @@ sidebarTitle: DataStax This is the implementation of the DataStax data handler for MindsDB. -[DataStax](https://www.datastax.com/) Astra DB is a cloud database-as-a-service based on Apache Cassandra. DataStax also offers DataStax Enterprise (DSE), an on-premises database built on Apache Cassandra, and Astra Streaming, a messaging and event streaming cloud service based on Apache Pulsar. +https://docs.datastax.com/en/astra-db-serverless/index.html[DataStax Astra DB] is a cloud database-as-a-service based on Apache Cassandra. DataStax also offers on-premises solutions, DataStax Enterprise (DSE) and Hyper-Converged Database (HCD), as well as Astra Streaming, a messaging and event streaming cloud service based on Apache Pulsar. ## Prerequisites @@ -13,7 +13,7 @@ Before proceeding, ensure the following prerequisites are met: 1. Install MindsDB locally via [Docker](/setup/self-hosted/docker) or [Docker Desktop](/setup/self-hosted/docker-desktop). 2. To connect DataStax to MindsDB, install the required dependencies following [this instruction](/setup/self-hosted/docker#install-dependencies). -3. Install or ensure access to DataStax. +3. Create an [Astra DB database](https://docs.datastax.com/en/astra-db-serverless/databases/create-database.html). ## Implementation @@ -21,9 +21,9 @@ DataStax Astra DB is API-compatible with Apache Cassandra and ScyllaDB. Therefor The required arguments to establish a connection are as follows: -* `user` is the user to authenticate. -* `password` is the password to authenticate the user. -* `secure_connect_bundle` is the path to the `secure_connect_bundle` zip file. +* `user`: The literal string `token` +* `password`: An [Astra application token](https://docs.datastax.com/en/astra-db-serverless/administration/manage-application-tokens.html) +* `secure_connect_bundle`: The path to your database's [Secure Connect Bundle](https://docs.datastax.com/en/astra-db-serverless/databases/secure-connect-bundle.html) zip file ## Usage @@ -34,21 +34,20 @@ CREATE DATABASE astra_connection WITH engine = "astra", parameters = { - "user": "user", - "password": "pass", + "user": "token", + "password": "application_token", "secure_connect_bundle": "/home/Downloads/file.zip" }; ``` - or, reference the bundle from Datastax s3 as: ```sql CREATE DATABASE astra_connection WITH ENGINE = "astra", PARAMETERS = { - "user": "user", - "password": "pass", + "user": "token", + "password": "application_token", "secure_connect_bundle": "https://datastax-cluster-config-prod.s3.us-east-2.amazonaws.com/32312-b9eb-4e09-a641-213eaesa12-1/secure-connect-demo.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AK..." } ``` From dba3869e979867d42a29f68c52f6effa2dd597d2 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa <49385643+MinuraPunchihewa@users.noreply.github.com> Date: Wed, 12 Nov 2025 20:02:43 +0530 Subject: [PATCH 03/11] Fixed MSSQL Query Execution Result Conversion (#11876) --- .../handlers/mssql_handler/mssql_handler.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py b/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py index 6230363d3f0..6cb4dacb49e 100644 --- a/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py +++ b/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py @@ -78,15 +78,11 @@ def _make_table_response( if not result: data_frame = pd.DataFrame(columns=columns) elif use_odbc: - # For pyodbc with large datasets, convert Row objects efficiently - # Using iterator with pd.DataFrame avoids intermediate list creation - try: - data_frame = pd.DataFrame(result, columns=columns) - except (ValueError, TypeError): - # Fallback: convert Row objects to tuples - data_frame = pd.DataFrame.from_records((tuple(row) for row in result), columns=columns) + # from_records() understands tuple-like records (including pyodbc.Row) + data_frame = pd.DataFrame.from_records(result, columns=columns) else: - data_frame = pd.DataFrame(result, columns=columns) + # pymssql with as_dict=True returns list of dicts + data_frame = pd.DataFrame(result) for column in description: column_name = column[0] From 01a6b37ad685796cd3d944bb4a7f54a45a82bcb0 Mon Sep 17 00:00:00 2001 From: Konstantin Sivakov Date: Wed, 12 Nov 2025 15:34:40 +0100 Subject: [PATCH 04/11] Update documentation for ENV Variables (#11667) Co-authored-by: martyna-mindsdb <109554435+martyna-mindsdb@users.noreply.github.com> --- docs/setup/environment-vars.mdx | 193 +++++++++++++++++++++++++++++--- 1 file changed, 175 insertions(+), 18 deletions(-) diff --git a/docs/setup/environment-vars.mdx b/docs/setup/environment-vars.mdx index 2891f045622..f1495a0fa44 100644 --- a/docs/setup/environment-vars.mdx +++ b/docs/setup/environment-vars.mdx @@ -3,8 +3,9 @@ title: Environment Variables sidebarTitle: Environment Variables --- -Most of the MindsDB functionality can be modified by extending the default configuration, but some of the configuration options +Most of the MindsDB functionality can be modified by extending the default configuration, but some of the configuration options can be added as environment variables on the server where MindsDB is deployed. +[Here is the list](/setup/full-list-environment-vars) of all the available environment variables. ## MindsDB Authentication @@ -21,6 +22,7 @@ MindsDB does not require authentication by default. If you want to enable authen export MINDSDB_USERNAME='mindsdb_user' export MINDSDB_PASSWORD='mindsdb_password' ``` + ## MindsDB Configuration File @@ -37,18 +39,19 @@ In order to start MindsDB with a [custom configuration file](/setup/custom-confi ```bash Shell export MINDSDB_CONFIG_PATH=/Users/username/path/config.json ``` + -## MindsDB Storage +## MindsDB Storage -By default, MindsDB stores the configuration files by determining appropriate platform-specific directories, e.g. a "user data dir": +By default, MindsDB stores the configuration files by determining appropriate platform-specific directories, e.g. a "user data dir": -* On Linux `~/.local/share/mindsdb/var` -* On MacOS `~/Library/Application Support/mindsdb/var` -* On Windows `C:\Documents and Settings\\Application Data\Local Settings\\mindsdb\var` +- On Linux `~/.local/share/mindsdb/var` +- On MacOS `~/Library/Application Support/mindsdb/var` +- On Windows `C:\Documents and Settings\\Application Data\Local Settings\\mindsdb\var` -In the `MINDSDB_STORAGE_DIR` location, MindsDB stores users' data, models and uploaded data files, the static assets for the frontend application and the -`sqlite.db` file. +In the `MINDSDB_STORAGE_DIR` location, MindsDB stores users' data, models and uploaded data files, the static assets for the frontend application and the +`sqlite.db` file. You can change the default storage location using `MINDSDB_STORAGE_DIR` variable. ### Example @@ -61,26 +64,31 @@ You can change the default storage location using `MINDSDB_STORAGE_DIR` variable ```bash Shell export MINDSDB_STORAGE_DIR='~/home/mindsdb/var' ``` + -## MindsDB Configuration Storage +## MindsDB Configuration Storage -MindsDB uses `sqlite` database by default to store the required configuration as models, projects, files metadata etc. -The full list of the above schemas can be found [here](https://github.com/mindsdb/mindsdb/blob/main/mindsdb/interfaces/storage/db.py#L69). You can change the +MindsDB uses `sqlite` database by default to store the required configuration as models, projects, files metadata etc. +The full list of the above schemas can be found [here](https://github.com/mindsdb/mindsdb/blob/main/mindsdb/interfaces/storage/db.py#L69). You can change the default storage option and use different database by adding the new connection string using `MINDSDB_DB_CON` variable. ### Example - ```bash Docker - docker run --name mindsdb_container -e MINDSDB_DB_CON='postgresql://user:secret@localhost' -e MINDSDB_APIS=http,mysql -p 47334:47334 -p 47335:47335 mindsdb/mindsdb - ``` - - ```bash Shell - export MINDSDB_DB_CON='postgresql://user:secret@localhost' - ``` + ```bash Docker docker run --name mindsdb_container -e + MINDSDB_DB_CON='postgresql://user:secret@localhost' -e MINDSDB_APIS=http,mysql + -p 47334:47334 -p 47335:47335 mindsdb/mindsdb ``` ```bash Shell export + MINDSDB_DB_CON='postgresql://user:secret@localhost' ``` +#### `MINDSDB_STORAGE_BACKUP_DISABLED` + +- **Type:** Boolean (`1`, `true`, `True`) +- **Description:** Disables permanent storage backup +- **Default:** `false` +- **Example:** `MINDSDB_STORAGE_BACKUP_DISABLED=1` + ## MindsDB APIs The `MINDSDB_APIS` environment variable lets users define which APIs to start. Learn more about the [available APIs here](/setup/mindsdb-apis). @@ -95,6 +103,7 @@ The `MINDSDB_APIS` environment variable lets users define which APIs to start. L ```bash Shell export MINDSDB_APIS='http,mysql' ``` + ## MindsDB Logs @@ -111,8 +120,23 @@ This environment variable defines the level of logging generated by MindsDB. You ```bash Shell export MINDSDB_LOG_LEVEL='DEBUG' ``` + +#### `MINDSDB_CONSOLE_LOG_LEVEL` + +- **Type:** String (`DEBUG`, `INFO`, `WARNING`, `ERROR`) +- **Description:** Sets console log level +- **Default:** `INFO` +- **Example:** `MINDSDB_CONSOLE_LOG_LEVEL=DEBUG` + +#### `MINDSDB_FILE_LOG_LEVEL` + +- **Type:** String (`DEBUG`, `INFO`, `WARNING`, `ERROR`) +- **Description:** Sets file log level and enables file logging +- **Default:** `INFO` (disabled by default) +- **Example:** `MINDSDB_FILE_LOG_LEVEL=DEBUG` + ## MindsDB Default Project By default, MindsDB creates a project named `mindsdb` where all the models and other objects are stored. You can change the default project name by setting the `MINDSDB_DEFAULT_PROJECT` environment variable. @@ -129,8 +153,30 @@ If this environment variable is set or modified after MindsDB has started, the d ```bash Shell export MINDSDB_DEFAULT_PROJECT='my_project' ``` + +#### `MINDSDB_DEFAULT_LLM_API_KEY` + +- **Type:** String +- **Description:** API key for default LLM (Large Language Model) +- **Default:** None +- **Example:** `MINDSDB_DEFAULT_LLM_API_KEY=sk-...` + +#### `MINDSDB_DEFAULT_EMBEDDING_MODEL_API_KEY` + +- **Type:** String +- **Description:** API key for default embedding model +- **Default:** None +- **Example:** `MINDSDB_DEFAULT_EMBEDDING_MODEL_API_KEY=sk-...` + +#### `MINDSDB_DEFAULT_RERANKING_MODEL_API_KEY` + +- **Type:** String +- **Description:** API key for default reranking model +- **Default:** None +- **Example:** `MINDSDB_DEFAULT_RERANKING_MODEL_API_KEY=sk-...` + ## MindsDB's PID File When running MindsDB via [Docker](/setup/self-hosted/docker) or [Docker Extension](/setup/self-hosted/docker-desktop), the PID file is not used by default. Users can opt for enabling the PID file by defining the `USE_PIDFILE` environment variable. @@ -147,6 +193,7 @@ If used, the PID file is stored in the temp directory (`$TMPDIR` on MacOS and Li ```bash Shell export USE_PIDFILE=1 ``` + ## MindsDB GUI Updates @@ -165,6 +212,7 @@ By default, the automatic GUI updates are enabled and the `MINDSDB_GUI_AUTOUPDAT ```bash Shell export MINDSDB_GUI_AUTOUPDATE=false ``` + ## MindsDB GUI Startup and Updates @@ -185,4 +233,113 @@ Note that the `MINDSDB_NO_STUDIO` is not recommended for the MindsDB instance ru ```bash Shell export MINDSDB_NO_STUDIO=true ``` + + +### ML Task Queue + +#### `MINDSDB_ML_QUEUE_TYPE` + +- **Type:** String (`local`, `redis`) +- **Description:** Type of ML task queue to use +- **Default:** `local` +- **Example:** `MINDSDB_ML_QUEUE_TYPE=redis` + +#### `MINDSDB_ML_QUEUE_HOST` + +- **Type:** String (hostname) +- **Description:** Redis host for ML task queue (only when `MINDSDB_ML_QUEUE_TYPE=redis`) +- **Default:** `localhost` +- **Example:** `MINDSDB_ML_QUEUE_HOST=redis.example.com` + +#### `MINDSDB_ML_QUEUE_PORT` + +- **Type:** Integer +- **Description:** Redis port for ML task queue +- **Default:** `6379` +- **Example:** `MINDSDB_ML_QUEUE_PORT=6380` + +#### `MINDSDB_ML_QUEUE_DB` + +- **Type:** Integer +- **Description:** Redis database number for ML task queue +- **Default:** `0` +- **Example:** `MINDSDB_ML_QUEUE_DB=1` + +#### `MINDSDB_ML_QUEUE_USERNAME` + +- **Type:** String +- **Description:** Redis username for ML task queue +- **Default:** None +- **Example:** `MINDSDB_ML_QUEUE_USERNAME=redis_user` + +#### `MINDSDB_ML_QUEUE_PASSWORD` + +- **Type:** String +- **Description:** Redis password for ML task queue +- **Default:** None +- **Example:** `MINDSDB_ML_QUEUE_PASSWORD=redis_pass` + +## Reranker Configuration + +#### `MINDSDB_RERANKER_N` + +- **Type:** Integer +- **Description:** Number of results to rerank +- **Default:** None +- **Example:** `MINDSDB_RERANKER_N=10` + +#### `MINDSDB_RERANKER_LOGPROBS` + +- **Type:** Boolean (`true`, `false`, `1`, `0`, `yes`, `no`) +- **Description:** Enable log probabilities in reranker +- **Default:** None +- **Example:** `MINDSDB_RERANKER_LOGPROBS=true` + +#### `MINDSDB_RERANKER_TOP_LOGPROBS` + +- **Type:** Integer +- **Description:** Number of top log probabilities to return +- **Default:** None +- **Example:** `MINDSDB_RERANKER_TOP_LOGPROBS=5` + +#### `MINDSDB_RERANKER_MAX_TOKENS` + +- **Type:** Integer +- **Description:** Maximum tokens for reranker +- **Default:** None +- **Example:** `MINDSDB_RERANKER_MAX_TOKENS=512` + +#### `MINDSDB_RERANKER_VALID_CLASS_TOKENS` + +- **Type:** String (comma-separated list) +- **Description:** Valid class tokens for reranker +- **Default:** None +- **Example:** `MINDSDB_RERANKER_VALID_CLASS_TOKENS=token1,token2,token3` + +## Features + +#### `MINDSDB_DATA_CATALOG_ENABLED` + +- **Type:** Boolean (`1`, `true`) +- **Description:** Enables the data catalog feature +- **Default:** `false` +- **Example:** `MINDSDB_DATA_CATALOG_ENABLED=1` + +## Runtime + +#### `MINDSDB_DOCKER_ENV` + +- **Type:** Any value (presence check) +- **Description:** Indicates MindsDB is running in Docker environment (changes default API host to `0.0.0.0`) +- **Default:** Not set +- **Example:** `MINDSDB_DOCKER_ENV=1` + +#### `MINDSDB_RUNTIME` + +- **Type:** String (`1`) +- **Description:** Indicates MindsDB runtime environment +- **Default:** Not set +- **Example:** `MINDSDB_RUNTIME=1` + +--- From a4bc712aa3819c95f15750d3a4e8334ca2888e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Tob=C3=B3n=20Hern=C3=A1ndez?= Date: Wed, 12 Nov 2025 10:50:14 -0500 Subject: [PATCH 05/11] [BUG] - Hubspot Handler - CRUD operations (#11874) --- .../handlers/hubspot_handler/README.md | 63 +-- .../hubspot_handler/hubspot_tables.py | 505 ++++++++++++++---- 2 files changed, 430 insertions(+), 138 deletions(-) diff --git a/mindsdb/integrations/handlers/hubspot_handler/README.md b/mindsdb/integrations/handlers/hubspot_handler/README.md index 367ed859e76..520e2a29b77 100644 --- a/mindsdb/integrations/handlers/hubspot_handler/README.md +++ b/mindsdb/integrations/handlers/hubspot_handler/README.md @@ -7,6 +7,7 @@ HubSpot handler for MindsDB provides interfaces to connect to HubSpot via APIs a ## Table of Contents - [HubSpot Handler](#hubspot-handler) + - [Table of Contents](#table-of-contents) - [About HubSpot](#about-hubspot) - [Installation](#installation) - [Authentication](#authentication) @@ -96,6 +97,11 @@ The handler provides comprehensive data catalog capabilities: | `contacts` | Individual contact records | id, email, firstname, lastname | SELECT, INSERT, UPDATE, DELETE | | `deals` | Sales opportunity records | id, dealname, amount, stage | SELECT, INSERT, UPDATE, DELETE | +**Important Notes on Field Values:** +- **Industry codes**: HubSpot uses predefined industry values (e.g., `COMPUTER_SOFTWARE`, `BIOTECHNOLOGY`, `FINANCIAL_SERVICES`). See [HubSpot's industry list](https://knowledge.hubspot.com/properties/hubspots-default-company-properties#industry) for all valid options. +- **Deal stages**: Each HubSpot account has custom pipeline stages. Use the stage IDs from your account (e.g., `presentationscheduled`, `closedwon`, `closedlost`, or numeric IDs like `110382973`). +- **Email validation**: Contact email addresses must be valid email formats (e.g., `user@example.com`). + ## Example Usage ### Basic Connection @@ -126,13 +132,6 @@ PARAMETERS = { SHOW TABLES FROM hubspot_datasource; ``` -**Get Table Schema:** -```sql -DESCRIBE hubspot_datasource.companies; -DESCRIBE hubspot_datasource.contacts; -DESCRIBE hubspot_datasource.deals; -``` - **Get Detailed Column Information:** ```sql SELECT * FROM information_schema.columns @@ -154,36 +153,6 @@ SELECT * FROM hubspot_datasource.contacts LIMIT 10; SELECT * FROM hubspot_datasource.deals LIMIT 10; ``` -**Advanced Filtering and Analytics:** -```sql --- Companies by industry and location -SELECT name, industry, city, state -FROM hubspot_datasource.companies -WHERE industry IN ('Technology', 'Healthcare') - AND city = 'San Francisco' -ORDER BY name; - --- Contact engagement analysis -SELECT - company, - COUNT(*) as contact_count, - STRING_AGG(email, ', ') as emails -FROM hubspot_datasource.contacts -WHERE company IS NOT NULL -GROUP BY company -ORDER BY contact_count DESC; - --- Sales pipeline analysis -SELECT - dealstage, - COUNT(*) as deal_count, - SUM(CAST(amount AS DECIMAL)) as total_value, - AVG(CAST(amount AS DECIMAL)) as avg_deal_size -FROM hubspot_datasource.deals -WHERE amount IS NOT NULL -GROUP BY dealstage -ORDER BY total_value DESC; -``` ### Data Manipulation @@ -191,22 +160,22 @@ ORDER BY total_value DESC; ```sql -- Create new company INSERT INTO hubspot_datasource.companies (name, domain, industry, city, state) -VALUES ('Acme Corp', 'acme.com', 'Technology', 'New York', 'NY'); +VALUES ('Acme Corp', 'acme.com', 'COMPUTER_SOFTWARE', 'New York', 'NY'); -- Create new contact -INSERT INTO hubspot_datasource.contacts (email, firstname, lastname, company, phone) -VALUES ('john.doe@acme.com', 'John', 'Doe', 'Acme Corp', '+1-555-0123'); +INSERT INTO hubspot_datasource.contacts (email, firstname, phone) +VALUES ('john.doe@example.com', 'John', '+1234567890'); -- Create new deal -INSERT INTO hubspot_datasource.deals (dealname, amount, pipeline, dealstage) -VALUES ('Acme Software License', '50000', 'sales', 'qualified-to-buy'); +INSERT INTO hubspot_datasource.deals (dealname, amount, dealstage, pipeline) +VALUES ('New Deal', 5000, 'presentationscheduled', 'default'); ``` **Updating Records:** ```sql -- Update company information UPDATE hubspot_datasource.companies -SET industry = 'SaaS', city = 'Austin' +SET industry = 'COMPUTER_SOFTWARE', city = 'Austin' WHERE name = 'Acme Corp'; -- Update contact details @@ -216,18 +185,18 @@ WHERE email = 'john.doe@acme.com'; -- Move deal through pipeline UPDATE hubspot_datasource.deals -SET dealstage = 'proposal-made', amount = '75000' -WHERE dealname = 'Acme Software License'; +SET dealstage = '110382973', amount = '75000' +WHERE dealname = 'New Deal'; ``` **Deleting Records:** ```sql -- Archive old deals DELETE FROM hubspot_datasource.deals -WHERE dealstage = 'closed-lost' +WHERE dealstage = 'closedlost' AND createdate < '2023-01-01'; -- Remove test contacts DELETE FROM hubspot_datasource.contacts -WHERE email LIKE '%test%' OR email LIKE '%example%'; +WHERE email = 'email'; ``` diff --git a/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py b/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py index 70053205c00..508df26c496 100644 --- a/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py +++ b/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py @@ -1,10 +1,14 @@ -from typing import List, Dict, Text, Any +from typing import List, Dict, Text, Any, Optional import pandas as pd +from hubspot import HubSpot from hubspot.crm.objects import ( SimplePublicObjectId as HubSpotObjectId, SimplePublicObjectBatchInput as HubSpotObjectBatchInput, SimplePublicObjectInputForCreate as HubSpotObjectInputCreate, + BatchInputSimplePublicObjectBatchInputForCreate, + BatchInputSimplePublicObjectBatchInput, + BatchInputSimplePublicObjectId, ) from mindsdb_sql_parser import ast @@ -52,6 +56,9 @@ def select(self, query: ast.Select) -> pd.DataFrame: selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() companies_df = pd.json_normalize(self.get_companies(limit=result_limit)) + if companies_df.empty: + companies_df = pd.DataFrame(columns=self._get_default_company_columns()) + select_statement_executor = SELECTQueryExecutor( companies_df, selected_columns, where_conditions, order_by_conditions ) @@ -107,11 +114,23 @@ def update(self, query: ast.Update) -> None: update_statement_parser = UPDATEQueryParser(query) values_to_update, where_conditions = update_statement_parser.parse_query() - companies_df = pd.json_normalize(self.get_companies()) + companies_df = pd.json_normalize(self.get_companies(limit=1000)) + + if companies_df.empty: + raise ValueError( + "No companies retrieved from HubSpot to evaluate update conditions. Verify your connection and permissions." + ) + update_query_executor = UPDATEQueryExecutor(companies_df, where_conditions) + filtered_df = update_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No companies found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - companies_df = update_query_executor.execute_query() - company_ids = companies_df["id"].tolist() + company_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Updating {len(company_ids)} compan(ies) matching WHERE conditions") self.update_companies(company_ids, values_to_update) def delete(self, query: ast.Delete) -> None: @@ -135,46 +154,110 @@ def delete(self, query: ast.Delete) -> None: delete_statement_parser = DELETEQueryParser(query) where_conditions = delete_statement_parser.parse_query() - companies_df = pd.json_normalize(self.get_companies()) + companies_df = pd.json_normalize(self.get_companies(limit=1000)) + + if companies_df.empty: + raise ValueError( + "No companies retrieved from HubSpot to evaluate delete conditions. Verify your connection and permissions." + ) + delete_query_executor = DELETEQueryExecutor(companies_df, where_conditions) + filtered_df = delete_query_executor.execute_query() - companies_df = delete_query_executor.execute_query() - company_ids = companies_df["id"].tolist() + if filtered_df.empty: + raise ValueError( + f"No companies found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) + + company_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Deleting {len(company_ids)} compan(ies) matching WHERE conditions") self.delete_companies(company_ids) def get_columns(self) -> List[Text]: - return pd.json_normalize(self.get_companies(limit=1)).columns.tolist() + return self._get_default_company_columns() + + @staticmethod + def _get_default_company_columns() -> List[str]: + return [ + "id", + "name", + "city", + "phone", + "state", + "domain", + "industry", + "createdate", + "lastmodifieddate", + ] - def get_companies(self, **kwargs) -> List[Dict]: + def get_companies(self, limit: int | None = None, **kwargs) -> List[Dict]: hubspot = self.handler.connect() - companies = hubspot.crm.companies.get_all(**kwargs) - companies_dict = [ - { - "id": company.id, - "name": company.properties.get("name", None), - "city": company.properties.get("city", None), - "phone": company.properties.get("phone", None), - "state": company.properties.get("state", None), - "domain": company.properties.get("company", None), - "industry": company.properties.get("industry", None), - "createdate": company.properties["createdate"], - "lastmodifieddate": company.properties["hs_lastmodifieddate"], - } - for company in companies + + requested_properties = kwargs.get("properties", []) + default_properties = [ + "name", + "domain", + "industry", + "city", + "state", + "phone", + "createdate", + "hs_lastmodifieddate", ] + + properties = list({*default_properties, *requested_properties}) + + api_kwargs = {**kwargs, "properties": properties} + if limit is not None: + api_kwargs["limit"] = limit + + companies = hubspot.crm.companies.get_all(**api_kwargs) + companies_dict = [] + + for company in companies: + try: + company_dict = { + "id": company.id, + "name": company.properties.get("name"), + "city": company.properties.get("city"), + "phone": company.properties.get("phone"), + "state": company.properties.get("state"), + "domain": company.properties.get("domain"), + "industry": company.properties.get("industry"), + "createdate": company.properties.get("createdate"), + "lastmodifieddate": company.properties.get("hs_lastmodifieddate"), + } + companies_dict.append(company_dict) + except Exception as e: + logger.warning(f"Error processing company {getattr(company, 'id', 'unknown')}: {str(e)}") + continue + + logger.info(f"Retrieved {len(companies_dict)} companies from HubSpot") return companies_dict def create_companies(self, companies_data: List[Dict[Text, Any]]) -> None: + if not companies_data: + raise ValueError("No company data provided for creation") + + logger.info(f"Attempting to create {len(companies_data)} compan(ies): {companies_data}") + hubspot = self.handler.connect() companies_to_create = [HubSpotObjectInputCreate(properties=company) for company in companies_data] + batch_input = BatchInputSimplePublicObjectBatchInputForCreate(inputs=companies_to_create) + try: created_companies = hubspot.crm.companies.batch_api.create( - inputs=companies_to_create, - ) - logger.info( - f"Companies created with ID's {[created_company.id for created_company in created_companies.results]}" + batch_input_simple_public_object_batch_input_for_create=batch_input ) + + if not created_companies or not hasattr(created_companies, "results") or not created_companies.results: + raise Exception("Company creation returned no results") + + created_ids = [created_company.id for created_company in created_companies.results] + logger.info(f"Successfully created {len(created_ids)} compan(ies) with IDs: {created_ids}") + except Exception as e: + logger.error(f"Companies creation failed: {str(e)}") raise Exception(f"Companies creation failed {e}") def update_companies(self, company_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: @@ -182,9 +265,10 @@ def update_companies(self, company_ids: List[Text], values_to_update: Dict[Text, companies_to_update = [ HubSpotObjectBatchInput(id=company_id, properties=values_to_update) for company_id in company_ids ] + batch_input = BatchInputSimplePublicObjectBatchInput(inputs=companies_to_update) try: updated_companies = hubspot.crm.companies.batch_api.update( - inputs=companies_to_update, + batch_input_simple_public_object_batch_input=batch_input ) logger.info( f"Companies with ID {[updated_company.id for updated_company in updated_companies.results]} updated" @@ -195,10 +279,9 @@ def update_companies(self, company_ids: List[Text], values_to_update: Dict[Text, def delete_companies(self, company_ids: List[Text]) -> None: hubspot = self.handler.connect() companies_to_delete = [HubSpotObjectId(id=company_id) for company_id in company_ids] + batch_input = BatchInputSimplePublicObjectId(inputs=companies_to_delete) try: - hubspot.crm.companies.batch_api.archive( - inputs=companies_to_delete, - ) + hubspot.crm.companies.batch_api.archive(batch_input_simple_public_object_id=batch_input) logger.info("Companies deleted") except Exception as e: raise Exception(f"Companies deletion failed {e}") @@ -231,7 +314,10 @@ def select(self, query: ast.Select) -> pd.DataFrame: select_statement_parser = SELECTQueryParser(query, "contacts", self.get_columns()) selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() - contacts_df = pd.json_normalize(self.get_contacts(limit=result_limit)) + contacts_df = pd.json_normalize(self.get_contacts(limit=result_limit, where_conditions=where_conditions)) + if contacts_df.empty: + contacts_df = pd.DataFrame(columns=self._get_default_contact_columns()) + select_statement_executor = SELECTQueryExecutor( contacts_df, selected_columns, where_conditions, order_by_conditions ) @@ -259,7 +345,7 @@ def insert(self, query: ast.Insert) -> None: """ insert_statement_parser = INSERTQueryParser( query, - supported_columns=["email", "firstname", "firstname", "phone", "company", "website"], + supported_columns=["email", "firstname", "lastname", "phone", "company", "website"], mandatory_columns=["email"], all_mandatory=False, ) @@ -287,11 +373,23 @@ def update(self, query: ast.Update) -> None: update_statement_parser = UPDATEQueryParser(query) values_to_update, where_conditions = update_statement_parser.parse_query() - contacts_df = pd.json_normalize(self.get_contacts()) + contacts_df = pd.json_normalize(self.get_contacts(limit=1000, where_conditions=where_conditions)) + + if contacts_df.empty: + raise ValueError( + "No contacts retrieved from HubSpot to evaluate update conditions. Verify your connection and permissions." + ) + update_query_executor = UPDATEQueryExecutor(contacts_df, where_conditions) + filtered_df = update_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No contacts found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - contacts_df = update_query_executor.execute_query() - contact_ids = contacts_df["id"].tolist() + contact_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Updating {len(contact_ids)} contact(s) matching WHERE conditions") self.update_contacts(contact_ids, values_to_update) def delete(self, query: ast.Delete) -> None: @@ -315,44 +413,206 @@ def delete(self, query: ast.Delete) -> None: delete_statement_parser = DELETEQueryParser(query) where_conditions = delete_statement_parser.parse_query() - contacts_df = pd.json_normalize(self.get_contacts()) + contacts_df = pd.json_normalize(self.get_contacts(limit=1000, where_conditions=where_conditions)) + + if contacts_df.empty: + raise ValueError( + "No contacts retrieved from HubSpot to evaluate delete conditions. Verify your connection and permissions." + ) + delete_query_executor = DELETEQueryExecutor(contacts_df, where_conditions) + filtered_df = delete_query_executor.execute_query() - contacts_df = delete_query_executor.execute_query() - contact_ids = contacts_df["id"].tolist() + if filtered_df.empty: + raise ValueError( + f"No contacts found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) + + contact_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Deleting {len(contact_ids)} contact(s) matching WHERE conditions") self.delete_contacts(contact_ids) def get_columns(self) -> List[Text]: - return pd.json_normalize(self.get_contacts(limit=1)).columns.tolist() + return self._get_default_contact_columns() + + @staticmethod + def _get_default_contact_columns() -> List[str]: + return [ + "id", + "email", + "firstname", + "lastname", + "phone", + "company", + "website", + "createdate", + "lastmodifieddate", + ] - def get_contacts(self, **kwargs) -> List[Dict]: + def get_contacts( + self, + limit: Optional[int] = None, + where_conditions: Optional[List] = None, + **kwargs, + ) -> List[Dict]: hubspot = self.handler.connect() - contacts = hubspot.crm.contacts.get_all(**kwargs) - contacts_dict = [ - { - "id": contact.id, - "email": contact.properties["email"], - "firstname": contact.properties.get("firstname", None), - "lastname": contact.properties.get("lastname", None), - "phone": contact.properties.get("phone", None), - "company": contact.properties.get("company", None), - "website": contact.properties.get("website", None), - "createdate": contact.properties["createdate"], - "lastmodifieddate": contact.properties["lastmodifieddate"], - } - for contact in contacts + requested_properties = kwargs.get("properties", []) + default_properties = [ + "email", + "firstname", + "lastname", + "phone", + "company", + "website", + "createdate", + "lastmodifieddate", ] + + properties = list({*default_properties, *requested_properties}) + + api_kwargs = {**kwargs, "properties": properties} + if limit is not None: + api_kwargs["limit"] = limit + else: + api_kwargs.pop("limit", None) + + # Try using HubSpot search API if we have simple equality filters + if where_conditions: + search_results = self._search_contacts_by_conditions(hubspot, where_conditions, properties, limit) + if search_results is not None: + logger.info(f"Retrieved {len(search_results)} contacts from HubSpot via search API") + return search_results + + contacts = hubspot.crm.contacts.get_all(**api_kwargs) + contacts_dict = [] + + try: + for contact in contacts: + contacts_dict.append(self._contact_to_dict(contact)) + if limit is not None and len(contacts_dict) >= limit: + break + except Exception as e: + logger.error(f"Failed to iterate HubSpot contacts: {str(e)}") + raise + + logger.info(f"Retrieved {len(contacts_dict)} contacts from HubSpot") return contacts_dict + def _search_contacts_by_conditions( + self, + hubspot: HubSpot, + where_conditions: List, + properties: List[str], + limit: Optional[int], + ) -> Optional[List[Dict[str, Any]]]: + filters = [] + + for condition in where_conditions: + if not isinstance(condition, (list, tuple)) or len(condition) < 3: + continue + + operator, column, value = condition[0], condition[1], condition[2] + if operator != "=" or column not in {"email", "id"}: + continue + + property_name = "hs_object_id" if column == "id" else column + filters.append( + { + "propertyName": property_name, + "operator": "EQ", + "value": str(value), + } + ) + + if not filters: + return None + + collected: List[Dict[str, Any]] = [] + remaining = limit if limit is not None else float("inf") + after = None + + while remaining > 0: + page_limit = min(int(remaining) if remaining != float("inf") else 100, 100) + search_request = { + "properties": properties, + "limit": page_limit, + "filterGroups": [{"filters": filters}], + } + + if after is not None: + search_request["after"] = after + + response = hubspot.crm.contacts.search_api.do_search(public_object_search_request=search_request) + + results = getattr(response, "results", []) + for contact in results: + collected.append(self._contact_to_dict(contact)) + if limit is not None and len(collected) >= limit: + return collected + + paging = getattr(response, "paging", None) + next_page = getattr(paging, "next", None) if paging else None + after = getattr(next_page, "after", None) if next_page else None + + if after is None or (limit is not None and len(collected) >= limit): + break + + if remaining != float("inf"): + remaining = limit - len(collected) + + return collected + + def _contact_to_dict(self, contact: Any) -> Dict[str, Any]: + try: + properties = getattr(contact, "properties", {}) or {} + return { + "id": contact.id, + "email": properties.get("email"), + "firstname": properties.get("firstname"), + "lastname": properties.get("lastname"), + "phone": properties.get("phone"), + "company": properties.get("company"), + "website": properties.get("website"), + "createdate": properties.get("createdate"), + "lastmodifieddate": properties.get("lastmodifieddate"), + } + except Exception as e: + logger.warning(f"Error processing contact {getattr(contact, 'id', 'unknown')}: {str(e)}") + return { + "id": getattr(contact, "id", None), + "email": None, + "firstname": None, + "lastname": None, + "phone": None, + "company": None, + "website": None, + "createdate": None, + "lastmodifieddate": None, + } + def create_contacts(self, contacts_data: List[Dict[Text, Any]]) -> None: + if not contacts_data: + raise ValueError("No contact data provided for creation") + + logger.info(f"Attempting to create {len(contacts_data)} contact(s): {contacts_data}") + hubspot = self.handler.connect() contacts_to_create = [HubSpotObjectInputCreate(properties=contact) for contact in contacts_data] + batch_input = BatchInputSimplePublicObjectBatchInputForCreate(inputs=contacts_to_create) + try: - created_contacts = hubspot.crm.contacts.batch_api.create(inputs=contacts_to_create) - logger.info( - f"Contacts created with ID {[created_contact.id for created_contact in created_contacts.results]}" + created_contacts = hubspot.crm.contacts.batch_api.create( + batch_input_simple_public_object_batch_input_for_create=batch_input ) + + if not created_contacts or not hasattr(created_contacts, "results") or not created_contacts.results: + raise Exception("Contact creation returned no results") + + created_ids = [created_contact.id for created_contact in created_contacts.results] + logger.info(f"Successfully created {len(created_ids)} contact(s) with IDs: {created_ids}") + except Exception as e: + logger.error(f"Contacts creation failed: {str(e)}") raise Exception(f"Contacts creation failed {e}") def update_contacts(self, contact_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: @@ -360,9 +620,10 @@ def update_contacts(self, contact_ids: List[Text], values_to_update: Dict[Text, contacts_to_update = [ HubSpotObjectBatchInput(id=contact_id, properties=values_to_update) for contact_id in contact_ids ] + batch_input = BatchInputSimplePublicObjectBatchInput(inputs=contacts_to_update) try: updated_contacts = hubspot.crm.contacts.batch_api.update( - inputs=contacts_to_update, + batch_input_simple_public_object_batch_input=batch_input ) logger.info( f"Contacts with ID {[updated_contact.id for updated_contact in updated_contacts.results]} updated" @@ -373,10 +634,9 @@ def update_contacts(self, contact_ids: List[Text], values_to_update: Dict[Text, def delete_contacts(self, contact_ids: List[Text]) -> None: hubspot = self.handler.connect() contacts_to_delete = [HubSpotObjectId(id=contact_id) for contact_id in contact_ids] + batch_input = BatchInputSimplePublicObjectId(inputs=contacts_to_delete) try: - hubspot.crm.contacts.batch_api.archive( - inputs=contacts_to_delete, - ) + hubspot.crm.contacts.batch_api.archive(batch_input_simple_public_object_id=batch_input) logger.info("Contacts deleted") except Exception as e: raise Exception(f"Contacts deletion failed {e}") @@ -410,6 +670,9 @@ def select(self, query: ast.Select) -> pd.DataFrame: selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() deals_df = pd.json_normalize(self.get_deals(limit=result_limit)) + if deals_df.empty: + deals_df = pd.DataFrame(columns=self._get_default_deal_columns()) + select_statement_executor = SELECTQueryExecutor( deals_df, selected_columns, where_conditions, order_by_conditions ) @@ -465,11 +728,23 @@ def update(self, query: ast.Update) -> None: update_statement_parser = UPDATEQueryParser(query) values_to_update, where_conditions = update_statement_parser.parse_query() - deals_df = pd.json_normalize(self.get_deals()) + deals_df = pd.json_normalize(self.get_deals(limit=1000)) + + if deals_df.empty: + raise ValueError( + "No deals retrieved from HubSpot to evaluate update conditions. Verify your connection and permissions." + ) + update_query_executor = UPDATEQueryExecutor(deals_df, where_conditions) + filtered_df = update_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No deals found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - deals_df = update_query_executor.execute_query() - deal_ids = deals_df["id"].tolist() + deal_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Updating {len(deal_ids)} deal(s) matching WHERE conditions") self.update_deals(deal_ids, values_to_update) def delete(self, query: ast.Delete) -> None: @@ -493,53 +768,102 @@ def delete(self, query: ast.Delete) -> None: delete_statement_parser = DELETEQueryParser(query) where_conditions = delete_statement_parser.parse_query() - deals_df = pd.json_normalize(self.get_deals()) + deals_df = pd.json_normalize(self.get_deals(limit=1000)) + + if deals_df.empty: + raise ValueError( + "No deals retrieved from HubSpot to evaluate delete conditions. Verify your connection and permissions." + ) + delete_query_executor = DELETEQueryExecutor(deals_df, where_conditions) + filtered_df = delete_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No deals found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - deals_df = delete_query_executor.execute_query() - deal_ids = deals_df["id"].tolist() + deal_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Deleting {len(deal_ids)} deal(s) matching WHERE conditions") self.delete_deals(deal_ids) def get_columns(self) -> List[Text]: - return pd.json_normalize(self.get_deals(limit=1)).columns.tolist() + return self._get_default_deal_columns() + + @staticmethod + def _get_default_deal_columns() -> List[str]: + return [ + "id", + "dealname", + "amount", + "pipeline", + "closedate", + "dealstage", + "hubspot_owner_id", + "createdate", + "hs_lastmodifieddate", + ] def get_deals(self, **kwargs) -> List[Dict]: hubspot = self.handler.connect() deals = hubspot.crm.deals.get_all(**kwargs) - deals_dict = [ - { - "id": deal.id, - "dealname": deal.properties["dealname"], - "amount": deal.properties.get("amount", None), - "pipeline": deal.properties.get("pipeline", None), - "closedate": deal.properties.get("closedate", None), - "dealstage": deal.properties.get("dealstage", None), - "hubspot_owner_id": deal.properties.get("hubspot_owner_id", None), - "createdate": deal.properties["createdate"], - "hs_lastmodifieddate": deal.properties["hs_lastmodifieddate"], - } - for deal in deals - ] + deals_dict = [] + + for deal in deals: + try: + deal_dict = { + "id": deal.id, + "dealname": deal.properties.get("dealname", None), + "amount": deal.properties.get("amount", None), + "pipeline": deal.properties.get("pipeline", None), + "closedate": deal.properties.get("closedate", None), + "dealstage": deal.properties.get("dealstage", None), + "hubspot_owner_id": deal.properties.get("hubspot_owner_id", None), + "createdate": deal.properties.get("createdate", None), + "hs_lastmodifieddate": deal.properties.get("hs_lastmodifieddate", None), + } + deals_dict.append(deal_dict) + except Exception as e: + logger.error(f"Error processing deal {getattr(deal, 'id', 'unknown')}: {str(e)}") + raise ValueError( + f"Failed to process deal {getattr(deal, 'id', 'unknown')}. " + f"Please verify the HubSpot record and try again." + ) from e + + logger.info(f"Retrieved {len(deals_dict)} deals from HubSpot") return deals_dict def create_deals(self, deals_data: List[Dict[Text, Any]]) -> None: + if not deals_data: + raise ValueError("No deal data provided for creation") + + logger.info(f"Attempting to create {len(deals_data)} deal(s): {deals_data}") + hubspot = self.handler.connect() deals_to_create = [HubSpotObjectInputCreate(properties=deal) for deal in deals_data] + batch_input = BatchInputSimplePublicObjectBatchInputForCreate(inputs=deals_to_create) + try: created_deals = hubspot.crm.deals.batch_api.create( - inputs=deals_to_create, + batch_input_simple_public_object_batch_input_for_create=batch_input ) - logger.info(f"Deals created with ID's {[created_deal.id for created_deal in created_deals.results]}") + + if not created_deals or not hasattr(created_deals, "results") or not created_deals.results: + raise Exception("Deal creation returned no results") + + created_ids = [created_deal.id for created_deal in created_deals.results] + logger.info(f"Successfully created {len(created_ids)} deal(s) with IDs: {created_ids}") + except Exception as e: + logger.error(f"Deals creation failed: {str(e)}") raise Exception(f"Deals creation failed {e}") def update_deals(self, deal_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: hubspot = self.handler.connect() deals_to_update = [HubSpotObjectBatchInput(id=deal_id, properties=values_to_update) for deal_id in deal_ids] + batch_input = BatchInputSimplePublicObjectBatchInput(inputs=deals_to_update) try: - updated_deals = hubspot.crm.deals.batch_api.update( - inputs=deals_to_update, - ) + updated_deals = hubspot.crm.deals.batch_api.update(batch_input_simple_public_object_batch_input=batch_input) logger.info(f"Deals with ID {[updated_deal.id for updated_deal in updated_deals.results]} updated") except Exception as e: raise Exception(f"Deals update failed {e}") @@ -547,10 +871,9 @@ def update_deals(self, deal_ids: List[Text], values_to_update: Dict[Text, Any]) def delete_deals(self, deal_ids: List[Text]) -> None: hubspot = self.handler.connect() deals_to_delete = [HubSpotObjectId(id=deal_id) for deal_id in deal_ids] + batch_input = BatchInputSimplePublicObjectId(inputs=deals_to_delete) try: - hubspot.crm.deals.batch_api.archive( - inputs=deals_to_delete, - ) + hubspot.crm.deals.batch_api.archive(batch_input_simple_public_object_id=batch_input) logger.info("Deals deleted") except Exception as e: raise Exception(f"Deals deletion failed {e}") From ee70c351eddfaab810d8fc1a348e44a7e979d612 Mon Sep 17 00:00:00 2001 From: Hamoon Date: Thu, 6 Nov 2025 23:56:40 +0330 Subject: [PATCH 06/11] [feat] Add ClickHouse data catalog --- .../clickhouse_handler/clickhouse_handler.py | 274 +++++++++++++++++- 1 file changed, 272 insertions(+), 2 deletions(-) diff --git a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py index feda48c1323..9d8ce2d171c 100644 --- a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py +++ b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py @@ -1,4 +1,5 @@ from urllib.parse import quote, urlencode +from typing import Optional, List import pandas as pd from sqlalchemy import create_engine @@ -8,7 +9,7 @@ from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender from mindsdb.utilities import log -from mindsdb.integrations.libs.base import DatabaseHandler +from mindsdb.integrations.libs.base import MetaDatabaseHandler from mindsdb.integrations.libs.response import ( HandlerStatusResponse as StatusResponse, HandlerResponse as Response, @@ -18,7 +19,7 @@ logger = log.getLogger(__name__) -class ClickHouseHandler(DatabaseHandler): +class ClickHouseHandler(MetaDatabaseHandler): """ This handler handles connection and execution of the ClickHouse statements. """ @@ -165,3 +166,272 @@ def get_columns(self, table_name) -> Response: q = f"DESCRIBE {table_name}" result = self.native_query(q) return result + + def meta_get_tables(self, table_names: Optional[List[str]] = None) -> Response: + """ + Retrieves metadata information about the tables in the ClickHouse database + to be stored in the data catalog. + + Args: + table_names (list): A list of table names for which to retrieve metadata information. + + Returns: + Response: A response object containing the metadata information. + """ + database = self.connection_data['database'] + + query = f""" + SELECT + name as table_name, + database as table_schema, + engine as table_type, + comment as table_description, + total_rows as row_count + FROM system.tables + WHERE database = '{database}' + """ + + if table_names is not None and len(table_names) > 0: + quoted_names = [f"'{t}'" for t in table_names] + query += f" AND name IN ({','.join(quoted_names)})" + + query += " ORDER BY name" + + result = self.native_query(query) + return result + + def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: + """ + Retrieves column metadata for the specified tables (or all tables if no list is provided). + This includes column comments that you can set in ClickHouse using: + ALTER TABLE table_name MODIFY COLUMN column_name Type COMMENT 'description' + + Args: + table_names (list): A list of table names for which to retrieve column metadata. + + Returns: + Response: A response object containing the column metadata. + """ + database = self.connection_data['database'] + + query = f""" + SELECT + table as table_name, + name as column_name, + type as data_type, + comment as column_description, + default_expression as column_default, + CASE WHEN is_in_primary_key = 1 THEN 0 ELSE 1 END as is_nullable + FROM system.columns + WHERE database = '{database}' + """ + + if table_names is not None and len(table_names) > 0: + quoted_names = [f"'{t}'" for t in table_names] + query += f" AND table IN ({','.join(quoted_names)})" + + query += " ORDER BY table, position" + + result = self.native_query(query) + return result + + def meta_get_column_statistics(self, table_names: Optional[List[str]] = None) -> Response: + """ + Retrieves column statistics for the specified tables (or all tables if no list is provided). + Uses the base class implementation which calls meta_get_column_statistics_for_table for each table. + + Args: + table_names (list): A list of table names for which to retrieve column statistics. + + Returns: + Response: A response object containing the column statistics. + """ + # Use the base class implementation that calls meta_get_column_statistics_for_table + return super().meta_get_column_statistics(table_names) + + def meta_get_column_statistics_for_table( + self, table_name: str, column_names: Optional[List[str]] = None + ) -> Response: + """ + Retrieves column statistics for a specific table. + + Args: + table_name (str): The name of the table. + column_names (Optional[List[str]]): List of column names to retrieve statistics for. + If None, statistics for all columns will be returned. + + Returns: + Response: A response object containing the column statistics for the table. + """ + database = self.connection_data['database'] + + # Get the list of columns for this table + columns_query = f""" + SELECT name, type + FROM system.columns + WHERE database = '{database}' AND table = '{table_name}' + """ + + if column_names: + quoted_names = [f"'{c}'" for c in column_names] + columns_query += f" AND name IN ({','.join(quoted_names)})" + + try: + columns_result = self.native_query(columns_query) + + if columns_result.resp_type == RESPONSE_TYPE.ERROR or columns_result.data_frame.empty: + logger.warning(f"No columns found for table {table_name}") + return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) + + # Build statistics query - collect all stats in one query + select_parts = [] + for _, row in columns_result.data_frame.iterrows(): + col = row['name'] + # Use backticks to handle special characters in column names + select_parts.extend([ + f"countIf(`{col}` IS NULL) AS nulls_{col}", + f"uniq(`{col}`) AS distincts_{col}", + f"toString(min(`{col}`)) AS min_{col}", + f"toString(max(`{col}`)) AS max_{col}", + ]) + + if not select_parts: + return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) + + # Build the query to get stats for all columns at once + stats_query = f""" + SELECT + count(*) AS total_rows, + {', '.join(select_parts)} + FROM `{database}`.`{table_name}` + """ + + stats_result = self.native_query(stats_query) + + if stats_result.resp_type != RESPONSE_TYPE.TABLE or stats_result.data_frame.empty: + logger.warning(f"Could not retrieve stats for table {table_name}") + # Return placeholder stats + placeholder_data = [] + for _, row in columns_result.data_frame.iterrows(): + placeholder_data.append({ + 'table_name': table_name, + 'column_name': row['name'], + 'null_percentage': None, + 'distinct_values_count': None, + 'most_common_values': None, + 'most_common_frequencies': None, + 'minimum_value': None, + 'maximum_value': None, + }) + return Response(RESPONSE_TYPE.TABLE, pd.DataFrame(placeholder_data)) + + # Parse the stats result + stats_data = stats_result.data_frame.iloc[0] + total_rows = stats_data.get('total_rows', 0) + + # Build the final statistics DataFrame + all_stats = [] + for _, row in columns_result.data_frame.iterrows(): + col = row['name'] + nulls = stats_data.get(f'nulls_{col}', 0) + distincts = stats_data.get(f'distincts_{col}', None) + min_val = stats_data.get(f'min_{col}', None) + max_val = stats_data.get(f'max_{col}', None) + + # Calculate null percentage + null_pct = None + if total_rows is not None and total_rows > 0: + null_pct = round((nulls / total_rows) * 100, 2) + + all_stats.append({ + 'table_name': table_name, + 'column_name': col, + 'null_percentage': null_pct, + 'distinct_values_count': distincts, + 'most_common_values': None, + 'most_common_frequencies': None, + 'minimum_value': min_val, + 'maximum_value': max_val, + }) + + return Response(RESPONSE_TYPE.TABLE, pd.DataFrame(all_stats)) + + except Exception as e: + logger.error(f"Exception while fetching statistics for table {table_name}: {e}") + # Return empty stats on error + return Response( + RESPONSE_TYPE.ERROR, + error_message=f"Could not retrieve statistics for table {table_name}: {str(e)}" + ) + + def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Response: + """ + Retrieves primary key information for the specified tables (or all tables if no list is provided). + + Args: + table_names (list): A list of table names for which to retrieve primary key information. + + Returns: + Response: A response object containing the primary key information. + """ + database = self.connection_data['database'] + + query = f""" + SELECT + table as table_name, + name as column_name, + position as ordinal_position, + 'PRIMARY' as constraint_name + FROM system.columns + WHERE database = '{database}' + AND is_in_primary_key = 1 + """ + + if table_names is not None and len(table_names) > 0: + quoted_names = [f"'{t}'" for t in table_names] + query += f" AND table IN ({','.join(quoted_names)})" + + query += " ORDER BY table, position" + + result = self.native_query(query) + return result + + def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None) -> Response: + """ + Retrieves foreign key information for the specified tables (or all tables if no list is provided). + Note: ClickHouse does not enforce foreign key constraints, but this method is provided for completeness. + + Args: + table_names (list): A list of table names for which to retrieve foreign key information. + + Returns: + Response: A response object containing an empty DataFrame (ClickHouse doesn't support foreign keys). + """ + # ClickHouse does not support foreign key constraints + # Return an empty DataFrame with the expected columns + df = pd.DataFrame(columns=[ + 'parent_table_name', + 'parent_column_name', + 'child_table_name', + 'child_column_name', + 'constraint_name' + ]) + return Response(RESPONSE_TYPE.TABLE, df) + + def meta_get_handler_info(self, **kwargs) -> str: + """ + Retrieves information about the ClickHouse handler design and implementation. + + Returns: + str: A string containing information about the ClickHouse handler's capabilities. + """ + return ( + "ClickHouse is a fast open-source column-oriented database management system.\n" + "Key features:\n" + "- Supports standard SQL syntax with some extensions\n" + "- Use backticks (`) to quote table and column names with special characters\n" + "- Does NOT support traditional foreign key constraints (they are not enforced)\n" + "- Optimized for analytical queries (OLAP) rather than transactional operations (OLTP)\n" + "- Supports various table engines (MergeTree, ReplacingMergeTree, SummingMergeTree, etc.)\n" + "- Native support for arrays, nested structures, and approximate algorithms\n" + ) From 0ba0bd636483d4f21c94b5f8476876f40b5b3ca5 Mon Sep 17 00:00:00 2001 From: Hamoon Date: Fri, 7 Nov 2025 00:19:02 +0330 Subject: [PATCH 07/11] [fix] Update is_nullable column retrieval in ClickHouse metadata query --- .../handlers/clickhouse_handler/clickhouse_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py index 9d8ce2d171c..cea00f92276 100644 --- a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py +++ b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py @@ -221,7 +221,7 @@ def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: type as data_type, comment as column_description, default_expression as column_default, - CASE WHEN is_in_primary_key = 1 THEN 0 ELSE 1 END as is_nullable + is_nullable as is_nullable FROM system.columns WHERE database = '{database}' """ From 55603d2d95b6d083797ebb9dfc50319a1f846889 Mon Sep 17 00:00:00 2001 From: Hamoon Date: Fri, 7 Nov 2025 01:14:14 +0330 Subject: [PATCH 08/11] [feat] Implement nullable column check and optimize statistics retrieval in ClickHouse handler --- .../clickhouse_handler/clickhouse_handler.py | 102 +++++++++++++++--- 1 file changed, 90 insertions(+), 12 deletions(-) diff --git a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py index cea00f92276..08e18e333ac 100644 --- a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py +++ b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py @@ -32,7 +32,8 @@ def __init__(self, name, connection_data, **kwargs): self.connection_data = connection_data self.renderer = SqlalchemyRender(ClickHouseDialect) self.is_connected = False - self.protocol = connection_data.get("protocol", "native") + self.protocol = connection_data.get('protocol', 'native') + self._has_is_nullable_column = None # Cache for version check def __del__(self): if self.is_connected is True: @@ -167,6 +168,36 @@ def get_columns(self, table_name) -> Response: result = self.native_query(q) return result + def _check_has_is_nullable_column(self) -> bool: + """ + Checks if the is_nullable column exists in system.columns table. + This column was added in ClickHouse 23.x. + + Returns: + bool: True if is_nullable column exists, False otherwise. + """ + if self._has_is_nullable_column is not None: + return self._has_is_nullable_column + + try: + check_query = """ + SELECT name + FROM system.columns + WHERE database = 'system' + AND table = 'columns' + AND name = 'is_nullable' + """ + result = self.native_query(check_query) + self._has_is_nullable_column = ( + result.resp_type == RESPONSE_TYPE.TABLE + and not result.data_frame.empty + ) + except Exception as e: + logger.warning(f"Could not check for is_nullable column: {e}") + self._has_is_nullable_column = False + + return self._has_is_nullable_column + def meta_get_tables(self, table_names: Optional[List[str]] = None) -> Response: """ Retrieves metadata information about the tables in the ClickHouse database @@ -214,14 +245,23 @@ def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: """ database = self.connection_data['database'] - query = f""" - SELECT + # Check if is_nullable column is available (ClickHouse 23.x+) + has_is_nullable = self._check_has_is_nullable_column() + + # Build the SELECT clause based on available columns + select_clause = """ table as table_name, name as column_name, type as data_type, comment as column_description, - default_expression as column_default, - is_nullable as is_nullable + default_expression as column_default""" + + if has_is_nullable: + select_clause += """, + is_nullable as is_nullable""" + + query = f""" + SELECT {select_clause} FROM system.columns WHERE database = '{database}' """ @@ -253,7 +293,9 @@ def meta_get_column_statistics_for_table( self, table_name: str, column_names: Optional[List[str]] = None ) -> Response: """ - Retrieves column statistics for a specific table. + Retrieves column statistics for a specific table using sampling for large tables. + Uses ClickHouse's SAMPLE clause to compute statistics on a sample of the data + for tables with more than 100,000 rows, significantly reducing query time. Args: table_name (str): The name of the table. @@ -283,6 +325,30 @@ def meta_get_column_statistics_for_table( logger.warning(f"No columns found for table {table_name}") return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) + # Check table size to determine if we should use sampling + size_query = f""" + SELECT total_rows + FROM system.tables + WHERE database = '{database}' AND name = '{table_name}' + """ + + size_result = self.native_query(size_query) + table_row_count = 0 + use_sampling = False + sample_rate = 0.1 # 10% sample by default + + if size_result.resp_type == RESPONSE_TYPE.TABLE and not size_result.data_frame.empty: + table_row_count = size_result.data_frame.iloc[0]['total_rows'] + # Use sampling for tables with more than 100K rows + if table_row_count > 100000: + use_sampling = True + # Adjust sample rate based on table size + if table_row_count > 10000000: # 10M+ rows + sample_rate = 0.01 # 1% sample + elif table_row_count > 1000000: # 1M+ rows + sample_rate = 0.05 # 5% sample + logger.info(f"Using {sample_rate * 100}% sampling for table {table_name} with {table_row_count} rows") + # Build statistics query - collect all stats in one query select_parts = [] for _, row in columns_result.data_frame.iterrows(): @@ -298,12 +364,13 @@ def meta_get_column_statistics_for_table( if not select_parts: return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) - # Build the query to get stats for all columns at once + # Build the query with optional SAMPLE clause + sample_clause = f" SAMPLE {sample_rate}" if use_sampling else "" stats_query = f""" SELECT count(*) AS total_rows, {', '.join(select_parts)} - FROM `{database}`.`{table_name}` + FROM `{database}`.`{table_name}`{sample_clause} """ stats_result = self.native_query(stats_query) @@ -327,7 +394,13 @@ def meta_get_column_statistics_for_table( # Parse the stats result stats_data = stats_result.data_frame.iloc[0] - total_rows = stats_data.get('total_rows', 0) + sampled_rows = stats_data.get('total_rows', 0) + + # If we used sampling, extrapolate the distinct count + # Note: null percentage, min, and max are still representative from the sample + extrapolation_factor = 1.0 + if use_sampling and sampled_rows > 0 and table_row_count > 0: + extrapolation_factor = table_row_count / sampled_rows # Build the final statistics DataFrame all_stats = [] @@ -338,10 +411,15 @@ def meta_get_column_statistics_for_table( min_val = stats_data.get(f'min_{col}', None) max_val = stats_data.get(f'max_{col}', None) - # Calculate null percentage + # Calculate null percentage (based on sample) null_pct = None - if total_rows is not None and total_rows > 0: - null_pct = round((nulls / total_rows) * 100, 2) + if sampled_rows is not None and sampled_rows > 0: + null_pct = round((nulls / sampled_rows) * 100, 2) + + # Extrapolate distinct count if we used sampling + # Use a conservative estimate: min(extrapolated, table_row_count) + if distincts is not None and use_sampling: + distincts = int(min(distincts * extrapolation_factor, table_row_count)) all_stats.append({ 'table_name': table_name, From 19276986ccbd917e2e3b78f91c84792cb39fcdbb Mon Sep 17 00:00:00 2001 From: Hamoon Date: Fri, 7 Nov 2025 16:00:20 +0330 Subject: [PATCH 09/11] [fix] Simplify column statistics retrieval by removing sampling logic --- .../clickhouse_handler/clickhouse_handler.py | 83 +++++-------------- 1 file changed, 22 insertions(+), 61 deletions(-) diff --git a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py index 08e18e333ac..f182892f2d1 100644 --- a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py +++ b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py @@ -293,62 +293,35 @@ def meta_get_column_statistics_for_table( self, table_name: str, column_names: Optional[List[str]] = None ) -> Response: """ - Retrieves column statistics for a specific table using sampling for large tables. - Uses ClickHouse's SAMPLE clause to compute statistics on a sample of the data - for tables with more than 100,000 rows, significantly reducing query time. + Retrieves column statistics for a specific table. Args: table_name (str): The name of the table. column_names (Optional[List[str]]): List of column names to retrieve statistics for. If None, statistics for all columns will be returned. - Returns: Response: A response object containing the column statistics for the table. """ database = self.connection_data['database'] - + # Get the list of columns for this table columns_query = f""" SELECT name, type FROM system.columns WHERE database = '{database}' AND table = '{table_name}' """ - + if column_names: quoted_names = [f"'{c}'" for c in column_names] columns_query += f" AND name IN ({','.join(quoted_names)})" - + try: columns_result = self.native_query(columns_query) - + if columns_result.resp_type == RESPONSE_TYPE.ERROR or columns_result.data_frame.empty: logger.warning(f"No columns found for table {table_name}") return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) - - # Check table size to determine if we should use sampling - size_query = f""" - SELECT total_rows - FROM system.tables - WHERE database = '{database}' AND name = '{table_name}' - """ - - size_result = self.native_query(size_query) - table_row_count = 0 - use_sampling = False - sample_rate = 0.1 # 10% sample by default - - if size_result.resp_type == RESPONSE_TYPE.TABLE and not size_result.data_frame.empty: - table_row_count = size_result.data_frame.iloc[0]['total_rows'] - # Use sampling for tables with more than 100K rows - if table_row_count > 100000: - use_sampling = True - # Adjust sample rate based on table size - if table_row_count > 10000000: # 10M+ rows - sample_rate = 0.01 # 1% sample - elif table_row_count > 1000000: # 1M+ rows - sample_rate = 0.05 # 5% sample - logger.info(f"Using {sample_rate * 100}% sampling for table {table_name} with {table_row_count} rows") - + # Build statistics query - collect all stats in one query select_parts = [] for _, row in columns_result.data_frame.iterrows(): @@ -360,21 +333,20 @@ def meta_get_column_statistics_for_table( f"toString(min(`{col}`)) AS min_{col}", f"toString(max(`{col}`)) AS max_{col}", ]) - + if not select_parts: return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) - - # Build the query with optional SAMPLE clause - sample_clause = f" SAMPLE {sample_rate}" if use_sampling else "" + + # Build the query to get stats for all columns at once stats_query = f""" SELECT count(*) AS total_rows, {', '.join(select_parts)} - FROM `{database}`.`{table_name}`{sample_clause} + FROM `{database}`.`{table_name}` """ - + stats_result = self.native_query(stats_query) - + if stats_result.resp_type != RESPONSE_TYPE.TABLE or stats_result.data_frame.empty: logger.warning(f"Could not retrieve stats for table {table_name}") # Return placeholder stats @@ -391,17 +363,11 @@ def meta_get_column_statistics_for_table( 'maximum_value': None, }) return Response(RESPONSE_TYPE.TABLE, pd.DataFrame(placeholder_data)) - + # Parse the stats result stats_data = stats_result.data_frame.iloc[0] - sampled_rows = stats_data.get('total_rows', 0) - - # If we used sampling, extrapolate the distinct count - # Note: null percentage, min, and max are still representative from the sample - extrapolation_factor = 1.0 - if use_sampling and sampled_rows > 0 and table_row_count > 0: - extrapolation_factor = table_row_count / sampled_rows - + total_rows = stats_data.get('total_rows', 0) + # Build the final statistics DataFrame all_stats = [] for _, row in columns_result.data_frame.iterrows(): @@ -410,17 +376,12 @@ def meta_get_column_statistics_for_table( distincts = stats_data.get(f'distincts_{col}', None) min_val = stats_data.get(f'min_{col}', None) max_val = stats_data.get(f'max_{col}', None) - - # Calculate null percentage (based on sample) + + # Calculate null percentage null_pct = None - if sampled_rows is not None and sampled_rows > 0: - null_pct = round((nulls / sampled_rows) * 100, 2) - - # Extrapolate distinct count if we used sampling - # Use a conservative estimate: min(extrapolated, table_row_count) - if distincts is not None and use_sampling: - distincts = int(min(distincts * extrapolation_factor, table_row_count)) - + if total_rows is not None and total_rows > 0: + null_pct = round((nulls / total_rows) * 100, 2) + all_stats.append({ 'table_name': table_name, 'column_name': col, @@ -431,9 +392,9 @@ def meta_get_column_statistics_for_table( 'minimum_value': min_val, 'maximum_value': max_val, }) - + return Response(RESPONSE_TYPE.TABLE, pd.DataFrame(all_stats)) - + except Exception as e: logger.error(f"Exception while fetching statistics for table {table_name}: {e}") # Return empty stats on error From 134b2e73898b116760ac07a6797dee331677a087 Mon Sep 17 00:00:00 2001 From: Hamoon Date: Sat, 8 Nov 2025 20:46:46 +0330 Subject: [PATCH 10/11] [fix] update Clickhouse meta handler info --- .../handlers/clickhouse_handler/clickhouse_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py index f182892f2d1..112bada74ef 100644 --- a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py +++ b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py @@ -472,5 +472,6 @@ def meta_get_handler_info(self, **kwargs) -> str: "- Does NOT support traditional foreign key constraints (they are not enforced)\n" "- Optimized for analytical queries (OLAP) rather than transactional operations (OLTP)\n" "- Supports various table engines (MergeTree, ReplacingMergeTree, SummingMergeTree, etc.)\n" + "- All ClickHouse functions are case-sensitive\n" "- Native support for arrays, nested structures, and approximate algorithms\n" ) From d4766c095408f449165d1d116b8def3e63733154 Mon Sep 17 00:00:00 2001 From: Hamoon Date: Wed, 12 Nov 2025 12:31:20 +0330 Subject: [PATCH 11/11] [fix] Format clickhouse_handler with ruff 0.11.11 --- .../clickhouse_handler/clickhouse_handler.py | 130 +++++++++--------- 1 file changed, 67 insertions(+), 63 deletions(-) diff --git a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py index 112bada74ef..28836020e73 100644 --- a/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py +++ b/mindsdb/integrations/handlers/clickhouse_handler/clickhouse_handler.py @@ -32,7 +32,7 @@ def __init__(self, name, connection_data, **kwargs): self.connection_data = connection_data self.renderer = SqlalchemyRender(ClickHouseDialect) self.is_connected = False - self.protocol = connection_data.get('protocol', 'native') + self.protocol = connection_data.get("protocol", "native") self._has_is_nullable_column = None # Cache for version check def __del__(self): @@ -172,13 +172,13 @@ def _check_has_is_nullable_column(self) -> bool: """ Checks if the is_nullable column exists in system.columns table. This column was added in ClickHouse 23.x. - + Returns: bool: True if is_nullable column exists, False otherwise. """ if self._has_is_nullable_column is not None: return self._has_is_nullable_column - + try: check_query = """ SELECT name @@ -188,14 +188,11 @@ def _check_has_is_nullable_column(self) -> bool: AND name = 'is_nullable' """ result = self.native_query(check_query) - self._has_is_nullable_column = ( - result.resp_type == RESPONSE_TYPE.TABLE - and not result.data_frame.empty - ) + self._has_is_nullable_column = result.resp_type == RESPONSE_TYPE.TABLE and not result.data_frame.empty except Exception as e: logger.warning(f"Could not check for is_nullable column: {e}") self._has_is_nullable_column = False - + return self._has_is_nullable_column def meta_get_tables(self, table_names: Optional[List[str]] = None) -> Response: @@ -209,8 +206,8 @@ def meta_get_tables(self, table_names: Optional[List[str]] = None) -> Response: Returns: Response: A response object containing the metadata information. """ - database = self.connection_data['database'] - + database = self.connection_data["database"] + query = f""" SELECT name as table_name, @@ -243,11 +240,11 @@ def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: Returns: Response: A response object containing the column metadata. """ - database = self.connection_data['database'] - + database = self.connection_data["database"] + # Check if is_nullable column is available (ClickHouse 23.x+) has_is_nullable = self._check_has_is_nullable_column() - + # Build the SELECT clause based on available columns select_clause = """ table as table_name, @@ -255,11 +252,11 @@ def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: type as data_type, comment as column_description, default_expression as column_default""" - + if has_is_nullable: select_clause += """, is_nullable as is_nullable""" - + query = f""" SELECT {select_clause} FROM system.columns @@ -294,15 +291,15 @@ def meta_get_column_statistics_for_table( ) -> Response: """ Retrieves column statistics for a specific table. - + Args: table_name (str): The name of the table. - column_names (Optional[List[str]]): List of column names to retrieve statistics for. + column_names (Optional[List[str]]): List of column names to retrieve statistics for. If None, statistics for all columns will be returned. Returns: Response: A response object containing the column statistics for the table. """ - database = self.connection_data['database'] + database = self.connection_data["database"] # Get the list of columns for this table columns_query = f""" @@ -325,14 +322,16 @@ def meta_get_column_statistics_for_table( # Build statistics query - collect all stats in one query select_parts = [] for _, row in columns_result.data_frame.iterrows(): - col = row['name'] + col = row["name"] # Use backticks to handle special characters in column names - select_parts.extend([ - f"countIf(`{col}` IS NULL) AS nulls_{col}", - f"uniq(`{col}`) AS distincts_{col}", - f"toString(min(`{col}`)) AS min_{col}", - f"toString(max(`{col}`)) AS max_{col}", - ]) + select_parts.extend( + [ + f"countIf(`{col}` IS NULL) AS nulls_{col}", + f"uniq(`{col}`) AS distincts_{col}", + f"toString(min(`{col}`)) AS min_{col}", + f"toString(max(`{col}`)) AS max_{col}", + ] + ) if not select_parts: return Response(RESPONSE_TYPE.TABLE, pd.DataFrame()) @@ -341,7 +340,7 @@ def meta_get_column_statistics_for_table( stats_query = f""" SELECT count(*) AS total_rows, - {', '.join(select_parts)} + {", ".join(select_parts)} FROM `{database}`.`{table_name}` """ @@ -352,46 +351,50 @@ def meta_get_column_statistics_for_table( # Return placeholder stats placeholder_data = [] for _, row in columns_result.data_frame.iterrows(): - placeholder_data.append({ - 'table_name': table_name, - 'column_name': row['name'], - 'null_percentage': None, - 'distinct_values_count': None, - 'most_common_values': None, - 'most_common_frequencies': None, - 'minimum_value': None, - 'maximum_value': None, - }) + placeholder_data.append( + { + "table_name": table_name, + "column_name": row["name"], + "null_percentage": None, + "distinct_values_count": None, + "most_common_values": None, + "most_common_frequencies": None, + "minimum_value": None, + "maximum_value": None, + } + ) return Response(RESPONSE_TYPE.TABLE, pd.DataFrame(placeholder_data)) # Parse the stats result stats_data = stats_result.data_frame.iloc[0] - total_rows = stats_data.get('total_rows', 0) + total_rows = stats_data.get("total_rows", 0) # Build the final statistics DataFrame all_stats = [] for _, row in columns_result.data_frame.iterrows(): - col = row['name'] - nulls = stats_data.get(f'nulls_{col}', 0) - distincts = stats_data.get(f'distincts_{col}', None) - min_val = stats_data.get(f'min_{col}', None) - max_val = stats_data.get(f'max_{col}', None) + col = row["name"] + nulls = stats_data.get(f"nulls_{col}", 0) + distincts = stats_data.get(f"distincts_{col}", None) + min_val = stats_data.get(f"min_{col}", None) + max_val = stats_data.get(f"max_{col}", None) # Calculate null percentage null_pct = None if total_rows is not None and total_rows > 0: null_pct = round((nulls / total_rows) * 100, 2) - all_stats.append({ - 'table_name': table_name, - 'column_name': col, - 'null_percentage': null_pct, - 'distinct_values_count': distincts, - 'most_common_values': None, - 'most_common_frequencies': None, - 'minimum_value': min_val, - 'maximum_value': max_val, - }) + all_stats.append( + { + "table_name": table_name, + "column_name": col, + "null_percentage": null_pct, + "distinct_values_count": distincts, + "most_common_values": None, + "most_common_frequencies": None, + "minimum_value": min_val, + "maximum_value": max_val, + } + ) return Response(RESPONSE_TYPE.TABLE, pd.DataFrame(all_stats)) @@ -399,8 +402,7 @@ def meta_get_column_statistics_for_table( logger.error(f"Exception while fetching statistics for table {table_name}: {e}") # Return empty stats on error return Response( - RESPONSE_TYPE.ERROR, - error_message=f"Could not retrieve statistics for table {table_name}: {str(e)}" + RESPONSE_TYPE.ERROR, error_message=f"Could not retrieve statistics for table {table_name}: {str(e)}" ) def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Response: @@ -413,8 +415,8 @@ def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Resp Returns: Response: A response object containing the primary key information. """ - database = self.connection_data['database'] - + database = self.connection_data["database"] + query = f""" SELECT table as table_name, @@ -448,13 +450,15 @@ def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None) -> Resp """ # ClickHouse does not support foreign key constraints # Return an empty DataFrame with the expected columns - df = pd.DataFrame(columns=[ - 'parent_table_name', - 'parent_column_name', - 'child_table_name', - 'child_column_name', - 'constraint_name' - ]) + df = pd.DataFrame( + columns=[ + "parent_table_name", + "parent_column_name", + "child_table_name", + "child_column_name", + "constraint_name", + ] + ) return Response(RESPONSE_TYPE.TABLE, df) def meta_get_handler_info(self, **kwargs) -> str: