Skip to content

Commit bf2963c

Browse files
feat: add _partitions_observed tracking via observe() for in_progress/pending distinction
- Add _partitions_observed: set[str] to track partitions where observe() has been called - Update state property to emit 4 fields: in_progress, completed, expected, is_partition_discovery_complete - num_partitions_in_progress = len(observed) - completed (worker started but not finished) - Dropped num_partitions_not_started (derivable from expected - in_progress - completed) - _cleanup_if_done adds partition to observed set to prevent negative in_progress - Updated all test assertions for new semantics
1 parent cfb4543 commit bf2963c

2 files changed

Lines changed: 23 additions & 8 deletions

File tree

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ def __init__(
190190
# (_set_initial_state does not read them back). On resume, they reset to 0.
191191
self._num_partitions_completed: int = 0
192192
self._is_partition_discovery_complete: bool = False
193+
# Tracks partition keys for which observe() has been called (worker produced at least one record).
194+
# Only len() is used in state emission; the set itself is never serialized.
195+
self._partitions_observed: set[str] = set()
193196

194197
self._set_initial_state(stream_state)
195198

@@ -223,9 +226,9 @@ def state(self) -> MutableMapping[str, Any]:
223226
state["lookback_window"] = self._lookback_window
224227
if self._parent_state is not None:
225228
state["parent_state"] = self._parent_state
229+
num_observed = len(self._partitions_observed)
226230
state["partitioned_stream_status"] = {
227-
"num_partitions_in_progress": self._generated_partitions_count
228-
- self._num_partitions_completed,
231+
"num_partitions_in_progress": num_observed - self._num_partitions_completed,
229232
"num_partitions_completed": self._num_partitions_completed,
230233
"num_partitions_expected": self._generated_partitions_count,
231234
"is_partition_discovery_complete": self._is_partition_discovery_complete,
@@ -552,11 +555,11 @@ def observe(self, record: Record) -> None:
552555
return
553556

554557
self._synced_some_data = True
558+
partition_key = self._to_partition_key(record.associated_slice.partition)
559+
self._partitions_observed.add(partition_key)
555560
self._update_global_cursor(record_cursor)
556561
if not self._use_global_cursor:
557-
self._cursor_per_partition[
558-
self._to_partition_key(record.associated_slice.partition)
559-
].observe(record)
562+
self._cursor_per_partition[partition_key].observe(record)
560563

561564
def _update_global_cursor(self, value: Any) -> None:
562565
if (
@@ -581,6 +584,8 @@ def _cleanup_if_done(self, partition_key: str) -> None:
581584

582585
seq = self._partition_key_to_index.pop(partition_key)
583586
self._processing_partitions_indexes.remove(seq)
587+
# Ensure completed partitions are counted as observed (handles partitions with no records)
588+
self._partitions_observed.add(partition_key)
584589
self._num_partitions_completed += 1
585590

586591
logger.debug(f"Partition {partition_key} fully processed and cleaned up.")

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,9 +405,11 @@ def run_mocked_test(
405405
assert "num_partitions_expected" in partitioned_status
406406
assert "is_partition_discovery_complete" in partitioned_status
407407
assert partitioned_status["num_partitions_in_progress"] >= 0
408+
assert partitioned_status["num_partitions_completed"] >= 0
408409
assert (
409410
partitioned_status["num_partitions_expected"]
410-
>= partitioned_status["num_partitions_completed"]
411+
>= partitioned_status["num_partitions_in_progress"]
412+
+ partitioned_status["num_partitions_completed"]
411413
)
412414
_strip_partitioned_stream_status(final_state_dict)
413415
assert final_state_dict == expected_state
@@ -3695,6 +3697,12 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update
36953697
assert partitioned_status["num_partitions_completed"] == 0
36963698
assert partitioned_status["num_partitions_expected"] == 0
36973699
assert partitioned_status["is_partition_discovery_complete"] is True
3700+
# Invariant: in_progress + completed <= expected
3701+
assert (
3702+
partitioned_status["num_partitions_in_progress"]
3703+
+ partitioned_status["num_partitions_completed"]
3704+
<= partitioned_status["num_partitions_expected"]
3705+
)
36983706
assert state == {
36993707
"use_global_cursor": False,
37003708
"lookback_window": 0,
@@ -3785,7 +3793,8 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
37853793
state = cursor.state
37863794
partitioned_status = state.pop("partitioned_stream_status", None)
37873795
assert partitioned_status is not None
3788-
assert partitioned_status["num_partitions_in_progress"] == 1
3796+
# observe() not called in this test, so in_progress comes only from _cleanup_if_done adding to observed
3797+
assert partitioned_status["num_partitions_in_progress"] == 0
37893798
assert partitioned_status["num_partitions_completed"] == 1
37903799
assert partitioned_status["num_partitions_expected"] == 2
37913800
assert partitioned_status["is_partition_discovery_complete"] is True
@@ -3887,7 +3896,8 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
38873896
state = cursor.state
38883897
partitioned_status = state.pop("partitioned_stream_status", None)
38893898
assert partitioned_status is not None
3890-
assert partitioned_status["num_partitions_in_progress"] == 1
3899+
# observe() not called in this test, so in_progress comes only from _cleanup_if_done adding to observed
3900+
assert partitioned_status["num_partitions_in_progress"] == 0
38913901
assert partitioned_status["num_partitions_completed"] == 1
38923902
assert partitioned_status["num_partitions_expected"] == 2
38933903
assert partitioned_status["is_partition_discovery_complete"] is True

0 commit comments

Comments
 (0)