|
4 | 4 |
|
5 | 5 | namespace OpenTelemetry\SDK\Common\Export\Http; |
6 | 6 |
|
7 | | -use Http\Discovery\Psr17FactoryDiscovery; |
| 7 | +use function assert; |
| 8 | +use BadMethodCallException; |
| 9 | +use function explode; |
| 10 | +use function in_array; |
| 11 | +use OpenTelemetry\SDK\Common\Export\TransportInterface; |
8 | 12 | use OpenTelemetry\SDK\Common\Future\CancellationInterface; |
9 | 13 | use OpenTelemetry\SDK\Common\Future\CompletedFuture; |
10 | 14 | use OpenTelemetry\SDK\Common\Future\ErrorFuture; |
11 | 15 | use OpenTelemetry\SDK\Common\Future\FutureInterface; |
12 | 16 | use Psr\Http\Client\ClientInterface; |
| 17 | +use Psr\Http\Client\NetworkExceptionInterface; |
13 | 18 | use Psr\Http\Message\RequestFactoryInterface; |
14 | | -use Psr\Http\Message\RequestInterface; |
15 | 19 | use Psr\Http\Message\ResponseInterface; |
16 | 20 | use Psr\Http\Message\StreamFactoryInterface; |
| 21 | +use RuntimeException; |
| 22 | +use function strtolower; |
17 | 23 | use Throwable; |
| 24 | +use function time_nanosleep; |
| 25 | +use function trim; |
18 | 26 |
|
19 | 27 | /** |
20 | | - * PSR-7/PSR-18 HTTP transport for OTLP/HTTP exporters. |
21 | | - * |
22 | | - * ### Response body size limiting (issue #1932) |
23 | | - * |
24 | | - * All response body reads are funnelled through |
25 | | - * {@see PsrUtils::readBodyWithSizeLimit()}, which caps consumption at 4 MiB. |
26 | | - * This prevents a misconfigured or malicious collector from causing unbounded |
27 | | - * memory growth in the PHP process. |
| 28 | + * @psalm-template CONTENT_TYPE of string |
| 29 | + * @template-implements TransportInterface<CONTENT_TYPE> |
28 | 30 | */ |
29 | | -final class PsrTransport |
| 31 | +final class PsrTransport implements TransportInterface |
30 | 32 | { |
31 | | - private ClientInterface $client; |
32 | | - private RequestFactoryInterface $requestFactory; |
33 | | - private StreamFactoryInterface $streamFactory; |
34 | | - private string $endpoint; |
35 | | - private string $contentType; |
36 | | - |
37 | | - /** @var array<string,string> */ |
38 | | - private array $headers; |
39 | | - private string $compression; |
| 33 | + private bool $closed = false; |
40 | 34 |
|
| 35 | + /** |
| 36 | + * @psalm-param CONTENT_TYPE $contentType |
| 37 | + */ |
41 | 38 | public function __construct( |
42 | | - ClientInterface $client, |
43 | | - RequestFactoryInterface $requestFactory, |
44 | | - StreamFactoryInterface $streamFactory, |
45 | | - string $endpoint, |
46 | | - string $contentType, |
47 | | - array $headers = [], |
48 | | - string $compression = 'none' |
| 39 | + private readonly ClientInterface $client, |
| 40 | + private readonly RequestFactoryInterface $requestFactory, |
| 41 | + private readonly StreamFactoryInterface $streamFactory, |
| 42 | + private readonly string $endpoint, |
| 43 | + private readonly string $contentType, |
| 44 | + private readonly array $headers, |
| 45 | + private readonly array $compression, |
| 46 | + private readonly int $retryDelay, |
| 47 | + private readonly int $maxRetries, |
49 | 48 | ) { |
50 | | - $this->client = $client; |
51 | | - $this->requestFactory = $requestFactory; |
52 | | - $this->streamFactory = $streamFactory; |
53 | | - $this->endpoint = $endpoint; |
54 | | - $this->contentType = $contentType; |
55 | | - $this->headers = $headers; |
56 | | - $this->compression = $compression; |
57 | 49 | } |
58 | 50 |
|
59 | | - /** |
60 | | - * Send $payload to the OTLP endpoint and return a Future that resolves to |
61 | | - * the (size-limited, decoded) response body string. |
62 | | - * |
63 | | - * @param string $payload Serialised protobuf or JSON export request. |
64 | | - * |
65 | | - * @return FutureInterface<string> |
66 | | - */ |
67 | | - public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface |
| 51 | + #[\Override] |
| 52 | + public function contentType(): string |
68 | 53 | { |
69 | | - try { |
70 | | - $request = $this->buildRequest($payload); |
71 | | - $response = $this->client->sendRequest($request); |
72 | | - |
73 | | - return new CompletedFuture($this->handleResponse($response)); |
74 | | - } catch (Throwable $e) { |
75 | | - return new ErrorFuture($e); |
76 | | - } |
| 54 | + return $this->contentType; |
77 | 55 | } |
78 | 56 |
|
79 | | - // ------------------------------------------------------------------------- |
80 | | - // Private helpers |
81 | | - // ------------------------------------------------------------------------- |
82 | | - |
83 | | - private function buildRequest(string $payload): RequestInterface |
| 57 | + /** |
| 58 | + * @psalm-suppress ArgumentTypeCoercion |
| 59 | + */ |
| 60 | + #[\Override] |
| 61 | + public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface |
84 | 62 | { |
85 | | - $body = $payload; |
86 | | - |
87 | | - if ($this->compression === 'gzip') { |
88 | | - $body = gzencode($payload); |
| 63 | + if ($this->closed) { |
| 64 | + return new ErrorFuture(new BadMethodCallException('Transport closed')); |
89 | 65 | } |
90 | 66 |
|
91 | | - $stream = $this->streamFactory->createStream($body); |
92 | | - |
| 67 | + $body = PsrUtils::encode($payload, $this->compression, $appliedEncodings); |
93 | 68 | $request = $this->requestFactory |
94 | 69 | ->createRequest('POST', $this->endpoint) |
95 | | - ->withBody($stream) |
96 | | - ->withHeader('Content-Type', $this->contentType); |
| 70 | + ->withBody($this->streamFactory->createStream($body)) |
| 71 | + ->withHeader('Content-Type', $this->contentType) |
| 72 | + ; |
| 73 | + if ($appliedEncodings) { |
| 74 | + $request = $request->withHeader('Content-Encoding', $appliedEncodings); |
| 75 | + } |
| 76 | + foreach ($this->headers as $header => $value) { |
| 77 | + $request = $request->withAddedHeader($header, $value); |
| 78 | + } |
97 | 79 |
|
98 | | - if ($this->compression === 'gzip') { |
99 | | - $request = $request->withHeader('Content-Encoding', 'gzip'); |
| 80 | + for ($retries = 0;; $retries++) { |
| 81 | + $response = null; |
| 82 | + $e = null; |
| 83 | + |
| 84 | + try { |
| 85 | + $response = $this->client->sendRequest($request); |
| 86 | + if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { |
| 87 | + break; |
| 88 | + } |
| 89 | + |
| 90 | + if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500 && !in_array($response->getStatusCode(), [408, 429], true)) { |
| 91 | + throw new RuntimeException($response->getReasonPhrase(), $response->getStatusCode()); |
| 92 | + } |
| 93 | + } catch (NetworkExceptionInterface $e) { |
| 94 | + } catch (Throwable $e) { |
| 95 | + return new ErrorFuture($e); |
| 96 | + } |
| 97 | + |
| 98 | + if ($retries >= $this->maxRetries) { |
| 99 | + return new ErrorFuture(new RuntimeException('Export retry limit exceeded', 0, $e)); |
| 100 | + } |
| 101 | + |
| 102 | + $delay = PsrUtils::retryDelay($retries, $this->retryDelay, $response); |
| 103 | + $sec = (int) $delay; |
| 104 | + $nsec = (int) (($delay - (float) $sec) * 1e9); |
| 105 | + |
| 106 | + /** @psalm-suppress ArgumentTypeCoercion */ |
| 107 | + if (time_nanosleep($sec, $nsec) !== true) { |
| 108 | + return new ErrorFuture(new RuntimeException('Export cancelled', 0, $e)); |
| 109 | + } |
100 | 110 | } |
101 | 111 |
|
102 | | - foreach ($this->headers as $name => $value) { |
103 | | - $request = $request->withHeader($name, $value); |
| 112 | + assert(isset($response)); |
| 113 | + |
| 114 | + try { |
| 115 | + $body = PsrUtils::decode( |
| 116 | + $response, |
| 117 | + self::parseContentEncoding($response), |
| 118 | + ); |
| 119 | + } catch (Throwable $e) { |
| 120 | + return new ErrorFuture($e); |
104 | 121 | } |
105 | 122 |
|
106 | | - return $request; |
| 123 | + return new CompletedFuture($body); |
107 | 124 | } |
108 | 125 |
|
109 | 126 | /** |
110 | | - * Read and decode the response body, subject to the 4 MiB limit defined |
111 | | - * by {@see ResponseBodySizeLimit::MAX_BYTES}. |
112 | | - * |
113 | | - * For non-2xx responses an exception is thrown so the exporter can apply |
114 | | - * its retry / drop logic. |
115 | | - * |
116 | | - * @throws TransportResponseException on HTTP error status codes. |
| 127 | + * @return list<string> |
117 | 128 | */ |
118 | | - private function handleResponse(ResponseInterface $response): string |
| 129 | + private static function parseContentEncoding(ResponseInterface $response): array |
119 | 130 | { |
120 | | - $statusCode = $response->getStatusCode(); |
| 131 | + $encodings = []; |
| 132 | + foreach (explode(',', $response->getHeaderLine('Content-Encoding')) as $encoding) { |
| 133 | + if (($encoding = trim($encoding, " \t")) !== '') { |
| 134 | + $encodings[] = strtolower($encoding); |
| 135 | + } |
| 136 | + } |
121 | 137 |
|
122 | | - // Always read (and limit) the body first — we need it for error details. |
123 | | - $body = PsrUtils::decode($response); |
| 138 | + return $encodings; |
| 139 | + } |
124 | 140 |
|
125 | | - if ($statusCode >= 200 && $statusCode < 300) { |
126 | | - return $body; |
| 141 | + #[\Override] |
| 142 | + public function shutdown(?CancellationInterface $cancellation = null): bool |
| 143 | + { |
| 144 | + if ($this->closed) { |
| 145 | + return false; |
127 | 146 | } |
128 | 147 |
|
129 | | - throw new TransportResponseException( |
130 | | - $statusCode, |
131 | | - $body, |
132 | | - sprintf( |
133 | | - 'OTLP export failed with HTTP %d. Body (up to %d bytes): %s', |
134 | | - $statusCode, |
135 | | - ResponseBodySizeLimit::MAX_BYTES, |
136 | | - $body !== '' ? $body : '(empty)' |
137 | | - ) |
138 | | - ); |
| 148 | + $this->closed = true; |
| 149 | + |
| 150 | + return true; |
| 151 | + } |
| 152 | + |
| 153 | + #[\Override] |
| 154 | + public function forceFlush(?CancellationInterface $cancellation = null): bool |
| 155 | + { |
| 156 | + return !$this->closed; |
139 | 157 | } |
140 | 158 | } |
0 commit comments