Skip to content

Commit c504223

Browse files
authored
fix: correct CDC snapshot table identifier parsing and update relevant doco (#49)
1 parent 5c1889b commit c504223

2 files changed

Lines changed: 61 additions & 53 deletions

File tree

docs/source/dataflow_spec_ref_cdc.rst

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ The ``cdcSettings`` and ``cdcSnapshotSettings`` enable and pass configuration in
1818
- See :ref:`cdcSnapshotSettings` for more information.
1919

2020
cdcSettings
21-
~~~~~~~~~~~~~~~~~
21+
~~~~~~~~~~~~~~~~~
2222

2323
The ``cdcSettings`` object contains the following properties:
2424

@@ -36,7 +36,7 @@ The ``cdcSettings`` object contains the following properties:
3636
- The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.
3737
* - **scd_type**
3838
- ``string``
39-
- Whether to store records as SCD type 1 or SCD type 2. Set to ``1`` for SCD type 1 or 2 for SCD type ``2``.
39+
- Whether to store records as SCD type 1 or SCD type 2. Set to ``1`` for SCD type 1 or 2 for SCD type ``2``.
4040
* - **apply_as_deletes**
4141
- ``string``
4242
- (*optional*) Specifies when a CDC event should be treated as a DELETE rather than an upsert.
@@ -147,6 +147,27 @@ CDC Historical Snapshot Source Configuration
147147
.. note::
148148
If ``recursiveFileLookup`` is set to ``true``, ensure that the ``path`` parameter is compatible with recursive directory traversal. When using the ``{version}`` placeholder, place it in the directory portion of the path rather than the filename (e.g. ``/data/{version}/file.parquet``). When using regex named capture groups, the pattern spans the full relative path from the first dynamic segment, so ``recursiveFileLookup`` must be ``true`` if the version spans multiple directory levels.
149149

150+
The ``source`` object contains the following properties for ``table`` based sources:
151+
152+
.. list-table::
153+
:header-rows: 1
154+
155+
* - Parameter
156+
- Type
157+
- Description
158+
* - **table**
159+
- ``string``
160+
- The table name to load the source data from, as either a 2-part ``schema.table`` (resolving to *<pipeline_target>*.schema.table) or 3-part ``catalog.schema.table`` identifier.
161+
* - **versionColumn**
162+
- ``string``
163+
- The column name to use for versioning.
164+
* - **startingVersion**
165+
- ``string`` or ``integer``
166+
- (*optional*) The version to start processing from.
167+
* - **selectExp**
168+
- ``list``
169+
- (*optional*) A list of select expressions to apply to the source data.
170+
150171
.. _file-path-patterns:
151172

152173
File Path Patterns
@@ -235,23 +256,3 @@ File Path Patterns
235256

236257
See ``samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/historical_snapshot_files_datetime_recursive_and_partitioned_regex_main.json`` for a complete working example.
237258

238-
The ``source`` object contains the following properties for ``table`` based sources:
239-
240-
.. list-table::
241-
:header-rows: 1
242-
243-
* - Parameter
244-
- Type
245-
- Description
246-
* - **table**
247-
- ``string``
248-
- The table name to load the source data from.
249-
* - **versionColumn**
250-
- ``string``
251-
- The column name to use for versioning.
252-
* - **startingVersion**
253-
- ``string`` or ``integer``
254-
- (*optional*) The version to start processing from.
255-
* - **selectExp**
256-
- ``list``
257-
- (*optional*) A list of select expressions to apply to the source data.

src/dataflow/cdc_snapshot.py

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def is_file_source(self) -> bool:
176176

177177
class CDCSnapshotFlow:
178178
"""A class to create a CDC Snapshot flow."""
179-
179+
180180
def __init__(self, settings: CDCSnapshotSettings):
181181
self.settings = settings
182182
self.logger = pipeline_config.get_logger()
@@ -210,7 +210,7 @@ def sorted_versions(self) -> List[VersionInfo]:
210210
if self._sorted_versions is None and self._available_versions:
211211
self._sorted_versions = sorted(self._available_versions, key=lambda x: x.raw_value)
212212
return self._sorted_versions or []
213-
213+
214214
@property
215215
def version_values(self) -> List[Union[int, datetime]]:
216216
"""Get version values."""
@@ -239,7 +239,7 @@ def create(
239239
flow_name: Optional[str] = None # TODO: Add flow name
240240
) -> None:
241241
"""Create CDC from snapshot flow.
242-
242+
243243
Args:
244244
dataflow_config: DataFlow configuration
245245
target_table: Name of the target table
@@ -318,35 +318,35 @@ def _get_available_versions(self, latest_snapshot_version: Optional[Union[int, d
318318
return self._get_available_table_versions(latest_snapshot_version)
319319
else:
320320
raise ValueError(f"Unsupported source type: {self.sourceType}")
321-
321+
322322
def _list_files(self, path, recursive=True):
323323
"""List files in a directory, with optional recursive file lookup.
324-
324+
325325
Args:
326326
path: Directory path to list files from
327327
recursive: If True, list files recursively. If False, list only files in the immediate directory.
328-
328+
329329
Returns:
330330
List of file objects from dbutils.fs.ls()
331331
"""
332332
dbutils = pipeline_config.get_dbutils()
333333
all_files = []
334-
334+
335335
for f in dbutils.fs().ls(path):
336336
all_files.append(f)
337337

338338
if recursive and f.isDir():
339339
all_files.extend(self._list_files(f.path, recursive=True))
340-
340+
341341
return all_files
342342

343343
def _path_to_regex_pattern(self, path: str) -> str:
344344
"""Convert path to normalized regex pattern with named groups.
345-
345+
346346
Curly-brace syntax is converted to named capture groups:
347347
- {version} -> (?P<version_main>.+) (single capture group for version)
348348
- {fragment} -> (?P<fragment>.*?) (single capture group for fragment)
349-
349+
350350
If path already contains regex named groups (?P<version_ or (?P<fragment>,
351351
it is returned as-is (already a regex pattern).
352352
"""
@@ -403,9 +403,9 @@ def _get_available_file_versions(self, latest_snapshot_version: Optional[Union[i
403403
self.logger.debug(f"CDC Snapshot: Using recursive file lookup: {recursive_file_lookup}")
404404
files_list = self._list_files(parent_dir, recursive=recursive_file_lookup)
405405
files_with_path_info = [FilePathInfo(full_path=f.path, filename_with_version_path='/'.join(f.path.split('/')[dynamic_idx:])) for f in files_list]
406-
406+
407407
self.logger.debug(f"CDC Snapshot: Found {len(files_with_path_info)} files")
408-
408+
409409
# Extract version from filename and filter by latest_snapshot_version if provided
410410
available_versions = []
411411
for file in files_with_path_info:
@@ -432,14 +432,14 @@ def _get_available_file_versions(self, latest_snapshot_version: Optional[Union[i
432432
except ValueError as e:
433433
self.logger.warning(f"CDC Snapshot: Skipping file '{file.filename_with_version_path}' - {e}")
434434
continue
435-
435+
436436
return available_versions
437437

438438
def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[int, datetime]]) -> List[VersionInfo]:
439439
"""Get list of available versions from table."""
440440
spark = pipeline_config.get_spark()
441441
table_name = self.source.table
442-
442+
443443
self.logger.info(f"CDC Snapshot: Getting versions from table: {table_name}")
444444
try:
445445
df = spark.table(table_name)
@@ -449,7 +449,7 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[
449449

450450
# Get the version column
451451
version_column = self.source.versionColumn
452-
452+
453453
# Check if the version column is a valid data type
454454
valid_data_types = ["timestamp", "date", "integer","long"]
455455
version_column_type = df.schema[version_column].dataType.typeName()
@@ -460,24 +460,24 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[
460460

461461
# Get the version values and filter by latest_snapshot_version if provided
462462
if latest_snapshot_version is not None:
463-
latest_version_info = VersionInfo(
463+
latest_version_info = VersionInfo(
464464
raw_value=latest_snapshot_version,
465465
version_type=self.source.versionType)
466466
version_df = df.select(version_column).where(f"{version_column} > {latest_version_info.sql_formatted_value}").distinct()
467467
else:
468468
version_df = df.select(version_column).distinct()
469-
469+
470470
available_versions = []
471471
for row in version_df.collect():
472472
version = row[version_column]
473-
473+
474474
if version is None:
475475
continue
476-
476+
477477
if self.source.startingVersion is not None and version < self.source.startingVersion:
478478
self.logger.debug(f"CDC Snapshot: Skipping version {version} because it is less than the starting version {self.source.startingVersion}")
479479
continue
480-
480+
481481
if latest_snapshot_version is not None and version <= latest_snapshot_version:
482482
self.logger.debug(f"CDC Snapshot: Skipping version {version} because it is less than or equal to the latest snapshot version {latest_snapshot_version}")
483483
continue
@@ -487,16 +487,16 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[
487487
version_type=self.source.versionType,
488488
datetime_format=None
489489
)
490-
490+
491491
available_versions.append(version_info)
492-
492+
493493
self.logger.debug(f"CDC Snapshot: Found {len(available_versions)} available versions")
494-
494+
495495
return available_versions
496496

497497
def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Optional[VersionInfo]:
498498
"""Extract version from filename using pattern (regex or curly-brace; normalized to regex).
499-
499+
500500
Version is taken from all named groups starting with version_, concatenated in name order.
501501
Curly-brace {version} is converted to (?P<version_main>.+); {fragment} to (?P<fragment>.*?).
502502
"""
@@ -521,7 +521,7 @@ def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Op
521521
raw_value = datetime.strptime(version_str, self.source.datetimeFormat)
522522
else:
523523
raw_value = int(version_str)
524-
524+
525525
return VersionInfo(
526526
raw_value=raw_value,
527527
version_type=self.source.versionType,
@@ -617,23 +617,30 @@ def _read_snapshot_dataframe(self, version_info: VersionInfo, dataflow_config: D
617617

618618
elif self.sourceType == CDCSnapshotSourceTypes.TABLE:
619619
table_parts = self.source.table.split(".")
620-
if len(table_parts) < 2:
621-
raise ValueError(f"Invalid table name format: {self.source.table}. Expected format: database.schema.table")
622-
620+
if not 1 < len(table_parts) < 4:
621+
raise ValueError(f"Invalid table name format: {self.source.table}. Accepted formats are: schema.table & database.schema.table")
622+
623+
if len(table_parts) == 3:
624+
database = f"{table_parts[0]}.{table_parts[1]}"
625+
else:
626+
# set to the specified target catalog by default
627+
_pipeline_details = pipeline_config.get_pipeline_details()
628+
database = f"{_pipeline_details.pipeline_catalog}.{table_parts[0]}"
629+
623630
table = table_parts[-1]
624-
database = f"{table_parts[0]}.{table_parts[1]}"
631+
625632
select_exp = self.source.selectExp
626633
where_clause = [
627634
f"{self.source.versionColumn} = {version_info.sql_formatted_value}"]
628-
635+
629636
self.logger.info(f"CDC Snapshot: Reading table: {database}.{table} with where clause: {where_clause}")
630637
df = SourceDelta(
631638
database=database,
632639
table=table,
633640
whereClause=where_clause,
634641
selectExp=select_exp
635642
).read_source(read_config)
636-
643+
637644
if self.source.deduplicateMode == DeduplicateMode.KEYS_ONLY:
638645
df = self._deduplicate_by_keys(df)
639646
self.logger.debug("CDC Snapshot: Applied deduplication by keys")

0 commit comments

Comments
 (0)