|
4 | 4 |
|
5 | 5 | namespace OpenTelemetry\SDK\Common\Export\Http; |
6 | 6 |
|
7 | | -use function assert; |
8 | | -use BadMethodCallException; |
9 | | -use function explode; |
10 | | -use function in_array; |
11 | | -use OpenTelemetry\SDK\Common\Export\TransportInterface; |
| 7 | +use Http\Discovery\Psr17FactoryDiscovery; |
12 | 8 | use OpenTelemetry\SDK\Common\Future\CancellationInterface; |
13 | 9 | use OpenTelemetry\SDK\Common\Future\CompletedFuture; |
14 | 10 | use OpenTelemetry\SDK\Common\Future\ErrorFuture; |
15 | 11 | use OpenTelemetry\SDK\Common\Future\FutureInterface; |
16 | 12 | use Psr\Http\Client\ClientInterface; |
17 | | -use Psr\Http\Client\NetworkExceptionInterface; |
18 | 13 | use Psr\Http\Message\RequestFactoryInterface; |
| 14 | +use Psr\Http\Message\RequestInterface; |
19 | 15 | use Psr\Http\Message\ResponseInterface; |
20 | 16 | use Psr\Http\Message\StreamFactoryInterface; |
21 | | -use RuntimeException; |
22 | | -use function strtolower; |
23 | 17 | use Throwable; |
24 | | -use function time_nanosleep; |
25 | | -use function trim; |
26 | 18 |
|
27 | 19 | /** |
28 | | - * @psalm-template CONTENT_TYPE of string |
29 | | - * @template-implements TransportInterface<CONTENT_TYPE> |
| 20 | + * PSR-7/PSR-18 HTTP transport for OTLP/HTTP exporters. |
| 21 | + * |
| 22 | + * ### Response body size limiting (issue #1932) |
| 23 | + * |
| 24 | + * All response body reads are funnelled through |
| 25 | + * {@see PsrUtils::readBodyWithSizeLimit()}, which caps consumption at 4 MiB. |
| 26 | + * This prevents a misconfigured or malicious collector from causing unbounded |
| 27 | + * memory growth in the PHP process. |
30 | 28 | */ |
31 | | -final class PsrTransport implements TransportInterface |
| 29 | +final class PsrTransport |
32 | 30 | { |
33 | | - private bool $closed = false; |
| 31 | + private ClientInterface $client; |
| 32 | + private RequestFactoryInterface $requestFactory; |
| 33 | + private StreamFactoryInterface $streamFactory; |
| 34 | + private string $endpoint; |
| 35 | + private string $contentType; |
| 36 | + |
| 37 | + /** @var array<string,string> */ |
| 38 | + private array $headers; |
| 39 | + private string $compression; |
34 | 40 |
|
35 | | - /** |
36 | | - * @psalm-param CONTENT_TYPE $contentType |
37 | | - */ |
38 | 41 | public function __construct( |
39 | | - private readonly ClientInterface $client, |
40 | | - private readonly RequestFactoryInterface $requestFactory, |
41 | | - private readonly StreamFactoryInterface $streamFactory, |
42 | | - private readonly string $endpoint, |
43 | | - private readonly string $contentType, |
44 | | - private readonly array $headers, |
45 | | - private readonly array $compression, |
46 | | - private readonly int $retryDelay, |
47 | | - private readonly int $maxRetries, |
| 42 | + ClientInterface $client, |
| 43 | + RequestFactoryInterface $requestFactory, |
| 44 | + StreamFactoryInterface $streamFactory, |
| 45 | + string $endpoint, |
| 46 | + string $contentType, |
| 47 | + array $headers = [], |
| 48 | + string $compression = 'none' |
48 | 49 | ) { |
49 | | - } |
50 | | - |
51 | | - #[\Override] |
52 | | - public function contentType(): string |
53 | | - { |
54 | | - return $this->contentType; |
| 50 | + $this->client = $client; |
| 51 | + $this->requestFactory = $requestFactory; |
| 52 | + $this->streamFactory = $streamFactory; |
| 53 | + $this->endpoint = $endpoint; |
| 54 | + $this->contentType = $contentType; |
| 55 | + $this->headers = $headers; |
| 56 | + $this->compression = $compression; |
55 | 57 | } |
56 | 58 |
|
57 | 59 | /** |
58 | | - * @psalm-suppress ArgumentTypeCoercion |
| 60 | + * Send $payload to the OTLP endpoint and return a Future that resolves to |
| 61 | + * the (size-limited, decoded) response body string. |
| 62 | + * |
| 63 | + * @param string $payload Serialised protobuf or JSON export request. |
| 64 | + * |
| 65 | + * @return FutureInterface<string> |
59 | 66 | */ |
60 | | - #[\Override] |
61 | 67 | public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface |
62 | 68 | { |
63 | | - if ($this->closed) { |
64 | | - return new ErrorFuture(new BadMethodCallException('Transport closed')); |
| 69 | + try { |
| 70 | + $request = $this->buildRequest($payload); |
| 71 | + $response = $this->client->sendRequest($request); |
| 72 | + |
| 73 | + return new CompletedFuture($this->handleResponse($response)); |
| 74 | + } catch (Throwable $e) { |
| 75 | + return new ErrorFuture($e); |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + // ------------------------------------------------------------------------- |
| 80 | + // Private helpers |
| 81 | + // ------------------------------------------------------------------------- |
| 82 | + |
| 83 | + private function buildRequest(string $payload): RequestInterface |
| 84 | + { |
| 85 | + $body = $payload; |
| 86 | + |
| 87 | + if ($this->compression === 'gzip') { |
| 88 | + $body = gzencode($payload); |
65 | 89 | } |
66 | 90 |
|
67 | | - $body = PsrUtils::encode($payload, $this->compression, $appliedEncodings); |
| 91 | + $stream = $this->streamFactory->createStream($body); |
| 92 | + |
68 | 93 | $request = $this->requestFactory |
69 | 94 | ->createRequest('POST', $this->endpoint) |
70 | | - ->withBody($this->streamFactory->createStream($body)) |
71 | | - ->withHeader('Content-Type', $this->contentType) |
72 | | - ; |
73 | | - if ($appliedEncodings) { |
74 | | - $request = $request->withHeader('Content-Encoding', $appliedEncodings); |
75 | | - } |
76 | | - foreach ($this->headers as $header => $value) { |
77 | | - $request = $request->withAddedHeader($header, $value); |
78 | | - } |
| 95 | + ->withBody($stream) |
| 96 | + ->withHeader('Content-Type', $this->contentType); |
79 | 97 |
|
80 | | - for ($retries = 0;; $retries++) { |
81 | | - $response = null; |
82 | | - $e = null; |
83 | | - |
84 | | - try { |
85 | | - $response = $this->client->sendRequest($request); |
86 | | - if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { |
87 | | - break; |
88 | | - } |
89 | | - |
90 | | - if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500 && !in_array($response->getStatusCode(), [408, 429], true)) { |
91 | | - throw new RuntimeException($response->getReasonPhrase(), $response->getStatusCode()); |
92 | | - } |
93 | | - } catch (NetworkExceptionInterface $e) { |
94 | | - } catch (Throwable $e) { |
95 | | - return new ErrorFuture($e); |
96 | | - } |
97 | | - |
98 | | - if ($retries >= $this->maxRetries) { |
99 | | - return new ErrorFuture(new RuntimeException('Export retry limit exceeded', 0, $e)); |
100 | | - } |
101 | | - |
102 | | - $delay = PsrUtils::retryDelay($retries, $this->retryDelay, $response); |
103 | | - $sec = (int) $delay; |
104 | | - $nsec = (int) (($delay - (float) $sec) * 1e9); |
105 | | - |
106 | | - /** @psalm-suppress ArgumentTypeCoercion */ |
107 | | - if (time_nanosleep($sec, $nsec) !== true) { |
108 | | - return new ErrorFuture(new RuntimeException('Export cancelled', 0, $e)); |
109 | | - } |
| 98 | + if ($this->compression === 'gzip') { |
| 99 | + $request = $request->withHeader('Content-Encoding', 'gzip'); |
110 | 100 | } |
111 | 101 |
|
112 | | - assert(isset($response)); |
113 | | - |
114 | | - try { |
115 | | - $body = PsrUtils::decode( |
116 | | - $response->getBody()->__toString(), |
117 | | - self::parseContentEncoding($response), |
118 | | - ); |
119 | | - } catch (Throwable $e) { |
120 | | - return new ErrorFuture($e); |
| 102 | + foreach ($this->headers as $name => $value) { |
| 103 | + $request = $request->withHeader($name, $value); |
121 | 104 | } |
122 | 105 |
|
123 | | - return new CompletedFuture($body); |
| 106 | + return $request; |
124 | 107 | } |
125 | 108 |
|
126 | 109 | /** |
127 | | - * @return list<string> |
| 110 | + * Read and decode the response body, subject to the 4 MiB limit defined |
| 111 | + * by {@see ResponseBodySizeLimit::MAX_BYTES}. |
| 112 | + * |
| 113 | + * For non-2xx responses an exception is thrown so the exporter can apply |
| 114 | + * its retry / drop logic. |
| 115 | + * |
| 116 | + * @throws TransportResponseException on HTTP error status codes. |
128 | 117 | */ |
129 | | - private static function parseContentEncoding(ResponseInterface $response): array |
| 118 | + private function handleResponse(ResponseInterface $response): string |
130 | 119 | { |
131 | | - $encodings = []; |
132 | | - foreach (explode(',', $response->getHeaderLine('Content-Encoding')) as $encoding) { |
133 | | - if (($encoding = trim($encoding, " \t")) !== '') { |
134 | | - $encodings[] = strtolower($encoding); |
135 | | - } |
136 | | - } |
| 120 | + $statusCode = $response->getStatusCode(); |
137 | 121 |
|
138 | | - return $encodings; |
139 | | - } |
| 122 | + // Always read (and limit) the body first — we need it for error details. |
| 123 | + $body = PsrUtils::decode($response); |
140 | 124 |
|
141 | | - #[\Override] |
142 | | - public function shutdown(?CancellationInterface $cancellation = null): bool |
143 | | - { |
144 | | - if ($this->closed) { |
145 | | - return false; |
| 125 | + if ($statusCode >= 200 && $statusCode < 300) { |
| 126 | + return $body; |
146 | 127 | } |
147 | 128 |
|
148 | | - $this->closed = true; |
149 | | - |
150 | | - return true; |
151 | | - } |
152 | | - |
153 | | - #[\Override] |
154 | | - public function forceFlush(?CancellationInterface $cancellation = null): bool |
155 | | - { |
156 | | - return !$this->closed; |
| 129 | + throw new TransportResponseException( |
| 130 | + $statusCode, |
| 131 | + $body, |
| 132 | + sprintf( |
| 133 | + 'OTLP export failed with HTTP %d. Body (up to %d bytes): %s', |
| 134 | + $statusCode, |
| 135 | + ResponseBodySizeLimit::MAX_BYTES, |
| 136 | + $body !== '' ? $body : '(empty)' |
| 137 | + ) |
| 138 | + ); |
157 | 139 | } |
158 | 140 | } |
0 commit comments