Skip to content

Commit b9ac43f

Browse files
fix: add expected STATE messages to integration test for direct-return approach
Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent 26150b3 commit b9ac43f

1 file changed

Lines changed: 27 additions & 1 deletion

File tree

unit_tests/sources/test_source_read.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from unittest.mock import Mock
77

88
import freezegun
9+
from airbyte_protocol_dataclasses.models import AirbyteStateType
910

1011
from airbyte_cdk.models import (
1112
AirbyteMessage,
1213
AirbyteRecordMessage,
14+
AirbyteStateMessage,
1315
AirbyteStream,
1416
AirbyteStreamStatus,
1517
AirbyteStreamStatusTraceMessage,
@@ -22,11 +24,15 @@
2224
TraceType,
2325
)
2426
from airbyte_cdk.models import Type as MessageType
27+
from airbyte_cdk.models.airbyte_protocol import (
28+
AirbyteStateBlob,
29+
AirbyteStreamState,
30+
)
2531
from airbyte_cdk.sources import AbstractSource
2632
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
2733
from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
2834
from airbyte_cdk.sources.message import InMemoryMessageRepository
29-
from airbyte_cdk.sources.streams import Stream
35+
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY, Stream
3036
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
3137
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
3238
from airbyte_cdk.sources.streams.core import StreamData
@@ -212,6 +218,16 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_e
212218
emitted_at=1577836800000,
213219
),
214220
),
221+
AirbyteMessage(
222+
type=MessageType.STATE,
223+
state=AirbyteStateMessage(
224+
type=AirbyteStateType.STREAM,
225+
stream=AirbyteStreamState(
226+
stream_descriptor=StreamDescriptor(name="stream0"),
227+
stream_state=AirbyteStateBlob(**{NO_CURSOR_STATE_KEY: True}),
228+
),
229+
),
230+
),
215231
AirbyteMessage(
216232
type=MessageType.TRACE,
217233
trace=AirbyteTraceMessage(
@@ -283,6 +299,16 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_e
283299
emitted_at=1577836800000,
284300
),
285301
),
302+
AirbyteMessage(
303+
type=MessageType.STATE,
304+
state=AirbyteStateMessage(
305+
type=AirbyteStateType.STREAM,
306+
stream=AirbyteStreamState(
307+
stream_descriptor=StreamDescriptor(name="stream1"),
308+
stream_state=AirbyteStateBlob(**{NO_CURSOR_STATE_KEY: True}),
309+
),
310+
),
311+
),
286312
AirbyteMessage(
287313
type=MessageType.TRACE,
288314
trace=AirbyteTraceMessage(

0 commit comments

Comments
 (0)