Skip to content

Commit 98f3710

Browse files
refactor: rename num_partitions_started to num_partitions_in_progress (computed)
1 parent 41429a2 commit 98f3710

3 files changed

Lines changed: 11 additions & 15 deletions

File tree

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ def __init__(
188188
# Partitioned stream status tracking for progress estimation.
189189
# These counters are per-sync only and intentionally NOT restored from persisted state
190190
# (_set_initial_state does not read them back). On resume, they reset to 0.
191-
self._num_partitions_started: int = 0
192191
self._num_partitions_completed: int = 0
193192
self._is_partition_discovery_complete: bool = False
194193

@@ -225,7 +224,8 @@ def state(self) -> MutableMapping[str, Any]:
225224
if self._parent_state is not None:
226225
state["parent_state"] = self._parent_state
227226
state["partitioned_stream_status"] = {
228-
"num_partitions_started": self._num_partitions_started,
227+
"num_partitions_in_progress": self._generated_partitions_count
228+
- self._num_partitions_completed,
229229
"num_partitions_completed": self._num_partitions_completed,
230230
"num_partitions_expected": self._generated_partitions_count,
231231
"is_partition_discovery_complete": self._is_partition_discovery_complete,
@@ -367,7 +367,6 @@ def _generate_slices_from_partition(
367367
with self._lock:
368368
seq = self._generated_partitions_count
369369
self._generated_partitions_count += 1
370-
self._num_partitions_started += 1
371370
self._processing_partitions_indexes.append(seq)
372371
self._partition_key_to_index[partition_key] = seq
373372

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -400,17 +400,14 @@ def run_mocked_test(
400400
assert partitioned_status is not None, (
401401
"partitioned_stream_status must always be present in state"
402402
)
403-
assert "num_partitions_started" in partitioned_status
403+
assert "num_partitions_in_progress" in partitioned_status
404404
assert "num_partitions_completed" in partitioned_status
405405
assert "num_partitions_expected" in partitioned_status
406406
assert "is_partition_discovery_complete" in partitioned_status
407-
assert (
408-
partitioned_status["num_partitions_started"]
409-
>= partitioned_status["num_partitions_completed"]
410-
)
407+
assert partitioned_status["num_partitions_in_progress"] >= 0
411408
assert (
412409
partitioned_status["num_partitions_expected"]
413-
>= partitioned_status["num_partitions_started"]
410+
>= partitioned_status["num_partitions_completed"]
414411
)
415412
_strip_partitioned_stream_status(final_state_dict)
416413
assert final_state_dict == expected_state
@@ -3694,7 +3691,7 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update
36943691
state = cursor.state
36953692
partitioned_status = state.pop("partitioned_stream_status", None)
36963693
assert partitioned_status is not None
3697-
assert partitioned_status["num_partitions_started"] == 0
3694+
assert partitioned_status["num_partitions_in_progress"] == 0
36983695
assert partitioned_status["num_partitions_completed"] == 0
36993696
assert partitioned_status["num_partitions_expected"] == 0
37003697
assert partitioned_status["is_partition_discovery_complete"] is True
@@ -3788,7 +3785,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
37883785
state = cursor.state
37893786
partitioned_status = state.pop("partitioned_stream_status", None)
37903787
assert partitioned_status is not None
3791-
assert partitioned_status["num_partitions_started"] == 2
3788+
assert partitioned_status["num_partitions_in_progress"] == 1
37923789
assert partitioned_status["num_partitions_completed"] == 1
37933790
assert partitioned_status["num_partitions_expected"] == 2
37943791
assert partitioned_status["is_partition_discovery_complete"] is True
@@ -3890,7 +3887,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
38903887
state = cursor.state
38913888
partitioned_status = state.pop("partitioned_stream_status", None)
38923889
assert partitioned_status is not None
3893-
assert partitioned_status["num_partitions_started"] == 2
3890+
assert partitioned_status["num_partitions_in_progress"] == 1
38943891
assert partitioned_status["num_partitions_completed"] == 1
38953892
assert partitioned_status["num_partitions_expected"] == 2
38963893
assert partitioned_status["is_partition_discovery_complete"] is True

unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ def test_given_record_for_partition_when_read_then_update_state(caplog):
344344
},
345345
],
346346
"partitioned_stream_status": {
347-
"num_partitions_started": 2,
347+
"num_partitions_in_progress": 0,
348348
"num_partitions_completed": 2,
349349
"num_partitions_expected": 2,
350350
"is_partition_discovery_complete": True,
@@ -588,7 +588,7 @@ def test_perpartition_with_fallback(caplog):
588588
"state": {"cursor_field": "2022-02-19"},
589589
"lookback_window": 1,
590590
"partitioned_stream_status": {
591-
"num_partitions_started": 6,
591+
"num_partitions_in_progress": 0,
592592
"num_partitions_completed": 6,
593593
"num_partitions_expected": 6,
594594
"is_partition_discovery_complete": True,
@@ -776,7 +776,7 @@ def test_per_partition_cursor_within_limit(caplog):
776776
},
777777
],
778778
"partitioned_stream_status": {
779-
"num_partitions_started": 3,
779+
"num_partitions_in_progress": 0,
780780
"num_partitions_completed": 3,
781781
"num_partitions_expected": 3,
782782
"is_partition_discovery_complete": True,

0 commit comments

Comments
 (0)