|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
| 15 | +import secrets |
| 16 | +import string |
| 17 | +import time |
15 | 18 | from datetime import datetime |
16 | 19 | from typing import Dict, List, Optional, Tuple, Union |
17 | 20 |
|
18 | 21 | from loguru import logger |
19 | 22 | from sqlalchemy import inspect, text |
20 | | -from sqlalchemy.engine import Connection |
| 23 | +from sqlalchemy.engine import Connection, Engine |
21 | 24 |
|
22 | 25 | from dcs_core.core.datasource.base import DataSource |
23 | 26 |
|
@@ -1047,3 +1050,87 @@ def query_timestamp_date_not_in_future_metric( |
1047 | 1050 | except Exception as e: |
1048 | 1051 | logger.error(f"Error occurred: {e}") |
1049 | 1052 | return 0, 0 |
| 1053 | + |
| 1054 | + def generate_view_name(self, view_name: str | None = None) -> str: |
| 1055 | + if view_name is not None: |
| 1056 | + return view_name |
| 1057 | + random_string = "".join( |
| 1058 | + secrets.choice(string.ascii_letters + string.digits) for _ in range(8) |
| 1059 | + ) |
| 1060 | + timestamp = int(time.time()) |
| 1061 | + return f"dcs_view_{timestamp}_{random_string.lower()}" |
| 1062 | + |
| 1063 | + def create_view( |
| 1064 | + self, |
| 1065 | + query: str | None = None, |
| 1066 | + schema: str | None = None, |
| 1067 | + view_name: str | None = None, |
| 1068 | + ) -> str | None: |
| 1069 | + view_name = self.generate_view_name(view_name=view_name) |
| 1070 | + schema_prefix = f"{schema}." if schema else "" |
| 1071 | + view_name_full = f"{schema_prefix}{view_name}" |
| 1072 | + |
| 1073 | + if query is None: |
| 1074 | + sql = f"CREATE VIEW {view_name_full} AS SELECT 1 AS dummy WHERE 1 = 0" |
| 1075 | + else: |
| 1076 | + sql = f"CREATE VIEW {view_name_full} AS {query}" |
| 1077 | + |
| 1078 | + try: |
| 1079 | + if isinstance(self.connection, (Connection, Engine)): |
| 1080 | + if isinstance(self.connection, Engine): |
| 1081 | + with self.connection.connect() as conn: |
| 1082 | + conn.execute(text(sql)) |
| 1083 | + conn.commit() |
| 1084 | + else: |
| 1085 | + self.connection.execute(text(sql)) |
| 1086 | + try: |
| 1087 | + self.connection.commit() |
| 1088 | + except Exception: |
| 1089 | + pass |
| 1090 | + else: |
| 1091 | + plain_sql = str(sql) |
| 1092 | + if hasattr(self.connection, "cursor"): |
| 1093 | + cur = self.connection.cursor() |
| 1094 | + cur.execute(plain_sql) |
| 1095 | + try: |
| 1096 | + self.connection.commit() |
| 1097 | + except Exception: |
| 1098 | + pass |
| 1099 | + else: |
| 1100 | + self.connection.execute(plain_sql) |
| 1101 | + |
| 1102 | + return view_name_full |
| 1103 | + except Exception as e: |
| 1104 | + logger.error(f"Error creating view {view_name_full}: {e}") |
| 1105 | + return None |
| 1106 | + |
| 1107 | + def drop_view(self, view_name: str, schema: str | None) -> bool: |
| 1108 | + schema_prefix = f"{schema}." if schema else "" |
| 1109 | + full_view_name = f"{schema_prefix}{view_name}" |
| 1110 | + drop_query = f"DROP VIEW {full_view_name}" |
| 1111 | + try: |
| 1112 | + if isinstance(self.connection, (Connection, Engine)): |
| 1113 | + if isinstance(self.connection, Engine): |
| 1114 | + with self.connection.connect() as conn: |
| 1115 | + conn.execute(text(drop_query)) |
| 1116 | + conn.commit() |
| 1117 | + else: |
| 1118 | + self.connection.execute(text(drop_query)) |
| 1119 | + try: |
| 1120 | + self.connection.commit() |
| 1121 | + except Exception: |
| 1122 | + pass |
| 1123 | + else: |
| 1124 | + if hasattr(self.connection, "cursor"): |
| 1125 | + cur = self.connection.cursor() |
| 1126 | + cur.execute(drop_query) |
| 1127 | + try: |
| 1128 | + self.connection.commit() |
| 1129 | + except Exception: |
| 1130 | + pass |
| 1131 | + else: |
| 1132 | + self.connection.execute(str(drop_query)) |
| 1133 | + return True |
| 1134 | + except Exception as e: |
| 1135 | + logger.error(f"Error dropping view {full_view_name}: {e}") |
| 1136 | + return False |
0 commit comments