@@ -222,18 +222,6 @@ def process(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext)
222222 out .finish ()
223223
224224
225- @dataclass
226- class ZeroColumnExchangeState (ExchangeState ):
227- """State for an exchange stream with zero-column batches."""
228-
229- call_count : int = 0
230-
231- def exchange (self , input : AnnotatedBatch , out : OutputCollector , ctx : CallContext ) -> None :
232- """Accept a zero-column batch and emit a zero-column batch."""
233- self .call_count += 1
234- out .emit (pa .record_batch ([], schema = pa .schema ([])))
235-
236-
237225# ---------------------------------------------------------------------------
238226# Test fixtures: Protocol + Implementation
239227# ---------------------------------------------------------------------------
@@ -349,10 +337,6 @@ def transform_with_header(self, factor: float) -> Stream[ExchangeState, StreamHe
349337 """Exchange stream with a stream header."""
350338 ...
351339
352- def zero_column_exchange (self ) -> Stream [ExchangeState ]:
353- """Exchange stream with zero-column input and output."""
354- ...
355-
356340 def fail_stream_init_with_header (self ) -> Stream [ProducerState , StreamHeader ]:
357341 """Stream that raises during init (with header declared)."""
358342 ...
@@ -466,11 +450,6 @@ def transform_with_header(self, factor: float) -> Stream[TransformWithHeaderStat
466450 output_schema = schema , state = TransformWithHeaderState (factor = factor ), input_schema = schema , header = header
467451 )
468452
469- def zero_column_exchange (self ) -> Stream [ZeroColumnExchangeState ]:
470- """Exchange stream with zero-column input and output."""
471- empty = pa .schema ([])
472- return Stream (output_schema = empty , state = ZeroColumnExchangeState (), input_schema = empty )
473-
474453 def fail_stream_init_with_header (self ) -> Stream [GenerateWithHeaderState , StreamHeader ]:
475454 """Stream that raises during init (with header declared)."""
476455 raise ValueError ("init boom with header" )
@@ -742,17 +721,6 @@ def test_bidi_context_manager(self, make_conn: ConnFactory) -> None:
742721 output = session .exchange (AnnotatedBatch (batch = pa .RecordBatch .from_pydict ({"value" : [5.0 ]})))
743722 assert output .batch .column ("value" ).to_pylist () == [10.0 ]
744723
745- def test_zero_column_exchange_100_batches (self , make_conn : ConnFactory ) -> None :
746- """Exchange stream with zero-column batches works over 100 iterations."""
747- empty_schema = pa .schema ([])
748- empty_input = AnnotatedBatch (batch = pa .record_batch ([], schema = empty_schema ))
749- with make_conn () as proxy , proxy .zero_column_exchange () as session :
750- for _ in range (100 ):
751- output = session .exchange (empty_input )
752- assert output .batch .schema == empty_schema
753- assert output .batch .num_rows == 0
754- assert output .batch .num_columns == 0
755-
756724
757725# ---------------------------------------------------------------------------
758726# Tests: RpcConnection (context manager + proxy)
0 commit comments