Skip to content

Commit ce6c1f2

Browse files
authored
Merge commit from fork
1 parent e67eb56 commit ce6c1f2

5 files changed

Lines changed: 149 additions & 6 deletions

File tree

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,12 @@ Example:
6565

6666
This requires to receive data via HTTP/HTTPS.
6767

68-
| parameter | type | description | default |
69-
|-----------|---------|------------------------|-----------|
70-
| bind | string | The address to bind to | `0.0.0.0` |
71-
| port | integer | The port to listen to | `4318` |
68+
| parameter | type | description | default |
69+
|--------------------------|---------|---------------------------------------------------------------------------------------------------------------------------|-----------|
70+
| bind | string | The address to bind to | `0.0.0.0` |
71+
| port | integer | The port to listen to | `4318` |
72+
| body_size_limit | size | The size limit of the POSTed element. This value should be larger than the 'chunk_limit_size' in out_opentelemetry plugin | `32M` (32MiB) |
73+
| decompression_size_limit | size | The size limit of the decompressed element | `256M` (256MiB) |
7274

7375
#### `<grpc>` section
7476

lib/fluent/plugin/in_opentelemetry.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ class OpentelemetryInput < Input
2727
config_param :bind, :string, default: "0.0.0.0"
2828
desc "The port to listen to."
2929
config_param :port, :integer, default: 4318
30+
31+
desc "The size limit of the POSTed element. This value should be larger than the 'chunk_limit_size' in out_opentelemetry plugin."
32+
config_param :body_size_limit, :size, default: 32 * 1024 * 1024
33+
desc "The size limit of the decompressed element."
34+
config_param :decompression_size_limit, :size, default: 256 * 1024 * 1024
3035
end
3136

3237
config_section :grpc, required: false, multi: false, init: false, param_name: :grpc_config do

lib/fluent/plugin/opentelemetry/http_input_handler.rb

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,19 @@ def close
2525
@request.body&.close
2626
end
2727
end
28+
29+
unless method_defined?(:body_stream)
30+
def body_stream
31+
@request.body
32+
end
33+
end
2834
end
2935
end
3036
end
3137

3238
class Fluent::Plugin::Opentelemetry::HttpInputHandler
39+
class SizeLimitError < StandardError; end
40+
3341
using Fluent::PluginHelper::HttpServer::Extension
3442

3543
def initialize(http_config, logger)
@@ -54,13 +62,29 @@ def traces(req, &block)
5462
def common(req, request_class, response_class)
5563
content_type = req.headers["content-type"]
5664
content_encoding = req.headers["content-encoding"]&.first
57-
body = req.body
65+
66+
content_length = req.headers["content-length"]&.first&.to_i
67+
if content_length && content_length >= @http_config.body_size_limit
68+
@logger.warn { "Received too big content length: #{content_length}" }
69+
return response_payload_too_large
70+
end
71+
72+
begin
73+
body = read_body(req, limit: @http_config.body_size_limit)
74+
rescue SizeLimitError
75+
@logger.warn { "Received payload exceeding body_size_limit" }
76+
return response_payload_too_large
77+
end
78+
5879
return response_unsupported_media_type unless valid_content_type?(content_type)
5980
return response_bad_request(content_type) unless valid_content_encoding?(content_encoding)
6081

6182
if content_encoding == Fluent::Plugin::Opentelemetry::CONTENT_ENCODING_GZIP
6283
begin
63-
body = Zlib::GzipReader.new(StringIO.new(body)).read
84+
body = decompress(body, limit: @http_config.decompression_size_limit)
85+
rescue SizeLimitError
86+
@logger.warn { "Decompressed payload exceeding decompression_size_limit" }
87+
return response_payload_too_large
6488
rescue Zlib::Error => e
6589
@logger.warn { "Failed to decompress gzip payload: #{e.message}" }
6690
return response_bad_request(content_type)
@@ -83,6 +107,42 @@ def common(req, request_class, response_class)
83107
req.close
84108
end
85109

110+
def read_body(request, limit:)
111+
body = +""
112+
while (chunk = request.body_stream&.read)
113+
body << chunk
114+
if body.bytesize > limit
115+
raise SizeLimitError, "Too large payload"
116+
end
117+
end
118+
body
119+
end
120+
121+
BYTES_TO_READ = 64 * 1024
122+
123+
def decompress(compressed_data, limit:)
124+
io = StringIO.new(compressed_data)
125+
out = +""
126+
loop do
127+
reader = Zlib::GzipReader.new(io)
128+
while (chunk = reader.read(BYTES_TO_READ))
129+
out << chunk
130+
if out.bytesize > limit
131+
raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
132+
end
133+
end
134+
135+
unused = reader.unused
136+
reader.finish
137+
unless unused.nil?
138+
adjust = unused.length
139+
io.pos -= adjust
140+
end
141+
break if io.eof?
142+
end
143+
out
144+
end
145+
86146
def valid_content_type?(content_type)
87147
case content_type
88148
when Fluent::Plugin::Opentelemetry::CONTENT_TYPE_PROTOBUF, Fluent::Plugin::Opentelemetry::CONTENT_TYPE_JSON
@@ -106,6 +166,10 @@ def response_unsupported_media_type
106166
response(415, Fluent::Plugin::Opentelemetry::CONTENT_TYPE_PLAIN, "415 unsupported media type, supported: [application/json, application/x-protobuf]")
107167
end
108168

169+
def response_payload_too_large
170+
response(413, Fluent::Plugin::Opentelemetry::CONTENT_TYPE_PLAIN, "413 Payload Too Large")
171+
end
172+
109173
def response_bad_request(content_type)
110174
response(400, content_type, "") # TODO: fix body message
111175
end

test/fluent/plugin/test_in_opentelemetry.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,15 @@ def test_configure
3232
<http>
3333
bind 127.0.0.1
3434
port #{@port}
35+
body_size_limit 12345
36+
decompression_size_limit 123456
3537
</http>
3638
])
3739
assert_equal "opentelemetry.test", d.instance.tag
3840
assert_equal "127.0.0.1", d.instance.http_config.bind
3941
assert_equal @port, d.instance.http_config.port
42+
assert_equal 12345, d.instance.http_config.body_size_limit
43+
assert_equal 123456, d.instance.http_config.decompression_size_limit
4044

4145
if defined?(GRPC)
4246
d = create_driver(%[

test/fluent/plugin/test_in_opentelemetry_http.rb

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,74 @@ def test_receive_compressed_json
113113
assert_equal(expected_events, d.events)
114114
end
115115

116+
def test_body_size_limit
117+
d = create_driver(<<~"CONFIG")
118+
tag opentelemetry.test
119+
<http>
120+
bind 127.0.0.1
121+
port #{@port}
122+
body_size_limit #{TestData::JSON::LOGS.bytesize}
123+
</http>
124+
CONFIG
125+
126+
res = d.run(expect_records: 1) do
127+
post_json("/v1/logs", TestData::JSON::LOGS)
128+
end
129+
expected_events = [["opentelemetry.test", @event_time, { "type" => "opentelemetry_logs", "message" => TestData::JSON::LOGS }]]
130+
assert_equal(200, res.status)
131+
assert_equal(expected_events, d.events)
132+
end
133+
134+
def test_body_size_limit_too_large_payload
135+
d = create_driver(<<~"CONFIG")
136+
tag opentelemetry.test
137+
<http>
138+
bind 127.0.0.1
139+
port #{@port}
140+
body_size_limit #{TestData::JSON::LOGS.bytesize - 1}
141+
</http>
142+
CONFIG
143+
144+
res = d.run(expect_records: 0) do
145+
post_json("/v1/logs", TestData::JSON::LOGS)
146+
end
147+
assert_equal(413, res.status)
148+
end
149+
150+
def test_decompression_size_limit
151+
d = create_driver(<<~"CONFIG")
152+
tag opentelemetry.test
153+
<http>
154+
bind 127.0.0.1
155+
port #{@port}
156+
decompression_size_limit #{TestData::JSON::LOGS.bytesize}
157+
</http>
158+
CONFIG
159+
160+
res = d.run(expect_records: 1) do
161+
post_json("/v1/logs", compress(TestData::JSON::LOGS), headers: { "Content-Encoding" => "gzip" })
162+
end
163+
expected_events = [["opentelemetry.test", @event_time, { "type" => "opentelemetry_logs", "message" => TestData::JSON::LOGS }]]
164+
assert_equal(200, res.status)
165+
assert_equal(expected_events, d.events)
166+
end
167+
168+
def test_decompression_size_limit_too_large_payload
169+
d = create_driver(<<~"CONFIG")
170+
tag opentelemetry.test
171+
<http>
172+
bind 127.0.0.1
173+
port #{@port}
174+
decompression_size_limit #{TestData::JSON::LOGS.bytesize - 1}
175+
</http>
176+
CONFIG
177+
178+
res = d.run(expect_records: 0) do
179+
post_json("/v1/logs", compress(TestData::JSON::LOGS), headers: { "Content-Encoding" => "gzip" })
180+
end
181+
assert_equal(413, res.status)
182+
end
183+
116184
data("metrics" => {
117185
request_path: "/v1/metrics",
118186
request_data: TestData::ProtocolBuffers::METRICS,

0 commit comments

Comments
 (0)