Skip to content

Commit f6fe798

Browse files
authored
fix(flows): FlowAppendView support for 'once' flag by forcing batch read (#22)
* Fix FlowAppendView support for 'once' flag by forcing batch read * Add new sample dataflow for append_view_once_flow * Improve flow name in append_view_once dataflow
1 parent bd92531 commit f6fe798

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{
2+
"dataFlowId": "append_view_once_flow",
3+
"dataFlowGroup": "feature_samples_general",
4+
"dataFlowType": "flow",
5+
"targetFormat": "delta",
6+
"targetDetails": {
7+
"table": "append_view_once_flow",
8+
"tableProperties": {
9+
"delta.enableChangeDataFeed": "true"
10+
}
11+
},
12+
"flowGroups": [
13+
{
14+
"flowGroupId": "main",
15+
"flows": {
16+
"f_customer_append_view_once": {
17+
"flowType": "append_view",
18+
"flowDetails": {
19+
"targetTable": "append_view_once_flow",
20+
"sourceView": "v_append_view_once_flow",
21+
"once": true
22+
},
23+
"views": {
24+
"v_append_view_once_flow": {
25+
"mode": "batch",
26+
"sourceType": "delta",
27+
"sourceDetails": {
28+
"database": "{staging_schema}",
29+
"table": "customer"
30+
}
31+
}
32+
}
33+
}
34+
}
35+
}
36+
]
37+
}

src/dataflow/flows/append_view.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def columnPrefixExceptions(self) -> List[str]:
3131

3232
@property
3333
def once(self) -> bool:
34-
"""Get the once flag."""
34+
"""Get the once flag. Note: Setting 'once' requires a batch read."""
3535
return self.flowDetails.get("once", False)
3636

3737
def create_flow(
@@ -55,14 +55,17 @@ def get_column_prefix_exceptions(flow_config: FlowConfig) -> List[str]:
5555
return column_prefix_exceptions
5656

5757
spark = self.spark
58+
spark_reader = spark.readStream
59+
if self.once:
60+
spark_reader = spark.read
5861
exclude_columns = flow_config.exclude_columns
5962
column_prefix_exceptions = get_column_prefix_exceptions(flow_config)
6063

6164
source_view_name = f'live.{self.sourceView}'
6265

6366
@dp.append_flow(name=self.flowName, target=self.targetTable, once=self.once)
6467
def flow_transform():
65-
df = spark.readStream.table(source_view_name)
68+
df = spark_reader.table(source_view_name)
6669
if "column_prefix" in self.flowDetails:
6770
prefix = f"{self.columnPrefix.lower()}_"
6871
df = df.select([

0 commit comments

Comments
 (0)