Skip to content

Commit 4f0454a

Browse files
VInicius Pereira de OliveiraVInicius Pereira de Oliveira
authored andcommitted
chore: snowpark splits objects into multiple files
1 parent 3646eb4 commit 4f0454a

4 files changed

Lines changed: 381 additions & 184 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ celerybeat.pid
143143
# Environments
144144
.env
145145
.envrc
146+
env.example_2
146147
.venv
147148
env/
148149
venv/
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
"""
2+
Artifact Reader Facade Classes
3+
Provides a clean interface for reading different types of Snowflake artifacts.
4+
"""
5+
6+
from abc import ABC, abstractmethod
7+
from typing import List, Dict, Any
8+
from snowflake.snowpark import Session
9+
from migration_accelerator_package.constants import ArtifactType
10+
11+
12+
class ArtifactReader(ABC):
13+
"""Abstract base class for artifact readers."""
14+
15+
def __init__(self, session: Session, database: str, schema: str):
16+
"""Initialize the artifact reader."""
17+
self.session = session
18+
self.database = database
19+
self.schema = schema
20+
21+
@abstractmethod
22+
def read(self) -> List[Dict[str, Any]]:
23+
"""Read artifacts of this type."""
24+
pass
25+
26+
def _normalize_keys(self, row_dict: Dict[str, Any]) -> Dict[str, Any]:
27+
"""Normalize dictionary keys to lowercase."""
28+
return {k.lower(): v for k, v in row_dict.items()}
29+
30+
def _normalize_rows(self, rows: List) -> List[Dict[str, Any]]:
31+
"""Normalize a list of rows to dictionaries with lowercase keys."""
32+
return [self._normalize_keys(dict(row.as_dict())) for row in rows]
33+
34+
35+
class TablesReader(ArtifactReader):
36+
"""Reader for Snowflake tables."""
37+
38+
def read(self) -> List[Dict[str, Any]]:
39+
"""Get all tables in the schema."""
40+
query = f"""
41+
SELECT
42+
table_catalog as database_name,
43+
table_schema as schema_name,
44+
table_name,
45+
table_type,
46+
row_count,
47+
bytes,
48+
created,
49+
last_altered,
50+
comment
51+
FROM information_schema.tables
52+
WHERE table_schema = '{self.schema}'
53+
AND table_type = 'BASE TABLE'
54+
ORDER BY table_name
55+
"""
56+
result = self.session.sql(query).collect()
57+
return [self._normalize_keys(dict(row.as_dict())) for row in result]
58+
59+
def read_columns(self, table_name: str) -> List[Dict[str, Any]]:
60+
"""Get columns for a specific table."""
61+
query = f"""
62+
SELECT
63+
column_name,
64+
data_type,
65+
character_maximum_length,
66+
numeric_precision,
67+
numeric_scale,
68+
is_nullable,
69+
column_default,
70+
comment
71+
FROM information_schema.columns
72+
WHERE table_schema = '{self.schema}'
73+
AND table_name = '{table_name}'
74+
ORDER BY ordinal_position
75+
"""
76+
result = self.session.sql(query).collect()
77+
return self._normalize_rows(result)
78+
79+
def read_table_data(self, table_name: str, limit: int = None) -> List[Dict[str, Any]]:
80+
"""Get data from a specific table."""
81+
query = f"SELECT * FROM {self.database}.{self.schema}.{table_name}"
82+
if limit:
83+
query += f" LIMIT {limit}"
84+
result = self.session.sql(query).collect()
85+
return [dict(row.as_dict()) for row in result]
86+
87+
88+
class ViewsReader(ArtifactReader):
89+
"""Reader for Snowflake views."""
90+
91+
def read(self) -> List[Dict[str, Any]]:
92+
"""Get all views in the schema."""
93+
query = f"""
94+
SELECT
95+
table_catalog as database_name,
96+
table_schema as schema_name,
97+
table_name as view_name,
98+
view_definition,
99+
created,
100+
comment
101+
FROM information_schema.views
102+
WHERE table_schema = '{self.schema}'
103+
ORDER BY view_name
104+
"""
105+
result = self.session.sql(query).collect()
106+
return self._normalize_rows(result)
107+
108+
109+
class ProceduresReader(ArtifactReader):
110+
"""Reader for Snowflake stored procedures."""
111+
112+
def read(self) -> List[Dict[str, Any]]:
113+
"""Get all stored procedures in the schema."""
114+
query = f"""
115+
SELECT
116+
procedure_catalog as database_name,
117+
procedure_schema as schema_name,
118+
procedure_name,
119+
procedure_definition,
120+
created,
121+
last_altered,
122+
comment
123+
FROM information_schema.procedures
124+
WHERE procedure_schema = '{self.schema}'
125+
ORDER BY procedure_name
126+
"""
127+
result = self.session.sql(query).collect()
128+
return self._normalize_rows(result)
129+
130+
131+
class FunctionsReader(ArtifactReader):
132+
"""Reader for Snowflake user-defined functions."""
133+
134+
def read(self) -> List[Dict[str, Any]]:
135+
"""Get all user-defined functions in the schema."""
136+
query = f"""
137+
SELECT
138+
function_catalog as database_name,
139+
function_schema as schema_name,
140+
function_name,
141+
function_definition,
142+
created,
143+
last_altered,
144+
comment
145+
FROM information_schema.functions
146+
WHERE function_schema = '{self.schema}'
147+
ORDER BY function_name
148+
"""
149+
result = self.session.sql(query).collect()
150+
return self._normalize_rows(result)
151+
152+
153+
class SequencesReader(ArtifactReader):
154+
"""Reader for Snowflake sequences."""
155+
156+
def read(self) -> List[Dict[str, Any]]:
157+
"""Get all sequences in the schema."""
158+
query = f"SHOW SEQUENCES IN SCHEMA {self.database}.{self.schema}"
159+
try:
160+
result = self.session.sql(query).collect()
161+
return self._normalize_rows(result)
162+
except Exception as e:
163+
# Fallback: try information_schema with basic columns only
164+
print(f" ⚠ Warning: SHOW SEQUENCES failed, trying information_schema: {e}")
165+
query = f"""
166+
SELECT
167+
sequence_catalog as database_name,
168+
sequence_schema as schema_name,
169+
sequence_name
170+
FROM information_schema.sequences
171+
WHERE sequence_schema = '{self.schema}'
172+
ORDER BY sequence_name
173+
"""
174+
result = self.session.sql(query).collect()
175+
return self._normalize_rows(result)
176+
177+
178+
class StagesReader(ArtifactReader):
179+
"""Reader for Snowflake stages."""
180+
181+
def read(self) -> List[Dict[str, Any]]:
182+
"""Get all stages in the schema."""
183+
query = f"SHOW STAGES IN SCHEMA {self.database}.{self.schema}"
184+
result = self.session.sql(query).collect()
185+
return self._normalize_rows(result)
186+
187+
188+
class FileFormatsReader(ArtifactReader):
189+
"""Reader for Snowflake file formats."""
190+
191+
def read(self) -> List[Dict[str, Any]]:
192+
"""Get all file formats in the schema."""
193+
query = f"SHOW FILE FORMATS IN SCHEMA {self.database}.{self.schema}"
194+
result = self.session.sql(query).collect()
195+
return self._normalize_rows(result)
196+
197+
198+
class TasksReader(ArtifactReader):
199+
"""Reader for Snowflake tasks."""
200+
201+
def read(self) -> List[Dict[str, Any]]:
202+
"""Get all tasks in the schema."""
203+
query = f"SHOW TASKS IN SCHEMA {self.database}.{self.schema}"
204+
result = self.session.sql(query).collect()
205+
return self._normalize_rows(result)
206+
207+
208+
class StreamsReader(ArtifactReader):
209+
"""Reader for Snowflake streams."""
210+
211+
def read(self) -> List[Dict[str, Any]]:
212+
"""Get all streams in the schema."""
213+
query = f"SHOW STREAMS IN SCHEMA {self.database}.{self.schema}"
214+
result = self.session.sql(query).collect()
215+
return self._normalize_rows(result)
216+
217+
218+
class PipesReader(ArtifactReader):
219+
"""Reader for Snowflake pipes."""
220+
221+
def read(self) -> List[Dict[str, Any]]:
222+
"""Get all pipes in the schema."""
223+
query = f"SHOW PIPES IN SCHEMA {self.database}.{self.schema}"
224+
result = self.session.sql(query).collect()
225+
return self._normalize_rows(result)
226+
227+
228+
class ArtifactReaderFactory:
229+
"""Factory for creating artifact readers."""
230+
231+
_readers = {
232+
ArtifactType.TABLES: TablesReader,
233+
ArtifactType.VIEWS: ViewsReader,
234+
ArtifactType.PROCEDURES: ProceduresReader,
235+
ArtifactType.FUNCTIONS: FunctionsReader,
236+
ArtifactType.SEQUENCES: SequencesReader,
237+
ArtifactType.STAGES: StagesReader,
238+
ArtifactType.FILE_FORMATS: FileFormatsReader,
239+
ArtifactType.TASKS: TasksReader,
240+
ArtifactType.STREAMS: StreamsReader,
241+
ArtifactType.PIPES: PipesReader,
242+
}
243+
244+
@classmethod
245+
def create_reader(cls, artifact_type: ArtifactType, session: Session, database: str, schema: str) -> ArtifactReader:
246+
"""Create an artifact reader for the given type."""
247+
reader_class = cls._readers.get(artifact_type)
248+
if not reader_class:
249+
raise ValueError(f"No reader available for artifact type: {artifact_type}")
250+
return reader_class(session, database, schema)
251+

src/migration_accelerator_package/constants.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,30 @@ class SnowflakeConfig(Enum):
99
class UnityCatalogConfig(Enum):
1010
CATALOG = "qubika_partner_solutions"
1111
SCHEMA = "migration_accelerator"
12-
RAW_VOLUME = "snowflake_artifacts_raw"
12+
RAW_VOLUME = "snowflake_artifacts_raw"
13+
14+
class ArtifactType(Enum):
15+
"""Enumeration of Snowflake artifact types."""
16+
TABLES = "tables"
17+
VIEWS = "views"
18+
PROCEDURES = "procedures"
19+
FUNCTIONS = "functions"
20+
SEQUENCES = "sequences"
21+
STAGES = "stages"
22+
FILE_FORMATS = "file_formats"
23+
TASKS = "tasks"
24+
STREAMS = "streams"
25+
PIPES = "pipes"
26+
27+
class ArtifactFileName(Enum):
28+
"""Enumeration of output file names for each artifact type."""
29+
TABLES = "tables.json"
30+
VIEWS = "views.json"
31+
PROCEDURES = "procedures.json"
32+
FUNCTIONS = "functions.json"
33+
SEQUENCES = "sequences.json"
34+
STAGES = "stages.json"
35+
FILE_FORMATS = "file_formats.json"
36+
TASKS = "tasks.json"
37+
STREAMS = "streams.json"
38+
PIPES = "pipelines.json" # pipes saved as pipelines.json

0 commit comments

Comments
 (0)