@@ -53,6 +53,8 @@ async with publisher:
5353 publisher.send(FooEvent(foo = " bar" ))
5454```
5555
56+ Protocol implementations will be responsible for creating publishers.
57+
5658## Event Receivers
5759
5860An ` AsyncEventReceiver ` is used to receive events from a service.
@@ -131,6 +133,8 @@ async for event in reciever:
131133 handle_event(event)
132134```
133135
136+ Protocol implementations will be responsible for creating receivers.
137+
134138### Errors
135139
136140Event streams may define modeled errors that may be sent over the stream. These
@@ -169,38 +173,30 @@ are handled by the following classes:
169173* ` OutputEventStream ` is returned when the operation only has an output stream.
170174
171175``` python
172- class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol):
173-
174- input_stream: AsyncEventPublisher[I]
175-
176- _output_stream: AsyncEventReceiver[O] | None = None
177- _response: R | None = None
178-
179- @ property
180- def output_stream (self ) -> AsyncEventReceiver[O] | None :
181- return self ._output_stream
182-
183- @output_stream.setter
184- def output_stream (self , value : AsyncEventReceiver[O]) -> None :
185- self ._output_stream = value
186-
187- @ property
188- def response (self ) -> R | None :
189- return self ._response
190-
191- @response.setter
192- def response (self , value : R) -> None :
193- self ._response = value
194-
195- async def await_output (self ) -> tuple[R, AsyncEventReceiver[O]]:
176+ class DuplexEventStream[
177+ IE : SerializeableShape,
178+ OE : DeserializeableShape,
179+ O: DeserializeableShape,
180+ ]:
181+
182+ input_stream: EventPublisher[IE ]
183+ output_stream: EventReceiver[OE ] | None = None
184+ output: O | None = None
185+
186+ def __init__ (
187+ self ,
188+ * ,
189+ input_stream : EventPublisher[IE ],
190+ output_future : Future[tuple[O, EventReceiver[OE ]]],
191+ ) -> None :
192+ self .input_stream = input_stream
193+ self ._output_future = output_future
194+
195+ async def await_output (self ) -> tuple[O, EventReceiver[OE ]]:
196196 ...
197197
198198 async def close (self ) -> None :
199- if self .output_stream is None :
200- _, self .output_stream = await self .await_output()
201-
202- await self .input_stream.close()
203- await self .output_stream.close()
199+ ...
204200
205201 async def __aenter__ (self ) -> Self:
206202 return self
@@ -209,21 +205,21 @@ class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protoco
209205 await self .close()
210206
211207
212- class InputEventStream[I: SerializableShape, R](Protocol) :
208+ class InputEventStream[IE : SerializeableShape, O] :
213209
214- input_stream: AsyncEventPublisher[I]
210+ input_stream: EventPublisher[IE ]
211+ output: O | None = None
215212
216- _response: R | None = None
213+ def __init__ (
214+ self ,
215+ * ,
216+ input_stream : EventPublisher[IE ],
217+ output_future : Future[O],
218+ ) -> None :
219+ self .input_stream = input_stream
220+ self ._output_future = output_future
217221
218- @ property
219- def response (self ) -> R | None :
220- return self ._response
221-
222- @response.setter
223- def response (self , value : R) -> None :
224- self ._response = value
225-
226- async def await_output (self ) -> R:
222+ async def await_output (self ) -> O:
227223 ...
228224
229225 async def close (self ) -> None :
@@ -236,11 +232,14 @@ class InputEventStream[I: SerializableShape, R](Protocol):
236232 await self .close()
237233
238234
239- class OutputEventStream[O: DeserializableShape, R](Protocol) :
235+ class OutputEventStream[OE : DeserializeableShape, O: DeserializeableShape] :
240236
241- output_stream: AsyncEventReceiver[O]
242-
243- response: R
237+ output_stream: EventReceiver[OE ]
238+ output: O
239+
240+ def __init__ (self , output_stream : EventReceiver[OE ], output : O) -> None :
241+ self .output_stream = output_stream
242+ self .output = output
244243
245244 async def close (self ) -> None :
246245 await self .output_stream.close()
@@ -258,7 +257,7 @@ the underlying publisher and/or receiver.
258257
259258Both ` InputEventStream ` and ` DuplexEventStream ` have an ` await_output ` method
260259that waits for the initial request to be received, returning that and the output
261- stream. Their ` response ` and ` output_stream ` properties will not be set until
260+ stream. Their ` output ` and ` output_stream ` properties will not be set until
262261then. This is important because clients MUST be able to start sending events to
263262the service immediately, without waiting for the initial response. This is
264263critical because there are existing services that require one or more events to
@@ -278,8 +277,8 @@ with await client.input_operation() as stream:
278277 stream.input_stream.send(FooEvent(foo = " bar" ))
279278```
280279
281- The ` OutputEventStream ` 's initial ` response ` and ` output_stream ` will never be
282- ` None ` , however. Instead, the ` ClientProtocol ` MUST set values for these when
280+ The ` OutputEventStream ` 's ` output ` and ` output_stream ` will never be ` None ` ,
281+ however. Instead, the ` ClientProtocol ` MUST set values for these when
283282constructing the object. This differs from the other stream types because the
284283lack of an input stream means that the service has nothing to wait on from the
285284client before sending responses.
@@ -290,6 +289,9 @@ with await client.output_operation() as stream:
290289 handle_event(event)
291290```
292291
292+ All three output types are centrally located and will be constructed by filling
293+ in the relevant publishers and receivers from the protocol implementation.
294+
293295## Event Structure
294296
295297Event messages are structurally similar to HTTP messages. They consist of a map
0 commit comments