Skip to content

Commit bb91ba6

Browse files
rederik76rederik76
andauthored
fix(cdc_snapshot): add Spark binaryFile fallback for dbutils.fs.ls() and fix parquet directory enumeration in historical CDC snapshot (#83)
* fix: add Spark binaryFile fallback for dbutils.fs.ls() and fix parquet directory enumeration in historical CDC snapshot - _list_files now tries dbutils.fs.ls() first; falls back to Spark binaryFile on any exception (e.g. Py4JSecurityException in Serverless with Restricted Access / SEG) - Fix bug where dbutils.fs().ls() was called with parentheses on fs - binaryFile fallback stops at .parquet directories and deduplicates part files so each snapshot version is counted once - dbutils path also guards against recursing into .parquet directories (trailing "/" stripped before the endswith check) * bump version * resolve merge and improve log message * bump version --------- Co-authored-by: rederik76 <rederik76@gmail.com>
1 parent 3e2a89d commit bb91ba6

2 files changed

Lines changed: 78 additions & 14 deletions

File tree

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v0.15.3
1+
v0.15.4

src/dataflow/cdc_snapshot.py

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def _next_snapshot_and_version(self, latest_snapshot_version, dataflow_config: D
306306
return (df, version_info.raw_value)
307307

308308
except Exception as e:
309-
self.logger.error(f"CDC Snapshot: Error processing snapshots: {e}")
309+
self.logger.error(f"CDC Snapshot: Error processing snapshots: {e}", exc_info=True)
310310
raise
311311

312312

@@ -318,27 +318,91 @@ def _get_available_versions(self, latest_snapshot_version: Optional[Union[int, d
318318
return self._get_available_table_versions(latest_snapshot_version)
319319
else:
320320
raise ValueError(f"Unsupported source type: {self.sourceType}")
321+
322+
def _list_files(self, path: str, recursive: bool = True) -> List:
323+
"""List files in a directory, attempting dbutils.fs.ls() first.
321324
322-
def _list_files(self, path, recursive=True):
323-
"""List files in a directory, with optional recursive file lookup.
325+
Falls back to Spark binaryFile if dbutils is unavailable (e.g., blocked
326+
by Serverless Restricted Access / SEG Py4JSecurityException).
324327
325328
Args:
326-
path: Directory path to list files from
327-
recursive: If True, list files recursively. If False, list only files in the immediate directory.
329+
path: Directory path to list files from.
330+
recursive: If True, list files recursively.
328331
329332
Returns:
330-
List of file objects from dbutils.fs.ls()
333+
List of objects with a .path attribute per discovered file.
331334
"""
332-
dbutils = pipeline_config.get_dbutils()
333-
all_files = []
335+
try:
336+
dbutils = pipeline_config.get_dbutils()
337+
all_files = []
338+
for f in dbutils.fs.ls(path):
339+
all_files.append(f)
340+
# Recursively list files in subdirectories unless the path ends with .parquet
341+
if recursive and f.isDir() and not f.path.rstrip("/").endswith(".parquet"):
342+
all_files.extend(self._list_files(f.path, recursive=True))
343+
return all_files
344+
except Exception as e:
345+
self.logger.warning(
346+
f"CDC Snapshot: dbutils.fs.ls() failed at '{path}'. "
347+
f"Falling back to Spark binaryFile for file discovery."
348+
)
334349

335-
for f in dbutils.fs().ls(path):
336-
all_files.append(f)
350+
# Fallback: Spark binaryFile read is SEG-compatible and needs no dbutils.
351+
self.logger.debug("CDC Snapshot: Falling back to Spark binaryFile for file discovery")
352+
353+
class _FileInfo:
354+
__slots__ = ("path", "name", "size", "modificationTime")
337355

338-
if recursive and f.isDir():
339-
all_files.extend(self._list_files(f.path, recursive=True))
356+
# binaryFile returns full URIs (e.g. dbfs:/Volumes/...). Normalise to
357+
# match the caller's path scheme so split-by-segment indices stay aligned.
358+
_prefix = path.startswith("dbfs:")
359+
360+
def __init__(self, row) -> None:
361+
raw = row.path
362+
if self._prefix:
363+
self.path = raw if raw.startswith("dbfs:") else f"dbfs:{raw}"
364+
else:
365+
self.path = raw[5:] if raw.startswith("dbfs:") else raw
366+
self.name = self.path.rstrip("/").rsplit("/", 1)[-1]
367+
self.size = row.length
368+
self.modificationTime = row.modificationTime
369+
370+
spark = pipeline_config.get_spark()
371+
rows = (
372+
spark.read
373+
.format("binaryFile")
374+
.option("recursiveFileLookup", str(recursive).lower())
375+
.load(path)
376+
.select("path", "modificationTime", "length")
377+
.collect()
378+
)
379+
380+
# binaryFile only returns leaf files, not directories. For parquet "files" that
381+
# are actually directories (e.g. customer.parquet/part-00000.parquet), truncate
382+
# the path at the .parquet directory level and deduplicate so each snapshot is
383+
# counted once.
384+
def _truncate_at_parquet_dir(p: str) -> str:
385+
segments = p.split("/")
386+
for i, seg in enumerate(segments[:-1]): # skip the last segment
387+
if seg.endswith(".parquet"):
388+
return "/".join(segments[: i + 1])
389+
return p
390+
391+
seen_paths = set()
392+
files: List[_FileInfo] = []
393+
for row in rows:
394+
fi = _FileInfo(row)
395+
fi.path = _truncate_at_parquet_dir(fi.path)
396+
fi.name = fi.path.rstrip("/").rsplit("/", 1)[-1]
397+
if fi.path not in seen_paths:
398+
seen_paths.add(fi.path)
399+
files.append(fi)
400+
401+
self.logger.debug(
402+
f"CDC Snapshot: Spark binaryFile fallback listed {len(files)} file(s) under '{path}'"
403+
)
340404

341-
return all_files
405+
return files
342406

343407
def _path_to_regex_pattern(self, path: str) -> str:
344408
"""Convert path to normalized regex pattern with named groups.

0 commit comments

Comments
 (0)