Skip to content

Commit a3faa51

Browse files
wip: renaming functions based on events.
1 parent 17cd07c commit a3faa51

4 files changed

Lines changed: 27 additions & 25 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,10 @@ def _set_output_messages(self):
144144

145145
self._self_invocation.output_messages = output_messages
146146

147-
def _stop_stream(self) -> None:
147+
def _on_stream_end(self) -> None:
148148
self._cleanup()
149149

150-
def _fail_stream(self, error: BaseException) -> None:
150+
def _on_stream_error(self, error: BaseException) -> None:
151151
self._cleanup(error)
152152

153153
def parse(self):

instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,7 @@ def test_chat_completion_streaming_instrumentation_finalize_errors_swallowed(
11151115
def stop_raises():
11161116
raise RuntimeError("instrumentation failure")
11171117

1118-
monkeypatch.setattr(response, "_stop_stream", stop_raises)
1118+
monkeypatch.setattr(response, "_on_stream_end", stop_raises)
11191119
response.close()
11201120

11211121
assert span_exporter.get_finished_spans() == ()

util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ class SyncStreamWrapper(
6767
Subclass this when wrapping a provider SDK stream that is consumed with
6868
normal iteration. The subclass should pass the SDK stream to
6969
``super().__init__(stream)`` and implement the three telemetry hooks:
70-
``_process_chunk`` for per-chunk state, ``_stop_stream`` for successful
71-
finalization, and ``_fail_stream`` for failure finalization.
70+
``_process_chunk`` for per-chunk state, ``_on_stream_end`` for successful
71+
finalization, and ``_on_stream_error`` for failure finalization.
7272
7373
Users should consume subclasses as normal streams, for example with
7474
``for chunk in wrapper`` or ``with wrapper``. The hook methods are called
@@ -137,13 +137,13 @@ def _finalize_success(self) -> None:
137137
if self._self_finalized:
138138
return
139139
self._self_finalized = True
140-
self._stop_stream()
140+
self._on_stream_end()
141141

142142
def _finalize_failure(self, error: BaseException) -> None:
143143
if self._self_finalized:
144144
return
145145
self._self_finalized = True
146-
self._fail_stream(error)
146+
self._on_stream_error(error)
147147

148148
def _safe_finalize_success(self) -> None:
149149
try:
@@ -168,11 +168,11 @@ def _process_chunk(self, chunk: ChunkT) -> None:
168168
"""Process one stream chunk for telemetry."""
169169

170170
@abstractmethod
171-
def _stop_stream(self) -> None:
171+
def _on_stream_end(self) -> None:
172172
"""Finalize the stream successfully."""
173173

174174
@abstractmethod
175-
def _fail_stream(self, error: BaseException) -> None:
175+
def _on_stream_error(self, error: BaseException) -> None:
176176
"""Finalize the stream with failure."""
177177

178178
@staticmethod
@@ -193,8 +193,8 @@ class AsyncStreamWrapper(
193193
Subclass this when wrapping a provider SDK stream that is consumed with
194194
async iteration. The subclass should pass the SDK stream to
195195
``super().__init__(stream)`` and implement the three telemetry hooks:
196-
``_process_chunk`` for per-chunk state, ``_stop_stream`` for successful
197-
finalization, and ``_fail_stream`` for failure finalization.
196+
``_process_chunk`` for per-chunk state, ``_on_stream_end`` for successful
197+
finalization, and ``_on_stream_error`` for failure finalization.
198198
199199
Users should consume subclasses as normal async streams, for example with
200200
``async for chunk in wrapper`` or ``async with wrapper``. The hook methods
@@ -266,13 +266,13 @@ def _finalize_success(self) -> None:
266266
if self._self_finalized:
267267
return
268268
self._self_finalized = True
269-
self._stop_stream()
269+
self._on_stream_end()
270270

271271
def _finalize_failure(self, error: BaseException) -> None:
272272
if self._self_finalized:
273273
return
274274
self._self_finalized = True
275-
self._fail_stream(error)
275+
self._on_stream_error(error)
276276

277277
def _safe_finalize_success(self) -> None:
278278
try:
@@ -297,11 +297,11 @@ def _process_chunk(self, chunk: ChunkT) -> None:
297297
"""Process one stream chunk for telemetry."""
298298

299299
@abstractmethod
300-
def _stop_stream(self) -> None:
300+
def _on_stream_end(self) -> None:
301301
"""Finalize the stream successfully."""
302302

303303
@abstractmethod
304-
def _fail_stream(self, error: BaseException) -> None:
304+
def _on_stream_error(self, error: BaseException) -> None:
305305
"""Finalize the stream with failure."""
306306

307307
@staticmethod

util/opentelemetry-util-genai/tests/test_stream.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
# pylint: disable=abstract-class-instantiated
16+
1517
import asyncio
1618
import inspect
1719

@@ -26,8 +28,8 @@
2628
def test_stream_wrapper_abstract_method_signatures_match():
2729
method_names = (
2830
"_process_chunk",
29-
"_stop_stream",
30-
"_fail_stream",
31+
"_on_stream_end",
32+
"_on_stream_error",
3133
"_handle_process_chunk_error",
3234
)
3335

@@ -86,10 +88,10 @@ def __init__(self, stream):
8688
def _process_chunk(self, chunk):
8789
self._self_processed.append(chunk)
8890

89-
def _stop_stream(self):
91+
def _on_stream_end(self):
9092
self._self_stop_count += 1
9193

92-
def _fail_stream(self, error):
94+
def _on_stream_error(self, error):
9395
self._self_failures.append(error)
9496

9597

@@ -99,13 +101,13 @@ def _process_chunk(self, chunk):
99101

100102

101103
class _FailingSyncStopStreamWrapper(_TestSyncStreamWrapper):
102-
def _stop_stream(self):
104+
def _on_stream_end(self):
103105
self._self_stop_count += 1
104106
raise ValueError("instrumentation failed")
105107

106108

107109
class _FailingSyncFailStreamWrapper(_TestSyncStreamWrapper):
108-
def _fail_stream(self, error):
110+
def _on_stream_error(self, error):
109111
self._self_failures.append(error)
110112
raise ValueError("instrumentation failed")
111113

@@ -325,10 +327,10 @@ def __init__(self, stream):
325327
def _process_chunk(self, chunk):
326328
self._self_processed.append(chunk)
327329

328-
def _stop_stream(self):
330+
def _on_stream_end(self):
329331
self._self_stop_count += 1
330332

331-
def _fail_stream(self, error):
333+
def _on_stream_error(self, error):
332334
self._self_failures.append(error)
333335

334336

@@ -338,13 +340,13 @@ def _process_chunk(self, chunk):
338340

339341

340342
class _FailingAsyncStopStreamWrapper(_TestAsyncStreamWrapper):
341-
def _stop_stream(self):
343+
def _on_stream_end(self):
342344
self._self_stop_count += 1
343345
raise ValueError("instrumentation failed")
344346

345347

346348
class _FailingAsyncFailStreamWrapper(_TestAsyncStreamWrapper):
347-
def _fail_stream(self, error):
349+
def _on_stream_error(self, error):
348350
self._self_failures.append(error)
349351
raise ValueError("instrumentation failed")
350352

0 commit comments

Comments
 (0)