Skip to content

Commit 5d57934

Browse files
feat(cdc): File path regex support (#23)
1 parent 875a376 commit 5d57934

File tree

5 files changed

+261
-71
lines changed

5 files changed

+261
-71
lines changed

docs/source/dataflow_spec_ref_cdc.rst

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ CDC Historical Snapshot Source Configuration
112112
- The format of the source data. E.g. supported formats are ``table``, ``parquet``, ``csv``, ``json``. All formats supported by spark see `PySpark Data Sources API <https://spark.apache.org/docs/3.5.3/sql-data-sources.html>`_.
113113
* - **path**
114114
- ``string``
115-
- The location to load the source data from. This can be a table name or a path to a a file or directory with multiple snapshots. A placeholder ``{version}`` can be used in this path which will be substituted with the version value in run time.
115+
- The location to load the source data from. This can be a table name or a path to a file or directory with multiple snapshots. Supports three path pattern styles for version extraction: the ``{version}`` placeholder (simple single-segment match), the ``{fragment}`` placeholder (for multi-file snapshots), and regex named capture groups (for complex partitioning). See :ref:`file-path-patterns` for details and examples.
116116
* - **versionType**
117117
- ``string``
118118
- The type of versioning to use. Can be either ``int`` or ``datetime``.
@@ -138,14 +138,102 @@ CDC Historical Snapshot Source Configuration
138138
- (*optional*) A list of select expressions to apply to the source data.
139139
* - **filter**
140140
- ``string``
141-
- (*optional*) A filter expression to apply to the source data. This filter is applied to the dataframe as a WHERE clause when the source is read. A placeholder ``{version}`` can be used in this filter expression which will be substituted with the version value in run time.
141+
- (*optional*) A filter expression to apply to the source data. This filter is applied to the dataframe as a WHERE clause when the source is read. The placeholder ``{version}`` can be used in this filter expression and will be substituted with the version value at run time (e.g. ``"year = '{version}'"``). Not applicable when using regex named capture groups in ``path``.
142142
* - **recursiveFileLookup**
143143
- ``boolean``
144144
- (*optional*) When set to ``true``, enables recursive directory traversal to find snapshot files. This should be used when snapshots are stored in a nested directory structure such as Hive-style partitioning (e.g., ``/data/{version}/file.parquet``). When set to ``false`` (default), only files in the immediate directory are searched. Default: ``false``.
145145

146146

147147
.. note::
148-
If ``recursiveFileLookup`` is set to ``true``, ensure that the ``path`` parameter is specified in a way that is compatible with recursive directory traversal. I.e. the ``{version}`` placeholder is used in the path and not the filename.
148+
If ``recursiveFileLookup`` is set to ``true``, ensure that the ``path`` parameter is compatible with recursive directory traversal. When using the ``{version}`` placeholder, place it in the directory portion of the path rather than the filename (e.g. ``/data/{version}/file.parquet``). When using regex named capture groups, the pattern spans the full relative path from the first dynamic segment, so ``recursiveFileLookup`` must be ``true`` if the version spans multiple directory levels.
149+
150+
.. _file-path-patterns:
151+
152+
File Path Patterns
153+
^^^^^^^^^^^^^^^^^^
154+
155+
The ``path`` field supports three styles for expressing where the version (and optional fragment) appears in the file path. All styles can be combined with a static base path prefix that is resolved at run time (e.g. ``{sample_file_location}``).
156+
157+
.. list-table::
158+
:header-rows: 1
159+
:widths: 20 35 45
160+
161+
* - Style
162+
- Syntax
163+
- When to Use
164+
* - ``{version}`` placeholder
165+
- ``{version}``
166+
- Version is contained in a single path segment or filename component. Simple and readable for flat or single-level partitioned layouts.
167+
* - ``{fragment}`` placeholder
168+
- ``{fragment}``
169+
- Snapshot data for a single version is split across multiple files. Use alongside ``{version}`` to group files sharing the same version together.
170+
* - Regex named capture groups
171+
- ``(?P<version_<name>>.+)``
172+
- Version is spread across multiple path segments or interleaved with other text. Supports complex partitioning schemes (e.g. Hive-style ``YEAR=.../MONTH=.../DAY=...``) where the version cannot be expressed as a single placeholder.
173+
174+
**``{version}`` — single-segment version**
175+
176+
The ``{version}`` placeholder matches one path segment or filename component. It is internally converted to a regex named capture group ``(?P<version_main>.+)``.
177+
178+
.. code-block:: json
179+
180+
{
181+
"path": "/mnt/data/snapshots/customer_{version}.csv",
182+
"versionType": "timestamp",
183+
"datetimeFormat": "%Y_%m_%d"
184+
}
185+
186+
Files matched: ``customer_2024_01_01.csv``, ``customer_2024_01_02.csv``, …
187+
188+
For directory-partitioned layouts, place ``{version}`` in the directory portion and set ``recursiveFileLookup`` to ``true``:
189+
190+
.. code-block:: json
191+
192+
{
193+
"path": "/mnt/data/snapshots/{version}/customer.csv",
194+
"versionType": "timestamp",
195+
"datetimeFormat": "YEAR=%Y/MONTH=%m/DAY=%d",
196+
"recursiveFileLookup": true
197+
}
198+
199+
Files matched: ``YEAR=2024/MONTH=01/DAY=01/customer.csv``, …
200+
201+
**``{fragment}`` — multi-file snapshots**
202+
203+
Use ``{fragment}`` alongside ``{version}`` when a single snapshot version is split across multiple files. All files sharing the same version are read and unioned together before CDC processing.
204+
205+
.. code-block:: json
206+
207+
{
208+
"path": "/mnt/data/snapshots/customer_{version}_split_{fragment}.csv",
209+
"versionType": "timestamp",
210+
"datetimeFormat": "%Y_%m_%d"
211+
}
212+
213+
Files matched and grouped by version: ``customer_2024_01_01_split_1.csv``, ``customer_2024_01_01_split_2.csv`` → both ingested as version ``2024-01-01``.
214+
215+
**Regex named capture groups — multi-segment versions**
216+
217+
For cases where the version is distributed across multiple directory levels or interleaved with fixed text, use Python regex named capture groups with the prefix ``version_``. All groups whose names start with ``version_`` are extracted and concatenated **in the order they appear in the pattern** (left to right) to form the final version string, which is then parsed according to ``datetimeFormat`` or treated as an integer.
218+
219+
Group naming convention: ``(?P<version_<name>>.+)``. The ``<name>`` suffix is arbitrary but must be unique within the pattern. The concatenation order is determined by the position of each group in the path expression, not the name.
220+
221+
.. code-block:: json
222+
223+
{
224+
"path": "/mnt/data/snapshots/(?P<version_year>.+)/(?P<version_month>.+)/data/customer_(?P<version_day>.+).csv",
225+
"versionType": "timestamp",
226+
"datetimeFormat": "%Y%m%d",
227+
"recursiveFileLookup": true
228+
}
229+
230+
For the file ``2024/01/data/customer_15.csv``, the groups are captured left-to-right: ``version_year=2024``, ``version_month=01``, ``version_day=15``. These are concatenated in pattern order to produce ``"20240115"``, which is then parsed with ``datetimeFormat: "%Y%m%d"``.
231+
232+
.. tip::
233+
234+
Arrange your ``(?P<version_...>)`` groups in the path from left to right in the same order that their values should be concatenated to match your ``datetimeFormat``. The group names themselves only need to be unique — their order in the pattern controls concatenation.
235+
236+
See ``samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/historical_snapshot_files_datetime_recursive_and_partitioned_regex_main.json`` for a complete working example.
149237

150238
The ``source`` object contains the following properties for ``table`` based sources:
151239

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"dataFlowId": "feature_historical_files_snapshot_datetime_recursive_and_regex",
3+
"dataFlowGroup": "feature_samples_snapshots",
4+
"dataFlowType": "standard",
5+
"targetFormat": "delta",
6+
"targetDetails": {
7+
"table": "feature_historical_snapshot_files_datetime_recursive_and_regex",
8+
"tableProperties": {
9+
"delta.enableChangeDataFeed": "true"
10+
},
11+
"schemaPath": "target/customer_schema.json",
12+
"configFlags": ["disableOperationalMetadata"]
13+
},
14+
"cdcSnapshotSettings": {
15+
"keys": [
16+
"CUSTOMER_ID"
17+
],
18+
"scd_type": "2",
19+
"snapshotType": "historical",
20+
"sourceType": "file",
21+
"source": {
22+
"format": "csv",
23+
"path": "{sample_file_location}/snapshot_customer_regex/(?P<version_year>.+)/(?P<version_month>.+)/data/customer_(?P<version_day>.+).csv",
24+
"readerOptions": {
25+
"header": "true"
26+
},
27+
"versionType": "timestamp",
28+
"datetimeFormat": "%Y%m%d",
29+
"recursiveFileLookup": true
30+
},
31+
"track_history_except_column_list":[
32+
"LOAD_TIMESTAMP"
33+
]
34+
}
35+
}

samples/test_data_and_orchestrator/src/initialize.ipynb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"customer_file_path = f\"{volume_root_file_path}/customer\"\n",
4242
"customer_snapshot_file_path = f\"{volume_root_file_path}/snapshot_customer\"\n",
4343
"customer_snapshot_multifile_path = f\"{volume_root_file_path}/snapshot_customer_multifile\"\n",
44+
"customer_snapshot_regex_file_path = f\"{volume_root_file_path}/snapshot_customer_regex\"\n",
4445
"customer_snapshot_partitioned_file_path = f\"{volume_root_file_path}/snapshot_customer_partitioned\"\n",
4546
"customer_snapshot_partitioned_parquet_file_path = f\"{volume_root_file_path}/snapshot_customer_partitioned_parquet\"\n",
4647
"template_samples_base_file_path = f\"{volume_root_file_path}/template_samples\"\n",

samples/test_data_and_orchestrator/src/run_1_staging_load.ipynb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@
193193
"source": [
194194
"# Delete all files and directories in snapshot directories\n",
195195
"dbutils.fs.rm(customer_snapshot_file_path, True)\n",
196+
"dbutils.fs.rm(customer_snapshot_regex_file_path, True)\n",
196197
"dbutils.fs.rm(customer_snapshot_partitioned_file_path, True)\n",
197198
"dbutils.fs.rm(customer_snapshot_partitioned_parquet_file_path, True)\n",
198199
"dbutils.fs.rm(template_samples_customer_file_path, True)\n",
@@ -219,6 +220,11 @@
219220
" True)\n",
220221
"\n",
221222
"dbutils.fs.put(\n",
223+
" f\"{customer_snapshot_regex_file_path}/2024/01/data/customer_01.csv\",\n",
224+
" file_content,\n",
225+
" True)\n",
226+
"\n",
227+
"dbutils.fs.put(\n",
222228
" f\"{customer_snapshot_partitioned_file_path}/YEAR=2024/MONTH=01/DAY=01/customer.csv\",\n",
223229
" file_content,\n",
224230
" True)\n",

0 commit comments

Comments
 (0)