diff --git a/dcs_core/__version__.py b/dcs_core/__version__.py index 346a967a..a1848f24 100644 --- a/dcs_core/__version__.py +++ b/dcs_core/__version__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.9.7" +__version__ = "0.9.8" diff --git a/dcs_core/core/datasource/sql_datasource.py b/dcs_core/core/datasource/sql_datasource.py index 8d5de91d..8b4dbf39 100644 --- a/dcs_core/core/datasource/sql_datasource.py +++ b/dcs_core/core/datasource/sql_datasource.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import secrets +import string +import time from datetime import datetime from typing import Dict, List, Optional, Tuple, Union from loguru import logger from sqlalchemy import inspect, text -from sqlalchemy.engine import Connection +from sqlalchemy.engine import Connection, Engine from dcs_core.core.datasource.base import DataSource @@ -1047,3 +1050,87 @@ def query_timestamp_date_not_in_future_metric( except Exception as e: logger.error(f"Error occurred: {e}") return 0, 0 + + def generate_view_name(self, view_name: str | None = None) -> str: + if view_name is not None: + return view_name + random_string = "".join( + secrets.choice(string.ascii_letters + string.digits) for _ in range(8) + ) + timestamp = int(time.time()) + return f"dcs_view_{timestamp}_{random_string.lower()}" + + def create_view( + self, + query: str | None = None, + schema: str | None = None, + view_name: str | None = None, + ) -> str | None: + view_name = self.generate_view_name(view_name=view_name) + schema_prefix = f"{schema}." if schema else "" + view_name_full = f"{schema_prefix}{view_name}" + + if query is None: + sql = f"CREATE VIEW {view_name_full} AS SELECT 1 AS dummy WHERE 1 = 0" + else: + sql = f"CREATE VIEW {view_name_full} AS {query}" + + try: + if isinstance(self.connection, (Connection, Engine)): + if isinstance(self.connection, Engine): + with self.connection.connect() as conn: + conn.execute(text(sql)) + conn.commit() + else: + self.connection.execute(text(sql)) + try: + self.connection.commit() + except Exception: + pass + else: + plain_sql = str(sql) + if hasattr(self.connection, "cursor"): + cur = self.connection.cursor() + cur.execute(plain_sql) + try: + self.connection.commit() + except Exception: + pass + else: + self.connection.execute(plain_sql) + + return view_name_full + except Exception as e: + logger.error(f"Error creating view {view_name_full}: {e}") + return None + + def drop_view(self, view_name: str, schema: str | None) -> bool: + schema_prefix = f"{schema}." if schema else "" + full_view_name = f"{schema_prefix}{view_name}" + drop_query = f"DROP VIEW {full_view_name}" + try: + if isinstance(self.connection, (Connection, Engine)): + if isinstance(self.connection, Engine): + with self.connection.connect() as conn: + conn.execute(text(drop_query)) + conn.commit() + else: + self.connection.execute(text(drop_query)) + try: + self.connection.commit() + except Exception: + pass + else: + if hasattr(self.connection, "cursor"): + cur = self.connection.cursor() + cur.execute(drop_query) + try: + self.connection.commit() + except Exception: + pass + else: + self.connection.execute(str(drop_query)) + return True + except Exception as e: + logger.error(f"Error dropping view {full_view_name}: {e}") + return False diff --git a/dcs_core/integrations/databases/bigquery.py b/dcs_core/integrations/databases/bigquery.py index a86e0b8c..211322ed 100644 --- a/dcs_core/integrations/databases/bigquery.py +++ b/dcs_core/integrations/databases/bigquery.py @@ -15,7 +15,7 @@ import base64 import json import os -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from loguru import logger from sqlalchemy import create_engine @@ -162,3 +162,44 @@ def query_get_table_columns( for r in rows } return column_info + + def create_view( + self, + query: Optional[str] = None, + dataset: Optional[str] = None, + view_name: Optional[str] = None, + ) -> str | None: + view_name = self.generate_view_name(view_name=view_name) + full_name = ( + f"`{self.project}`.`{dataset}`.`{view_name}`" + if dataset + else f"`{view_name}`" + ) + try: + if query is None: + create_view_query = ( + f"CREATE VIEW {full_name} AS SELECT 1 AS dummy_column WHERE FALSE" + ) + self.connection.execute(create_view_query) + return full_name + else: + create_view_query = f"CREATE VIEW {full_name} AS {query}" + self.connection.execute(create_view_query) + return full_name + except Exception as e: + logger.error(f"Error creating view: {e}") + return None + + def drop_view(self, view_name: str, dataset: Optional[str] = None) -> bool: + full_name = ( + f"`{self.project}`.`{dataset}`.`{view_name}`" + if dataset + else f"`{view_name}`" + ) + try: + drop_view_query = f"DROP VIEW {full_name}" + self.connection.execute(drop_view_query) + return True + except Exception as e: + logger.error(f"Error dropping view: {e}") + return False diff --git a/dcs_core/integrations/databases/oracle.py b/dcs_core/integrations/databases/oracle.py index 17341d21..6860b7b3 100644 --- a/dcs_core/integrations/databases/oracle.py +++ b/dcs_core/integrations/databases/oracle.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import secrets +import string +import time from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Union @@ -690,3 +693,12 @@ def query_get_all_space_count( return round((result[0] / result[1]) * 100) if result[1] > 0 else 0 return result[0] if result else 0 + + def generate_view_name(self, view_name: str | None = None) -> str: + if view_name is not None: + return view_name.upper() + random_string = "".join( + secrets.choice(string.ascii_letters + string.digits) for _ in range(8) + ) + timestamp = int(time.time()) + return f"dcs_view_{timestamp}_{random_string.lower()}".upper() diff --git a/pyproject.toml b/pyproject.toml index 17283fd4..dcc6d47e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dcs-core" -version = "0.9.7" +version = "0.9.8" description = "Open Source Data Quality Monitoring" license = "Apache-2.0" authors = [