|
40 | 40 | from google.genai import types |
41 | 41 | from vcr.record_mode import RecordMode |
42 | 42 |
|
| 43 | +try: |
| 44 | + # These modules are only supported in python >= 3.10 |
| 45 | + from aiohttp.client_exceptions import ClientConnectionError |
| 46 | + from vcr.stubs import aiohttp_stubs |
| 47 | +except ImportError: |
| 48 | + ClientConnectionError = None |
| 49 | + aiohttp_stubs = None |
| 50 | + |
43 | 51 | from opentelemetry.instrumentation._semconv import ( |
44 | 52 | OTEL_SEMCONV_STABILITY_OPT_IN, |
45 | 53 | _OpenTelemetrySemanticConventionStability, |
@@ -135,6 +143,9 @@ def _redact_headers(headers): |
135 | 143 |
|
136 | 144 |
|
137 | 145 | def _before_record_request(request): |
| 146 | + # aiohttp reports the request method in lower case while it is recorded in the cassette in upper case. |
| 147 | + if request.method: |
| 148 | + request.method = request.method.upper() |
138 | 149 | if request.headers: |
139 | 150 | _redact_headers(request.headers) |
140 | 151 | uri = request.uri |
@@ -316,6 +327,48 @@ def setup_vcr(vcr): |
316 | 327 | return vcr |
317 | 328 |
|
318 | 329 |
|
| 330 | +@pytest.fixture(name="patch_vcr_aiohttp_stream", scope="module", autouse=True) |
| 331 | +def fixture_patch_vcr_aiohttp_stream(): |
| 332 | + # Allows the async tests to not be stuck in infinite loop when streaming |
| 333 | + # a VCR cassette with aiohttp stubs. |
| 334 | + # https://github.com/kevin1024/vcrpy/issues/927 |
| 335 | + if ClientConnectionError is None or aiohttp_stubs is None: |
| 336 | + return |
| 337 | + |
| 338 | + class _ReplayMockStream(aiohttp_stubs.MockStream): |
| 339 | + # Keep vcrpy's stream behavior, but ignore aiohttp's |
| 340 | + # close-time ClientConnectionError("Connection closed") during |
| 341 | + # cassette replay, where the full response is already buffered |
| 342 | + # and this condition should be treated as normal EOF. |
| 343 | + def set_exception(self, exc): |
| 344 | + if isinstance(exc, ClientConnectionError) and exc.args == ( |
| 345 | + "Connection closed", |
| 346 | + ): |
| 347 | + return |
| 348 | + super().set_exception(exc) |
| 349 | + |
| 350 | + class _ReplayMockClientResponse(aiohttp_stubs.MockClientResponse): |
| 351 | + def __init__(self, *args, **kwargs): |
| 352 | + super().__init__(*args, **kwargs) |
| 353 | + self._mock_content_stream = None |
| 354 | + |
| 355 | + @property |
| 356 | + def content(self): |
| 357 | + # vcrpy's aiohttp MockClientResponse.content creates a fresh stream object |
| 358 | + # on every property access. google-genai async streaming repeatedly reads |
| 359 | + # response.content.readline() and expects the same stream instance until EOF is |
| 360 | + # reached. |
| 361 | + if self._mock_content_stream is None: |
| 362 | + body = self._body or b"" |
| 363 | + stream = _ReplayMockStream() |
| 364 | + stream.feed_data(body) |
| 365 | + stream.feed_eof() |
| 366 | + self._mock_content_stream = stream |
| 367 | + return self._mock_content_stream |
| 368 | + |
| 369 | + aiohttp_stubs.MockClientResponse = _ReplayMockClientResponse |
| 370 | + |
| 371 | + |
319 | 372 | @pytest.fixture(name="instrumentor") |
320 | 373 | def fixture_instrumentor(): |
321 | 374 | return GoogleGenAiSdkInstrumentor() |
|
0 commit comments