Skip to content

Commit 79c62b5

Browse files
committed
Fix async streaming issue
1 parent d97bb9e commit 79c62b5

1 file changed

Lines changed: 33 additions & 29 deletions

File tree

  • instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,41 +1228,45 @@ async def instrumented_generate_content_stream(
12281228
)
12291229
)
12301230
if helper.experimental_sem_convs_enabled:
1231-
with telemetry_handler.inference(
1231+
invocation = telemetry_handler.start_inference(
12321232
provider=helper._genai_system,
12331233
request_model=model,
12341234
operation_name="generate_content",
1235-
) as invocation:
1236-
invocation.attributes.update(extra_attributes)
1237-
invocation.tool_definitions = (
1238-
await helper._maybe_get_tool_definitions_async(config)
1239-
)
1240-
invocation.input_messages = to_input_messages(
1241-
contents=transformers.t_contents(contents)
1235+
)
1236+
invocation.attributes.update(extra_attributes)
1237+
invocation.tool_definitions = (
1238+
await helper._maybe_get_tool_definitions_async(config)
1239+
)
1240+
invocation.input_messages = to_input_messages(
1241+
contents=transformers.t_contents(contents)
1242+
)
1243+
if system_content := _config_to_system_instruction(config):
1244+
invocation.system_instruction = to_system_instructions(
1245+
content=transformers.t_contents(system_content)[0]
12421246
)
1243-
if system_content := _config_to_system_instruction(config):
1244-
invocation.system_instruction = to_system_instructions(
1245-
content=transformers.t_contents(system_content)[0]
1246-
)
1247+
1248+
async def _response_async_generator_wrapper():
12471249
candidates = []
1250+
try:
1251+
async for resp in await wrapped_func(
1252+
self,
1253+
model=model,
1254+
contents=contents,
1255+
config=helper.wrapped_config(config),
1256+
**kwargs,
1257+
):
1258+
helper._update_response(resp)
1259+
if resp.candidates:
1260+
candidates += resp.candidates
1261+
yield resp
1262+
helper.apply_finish_attributes(invocation, candidates)
1263+
invocation.stop()
1264+
except Exception as exc:
1265+
helper.apply_finish_attributes(invocation, candidates)
1266+
invocation.fail(exc)
1267+
raise
12481268

1249-
async def _response_async_generator_wrapper(candidates):
1250-
try:
1251-
async for resp in await wrapped_func(
1252-
self,
1253-
model=model,
1254-
contents=contents,
1255-
config=helper.wrapped_config(config),
1256-
**kwargs,
1257-
):
1258-
helper._update_response(resp)
1259-
if resp.candidates:
1260-
candidates += resp.candidates
1261-
yield resp
1262-
finally:
1263-
helper.apply_finish_attributes(invocation, candidates)
1264-
1265-
return _response_async_generator_wrapper(candidates)
1269+
return _response_async_generator_wrapper()
12661270
else:
12671271
with helper.start_span_as_current_span(
12681272
model,

0 commit comments

Comments
 (0)