|
1 | | -from collections.abc import Iterable |
2 | | - |
3 | 1 | from pynumaflow.shared.asynciter import NonBlockingIterator |
4 | 2 | from pynumaflow.sourcer import ReadRequest, Message, UserMetadata |
5 | 3 | from pynumaflow.sourcer import ( |
@@ -89,29 +87,7 @@ async def active_partitions_handler(self) -> PartitionsResponse: |
89 | 87 | async def total_partitions_handler(self) -> int | None: |
90 | 88 | return 10 |
91 | 89 |
|
92 | | - |
93 | | -class SyncSource(Sourcer): |
94 | | - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: |
95 | | - payload = b"payload:test_mock_message" |
96 | | - keys = ["test_key"] |
97 | | - offset = mock_offset() |
98 | | - event_time = mock_event_time() |
99 | | - for i in range(10): |
100 | | - yield Message(payload=payload, keys=keys, offset=offset, event_time=event_time) |
101 | | - |
102 | | - def ack_handler(self, ack_request: AckRequest): |
103 | | - return |
104 | | - |
105 | | - def nack_handler(self, nack_request: NackRequest): |
106 | | - return |
107 | | - |
108 | | - def pending_handler(self) -> PendingResponse: |
109 | | - return PendingResponse(count=10) |
110 | | - |
111 | | - def active_partitions_handler(self) -> PartitionsResponse: |
112 | | - return PartitionsResponse(partitions=mock_partitions()) |
113 | | - |
114 | | - |
| 90 | + |
115 | 91 | def read_req_source_fn() -> ReadRequest: |
116 | 92 | request = source_pb2.ReadRequest.Request( |
117 | 93 | num_records=10, |
@@ -162,20 +138,3 @@ async def pending_handler(self) -> PendingResponse: |
162 | 138 |
|
163 | 139 | async def active_partitions_handler(self) -> PartitionsResponse: |
164 | 140 | raise RuntimeError("Got a runtime error from partition handler.") |
165 | | - |
166 | | - |
167 | | -class SyncSourceError(Sourcer): |
168 | | - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: |
169 | | - raise RuntimeError("Got a runtime error from read handler.") |
170 | | - |
171 | | - def ack_handler(self, ack_request: AckRequest): |
172 | | - raise RuntimeError("Got a runtime error from ack handler.") |
173 | | - |
174 | | - def nack_handler(self, nack_request: NackRequest): |
175 | | - raise RuntimeError("Got a runtime error from nack handler.") |
176 | | - |
177 | | - def pending_handler(self) -> PendingResponse: |
178 | | - raise RuntimeError("Got a runtime error from pending handler.") |
179 | | - |
180 | | - def active_partitions_handler(self) -> PartitionsResponse: |
181 | | - raise RuntimeError("Got a runtime error from partition handler.") |
0 commit comments