Skip to content

Commit 32196a6

Browse files
authored
in_opentelemetry: handle invalid gzip gracefully (#42)
1 parent d4ed3ad commit 32196a6

3 files changed

Lines changed: 35 additions & 12 deletions

File tree

lib/fluent/plugin/in_opentelemetry.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def start
5858
super
5959

6060
if @http_config
61-
http_handler = Opentelemetry::HttpInputHandler.new
61+
http_handler = Opentelemetry::HttpInputHandler.new(@http_config, log)
6262
http_server_create_http_server(:in_opentelemetry_http_server, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
6363
serv.post("/v1/logs") do |req|
6464
http_handler.logs(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_LOGS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record }) }

lib/fluent/plugin/opentelemetry/http_input_handler.rb

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ def close
3232
class Fluent::Plugin::Opentelemetry::HttpInputHandler
3333
using Fluent::PluginHelper::HttpServer::Extension
3434

35+
def initialize(http_config, logger)
36+
@http_config = http_config
37+
@logger = logger
38+
end
39+
3540
def logs(req, &block)
3641
common(req, Fluent::Plugin::Opentelemetry::Request::Logs, Fluent::Plugin::Opentelemetry::Response::Logs, &block)
3742
end
@@ -53,12 +58,20 @@ def common(req, request_class, response_class)
5358
return response_unsupported_media_type unless valid_content_type?(content_type)
5459
return response_bad_request(content_type) unless valid_content_encoding?(content_encoding)
5560

56-
body = Zlib::GzipReader.new(StringIO.new(body)).read if content_encoding == Fluent::Plugin::Opentelemetry::CONTENT_ENCODING_GZIP
61+
if content_encoding == Fluent::Plugin::Opentelemetry::CONTENT_ENCODING_GZIP
62+
begin
63+
body = Zlib::GzipReader.new(StringIO.new(body)).read
64+
rescue Zlib::Error => e
65+
@logger.warn { "Failed to decompress gzip payload: #{e.message}" }
66+
return response_bad_request(content_type)
67+
end
68+
end
5769

5870
begin
5971
record = request_class.new(body).record
60-
rescue Google::Protobuf::ParseError
72+
rescue Google::Protobuf::ParseError => e
6173
# The format in request body does not comply with the OpenTelemetry protocol.
74+
@logger.warn { "Failed to parse OpenTelemetry payload: #{e.message}" }
6275
return response_bad_request(content_type)
6376
end
6477

test/fluent/plugin/test_in_opentelemetry_http.rb

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,6 @@ def test_receive_compressed_json
113113
assert_equal(expected_events, d.events)
114114
end
115115

116-
def test_invalid_json
117-
d = create_driver
118-
res = d.run(expect_records: 0) do
119-
post_json("/v1/logs", TestData::JSON::INVALID)
120-
end
121-
122-
assert_equal(400, res.status)
123-
end
124-
125116
data("metrics" => {
126117
request_path: "/v1/metrics",
127118
request_data: TestData::ProtocolBuffers::METRICS,
@@ -162,6 +153,25 @@ def test_receive_compressed_protocol_buffers
162153
assert_equal(expected_events, d.events)
163154
end
164155

156+
def test_invalid_json
157+
d = create_driver
158+
res = d.run(expect_records: 0) do
159+
post_json("/v1/logs", TestData::JSON::INVALID)
160+
end
161+
162+
assert_equal(400, res.status)
163+
end
164+
165+
def test_invalid_gzip_payload
166+
d = create_driver
167+
res = d.run(expect_records: 0) do
168+
# Post plain JSON payload as gzip
169+
post_json("/v1/logs", TestData::JSON::LOGS, headers: { "Content-Encoding" => "gzip" })
170+
end
171+
172+
assert_equal(400, res.status)
173+
end
174+
165175
def test_invalid_protocol_buffers
166176
d = create_driver
167177
res = d.run(expect_records: 0) do

0 commit comments

Comments
 (0)