From 8cbbe602f4f6e45ee815b7adc8c9b282073e50ed Mon Sep 17 00:00:00 2001 From: Michael Olayemi Olawepo <154475559+sejubar@users.noreply.github.com> Date: Tue, 11 Nov 2025 17:52:33 -0500 Subject: [PATCH 1/7] Fqe 1675 - additional test for mysql api (#11797) Co-authored-by: andrew --- .../executor/sql_query/steps/insert_step.py | 3 + .../flows/test_mysql_api_extended.py | 756 ++++++++++++++++++ 2 files changed, 759 insertions(+) create mode 100644 tests/integration/flows/test_mysql_api_extended.py diff --git a/mindsdb/api/executor/sql_query/steps/insert_step.py b/mindsdb/api/executor/sql_query/steps/insert_step.py index 097b6e2e116..73657ffc42c 100644 --- a/mindsdb/api/executor/sql_query/steps/insert_step.py +++ b/mindsdb/api/executor/sql_query/steps/insert_step.py @@ -4,6 +4,7 @@ from mindsdb.api.executor.planner.steps import SaveToTable, InsertToTable, CreateTableStep from mindsdb.api.executor.sql_query.result_set import ResultSet, Column +from mindsdb.utilities.exception import EntityNotExistsError from mindsdb.api.executor.exceptions import NotSupportedYet, LogicError from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES @@ -114,6 +115,8 @@ def call(self, step): table_name = step.table dn = self.session.datahub.get(integration_name) + if dn is None: + raise EntityNotExistsError("Database not found", integration_name) dn.create_table(table_name=table_name, columns=step.columns, is_replace=step.is_replace, is_create=True) return ResultSet() diff --git a/tests/integration/flows/test_mysql_api_extended.py b/tests/integration/flows/test_mysql_api_extended.py new file mode 100644 index 00000000000..6839b437e29 --- /dev/null +++ b/tests/integration/flows/test_mysql_api_extended.py @@ -0,0 +1,756 @@ +import os +import pytest +import time +from .test_mysql_api import BaseStuff +import mysql.connector + + +@pytest.fixture(scope="module") +def setup_local_db(): + """Module-scoped fixture to create a writeable DB for table tests.""" + db_name = "test_db_local" + helper = BaseStuff() + helper.use_binary = False + + params = {"user": "postgres", "password": "postgres", "host": "postgres", "port": 5432, "database": "postgres"} + + print(f"\n--> [Fixture setup_local_db] Setting up local database: {db_name} on {params['host']}:{params['port']}") + try: + helper.query(f"DROP DATABASE IF EXISTS {db_name}") + create_datasource_sql_via_connector(helper, db_name, "postgres", params) + yield db_name + except (mysql.connector.Error, TimeoutError) as e: + pytest.skip( + f"\n\n--- FIXTURE SETUP FAILED ---\n" + f"Could not connect to the PostgreSQL container ('{params['host']}').\n" + f"Please ensure your Docker Compose environment is running correctly.\n" + f"Original Error: {e}\n" + ) + finally: + print(f"\n--> [Fixture setup_local_db] Tearing down database: {db_name}") + try: + helper.query(f"DROP DATABASE IF EXISTS {db_name};") + except mysql.connector.Error: + pass + + +def create_datasource_sql_via_connector(helper_instance, db_name, engine, parameters, poll_timeout=30, poll_interval=2): + """Helper to create a datasource via a CREATE DATABASE query.""" + params_list = [f'"{k}": "{v}"' if isinstance(v, str) else f'"{k}": {v}' for k, v in parameters.items()] + params_str = ", ".join(params_list) + query_str = f"CREATE DATABASE {db_name} WITH ENGINE = '{engine}', PARAMETERS = {{{params_str}}};" + print(f" [Helper create_datasource] Executing: CREATE DATABASE {db_name}...") + helper_instance.query(query_str) + start_time = time.time() + while True: + try: + helper_instance.validate_database_creation(db_name) + print(f" [Helper create_datasource] DATABASE {db_name} created and validated.") + break + except AssertionError as e: + elapsed_time = time.time() - start_time + if elapsed_time > poll_timeout: + print(f" [Helper create_datasource] ERROR: Timeout after {poll_timeout}s waiting for {db_name}.") + raise TimeoutError(f"Timed out waiting for database {db_name} to be created.") from e + time.sleep(poll_interval) + + +def wait_for_trigger_creation(query_fn, trigger_name, timeout=20, max_interval=5): + """ + Polls information_schema to see if a trigger is visible. + Does not raise an error, returns True (found) or False (not found). + """ + start = time.time() + interval = 1 + print(f"\n[DEBUG] Checking for trigger '{trigger_name}' in information_schema (timeout={timeout}s)...") + while time.time() - start < timeout: + try: + result = query_fn(f"SELECT 1 FROM information_schema.triggers WHERE trigger_name = '{trigger_name}';") + if result: + print(f"[DEBUG] Trigger '{trigger_name}' found in information_schema after {time.time() - start:.2f}s.") + return True + except Exception: + pass + + try: + result = query_fn("SHOW TRIGGERS;") + if result and trigger_name in [row.get("Trigger", row.get("TRIGGER")) for row in result]: + print(f"[DEBUG] Trigger '{trigger_name}' found in SHOW TRIGGERS after {time.time() - start:.2f}s.") + return True + except Exception: + pass + time.sleep(interval) + interval = min(interval * 1.5, max_interval) + + print( + f"[DEBUG] WARNING: Trigger '{trigger_name}' was not found in metadata after {timeout}s. Proceeding with functional test..." + ) + return False + + +def wait_for_trigger_to_fire( + query_fn, db_name, source_table_name, target_table_name, test_id, updated_message, timeout=120, max_interval=10 +): + """ + Polls for a trigger to fire by repeatedly sending the UPDATE command + and checking the target table. This is robust against trigger creation lag. + """ + start = time.time() + interval = 1 + + while time.time() - start < timeout: + print(f"[DEBUG] Firing trigger (elapsed={time.time() - start:.1f}s, interval={interval:.2f}s)...") + query_fn(f"UPDATE {db_name}.{source_table_name} SET message = '{updated_message}' WHERE id = {test_id};") + + time.sleep(interval) + + result = query_fn(f"SELECT id, message FROM {db_name}.{target_table_name} WHERE id = {test_id};") + if result: + elapsed = time.time() - start + print(f"[DEBUG] Trigger fired and verified after {elapsed:.2f}s → {result}") + return result + + print("[DEBUG] Trigger not fired yet. Retrying...") + interval = min(interval * 1.5, max_interval) + + raise TimeoutError(f"Trigger did not fire for id {test_id} within {timeout}s despite repeated attempts.") + + +def wait_for_kb_creation(query_fn, kb_name, timeout=90, poll_interval=1): + """Polls to check if a Knowledge Base has been created successfully.""" + start_time = time.time() + while True: + try: + result = query_fn(f"DESCRIBE KNOWLEDGE_BASE {kb_name};") + if result and result[0]["name"] == kb_name: + print(f" [Helper wait_for_kb] KB {kb_name} created and validated.") + return result + except Exception: + # KB might not be queryable at all yet + pass + + elapsed_time = time.time() - start_time + if elapsed_time > timeout: + print(f" [Helper wait_for_kb] ERROR: Timeout after {timeout}s waiting for {kb_name}.") + raise TimeoutError(f"Timed out waiting for Knowledge Base {kb_name} to be created.") + time.sleep(poll_interval) + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTables(BaseStuff): + """Test suite for Table operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_local_db") + def test_table_lifecycle(self, setup_local_db, use_binary): + db_name = setup_local_db + table_name = "test_lifecycle_table" + try: + create_table_query = f"CREATE TABLE {db_name}.{table_name} (id INT, value VARCHAR(255));" + self.query(create_table_query) + result = self.query(f"SHOW TABLES FROM {db_name};") + assert table_name in [list(row.values())[0] for row in result] + replace_query = f"CREATE OR REPLACE TABLE {db_name}.{table_name} (SELECT 2 as id, 'new_data' as value);" + self.query(replace_query) + result = self.query(f"SELECT * FROM {db_name}.{table_name};") + assert result and result[0]["id"] == 2 and result[0]["value"] == "new_data" + finally: + self.query(f"DROP TABLE IF EXISTS {db_name}.{table_name};") + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTablesNegative(BaseStuff): + """Negative tests for Table operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_local_db") + def test_create_duplicate_table(self, setup_local_db, use_binary): + db_name = setup_local_db + table_name = "test_duplicate_table" + create_query = f"CREATE TABLE {db_name}.{table_name} (id INT);" + try: + self.query(create_query) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + self.query(f"DROP TABLE IF EXISTS {db_name}.{table_name};") + + def test_create_table_in_missing_db_raises_error(self, use_binary): + create_query = "CREATE TABLE non_existent_db.non_existent_table (id INT);" + with pytest.raises(Exception) as e: + self.query(create_query) + assert "non_existent_db" or "Database not found" in str(e.value).lower() + + @pytest.mark.usefixtures("setup_local_db") + def test_drop_non_existent_table(self, setup_local_db, use_binary): + db_name = setup_local_db + table_name = "test_non_existent_table" + with pytest.raises(Exception) as e: + self.query(f"DROP TABLE {db_name}.{table_name};") + assert "does not exist" in str(e.value).lower() + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLViews(BaseStuff): + """Test suite for View operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + def test_view_lifecycle(self, use_binary): + db_name = "test_sql_view_db" + view_name = "test_sql_view" + try: + self.query(f"DROP VIEW IF EXISTS {view_name};") + self.query(f"DROP DATABASE IF EXISTS {db_name};") + + create_db_query = f""" + CREATE DATABASE {db_name} + WITH ENGINE = 'postgres', PARAMETERS = {{"user": "demo_user", "password": "demo_password", "host": "samples.mindsdb.com", "port": "5432", "database": "demo", "schema": "demo"}}; + """ + self.query(create_db_query) + + create_view_query = ( + f"CREATE VIEW {view_name} AS (SELECT * FROM {db_name}.home_rentals WHERE number_of_rooms = 2);" + ) + self.query(create_view_query) + result = self.query("SHOW VIEWS;") + assert view_name in [row.get("name", row.get("Name", row.get("NAME"))) for row in result] + result = self.query(f"SELECT * FROM {view_name};") + assert len(result) > 0 and all(row["number_of_rooms"] == 2 for row in result) + alter_view_query = ( + f"ALTER VIEW {view_name} AS (SELECT * FROM {db_name}.home_rentals WHERE number_of_rooms = 1);" + ) + self.query(alter_view_query) + result_after_alter = self.query(f"SELECT * FROM {view_name};") + assert len(result_after_alter) > 0 and all(row["number_of_rooms"] == 1 for row in result_after_alter) + finally: + self.query(f"DROP VIEW IF EXISTS {view_name};") + self.query(f"DROP DATABASE IF EXISTS {db_name};") + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLViewsNegative(BaseStuff): + """Negative tests for View operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + def test_create_duplicate_view(self, use_binary): + view_name = "test_duplicate_view" + create_query = f"CREATE VIEW {view_name} AS (SELECT 1);" + try: + self.query(f"DROP VIEW IF EXISTS {view_name};") + self.query(create_query) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + self.query(f"DROP VIEW IF EXISTS {view_name};") + + def test_create_view_on_non_existent_table(self, use_binary): + view_name = "test_bad_source_view" + create_query = f"CREATE VIEW {view_name} AS (SELECT * FROM non_existent_db.non_existent_table);" + with pytest.raises(Exception) as e: + self.query(create_query) + error_str = str(e.value).lower() + assert "not found in the database" in error_str or "table name should contain only one part" in error_str + + def test_drop_non_existent_view(self, use_binary): + view_name = "non_existent_view" + try: + self.query(f"DROP VIEW IF EXISTS {view_name};") + except Exception: + pass + + with pytest.raises(Exception) as e: + self.query(f"DROP VIEW {view_name};") + error_str = str(e.value).lower() + assert "view not found" in error_str or "unknown view" in error_str + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLKnowledgeBases(BaseStuff): + """Test suite for Knowledge Base operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.fixture + def basic_kb(self, request): + """ + Fixture to create a basic Knowledge Base for alteration tests. + Requires OPENAI_API_KEY. + """ + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set. Skipping KB tests.") + + kb_name = "test_alter_kb_local" + embedding_model = "text-embedding-3-small" + + create_kb_query = f""" + CREATE KNOWLEDGE_BASE {kb_name} + USING embedding_model = {{"provider": "openai", "model_name": "{embedding_model}", "api_key": "{openai_api_key}"}}; + """ + try: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + self.query(create_kb_query) + result = wait_for_kb_creation(self.query, kb_name) + assert result and result[0]["name"] == kb_name + yield kb_name + finally: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + def test_knowledge_base_full_lifecycle(self, use_binary): + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set. Skipping Knowledge Base lifecycle test.") + + kb_name = "test_kb_sql" + content_to_insert = "MindsDB helps developers build AI-powered applications." + embedding_model = "text-embedding-3-small" + try: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + create_kb_query = f""" + CREATE KNOWLEDGE_BASE {kb_name} + USING embedding_model = {{"provider": "openai", "model_name": "{embedding_model}", "api_key": "{openai_api_key}"}}; + """ + self.query(create_kb_query) + result = wait_for_kb_creation(self.query, kb_name) + assert result and result[0]["name"] == kb_name and embedding_model in result[0]["embedding_model"] + self.query(f"INSERT INTO {kb_name} (content) VALUES ('{content_to_insert}');") + + # Give insertion a moment to process before querying + time.sleep(2) + + result = self.query(f"SELECT chunk_content FROM {kb_name} WHERE content = 'What is MindsDB?';") + assert result and "MindsDB" in result[0]["chunk_content"] + finally: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + def test_create_kb_with_invalid_provider(self, use_binary): + kb_name = "test_invalid_provider" + create_query = ( + f'CREATE KNOWLEDGE_BASE {kb_name} USING embedding_model = {{"provider": "non_existent_provider"}};' + ) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "wrong embedding provider" in str(e.value).lower() + + def test_create_kb_with_invalid_api_key(self, use_binary, request): + kb_name = "test_invalid_key_local" + create_query = f'CREATE KNOWLEDGE_BASE {kb_name} USING embedding_model = {{"provider": "openai", "api_key": "this_is_a_fake_key"}};' + try: + with pytest.raises(Exception) as e: + self.query(create_query) + assert ( + "problem with embedding model config" in str(e.value).lower() + or "invalid api key" in str(e.value).lower() + ) + finally: + # Ensure cleanup even if creation fails + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + def test_insert_into_non_existent_kb(self, use_binary): + kb_name = "non_existent_kb" + with pytest.raises(Exception) as e: + self.query(f"INSERT INTO {kb_name} (content) VALUES ('some data');") + error_str = str(e.value).lower() + assert "can't create table" in error_str or "doesn't exist" in error_str or "unknown table" in error_str + + def test_query_non_existent_kb(self, use_binary): + kb_name = "non_existent_kb" + with pytest.raises(Exception) as e: + self.query(f"SELECT * FROM {kb_name} WHERE content = 'some query';") + error_str = str(e.value).lower() + assert "not found in database" in error_str or "doesn't exist" in error_str or "unknown table" in error_str + + def test_create_duplicate_kb(self, use_binary, request): + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set. Skipping duplicate KB test.") + + kb_name = "test_duplicate_kb" + embedding_model = "text-embedding-3-small" + create_query = f""" + CREATE KNOWLEDGE_BASE {kb_name} + USING embedding_model = {{"provider": "openai", "model_name": "{embedding_model}", "api_key": "{openai_api_key}"}}; + """ + try: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + self.query(create_query) + wait_for_kb_creation(self.query, kb_name) + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + self.query(f"DROP KNOWLEDGE_BASE IF EXISTS {kb_name};") + + @pytest.mark.usefixtures("basic_kb") + def test_alter_kb_embedding_api_key(self, basic_kb, use_binary): + """Tests altering the api_key of the embedding_model.""" + kb_name = basic_kb + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY needed for this alter test.") + + new_api_key = openai_api_key + + alter_query = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + embedding_model = {{ 'api_key': '{new_api_key}' }}; + """ + self.query(alter_query) + + time.sleep(1) + + result = self.query(f"SELECT embedding_model FROM information_schema.knowledge_bases WHERE name = '{kb_name}';") + assert result + embedding_model_json = result[0].get("embedding_model") + assert embedding_model_json is not None + + assert '"provider": "openai"' in embedding_model_json + assert '"model_name": "text-embedding-3-small"' in embedding_model_json + assert '"api_key": "' in embedding_model_json + + @pytest.mark.xfail( + reason="Bug: ALTER KNOWLEDGE_BASE does not unset reranking_model. See LINEAR-TICKET-NUMBER: FQE-1716" + ) + @pytest.mark.usefixtures("basic_kb") + def test_alter_kb_reranking_model(self, basic_kb, use_binary): + """Tests adding and then disabling the reranking_model.""" + kb_name = basic_kb + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY needed for this alter test.") + + alter_query_add = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + reranking_model = {{ 'provider': 'openai', 'model_name': 'gpt-4o', 'api_key': '{openai_api_key}' }}; + """ + self.query(alter_query_add) + + result_add = self.query(f"DESCRIBE KNOWLEDGE_BASE {kb_name};") + assert result_add and '"provider": "openai"' in result_add[0]["RERANKING_MODEL"] + + alter_query_disable = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + reranking_model = false; + """ + self.query(alter_query_disable) + + result_disable = self.query(f"DESCRIBE KNOWLEDGE_BASE {kb_name};") + assert result_disable + reranking_model_desc = result_disable[0].get("RERANKING_MODEL") + assert reranking_model_desc is None or reranking_model_desc == "{}" + + @pytest.mark.xfail(reason="Bug: information_schema.knowledge_bases.PARAMS is not updated on ALTER") + @pytest.mark.usefixtures("basic_kb") + def test_alter_kb_preprocessing(self, basic_kb, use_binary): + """Tests altering the preprocessing parameters by checking the PARAMS column.""" + kb_name = basic_kb + + alter_query = f""" + ALTER KNOWLEDGE_BASE {kb_name} + USING + preprocessing = {{ 'chunk_size': 300, 'chunk_overlap': 50 }}; + """ + self.query(alter_query) + + time.sleep(1) + + result = self.query(f"SELECT PARAMS FROM information_schema.knowledge_bases WHERE name = '{kb_name}';") + + assert result, "Query to information_schema returned no results." + + params_json = result[0].get("PARAMS") + assert params_json is not None, "PARAMS column is NULL." + + assert '"preprocessing":' in params_json, "The 'preprocessing' key was not added to the PARAMS column." + assert '"chunk_size": 300' in params_json, "chunk_size was not set correctly in PARAMS." + assert '"chunk_overlap": 50' in params_json, "chunk_overlap was not set correctly in PARAMS." + + +@pytest.fixture(scope="function") +def setup_trigger_db(request): + """Function-scoped fixture to ensure a clean DB for each trigger test.""" + + db_name = "trigger_test_db_local" + + source_table_name = "trigger_source_table" + target_table_name = "trigger_target_table" + + params = {"user": "postgres", "password": "postgres", "host": "postgres", "port": 5432, "database": "postgres"} + helper = BaseStuff() + helper.use_binary = False + + try: + print( + f"\n--> [Fixture setup_trigger_db] Setting up local database: {db_name} on {params['host']}:{params['port']}" + ) + helper.query(f"DROP DATABASE IF EXISTS {db_name}") + + create_datasource_sql_via_connector(helper, db_name, "postgres", params) + + helper.query(f"CREATE TABLE {db_name}.{source_table_name} (id INT, message VARCHAR(255));") + helper.query(f"CREATE TABLE {db_name}.{target_table_name} (id INT, message VARCHAR(255));") + helper.query(f"INSERT INTO {db_name}.{source_table_name} (id, message) VALUES (101, 'initial_update_message');") + helper.query(f"INSERT INTO {db_name}.{source_table_name} (id, message) VALUES (102, 'initial_delete_message');") + + yield db_name, source_table_name, target_table_name + + except (mysql.connector.Error, TimeoutError) as setup_err: + pytest.skip(f"Trigger fixture setup failed. Ensure Docker environment is running. Error: {setup_err}") + + finally: + print(f"\n--> [CLEANUP] Dropping tables and DATABASE: {db_name}") + try: + helper.query(f"DROP TABLE IF EXISTS {db_name}.{source_table_name};") + helper.query(f"DROP TABLE IF EXISTS {db_name}.{target_table_name};") + except mysql.connector.Error as e: + print(f"Warning: Error dropping tables during cleanup: {e}") + pass + + try: + helper.query(f"DROP DATABASE IF EXISTS {db_name};") + except mysql.connector.Error as e: + print(f"Warning: Error dropping database during cleanup: {e}") + pass + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTriggers(BaseStuff): + """Test suite for Trigger operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_trigger_db") + def test_trigger_lifecycle(self, setup_trigger_db, use_binary): + db_name, source_table_name, target_table_name = setup_trigger_db + trigger_name = "test_insert_trigger" + test_id = 201 # Use a new ID that will be INSERTED + inserted_message = "this message was inserted" + try: + # Ensure the target table is clean + self.query(f"DELETE FROM {db_name}.{target_table_name};") + + # Pre-drop trigger to ensure clean state + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + create_trigger_query = f""" + CREATE TRIGGER {trigger_name} + ON {db_name}.{source_table_name} + (INSERT INTO {db_name}.{target_table_name} (id, message) SELECT id, message FROM TABLE_DELTA); + """ + print("\n[DEBUG] Sending CREATE TRIGGER command...") + self.query(create_trigger_query) + + wait_for_trigger_creation(self.query, trigger_name, timeout=20) + + print("[DEBUG] Schema check complete. Proceeding to functional firing test...") + + # Activate Trigger with an INSERT, not an UPDATE + self.query( + f"INSERT INTO {db_name}.{source_table_name} (id, message) VALUES ({test_id}, '{inserted_message}');" + ) + + # Poll the target table for the new result + result = [] + max_wait_time = 60 + interval = 1 + max_interval = 8 + start_time = time.time() + while time.time() - start_time < max_wait_time: + result = self.query(f"SELECT id, message FROM {db_name}.{target_table_name} WHERE id = {test_id};") + if result: + break + time.sleep(interval) + interval = min(interval * 2, max_interval) + + # Verify + assert result, f"Trigger did not fire for id {test_id} within {max_wait_time}s." + assert result[0]["message"] == inserted_message + + finally: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLTriggersNegative(BaseStuff): + """Negative tests for Trigger operations.""" + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("setup_trigger_db") + def test_create_duplicate_trigger(self, setup_trigger_db, use_binary): + db_name, source_table_name, _ = setup_trigger_db + trigger_name = "duplicate_trigger" + create_query = f"CREATE TRIGGER {trigger_name} ON {db_name}.{source_table_name} (SELECT 1);" + try: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + self.query(create_query) + wait_for_trigger_creation(self.query, trigger_name, timeout=20) + + with pytest.raises(Exception) as e: + self.query(create_query) + assert "already exists" in str(e.value).lower() + finally: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + def test_create_trigger_on_non_existent_table(self, use_binary, request): + trigger_name = "bad_trigger_local" + create_query = f"CREATE TRIGGER {trigger_name} ON non_existent_db.non_existent_table (SELECT 1);" + try: + with pytest.raises(Exception) as e: + self.query(create_query) + error_str = str(e.value).lower() + assert "no integration with name" in error_str or "unknown database" in error_str + finally: + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + def test_drop_non_existent_trigger(self, use_binary): + trigger_name = "non_existent_trigger" + try: + self.query(f"DROP TRIGGER {trigger_name};") + except Exception: + pass + + with pytest.raises(Exception) as e: + self.query(f"DROP TRIGGER {trigger_name};") + error_str = str(e.value).lower() + assert "doesn't exist" in error_str or "unknown trigger" in error_str + + +@pytest.mark.parametrize("use_binary", [False, True], indirect=True) +class TestMySQLQueryComposability(BaseStuff): + """Test suite for advanced query composability (CTEs, Subqueries, UNIONs).""" + + db_name = "test_composability_db" + + @pytest.fixture(scope="class") + def composability_db(self): + """Class-scoped fixture to create the postgres DB.""" + print(f"\n--> [Fixture composability_db] Setting up database: {self.db_name}") + db_details = { + "type": "postgres", + "connection_data": { + "host": "samples.mindsdb.com", + "port": "5432", + "user": "demo_user", + "password": "demo_password", + "database": "demo", + "schema": "demo", + }, + } + try: + self.create_database(self.db_name, db_details) + self.validate_database_creation(self.db_name) + yield self.db_name + finally: + print(f"\n--> [Fixture composability_db] Tearing down database: {self.db_name}") + self.query(f"DROP DATABASE IF EXISTS {self.db_name};") + + @pytest.fixture + def use_binary(self, request): + self.use_binary = request.param + + @pytest.mark.usefixtures("composability_db") + def test_common_table_expression_with(self, use_binary): + """ + Tests a query using a WITH clause (CTE). + tests that you can define a temporary result set (cte) and then query that result set. + """ + query = f""" + WITH cte AS ( + SELECT * FROM {self.db_name}.home_rentals WHERE number_of_rooms = 2 + ) + SELECT * FROM cte LIMIT 5; + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] == 2 for row in result) + + @pytest.mark.usefixtures("composability_db") + def test_union_operator(self, use_binary): + """Tests a query using the UNION set operator. + It tests that you can combine the results from two separate queries into one list. + """ + query = f""" + (SELECT sqft, location, number_of_rooms FROM {self.db_name}.home_rentals WHERE number_of_rooms = 1 LIMIT 5) + UNION + (SELECT sqft, location, number_of_rooms FROM {self.db_name}.home_rentals WHERE number_of_rooms = 2 LIMIT 5); + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] in (1, 2) for row in result) + + @pytest.mark.usefixtures("composability_db") + def test_subquery_with_join_and_cte(self, use_binary): + """ + Tests a subquery rewrite for the unsupported 'WHERE IN (SELECT...)' syntax. + This tests CTE, UNION, and JOIN composability. + """ + query = f""" + WITH allowed_rooms AS ( + SELECT 1 as room_num + UNION + SELECT 3 as room_num + ) + SELECT t1.* + FROM {self.db_name}.home_rentals AS t1 + JOIN allowed_rooms AS t2 ON t1.number_of_rooms = t2.room_num + LIMIT 10; + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] in (1, 3) for row in result) + + @pytest.mark.usefixtures("composability_db") + def test_from_subquery(self, use_binary): + """Tests a subquery in the FROM clause. + It tests that you can run a query inside the FROM clause and use its results + as the source table for an outer query. + """ + query = f""" + SELECT * FROM ( + SELECT * FROM {self.db_name}.home_rentals WHERE number_of_rooms = 2 + ) as sub_table + LIMIT 5; + """ + result = self.query(query) + assert len(result) > 0 + assert all(row["number_of_rooms"] == 2 for row in result) From 1c7eb200ee037f7a63df11f6658d5cf0ead92164 Mon Sep 17 00:00:00 2001 From: "April I. Murphy" <36110273+aimurphy@users.noreply.github.com> Date: Wed, 12 Nov 2025 06:18:24 -0800 Subject: [PATCH 2/7] Update datastax.mdx (#11873) --- .../data-integrations/datastax.mdx | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/docs/integrations/data-integrations/datastax.mdx b/docs/integrations/data-integrations/datastax.mdx index 4d3370ce586..a0e99915c17 100644 --- a/docs/integrations/data-integrations/datastax.mdx +++ b/docs/integrations/data-integrations/datastax.mdx @@ -5,7 +5,7 @@ sidebarTitle: DataStax This is the implementation of the DataStax data handler for MindsDB. -[DataStax](https://www.datastax.com/) Astra DB is a cloud database-as-a-service based on Apache Cassandra. DataStax also offers DataStax Enterprise (DSE), an on-premises database built on Apache Cassandra, and Astra Streaming, a messaging and event streaming cloud service based on Apache Pulsar. +https://docs.datastax.com/en/astra-db-serverless/index.html[DataStax Astra DB] is a cloud database-as-a-service based on Apache Cassandra. DataStax also offers on-premises solutions, DataStax Enterprise (DSE) and Hyper-Converged Database (HCD), as well as Astra Streaming, a messaging and event streaming cloud service based on Apache Pulsar. ## Prerequisites @@ -13,7 +13,7 @@ Before proceeding, ensure the following prerequisites are met: 1. Install MindsDB locally via [Docker](/setup/self-hosted/docker) or [Docker Desktop](/setup/self-hosted/docker-desktop). 2. To connect DataStax to MindsDB, install the required dependencies following [this instruction](/setup/self-hosted/docker#install-dependencies). -3. Install or ensure access to DataStax. +3. Create an [Astra DB database](https://docs.datastax.com/en/astra-db-serverless/databases/create-database.html). ## Implementation @@ -21,9 +21,9 @@ DataStax Astra DB is API-compatible with Apache Cassandra and ScyllaDB. Therefor The required arguments to establish a connection are as follows: -* `user` is the user to authenticate. -* `password` is the password to authenticate the user. -* `secure_connect_bundle` is the path to the `secure_connect_bundle` zip file. +* `user`: The literal string `token` +* `password`: An [Astra application token](https://docs.datastax.com/en/astra-db-serverless/administration/manage-application-tokens.html) +* `secure_connect_bundle`: The path to your database's [Secure Connect Bundle](https://docs.datastax.com/en/astra-db-serverless/databases/secure-connect-bundle.html) zip file ## Usage @@ -34,21 +34,20 @@ CREATE DATABASE astra_connection WITH engine = "astra", parameters = { - "user": "user", - "password": "pass", + "user": "token", + "password": "application_token", "secure_connect_bundle": "/home/Downloads/file.zip" }; ``` - or, reference the bundle from Datastax s3 as: ```sql CREATE DATABASE astra_connection WITH ENGINE = "astra", PARAMETERS = { - "user": "user", - "password": "pass", + "user": "token", + "password": "application_token", "secure_connect_bundle": "https://datastax-cluster-config-prod.s3.us-east-2.amazonaws.com/32312-b9eb-4e09-a641-213eaesa12-1/secure-connect-demo.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AK..." } ``` From dba3869e979867d42a29f68c52f6effa2dd597d2 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa <49385643+MinuraPunchihewa@users.noreply.github.com> Date: Wed, 12 Nov 2025 20:02:43 +0530 Subject: [PATCH 3/7] Fixed MSSQL Query Execution Result Conversion (#11876) --- .../handlers/mssql_handler/mssql_handler.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py b/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py index 6230363d3f0..6cb4dacb49e 100644 --- a/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py +++ b/mindsdb/integrations/handlers/mssql_handler/mssql_handler.py @@ -78,15 +78,11 @@ def _make_table_response( if not result: data_frame = pd.DataFrame(columns=columns) elif use_odbc: - # For pyodbc with large datasets, convert Row objects efficiently - # Using iterator with pd.DataFrame avoids intermediate list creation - try: - data_frame = pd.DataFrame(result, columns=columns) - except (ValueError, TypeError): - # Fallback: convert Row objects to tuples - data_frame = pd.DataFrame.from_records((tuple(row) for row in result), columns=columns) + # from_records() understands tuple-like records (including pyodbc.Row) + data_frame = pd.DataFrame.from_records(result, columns=columns) else: - data_frame = pd.DataFrame(result, columns=columns) + # pymssql with as_dict=True returns list of dicts + data_frame = pd.DataFrame(result) for column in description: column_name = column[0] From 01a6b37ad685796cd3d944bb4a7f54a45a82bcb0 Mon Sep 17 00:00:00 2001 From: Konstantin Sivakov Date: Wed, 12 Nov 2025 15:34:40 +0100 Subject: [PATCH 4/7] Update documentation for ENV Variables (#11667) Co-authored-by: martyna-mindsdb <109554435+martyna-mindsdb@users.noreply.github.com> --- docs/setup/environment-vars.mdx | 193 +++++++++++++++++++++++++++++--- 1 file changed, 175 insertions(+), 18 deletions(-) diff --git a/docs/setup/environment-vars.mdx b/docs/setup/environment-vars.mdx index 2891f045622..f1495a0fa44 100644 --- a/docs/setup/environment-vars.mdx +++ b/docs/setup/environment-vars.mdx @@ -3,8 +3,9 @@ title: Environment Variables sidebarTitle: Environment Variables --- -Most of the MindsDB functionality can be modified by extending the default configuration, but some of the configuration options +Most of the MindsDB functionality can be modified by extending the default configuration, but some of the configuration options can be added as environment variables on the server where MindsDB is deployed. +[Here is the list](/setup/full-list-environment-vars) of all the available environment variables. ## MindsDB Authentication @@ -21,6 +22,7 @@ MindsDB does not require authentication by default. If you want to enable authen export MINDSDB_USERNAME='mindsdb_user' export MINDSDB_PASSWORD='mindsdb_password' ``` + ## MindsDB Configuration File @@ -37,18 +39,19 @@ In order to start MindsDB with a [custom configuration file](/setup/custom-confi ```bash Shell export MINDSDB_CONFIG_PATH=/Users/username/path/config.json ``` + -## MindsDB Storage +## MindsDB Storage -By default, MindsDB stores the configuration files by determining appropriate platform-specific directories, e.g. a "user data dir": +By default, MindsDB stores the configuration files by determining appropriate platform-specific directories, e.g. a "user data dir": -* On Linux `~/.local/share/mindsdb/var` -* On MacOS `~/Library/Application Support/mindsdb/var` -* On Windows `C:\Documents and Settings\\Application Data\Local Settings\\mindsdb\var` +- On Linux `~/.local/share/mindsdb/var` +- On MacOS `~/Library/Application Support/mindsdb/var` +- On Windows `C:\Documents and Settings\\Application Data\Local Settings\\mindsdb\var` -In the `MINDSDB_STORAGE_DIR` location, MindsDB stores users' data, models and uploaded data files, the static assets for the frontend application and the -`sqlite.db` file. +In the `MINDSDB_STORAGE_DIR` location, MindsDB stores users' data, models and uploaded data files, the static assets for the frontend application and the +`sqlite.db` file. You can change the default storage location using `MINDSDB_STORAGE_DIR` variable. ### Example @@ -61,26 +64,31 @@ You can change the default storage location using `MINDSDB_STORAGE_DIR` variable ```bash Shell export MINDSDB_STORAGE_DIR='~/home/mindsdb/var' ``` + -## MindsDB Configuration Storage +## MindsDB Configuration Storage -MindsDB uses `sqlite` database by default to store the required configuration as models, projects, files metadata etc. -The full list of the above schemas can be found [here](https://github.com/mindsdb/mindsdb/blob/main/mindsdb/interfaces/storage/db.py#L69). You can change the +MindsDB uses `sqlite` database by default to store the required configuration as models, projects, files metadata etc. +The full list of the above schemas can be found [here](https://github.com/mindsdb/mindsdb/blob/main/mindsdb/interfaces/storage/db.py#L69). You can change the default storage option and use different database by adding the new connection string using `MINDSDB_DB_CON` variable. ### Example - ```bash Docker - docker run --name mindsdb_container -e MINDSDB_DB_CON='postgresql://user:secret@localhost' -e MINDSDB_APIS=http,mysql -p 47334:47334 -p 47335:47335 mindsdb/mindsdb - ``` - - ```bash Shell - export MINDSDB_DB_CON='postgresql://user:secret@localhost' - ``` + ```bash Docker docker run --name mindsdb_container -e + MINDSDB_DB_CON='postgresql://user:secret@localhost' -e MINDSDB_APIS=http,mysql + -p 47334:47334 -p 47335:47335 mindsdb/mindsdb ``` ```bash Shell export + MINDSDB_DB_CON='postgresql://user:secret@localhost' ``` +#### `MINDSDB_STORAGE_BACKUP_DISABLED` + +- **Type:** Boolean (`1`, `true`, `True`) +- **Description:** Disables permanent storage backup +- **Default:** `false` +- **Example:** `MINDSDB_STORAGE_BACKUP_DISABLED=1` + ## MindsDB APIs The `MINDSDB_APIS` environment variable lets users define which APIs to start. Learn more about the [available APIs here](/setup/mindsdb-apis). @@ -95,6 +103,7 @@ The `MINDSDB_APIS` environment variable lets users define which APIs to start. L ```bash Shell export MINDSDB_APIS='http,mysql' ``` + ## MindsDB Logs @@ -111,8 +120,23 @@ This environment variable defines the level of logging generated by MindsDB. You ```bash Shell export MINDSDB_LOG_LEVEL='DEBUG' ``` + +#### `MINDSDB_CONSOLE_LOG_LEVEL` + +- **Type:** String (`DEBUG`, `INFO`, `WARNING`, `ERROR`) +- **Description:** Sets console log level +- **Default:** `INFO` +- **Example:** `MINDSDB_CONSOLE_LOG_LEVEL=DEBUG` + +#### `MINDSDB_FILE_LOG_LEVEL` + +- **Type:** String (`DEBUG`, `INFO`, `WARNING`, `ERROR`) +- **Description:** Sets file log level and enables file logging +- **Default:** `INFO` (disabled by default) +- **Example:** `MINDSDB_FILE_LOG_LEVEL=DEBUG` + ## MindsDB Default Project By default, MindsDB creates a project named `mindsdb` where all the models and other objects are stored. You can change the default project name by setting the `MINDSDB_DEFAULT_PROJECT` environment variable. @@ -129,8 +153,30 @@ If this environment variable is set or modified after MindsDB has started, the d ```bash Shell export MINDSDB_DEFAULT_PROJECT='my_project' ``` + +#### `MINDSDB_DEFAULT_LLM_API_KEY` + +- **Type:** String +- **Description:** API key for default LLM (Large Language Model) +- **Default:** None +- **Example:** `MINDSDB_DEFAULT_LLM_API_KEY=sk-...` + +#### `MINDSDB_DEFAULT_EMBEDDING_MODEL_API_KEY` + +- **Type:** String +- **Description:** API key for default embedding model +- **Default:** None +- **Example:** `MINDSDB_DEFAULT_EMBEDDING_MODEL_API_KEY=sk-...` + +#### `MINDSDB_DEFAULT_RERANKING_MODEL_API_KEY` + +- **Type:** String +- **Description:** API key for default reranking model +- **Default:** None +- **Example:** `MINDSDB_DEFAULT_RERANKING_MODEL_API_KEY=sk-...` + ## MindsDB's PID File When running MindsDB via [Docker](/setup/self-hosted/docker) or [Docker Extension](/setup/self-hosted/docker-desktop), the PID file is not used by default. Users can opt for enabling the PID file by defining the `USE_PIDFILE` environment variable. @@ -147,6 +193,7 @@ If used, the PID file is stored in the temp directory (`$TMPDIR` on MacOS and Li ```bash Shell export USE_PIDFILE=1 ``` + ## MindsDB GUI Updates @@ -165,6 +212,7 @@ By default, the automatic GUI updates are enabled and the `MINDSDB_GUI_AUTOUPDAT ```bash Shell export MINDSDB_GUI_AUTOUPDATE=false ``` + ## MindsDB GUI Startup and Updates @@ -185,4 +233,113 @@ Note that the `MINDSDB_NO_STUDIO` is not recommended for the MindsDB instance ru ```bash Shell export MINDSDB_NO_STUDIO=true ``` + + +### ML Task Queue + +#### `MINDSDB_ML_QUEUE_TYPE` + +- **Type:** String (`local`, `redis`) +- **Description:** Type of ML task queue to use +- **Default:** `local` +- **Example:** `MINDSDB_ML_QUEUE_TYPE=redis` + +#### `MINDSDB_ML_QUEUE_HOST` + +- **Type:** String (hostname) +- **Description:** Redis host for ML task queue (only when `MINDSDB_ML_QUEUE_TYPE=redis`) +- **Default:** `localhost` +- **Example:** `MINDSDB_ML_QUEUE_HOST=redis.example.com` + +#### `MINDSDB_ML_QUEUE_PORT` + +- **Type:** Integer +- **Description:** Redis port for ML task queue +- **Default:** `6379` +- **Example:** `MINDSDB_ML_QUEUE_PORT=6380` + +#### `MINDSDB_ML_QUEUE_DB` + +- **Type:** Integer +- **Description:** Redis database number for ML task queue +- **Default:** `0` +- **Example:** `MINDSDB_ML_QUEUE_DB=1` + +#### `MINDSDB_ML_QUEUE_USERNAME` + +- **Type:** String +- **Description:** Redis username for ML task queue +- **Default:** None +- **Example:** `MINDSDB_ML_QUEUE_USERNAME=redis_user` + +#### `MINDSDB_ML_QUEUE_PASSWORD` + +- **Type:** String +- **Description:** Redis password for ML task queue +- **Default:** None +- **Example:** `MINDSDB_ML_QUEUE_PASSWORD=redis_pass` + +## Reranker Configuration + +#### `MINDSDB_RERANKER_N` + +- **Type:** Integer +- **Description:** Number of results to rerank +- **Default:** None +- **Example:** `MINDSDB_RERANKER_N=10` + +#### `MINDSDB_RERANKER_LOGPROBS` + +- **Type:** Boolean (`true`, `false`, `1`, `0`, `yes`, `no`) +- **Description:** Enable log probabilities in reranker +- **Default:** None +- **Example:** `MINDSDB_RERANKER_LOGPROBS=true` + +#### `MINDSDB_RERANKER_TOP_LOGPROBS` + +- **Type:** Integer +- **Description:** Number of top log probabilities to return +- **Default:** None +- **Example:** `MINDSDB_RERANKER_TOP_LOGPROBS=5` + +#### `MINDSDB_RERANKER_MAX_TOKENS` + +- **Type:** Integer +- **Description:** Maximum tokens for reranker +- **Default:** None +- **Example:** `MINDSDB_RERANKER_MAX_TOKENS=512` + +#### `MINDSDB_RERANKER_VALID_CLASS_TOKENS` + +- **Type:** String (comma-separated list) +- **Description:** Valid class tokens for reranker +- **Default:** None +- **Example:** `MINDSDB_RERANKER_VALID_CLASS_TOKENS=token1,token2,token3` + +## Features + +#### `MINDSDB_DATA_CATALOG_ENABLED` + +- **Type:** Boolean (`1`, `true`) +- **Description:** Enables the data catalog feature +- **Default:** `false` +- **Example:** `MINDSDB_DATA_CATALOG_ENABLED=1` + +## Runtime + +#### `MINDSDB_DOCKER_ENV` + +- **Type:** Any value (presence check) +- **Description:** Indicates MindsDB is running in Docker environment (changes default API host to `0.0.0.0`) +- **Default:** Not set +- **Example:** `MINDSDB_DOCKER_ENV=1` + +#### `MINDSDB_RUNTIME` + +- **Type:** String (`1`) +- **Description:** Indicates MindsDB runtime environment +- **Default:** Not set +- **Example:** `MINDSDB_RUNTIME=1` + +--- From a4bc712aa3819c95f15750d3a4e8334ca2888e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Tob=C3=B3n=20Hern=C3=A1ndez?= Date: Wed, 12 Nov 2025 10:50:14 -0500 Subject: [PATCH 5/7] [BUG] - Hubspot Handler - CRUD operations (#11874) --- .../handlers/hubspot_handler/README.md | 63 +-- .../hubspot_handler/hubspot_tables.py | 505 ++++++++++++++---- 2 files changed, 430 insertions(+), 138 deletions(-) diff --git a/mindsdb/integrations/handlers/hubspot_handler/README.md b/mindsdb/integrations/handlers/hubspot_handler/README.md index 367ed859e76..520e2a29b77 100644 --- a/mindsdb/integrations/handlers/hubspot_handler/README.md +++ b/mindsdb/integrations/handlers/hubspot_handler/README.md @@ -7,6 +7,7 @@ HubSpot handler for MindsDB provides interfaces to connect to HubSpot via APIs a ## Table of Contents - [HubSpot Handler](#hubspot-handler) + - [Table of Contents](#table-of-contents) - [About HubSpot](#about-hubspot) - [Installation](#installation) - [Authentication](#authentication) @@ -96,6 +97,11 @@ The handler provides comprehensive data catalog capabilities: | `contacts` | Individual contact records | id, email, firstname, lastname | SELECT, INSERT, UPDATE, DELETE | | `deals` | Sales opportunity records | id, dealname, amount, stage | SELECT, INSERT, UPDATE, DELETE | +**Important Notes on Field Values:** +- **Industry codes**: HubSpot uses predefined industry values (e.g., `COMPUTER_SOFTWARE`, `BIOTECHNOLOGY`, `FINANCIAL_SERVICES`). See [HubSpot's industry list](https://knowledge.hubspot.com/properties/hubspots-default-company-properties#industry) for all valid options. +- **Deal stages**: Each HubSpot account has custom pipeline stages. Use the stage IDs from your account (e.g., `presentationscheduled`, `closedwon`, `closedlost`, or numeric IDs like `110382973`). +- **Email validation**: Contact email addresses must be valid email formats (e.g., `user@example.com`). + ## Example Usage ### Basic Connection @@ -126,13 +132,6 @@ PARAMETERS = { SHOW TABLES FROM hubspot_datasource; ``` -**Get Table Schema:** -```sql -DESCRIBE hubspot_datasource.companies; -DESCRIBE hubspot_datasource.contacts; -DESCRIBE hubspot_datasource.deals; -``` - **Get Detailed Column Information:** ```sql SELECT * FROM information_schema.columns @@ -154,36 +153,6 @@ SELECT * FROM hubspot_datasource.contacts LIMIT 10; SELECT * FROM hubspot_datasource.deals LIMIT 10; ``` -**Advanced Filtering and Analytics:** -```sql --- Companies by industry and location -SELECT name, industry, city, state -FROM hubspot_datasource.companies -WHERE industry IN ('Technology', 'Healthcare') - AND city = 'San Francisco' -ORDER BY name; - --- Contact engagement analysis -SELECT - company, - COUNT(*) as contact_count, - STRING_AGG(email, ', ') as emails -FROM hubspot_datasource.contacts -WHERE company IS NOT NULL -GROUP BY company -ORDER BY contact_count DESC; - --- Sales pipeline analysis -SELECT - dealstage, - COUNT(*) as deal_count, - SUM(CAST(amount AS DECIMAL)) as total_value, - AVG(CAST(amount AS DECIMAL)) as avg_deal_size -FROM hubspot_datasource.deals -WHERE amount IS NOT NULL -GROUP BY dealstage -ORDER BY total_value DESC; -``` ### Data Manipulation @@ -191,22 +160,22 @@ ORDER BY total_value DESC; ```sql -- Create new company INSERT INTO hubspot_datasource.companies (name, domain, industry, city, state) -VALUES ('Acme Corp', 'acme.com', 'Technology', 'New York', 'NY'); +VALUES ('Acme Corp', 'acme.com', 'COMPUTER_SOFTWARE', 'New York', 'NY'); -- Create new contact -INSERT INTO hubspot_datasource.contacts (email, firstname, lastname, company, phone) -VALUES ('john.doe@acme.com', 'John', 'Doe', 'Acme Corp', '+1-555-0123'); +INSERT INTO hubspot_datasource.contacts (email, firstname, phone) +VALUES ('john.doe@example.com', 'John', '+1234567890'); -- Create new deal -INSERT INTO hubspot_datasource.deals (dealname, amount, pipeline, dealstage) -VALUES ('Acme Software License', '50000', 'sales', 'qualified-to-buy'); +INSERT INTO hubspot_datasource.deals (dealname, amount, dealstage, pipeline) +VALUES ('New Deal', 5000, 'presentationscheduled', 'default'); ``` **Updating Records:** ```sql -- Update company information UPDATE hubspot_datasource.companies -SET industry = 'SaaS', city = 'Austin' +SET industry = 'COMPUTER_SOFTWARE', city = 'Austin' WHERE name = 'Acme Corp'; -- Update contact details @@ -216,18 +185,18 @@ WHERE email = 'john.doe@acme.com'; -- Move deal through pipeline UPDATE hubspot_datasource.deals -SET dealstage = 'proposal-made', amount = '75000' -WHERE dealname = 'Acme Software License'; +SET dealstage = '110382973', amount = '75000' +WHERE dealname = 'New Deal'; ``` **Deleting Records:** ```sql -- Archive old deals DELETE FROM hubspot_datasource.deals -WHERE dealstage = 'closed-lost' +WHERE dealstage = 'closedlost' AND createdate < '2023-01-01'; -- Remove test contacts DELETE FROM hubspot_datasource.contacts -WHERE email LIKE '%test%' OR email LIKE '%example%'; +WHERE email = 'email'; ``` diff --git a/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py b/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py index 70053205c00..508df26c496 100644 --- a/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py +++ b/mindsdb/integrations/handlers/hubspot_handler/hubspot_tables.py @@ -1,10 +1,14 @@ -from typing import List, Dict, Text, Any +from typing import List, Dict, Text, Any, Optional import pandas as pd +from hubspot import HubSpot from hubspot.crm.objects import ( SimplePublicObjectId as HubSpotObjectId, SimplePublicObjectBatchInput as HubSpotObjectBatchInput, SimplePublicObjectInputForCreate as HubSpotObjectInputCreate, + BatchInputSimplePublicObjectBatchInputForCreate, + BatchInputSimplePublicObjectBatchInput, + BatchInputSimplePublicObjectId, ) from mindsdb_sql_parser import ast @@ -52,6 +56,9 @@ def select(self, query: ast.Select) -> pd.DataFrame: selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() companies_df = pd.json_normalize(self.get_companies(limit=result_limit)) + if companies_df.empty: + companies_df = pd.DataFrame(columns=self._get_default_company_columns()) + select_statement_executor = SELECTQueryExecutor( companies_df, selected_columns, where_conditions, order_by_conditions ) @@ -107,11 +114,23 @@ def update(self, query: ast.Update) -> None: update_statement_parser = UPDATEQueryParser(query) values_to_update, where_conditions = update_statement_parser.parse_query() - companies_df = pd.json_normalize(self.get_companies()) + companies_df = pd.json_normalize(self.get_companies(limit=1000)) + + if companies_df.empty: + raise ValueError( + "No companies retrieved from HubSpot to evaluate update conditions. Verify your connection and permissions." + ) + update_query_executor = UPDATEQueryExecutor(companies_df, where_conditions) + filtered_df = update_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No companies found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - companies_df = update_query_executor.execute_query() - company_ids = companies_df["id"].tolist() + company_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Updating {len(company_ids)} compan(ies) matching WHERE conditions") self.update_companies(company_ids, values_to_update) def delete(self, query: ast.Delete) -> None: @@ -135,46 +154,110 @@ def delete(self, query: ast.Delete) -> None: delete_statement_parser = DELETEQueryParser(query) where_conditions = delete_statement_parser.parse_query() - companies_df = pd.json_normalize(self.get_companies()) + companies_df = pd.json_normalize(self.get_companies(limit=1000)) + + if companies_df.empty: + raise ValueError( + "No companies retrieved from HubSpot to evaluate delete conditions. Verify your connection and permissions." + ) + delete_query_executor = DELETEQueryExecutor(companies_df, where_conditions) + filtered_df = delete_query_executor.execute_query() - companies_df = delete_query_executor.execute_query() - company_ids = companies_df["id"].tolist() + if filtered_df.empty: + raise ValueError( + f"No companies found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) + + company_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Deleting {len(company_ids)} compan(ies) matching WHERE conditions") self.delete_companies(company_ids) def get_columns(self) -> List[Text]: - return pd.json_normalize(self.get_companies(limit=1)).columns.tolist() + return self._get_default_company_columns() + + @staticmethod + def _get_default_company_columns() -> List[str]: + return [ + "id", + "name", + "city", + "phone", + "state", + "domain", + "industry", + "createdate", + "lastmodifieddate", + ] - def get_companies(self, **kwargs) -> List[Dict]: + def get_companies(self, limit: int | None = None, **kwargs) -> List[Dict]: hubspot = self.handler.connect() - companies = hubspot.crm.companies.get_all(**kwargs) - companies_dict = [ - { - "id": company.id, - "name": company.properties.get("name", None), - "city": company.properties.get("city", None), - "phone": company.properties.get("phone", None), - "state": company.properties.get("state", None), - "domain": company.properties.get("company", None), - "industry": company.properties.get("industry", None), - "createdate": company.properties["createdate"], - "lastmodifieddate": company.properties["hs_lastmodifieddate"], - } - for company in companies + + requested_properties = kwargs.get("properties", []) + default_properties = [ + "name", + "domain", + "industry", + "city", + "state", + "phone", + "createdate", + "hs_lastmodifieddate", ] + + properties = list({*default_properties, *requested_properties}) + + api_kwargs = {**kwargs, "properties": properties} + if limit is not None: + api_kwargs["limit"] = limit + + companies = hubspot.crm.companies.get_all(**api_kwargs) + companies_dict = [] + + for company in companies: + try: + company_dict = { + "id": company.id, + "name": company.properties.get("name"), + "city": company.properties.get("city"), + "phone": company.properties.get("phone"), + "state": company.properties.get("state"), + "domain": company.properties.get("domain"), + "industry": company.properties.get("industry"), + "createdate": company.properties.get("createdate"), + "lastmodifieddate": company.properties.get("hs_lastmodifieddate"), + } + companies_dict.append(company_dict) + except Exception as e: + logger.warning(f"Error processing company {getattr(company, 'id', 'unknown')}: {str(e)}") + continue + + logger.info(f"Retrieved {len(companies_dict)} companies from HubSpot") return companies_dict def create_companies(self, companies_data: List[Dict[Text, Any]]) -> None: + if not companies_data: + raise ValueError("No company data provided for creation") + + logger.info(f"Attempting to create {len(companies_data)} compan(ies): {companies_data}") + hubspot = self.handler.connect() companies_to_create = [HubSpotObjectInputCreate(properties=company) for company in companies_data] + batch_input = BatchInputSimplePublicObjectBatchInputForCreate(inputs=companies_to_create) + try: created_companies = hubspot.crm.companies.batch_api.create( - inputs=companies_to_create, - ) - logger.info( - f"Companies created with ID's {[created_company.id for created_company in created_companies.results]}" + batch_input_simple_public_object_batch_input_for_create=batch_input ) + + if not created_companies or not hasattr(created_companies, "results") or not created_companies.results: + raise Exception("Company creation returned no results") + + created_ids = [created_company.id for created_company in created_companies.results] + logger.info(f"Successfully created {len(created_ids)} compan(ies) with IDs: {created_ids}") + except Exception as e: + logger.error(f"Companies creation failed: {str(e)}") raise Exception(f"Companies creation failed {e}") def update_companies(self, company_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: @@ -182,9 +265,10 @@ def update_companies(self, company_ids: List[Text], values_to_update: Dict[Text, companies_to_update = [ HubSpotObjectBatchInput(id=company_id, properties=values_to_update) for company_id in company_ids ] + batch_input = BatchInputSimplePublicObjectBatchInput(inputs=companies_to_update) try: updated_companies = hubspot.crm.companies.batch_api.update( - inputs=companies_to_update, + batch_input_simple_public_object_batch_input=batch_input ) logger.info( f"Companies with ID {[updated_company.id for updated_company in updated_companies.results]} updated" @@ -195,10 +279,9 @@ def update_companies(self, company_ids: List[Text], values_to_update: Dict[Text, def delete_companies(self, company_ids: List[Text]) -> None: hubspot = self.handler.connect() companies_to_delete = [HubSpotObjectId(id=company_id) for company_id in company_ids] + batch_input = BatchInputSimplePublicObjectId(inputs=companies_to_delete) try: - hubspot.crm.companies.batch_api.archive( - inputs=companies_to_delete, - ) + hubspot.crm.companies.batch_api.archive(batch_input_simple_public_object_id=batch_input) logger.info("Companies deleted") except Exception as e: raise Exception(f"Companies deletion failed {e}") @@ -231,7 +314,10 @@ def select(self, query: ast.Select) -> pd.DataFrame: select_statement_parser = SELECTQueryParser(query, "contacts", self.get_columns()) selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() - contacts_df = pd.json_normalize(self.get_contacts(limit=result_limit)) + contacts_df = pd.json_normalize(self.get_contacts(limit=result_limit, where_conditions=where_conditions)) + if contacts_df.empty: + contacts_df = pd.DataFrame(columns=self._get_default_contact_columns()) + select_statement_executor = SELECTQueryExecutor( contacts_df, selected_columns, where_conditions, order_by_conditions ) @@ -259,7 +345,7 @@ def insert(self, query: ast.Insert) -> None: """ insert_statement_parser = INSERTQueryParser( query, - supported_columns=["email", "firstname", "firstname", "phone", "company", "website"], + supported_columns=["email", "firstname", "lastname", "phone", "company", "website"], mandatory_columns=["email"], all_mandatory=False, ) @@ -287,11 +373,23 @@ def update(self, query: ast.Update) -> None: update_statement_parser = UPDATEQueryParser(query) values_to_update, where_conditions = update_statement_parser.parse_query() - contacts_df = pd.json_normalize(self.get_contacts()) + contacts_df = pd.json_normalize(self.get_contacts(limit=1000, where_conditions=where_conditions)) + + if contacts_df.empty: + raise ValueError( + "No contacts retrieved from HubSpot to evaluate update conditions. Verify your connection and permissions." + ) + update_query_executor = UPDATEQueryExecutor(contacts_df, where_conditions) + filtered_df = update_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No contacts found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - contacts_df = update_query_executor.execute_query() - contact_ids = contacts_df["id"].tolist() + contact_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Updating {len(contact_ids)} contact(s) matching WHERE conditions") self.update_contacts(contact_ids, values_to_update) def delete(self, query: ast.Delete) -> None: @@ -315,44 +413,206 @@ def delete(self, query: ast.Delete) -> None: delete_statement_parser = DELETEQueryParser(query) where_conditions = delete_statement_parser.parse_query() - contacts_df = pd.json_normalize(self.get_contacts()) + contacts_df = pd.json_normalize(self.get_contacts(limit=1000, where_conditions=where_conditions)) + + if contacts_df.empty: + raise ValueError( + "No contacts retrieved from HubSpot to evaluate delete conditions. Verify your connection and permissions." + ) + delete_query_executor = DELETEQueryExecutor(contacts_df, where_conditions) + filtered_df = delete_query_executor.execute_query() - contacts_df = delete_query_executor.execute_query() - contact_ids = contacts_df["id"].tolist() + if filtered_df.empty: + raise ValueError( + f"No contacts found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) + + contact_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Deleting {len(contact_ids)} contact(s) matching WHERE conditions") self.delete_contacts(contact_ids) def get_columns(self) -> List[Text]: - return pd.json_normalize(self.get_contacts(limit=1)).columns.tolist() + return self._get_default_contact_columns() + + @staticmethod + def _get_default_contact_columns() -> List[str]: + return [ + "id", + "email", + "firstname", + "lastname", + "phone", + "company", + "website", + "createdate", + "lastmodifieddate", + ] - def get_contacts(self, **kwargs) -> List[Dict]: + def get_contacts( + self, + limit: Optional[int] = None, + where_conditions: Optional[List] = None, + **kwargs, + ) -> List[Dict]: hubspot = self.handler.connect() - contacts = hubspot.crm.contacts.get_all(**kwargs) - contacts_dict = [ - { - "id": contact.id, - "email": contact.properties["email"], - "firstname": contact.properties.get("firstname", None), - "lastname": contact.properties.get("lastname", None), - "phone": contact.properties.get("phone", None), - "company": contact.properties.get("company", None), - "website": contact.properties.get("website", None), - "createdate": contact.properties["createdate"], - "lastmodifieddate": contact.properties["lastmodifieddate"], - } - for contact in contacts + requested_properties = kwargs.get("properties", []) + default_properties = [ + "email", + "firstname", + "lastname", + "phone", + "company", + "website", + "createdate", + "lastmodifieddate", ] + + properties = list({*default_properties, *requested_properties}) + + api_kwargs = {**kwargs, "properties": properties} + if limit is not None: + api_kwargs["limit"] = limit + else: + api_kwargs.pop("limit", None) + + # Try using HubSpot search API if we have simple equality filters + if where_conditions: + search_results = self._search_contacts_by_conditions(hubspot, where_conditions, properties, limit) + if search_results is not None: + logger.info(f"Retrieved {len(search_results)} contacts from HubSpot via search API") + return search_results + + contacts = hubspot.crm.contacts.get_all(**api_kwargs) + contacts_dict = [] + + try: + for contact in contacts: + contacts_dict.append(self._contact_to_dict(contact)) + if limit is not None and len(contacts_dict) >= limit: + break + except Exception as e: + logger.error(f"Failed to iterate HubSpot contacts: {str(e)}") + raise + + logger.info(f"Retrieved {len(contacts_dict)} contacts from HubSpot") return contacts_dict + def _search_contacts_by_conditions( + self, + hubspot: HubSpot, + where_conditions: List, + properties: List[str], + limit: Optional[int], + ) -> Optional[List[Dict[str, Any]]]: + filters = [] + + for condition in where_conditions: + if not isinstance(condition, (list, tuple)) or len(condition) < 3: + continue + + operator, column, value = condition[0], condition[1], condition[2] + if operator != "=" or column not in {"email", "id"}: + continue + + property_name = "hs_object_id" if column == "id" else column + filters.append( + { + "propertyName": property_name, + "operator": "EQ", + "value": str(value), + } + ) + + if not filters: + return None + + collected: List[Dict[str, Any]] = [] + remaining = limit if limit is not None else float("inf") + after = None + + while remaining > 0: + page_limit = min(int(remaining) if remaining != float("inf") else 100, 100) + search_request = { + "properties": properties, + "limit": page_limit, + "filterGroups": [{"filters": filters}], + } + + if after is not None: + search_request["after"] = after + + response = hubspot.crm.contacts.search_api.do_search(public_object_search_request=search_request) + + results = getattr(response, "results", []) + for contact in results: + collected.append(self._contact_to_dict(contact)) + if limit is not None and len(collected) >= limit: + return collected + + paging = getattr(response, "paging", None) + next_page = getattr(paging, "next", None) if paging else None + after = getattr(next_page, "after", None) if next_page else None + + if after is None or (limit is not None and len(collected) >= limit): + break + + if remaining != float("inf"): + remaining = limit - len(collected) + + return collected + + def _contact_to_dict(self, contact: Any) -> Dict[str, Any]: + try: + properties = getattr(contact, "properties", {}) or {} + return { + "id": contact.id, + "email": properties.get("email"), + "firstname": properties.get("firstname"), + "lastname": properties.get("lastname"), + "phone": properties.get("phone"), + "company": properties.get("company"), + "website": properties.get("website"), + "createdate": properties.get("createdate"), + "lastmodifieddate": properties.get("lastmodifieddate"), + } + except Exception as e: + logger.warning(f"Error processing contact {getattr(contact, 'id', 'unknown')}: {str(e)}") + return { + "id": getattr(contact, "id", None), + "email": None, + "firstname": None, + "lastname": None, + "phone": None, + "company": None, + "website": None, + "createdate": None, + "lastmodifieddate": None, + } + def create_contacts(self, contacts_data: List[Dict[Text, Any]]) -> None: + if not contacts_data: + raise ValueError("No contact data provided for creation") + + logger.info(f"Attempting to create {len(contacts_data)} contact(s): {contacts_data}") + hubspot = self.handler.connect() contacts_to_create = [HubSpotObjectInputCreate(properties=contact) for contact in contacts_data] + batch_input = BatchInputSimplePublicObjectBatchInputForCreate(inputs=contacts_to_create) + try: - created_contacts = hubspot.crm.contacts.batch_api.create(inputs=contacts_to_create) - logger.info( - f"Contacts created with ID {[created_contact.id for created_contact in created_contacts.results]}" + created_contacts = hubspot.crm.contacts.batch_api.create( + batch_input_simple_public_object_batch_input_for_create=batch_input ) + + if not created_contacts or not hasattr(created_contacts, "results") or not created_contacts.results: + raise Exception("Contact creation returned no results") + + created_ids = [created_contact.id for created_contact in created_contacts.results] + logger.info(f"Successfully created {len(created_ids)} contact(s) with IDs: {created_ids}") + except Exception as e: + logger.error(f"Contacts creation failed: {str(e)}") raise Exception(f"Contacts creation failed {e}") def update_contacts(self, contact_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: @@ -360,9 +620,10 @@ def update_contacts(self, contact_ids: List[Text], values_to_update: Dict[Text, contacts_to_update = [ HubSpotObjectBatchInput(id=contact_id, properties=values_to_update) for contact_id in contact_ids ] + batch_input = BatchInputSimplePublicObjectBatchInput(inputs=contacts_to_update) try: updated_contacts = hubspot.crm.contacts.batch_api.update( - inputs=contacts_to_update, + batch_input_simple_public_object_batch_input=batch_input ) logger.info( f"Contacts with ID {[updated_contact.id for updated_contact in updated_contacts.results]} updated" @@ -373,10 +634,9 @@ def update_contacts(self, contact_ids: List[Text], values_to_update: Dict[Text, def delete_contacts(self, contact_ids: List[Text]) -> None: hubspot = self.handler.connect() contacts_to_delete = [HubSpotObjectId(id=contact_id) for contact_id in contact_ids] + batch_input = BatchInputSimplePublicObjectId(inputs=contacts_to_delete) try: - hubspot.crm.contacts.batch_api.archive( - inputs=contacts_to_delete, - ) + hubspot.crm.contacts.batch_api.archive(batch_input_simple_public_object_id=batch_input) logger.info("Contacts deleted") except Exception as e: raise Exception(f"Contacts deletion failed {e}") @@ -410,6 +670,9 @@ def select(self, query: ast.Select) -> pd.DataFrame: selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() deals_df = pd.json_normalize(self.get_deals(limit=result_limit)) + if deals_df.empty: + deals_df = pd.DataFrame(columns=self._get_default_deal_columns()) + select_statement_executor = SELECTQueryExecutor( deals_df, selected_columns, where_conditions, order_by_conditions ) @@ -465,11 +728,23 @@ def update(self, query: ast.Update) -> None: update_statement_parser = UPDATEQueryParser(query) values_to_update, where_conditions = update_statement_parser.parse_query() - deals_df = pd.json_normalize(self.get_deals()) + deals_df = pd.json_normalize(self.get_deals(limit=1000)) + + if deals_df.empty: + raise ValueError( + "No deals retrieved from HubSpot to evaluate update conditions. Verify your connection and permissions." + ) + update_query_executor = UPDATEQueryExecutor(deals_df, where_conditions) + filtered_df = update_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No deals found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - deals_df = update_query_executor.execute_query() - deal_ids = deals_df["id"].tolist() + deal_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Updating {len(deal_ids)} deal(s) matching WHERE conditions") self.update_deals(deal_ids, values_to_update) def delete(self, query: ast.Delete) -> None: @@ -493,53 +768,102 @@ def delete(self, query: ast.Delete) -> None: delete_statement_parser = DELETEQueryParser(query) where_conditions = delete_statement_parser.parse_query() - deals_df = pd.json_normalize(self.get_deals()) + deals_df = pd.json_normalize(self.get_deals(limit=1000)) + + if deals_df.empty: + raise ValueError( + "No deals retrieved from HubSpot to evaluate delete conditions. Verify your connection and permissions." + ) + delete_query_executor = DELETEQueryExecutor(deals_df, where_conditions) + filtered_df = delete_query_executor.execute_query() + + if filtered_df.empty: + raise ValueError( + f"No deals found matching WHERE conditions: {where_conditions}. Please verify the conditions are correct." + ) - deals_df = delete_query_executor.execute_query() - deal_ids = deals_df["id"].tolist() + deal_ids = filtered_df["id"].astype(str).tolist() + logger.info(f"Deleting {len(deal_ids)} deal(s) matching WHERE conditions") self.delete_deals(deal_ids) def get_columns(self) -> List[Text]: - return pd.json_normalize(self.get_deals(limit=1)).columns.tolist() + return self._get_default_deal_columns() + + @staticmethod + def _get_default_deal_columns() -> List[str]: + return [ + "id", + "dealname", + "amount", + "pipeline", + "closedate", + "dealstage", + "hubspot_owner_id", + "createdate", + "hs_lastmodifieddate", + ] def get_deals(self, **kwargs) -> List[Dict]: hubspot = self.handler.connect() deals = hubspot.crm.deals.get_all(**kwargs) - deals_dict = [ - { - "id": deal.id, - "dealname": deal.properties["dealname"], - "amount": deal.properties.get("amount", None), - "pipeline": deal.properties.get("pipeline", None), - "closedate": deal.properties.get("closedate", None), - "dealstage": deal.properties.get("dealstage", None), - "hubspot_owner_id": deal.properties.get("hubspot_owner_id", None), - "createdate": deal.properties["createdate"], - "hs_lastmodifieddate": deal.properties["hs_lastmodifieddate"], - } - for deal in deals - ] + deals_dict = [] + + for deal in deals: + try: + deal_dict = { + "id": deal.id, + "dealname": deal.properties.get("dealname", None), + "amount": deal.properties.get("amount", None), + "pipeline": deal.properties.get("pipeline", None), + "closedate": deal.properties.get("closedate", None), + "dealstage": deal.properties.get("dealstage", None), + "hubspot_owner_id": deal.properties.get("hubspot_owner_id", None), + "createdate": deal.properties.get("createdate", None), + "hs_lastmodifieddate": deal.properties.get("hs_lastmodifieddate", None), + } + deals_dict.append(deal_dict) + except Exception as e: + logger.error(f"Error processing deal {getattr(deal, 'id', 'unknown')}: {str(e)}") + raise ValueError( + f"Failed to process deal {getattr(deal, 'id', 'unknown')}. " + f"Please verify the HubSpot record and try again." + ) from e + + logger.info(f"Retrieved {len(deals_dict)} deals from HubSpot") return deals_dict def create_deals(self, deals_data: List[Dict[Text, Any]]) -> None: + if not deals_data: + raise ValueError("No deal data provided for creation") + + logger.info(f"Attempting to create {len(deals_data)} deal(s): {deals_data}") + hubspot = self.handler.connect() deals_to_create = [HubSpotObjectInputCreate(properties=deal) for deal in deals_data] + batch_input = BatchInputSimplePublicObjectBatchInputForCreate(inputs=deals_to_create) + try: created_deals = hubspot.crm.deals.batch_api.create( - inputs=deals_to_create, + batch_input_simple_public_object_batch_input_for_create=batch_input ) - logger.info(f"Deals created with ID's {[created_deal.id for created_deal in created_deals.results]}") + + if not created_deals or not hasattr(created_deals, "results") or not created_deals.results: + raise Exception("Deal creation returned no results") + + created_ids = [created_deal.id for created_deal in created_deals.results] + logger.info(f"Successfully created {len(created_ids)} deal(s) with IDs: {created_ids}") + except Exception as e: + logger.error(f"Deals creation failed: {str(e)}") raise Exception(f"Deals creation failed {e}") def update_deals(self, deal_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: hubspot = self.handler.connect() deals_to_update = [HubSpotObjectBatchInput(id=deal_id, properties=values_to_update) for deal_id in deal_ids] + batch_input = BatchInputSimplePublicObjectBatchInput(inputs=deals_to_update) try: - updated_deals = hubspot.crm.deals.batch_api.update( - inputs=deals_to_update, - ) + updated_deals = hubspot.crm.deals.batch_api.update(batch_input_simple_public_object_batch_input=batch_input) logger.info(f"Deals with ID {[updated_deal.id for updated_deal in updated_deals.results]} updated") except Exception as e: raise Exception(f"Deals update failed {e}") @@ -547,10 +871,9 @@ def update_deals(self, deal_ids: List[Text], values_to_update: Dict[Text, Any]) def delete_deals(self, deal_ids: List[Text]) -> None: hubspot = self.handler.connect() deals_to_delete = [HubSpotObjectId(id=deal_id) for deal_id in deal_ids] + batch_input = BatchInputSimplePublicObjectId(inputs=deals_to_delete) try: - hubspot.crm.deals.batch_api.archive( - inputs=deals_to_delete, - ) + hubspot.crm.deals.batch_api.archive(batch_input_simple_public_object_id=batch_input) logger.info("Deals deleted") except Exception as e: raise Exception(f"Deals deletion failed {e}") From 369180493533ee516022b8abc49e3c5e2851c51b Mon Sep 17 00:00:00 2001 From: Parthiv Makwana <75653580+parthiv11@users.noreply.github.com> Date: Mon, 12 Jan 2026 06:39:36 +0000 Subject: [PATCH 6/7] Improve DuckDB handler with enhanced connection options and documentation --- .../handlers/duckdb_handler/README.md | 49 +++++++++++++------ .../duckdb_handler/connection_args.py | 22 ++++++--- .../handlers/duckdb_handler/duckdb_handler.py | 39 ++++++++------- 3 files changed, 71 insertions(+), 39 deletions(-) diff --git a/mindsdb/integrations/handlers/duckdb_handler/README.md b/mindsdb/integrations/handlers/duckdb_handler/README.md index 54c1040a42c..5fa9125b940 100644 --- a/mindsdb/integrations/handlers/duckdb_handler/README.md +++ b/mindsdb/integrations/handlers/duckdb_handler/README.md @@ -1,41 +1,62 @@ -# DuckDB Handler +# DuckDB Handler This is the implementation of the DuckDB handler for MindsDB. ## DuckDB DuckDB is an open-source analytical database system. DuckDB is designed for fast execution of analytical queries. -There are no external dependencies and the DBMS runs completly embedded within a host process, similar to SQLite. +There are no external dependencies, and the DBMS runs completely embedded within a host process, similar to SQLite. DuckDB provides a rich SQL dialect with support for complex queries with transactional guarantees (ACID). -## Implementation -This handler was implemented using the `duckdb` python client library. +## Implementation +This handler was implemented using the `duckdb` Python client library. ### DuckDB version -The DuckDB handler is currently using the `0.7.1.dev187` pre-relase version of the python client library. In case of issues, make sure your DuckDB database is compatible with this version. See the DuckDB handler [requirements.txt](requirements.txt) for details. - +The DuckDB handler is currently using the `1.1.3` release version of the Python client library. In case of issues, make sure your DuckDB or MotherDuck database is compatible with this version. See the DuckDB handler [requirements.txt](requirements.txt) for details. The required arguments to establish a connection are: -* `database`: the name of the DuckDB database file. May also be set to `:memory:`, which will create an in-memory database. +* `database`: the name of the DuckDB or MotherDuck database file. + - Set to `:memory:` to create an in-memory database. + - For MotherDuck, specify the database and motherduck_token. -The optional arguments are: +Additional optional arguments include: +* `motherduck_token`: a token to authenticate with MotherDuck. * `read_only`: a flag that specifies if the connection should be made in read-only mode. -This is required if multiple processes want to access the same database file at the same time. - + - This is required if multiple processes want to access the same database file simultaneously. ## Usage -In order to make use of this handler and connect to a DuckDB database in MindsDB, the following syntax can be used: +To connect to a DuckDB or MotherDuck database in MindsDB, the following syntax can be used: +### DuckDB Example ```sql CREATE DATABASE duckdb_datasource WITH engine='duckdb', parameters={ - "database":"db.duckdb" + "database": "db.duckdb" }; ``` -Now, you can use this established connection to query your database as follows: +### MotherDuck Example +```sql +CREATE DATABASE md_datasource +WITH +engine='duckdb', +parameters={ + "database": "sample_data", + "motherduck_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." +}; +``` + +Once the connection is established, you can query the database: + ```sql SELECT * FROM duckdb_datasource.my_table; -``` \ No newline at end of file +``` + +For MotherDuck: +```sql +SELECT * FROM md_datasource.movies; +``` + +By leveraging these features, MindsDB provides powerful integrations with DuckDB and MotherDuck for scalable analytics. \ No newline at end of file diff --git a/mindsdb/integrations/handlers/duckdb_handler/connection_args.py b/mindsdb/integrations/handlers/duckdb_handler/connection_args.py index e5a372f9e88..a83e81ea30a 100644 --- a/mindsdb/integrations/handlers/duckdb_handler/connection_args.py +++ b/mindsdb/integrations/handlers/duckdb_handler/connection_args.py @@ -2,16 +2,26 @@ from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE - connection_args = OrderedDict( database={ - 'type': ARG_TYPE.STR, - 'description': 'The database file to read and write from. The special value :memory: (default) can be used to create an in-memory database.', + "type": ARG_TYPE.STR, + "description": ( + "The database file to read and write from. The special value :memory: (default) " + "can be used to create an in-memory database." + ), + }, + motherduck_token={ + "type": ARG_TYPE.STR, + "description": "Motherduck access token if want to connect motherduck database.", }, read_only={ - 'type': ARG_TYPE.BOOL, - 'description': 'A flag that specifies if the connection should be made in read-only mode.', + "type": ARG_TYPE.BOOL, + "description": ("A flag that specifies if the connection should be made in read-only mode."), }, ) -connection_args_example = OrderedDict(database='db.duckdb', read_only=True) +connection_args_example = OrderedDict( + database="sample_data", + read_only=True, + motherduck_token="ey...enKoT.SsEcCa......", +) \ No newline at end of file diff --git a/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py b/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py index 7ae5423859c..ce725a27a63 100644 --- a/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py +++ b/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py @@ -19,14 +19,14 @@ class DuckDBHandler(DatabaseHandler): """This handler handles connection and execution of the DuckDB statements.""" - name = 'duckdb' + name = "duckdb" def __init__(self, name: str, **kwargs): super().__init__(name) self.parser = parse_sql - self.dialect = 'postgresql' - self.connection_data = kwargs.get('connection_data') - self.renderer = SqlalchemyRender('postgres') + self.dialect = "postgresql" + self.connection_data = kwargs.get("connection_data") + self.renderer = SqlalchemyRender("postgres") self.connection = None self.is_connected = False @@ -44,10 +44,17 @@ def connect(self) -> DuckDBPyConnection: if self.is_connected is True: return self.connection + motherduck_token = self.connection_data.get("motherduck_token") + if motherduck_token: + database = ( + f"md:{self.connection_data.get('database')}?motherduck_token={motherduck_token}&attach_mode=single" + ) + else: + database = self.connection_data.get("database") args = { - 'database': self.connection_data.get('database'), - 'read_only': self.connection_data.get('read_only'), + "database": database, + "read_only": self.connection_data.get("read_only"), } self.connection = duckdb.connect(**args) @@ -78,9 +85,7 @@ def check_connection(self) -> StatusResponse: self.connect() response.success = True except Exception as e: - logger.error( - f'Error connecting to DuckDB {self.connection_data["database"]}, {e}!' - ) + logger.error(f"Error connecting to DuckDB {self.connection_data['database']}, {e}!") response.error_message = str(e) finally: if response.success is True and need_to_close: @@ -111,17 +116,13 @@ def native_query(self, query: str) -> Response: if result: response = Response( RESPONSE_TYPE.TABLE, - data_frame=pd.DataFrame( - result, columns=[x[0] for x in cursor.description] - ), + data_frame=pd.DataFrame(result, columns=[x[0] for x in cursor.description]), ) else: connection.commit() response = Response(RESPONSE_TYPE.OK) except Exception as e: - logger.error( - f'Error running query: {query} on {self.connection_data["database"]}!' - ) + logger.error(f"Error running query: {query} on {self.connection_data['database']}!") response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) cursor.close() @@ -150,10 +151,10 @@ def get_tables(self) -> Response: Response: Names of the tables in the database. """ - q = 'SHOW TABLES;' + q = "SHOW TABLES;" result = self.native_query(q) df = result.data_frame - result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) + result.data_frame = df.rename(columns={df.columns[0]: "table_name"}) return result def get_columns(self, table_name: str) -> Response: @@ -166,5 +167,5 @@ def get_columns(self, table_name: str) -> Response: Response: Details of the table. """ - query = f'DESCRIBE {table_name};' - return self.native_query(query) + query = f"DESCRIBE {table_name};" + return self.native_query(query) \ No newline at end of file From ed741f8732ea6392201e180948a2c70aced66d5b Mon Sep 17 00:00:00 2001 From: Parthiv Makwana <75653580+parthiv11@users.noreply.github.com> Date: Fri, 13 Mar 2026 05:44:59 +0000 Subject: [PATCH 7/7] style: Consistently use double quotes for strings in duckdb_handler and connection_args --- mindsdb/integrations/handlers/duckdb_handler/connection_args.py | 2 +- mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mindsdb/integrations/handlers/duckdb_handler/connection_args.py b/mindsdb/integrations/handlers/duckdb_handler/connection_args.py index a83e81ea30a..4d9591e5eb6 100644 --- a/mindsdb/integrations/handlers/duckdb_handler/connection_args.py +++ b/mindsdb/integrations/handlers/duckdb_handler/connection_args.py @@ -24,4 +24,4 @@ database="sample_data", read_only=True, motherduck_token="ey...enKoT.SsEcCa......", -) \ No newline at end of file +) diff --git a/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py b/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py index ce725a27a63..bc407ef0575 100644 --- a/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py +++ b/mindsdb/integrations/handlers/duckdb_handler/duckdb_handler.py @@ -168,4 +168,4 @@ def get_columns(self, table_name: str) -> Response: """ query = f"DESCRIBE {table_name};" - return self.native_query(query) \ No newline at end of file + return self.native_query(query)