Skip to content

Commit 5cfae0a

Browse files
Multifile cdc snapshot support (#14)
1 parent f93e6ee commit 5cfae0a

5 files changed

Lines changed: 96 additions & 14 deletions

File tree

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v0.4.1
1+
v0.5.0
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"dataFlowId": "feature_historical_files_snapshot_datetime_multifile",
3+
"dataFlowGroup": "feature_samples_snapshots",
4+
"dataFlowType": "standard",
5+
"targetFormat": "delta",
6+
"targetDetails": {
7+
"table": "feature_historical_snapshot_files_datetime_multifile",
8+
"tableProperties": {
9+
"delta.enableChangeDataFeed": "true"
10+
},
11+
"schemaPath": "target/customer_schema.json",
12+
"configFlags": ["disableOperationalMetadata"]
13+
},
14+
"cdcSnapshotSettings": {
15+
"keys": [
16+
"CUSTOMER_ID"
17+
],
18+
"scd_type": "2",
19+
"snapshotType": "historical",
20+
"sourceType": "file",
21+
"source": {
22+
"format": "csv",
23+
"path": "{sample_file_location}/snapshot_customer_multifile/customer_{version}_split_{fragment}.csv",
24+
"readerOptions": {
25+
"header": "true"
26+
},
27+
"versionType": "timestamp",
28+
"datetimeFormat": "%Y_%m_%d"
29+
},
30+
"track_history_except_column_list":[
31+
"LOAD_TIMESTAMP"
32+
]
33+
}
34+
}

samples/test_data_and_orchestrator/src/initialize.ipynb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"volume_root_file_path = f\"/Volumes/{staging_schema}/{staging_volume}\".replace(\".\", \"/\")\n",
4141
"customer_file_path = f\"{volume_root_file_path}/customer\"\n",
4242
"customer_snapshot_file_path = f\"{volume_root_file_path}/snapshot_customer\"\n",
43+
"customer_snapshot_multifile_path = f\"{volume_root_file_path}/snapshot_customer_multifile\"\n",
4344
"customer_snapshot_partitioned_file_path = f\"{volume_root_file_path}/snapshot_customer_partitioned\"\n",
4445
"customer_snapshot_partitioned_parquet_file_path = f\"{volume_root_file_path}/snapshot_customer_partitioned_parquet\"\n",
4546
"template_samples_base_file_path = f\"{volume_root_file_path}/template_samples\"\n",

samples/test_data_and_orchestrator/src/run_1_staging_load.ipynb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,34 @@
277277
"dbutils.fs.put(\n",
278278
" f\"{template_samples_customer_file_path}/customer_2024_02_10.csv\",\n",
279279
" file_content,\n",
280+
" True)\n",
281+
"\n",
282+
"\n",
283+
"file_content = \"\"\"CUSTOMER_ID,FIRST_NAME,LAST_NAME,EMAIL,DELETE_FLAG,LOAD_TIMESTAMP\\n\n",
284+
"1,John,Doe,john.doe@example.com,,2024-01-01 10:00:00\\n\n",
285+
"\"\"\"\n",
286+
"\n",
287+
"dbutils.fs.put(\n",
288+
" f\"{customer_snapshot_multifile_path}/customer_2024_01_01_split_0001.csv\",\n",
289+
" file_content,\n",
290+
" True)\n",
291+
"\n",
292+
"file_content = \"\"\"CUSTOMER_ID,FIRST_NAME,LAST_NAME,EMAIL,DELETE_FLAG,LOAD_TIMESTAMP\\n\n",
293+
"2,Jane,Smith,jane.smith@example.com,,2024-01-01 10:00:00\\n\n",
294+
"\"\"\"\n",
295+
"\n",
296+
"dbutils.fs.put(\n",
297+
" f\"{customer_snapshot_multifile_path}/customer_2024_01_01_split_0002.csv\",\n",
298+
" file_content,\n",
299+
" True)\n",
300+
"\n",
301+
"file_content = \"\"\"CUSTOMER_ID,FIRST_NAME,LAST_NAME,EMAIL,DELETE_FLAG,LOAD_TIMESTAMP\\n\n",
302+
"1,John,Doe,john@example.com,,2024-12-12 10:00:00\\n\n",
303+
"\"\"\"\n",
304+
"\n",
305+
"dbutils.fs.put(\n",
306+
" f\"{customer_snapshot_multifile_path}/customer_2024_12_12_split_0001.csv\",\n",
307+
" file_content,\n",
280308
" True)\n"
281309
]
282310
},

src/dataflow/cdc_snapshot.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import bisect
22
from dataclasses import dataclass, field
33
from datetime import datetime
4+
import fnmatch
5+
import os
46
import re
57
from typing import Dict, List, Optional, Union
68

@@ -423,7 +425,7 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[
423425

424426
def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Optional[VersionInfo]:
425427
"""Extract version from filename using pattern"""
426-
regex_pattern = re.escape(file_pattern).replace(r'\{version\}', r'(.+)')
428+
regex_pattern = re.escape(file_pattern).replace(r'\{version\}', r'(.+?)').replace(r'\{fragment\}', r'.*?')
427429
match = re.match(regex_pattern, filename)
428430
if not match or not match.group(1):
429431
self.logger.debug(f"CDC Snapshot: No version string match found for filename: {filename}")
@@ -481,18 +483,35 @@ def _read_snapshot_dataframe(self, version_info: VersionInfo, dataflow_config: D
481483

482484
if self.sourceType == CDCSnapshotSourceTypes.FILE:
483485
file_path = self.source.path.replace("{version}", version_info.formatted_value)
484-
self.logger.debug(f"CDC Snapshot: Reading file: {file_path}")
485-
486-
schema_path = self.source.schemaPath
487-
select_exp = self.source.selectExp
488-
489-
df = SourceBatchFiles(
490-
path=file_path,
491-
format=self.source.format,
492-
readerOptions=self.source.readerOptions,
493-
schemaPath=schema_path,
494-
selectExp=select_exp
495-
).read_source(read_config)
486+
487+
if '{fragment}' in file_path:
488+
search_pattern = file_path.replace('{fragment}', "*")
489+
directory = os.path.dirname(search_pattern)
490+
filename_pattern = os.path.basename(search_pattern)
491+
dbutils = pipeline_config.get_dbutils()
492+
files = [f.path for f in dbutils.fs.ls(directory) if fnmatch.fnmatch(f.name, filename_pattern)]
493+
else:
494+
files = [file_path]
495+
496+
df = None
497+
for file in files:
498+
self.logger.debug(f"CDC Snapshot: Reading file: {file_path}")
499+
500+
schema_path = self.source.schemaPath
501+
select_exp = self.source.selectExp
502+
503+
file_df = SourceBatchFiles(
504+
path=file,
505+
format=self.source.format,
506+
readerOptions=self.source.readerOptions,
507+
schemaPath=schema_path,
508+
selectExp=select_exp
509+
).read_source(read_config)
510+
511+
if df:
512+
df = df.union(file_df)
513+
else:
514+
df = file_df
496515

497516
# Apply filter if specified
498517
if self.source.filter:

0 commit comments

Comments
 (0)