Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dcs_core/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.9.7"
__version__ = "0.9.8"
89 changes: 88 additions & 1 deletion dcs_core/core/datasource/sql_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import secrets
import string
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union

from loguru import logger
from sqlalchemy import inspect, text
from sqlalchemy.engine import Connection
from sqlalchemy.engine import Connection, Engine

from dcs_core.core.datasource.base import DataSource

Expand Down Expand Up @@ -1047,3 +1050,87 @@ def query_timestamp_date_not_in_future_metric(
except Exception as e:
logger.error(f"Error occurred: {e}")
return 0, 0

def generate_view_name(self, view_name: str | None = None) -> str:
if view_name is not None:
return view_name
random_string = "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(8)
)
timestamp = int(time.time())
return f"dcs_view_{timestamp}_{random_string.lower()}"

def create_view(
self,
query: str | None = None,
schema: str | None = None,
view_name: str | None = None,
) -> str | None:
view_name = self.generate_view_name(view_name=view_name)
schema_prefix = f"{schema}." if schema else ""
view_name_full = f"{schema_prefix}{view_name}"

if query is None:
sql = f"CREATE VIEW {view_name_full} AS SELECT 1 AS dummy WHERE 1 = 0"
else:
sql = f"CREATE VIEW {view_name_full} AS {query}"

try:
if isinstance(self.connection, (Connection, Engine)):
if isinstance(self.connection, Engine):
with self.connection.connect() as conn:
conn.execute(text(sql))
conn.commit()
else:
self.connection.execute(text(sql))
try:
self.connection.commit()
except Exception:
pass
else:
plain_sql = str(sql)
if hasattr(self.connection, "cursor"):
cur = self.connection.cursor()
cur.execute(plain_sql)
try:
self.connection.commit()
except Exception:
pass
else:
self.connection.execute(plain_sql)

return view_name_full
except Exception as e:
logger.error(f"Error creating view {view_name_full}: {e}")
return None

def drop_view(self, view_name: str, schema: str | None) -> bool:
schema_prefix = f"{schema}." if schema else ""
full_view_name = f"{schema_prefix}{view_name}"
drop_query = f"DROP VIEW {full_view_name}"
try:
if isinstance(self.connection, (Connection, Engine)):
if isinstance(self.connection, Engine):
with self.connection.connect() as conn:
conn.execute(text(drop_query))
conn.commit()
else:
self.connection.execute(text(drop_query))
try:
self.connection.commit()
except Exception:
pass
else:
if hasattr(self.connection, "cursor"):
cur = self.connection.cursor()
cur.execute(drop_query)
try:
self.connection.commit()
except Exception:
pass
else:
self.connection.execute(str(drop_query))
return True
except Exception as e:
logger.error(f"Error dropping view {full_view_name}: {e}")
return False
43 changes: 42 additions & 1 deletion dcs_core/integrations/databases/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import base64
import json
import os
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from loguru import logger
from sqlalchemy import create_engine
Expand Down Expand Up @@ -162,3 +162,44 @@ def query_get_table_columns(
for r in rows
}
return column_info

def create_view(
self,
query: Optional[str] = None,
dataset: Optional[str] = None,
view_name: Optional[str] = None,
) -> str | None:
view_name = self.generate_view_name(view_name=view_name)
full_name = (
f"`{self.project}`.`{dataset}`.`{view_name}`"
if dataset
else f"`{view_name}`"
)
try:
if query is None:
create_view_query = (
f"CREATE VIEW {full_name} AS SELECT 1 AS dummy_column WHERE FALSE"
)
self.connection.execute(create_view_query)
return full_name
else:
create_view_query = f"CREATE VIEW {full_name} AS {query}"
self.connection.execute(create_view_query)
return full_name
except Exception as e:
logger.error(f"Error creating view: {e}")
return None

def drop_view(self, view_name: str, dataset: Optional[str] = None) -> bool:
full_name = (
f"`{self.project}`.`{dataset}`.`{view_name}`"
if dataset
else f"`{view_name}`"
)
try:
drop_view_query = f"DROP VIEW {full_name}"
self.connection.execute(drop_view_query)
return True
except Exception as e:
logger.error(f"Error dropping view: {e}")
return False
Comment on lines +166 to +205

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: self.project does not exist; use self.project_id (and optionally default dataset)

In both create_view and drop_view, full_name is built using self.project:

full_name = (
    f"`{self.project}`.`{dataset}`.`{view_name}`"
    if dataset
    else f"`{view_name}`"
)

But in __init__, the attribute is named self.project_id, not self.project:

self.project_id = self.data_connection.get("project")

At runtime this will raise AttributeError: 'BigQueryDataSource' object has no attribute 'project', causing all view operations to fail.

You likely want:

-        view_name = self.generate_view_name(view_name=view_name)
-        full_name = (
-            f"`{self.project}`.`{dataset}`.`{view_name}`"
-            if dataset
-            else f"`{view_name}`"
-        )
+        view_name = self.generate_view_name(view_name=view_name)
+        dataset = dataset or self.dataset_id
+        full_name = (
+            f"`{self.project_id}`.`{dataset}`.`{view_name}`"
+            if dataset
+            else f"`{view_name}`"
+        )

and similarly for drop_view:

-        full_name = (
-            f"`{self.project}`.`{dataset}`.`{view_name}`"
-            if dataset
-            else f"`{view_name}`"
-        )
+        dataset = dataset or self.dataset_id
+        full_name = (
+            f"`{self.project_id}`.`{dataset}`.`{view_name}`"
+            if dataset
+            else f"`{view_name}`"
+        )

This both fixes the missing‑attribute bug and makes the optional dataset argument fall back to the configured default, which is more ergonomic for callers.

🧰 Tools
🪛 Ruff (0.14.5)

189-189: Do not catch blind exception: Exception

(BLE001)


202-202: Consider moving this statement to an else block

(TRY300)


203-203: Do not catch blind exception: Exception

(BLE001)

12 changes: 12 additions & 0 deletions dcs_core/integrations/databases/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import secrets
import string
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -690,3 +693,12 @@ def query_get_all_space_count(
return round((result[0] / result[1]) * 100) if result[1] > 0 else 0

return result[0] if result else 0

def generate_view_name(self, view_name: str | None = None) -> str:
if view_name is not None:
return view_name.upper()
random_string = "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(8)
)
timestamp = int(time.time())
return f"dcs_view_{timestamp}_{random_string.lower()}".upper()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dcs-core"
version = "0.9.7"
version = "0.9.8"
description = "Open Source Data Quality Monitoring"
license = "Apache-2.0"
authors = [
Expand Down
Loading