From 2cdbce70a7ddf584360a9a4ac8c12d4ce095961d Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Thu, 25 Jun 2026 16:03:18 +0900 Subject: [PATCH] Backport(v1.19) buffer, in_http: enforce size limits on decompressed payloads **Which issue(s) this PR fixes**: Fixes # **What this PR does / why we need it**: This PR introduces a strict upper bound for decompressed payloads. ### Changes 1. Introduced `Fluent::Plugin::Extractor` * Extracted the decompression logic (gzip, zstd, deflate) into a dedicated module that processes and decompresses data in manageable 64KB chunks, ensuring steady memory usage. 2. Added `decompression_size_limit` parameter (default: `256MiB`) * Specifies the upper limit of decompression in `buffer` (`buf_file`, `buf_file_single` and `buf_memory`) and `in_http` plugins. * If the limit is exceeded, it safely raises `Fluent::Plugin::Extractor::SizeLimitError` (or returns a `400 Bad Request` in `in_http`). ### Notice for 3rd-party Plugin Developers Plugins utilizing the `buffer` / `in_http` mechanisms or the newly introduced `Fluent::Plugin::Extractor` are automatically protected by this limit. However, if your custom plugin implements its own decompression logic independently, you must take care to implement similar boundary checks to guard against unexpected memory spikes. **Docs Changes**: **Release Note**: * buffer, in_http: enforce size limits on decompressed payloads Signed-off-by: Shizuo Fujita --- lib/fluent/event.rb | 3 +- lib/fluent/plugin/buf_file.rb | 2 +- lib/fluent/plugin/buf_file_single.rb | 4 +- lib/fluent/plugin/buf_memory.rb | 2 +- lib/fluent/plugin/buffer.rb | 3 + lib/fluent/plugin/buffer/chunk.rb | 3 +- lib/fluent/plugin/buffer/file_chunk.rb | 4 +- lib/fluent/plugin/buffer/file_single_chunk.rb | 4 +- lib/fluent/plugin/buffer/memory_chunk.rb | 2 +- lib/fluent/plugin/compressable.rb | 69 +--------- lib/fluent/plugin/extractor.rb | 121 ++++++++++++++++ lib/fluent/plugin/in_forward.rb | 4 +- lib/fluent/plugin/in_http.rb | 12 +- test/plugin/test_buf_file.rb | 21 +++ test/plugin/test_buf_file_single.rb | 14 ++ test/plugin/test_buf_memory.rb | 9 ++ test/plugin/test_buffer.rb | 27 +++- test/plugin/test_extractor.rb | 129 ++++++++++++++++++ test/plugin/test_in_forward.rb | 27 ++++ test/plugin/test_in_http.rb | 58 ++++++++ 20 files changed, 436 insertions(+), 82 deletions(-) create mode 100644 lib/fluent/plugin/extractor.rb create mode 100644 test/plugin/test_extractor.rb diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 2a6044092f..e2f7150ac5 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -268,11 +268,12 @@ def to_msgpack_stream(time_int: false, packer: nil) end class CompressedMessagePackEventStream < MessagePackEventStream - def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) + def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip, decompression_size_limit: Fluent::Plugin::Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records) @decompressed_data = nil @compressed_data = data @type = compress + @decompression_size_limit = decompression_size_limit end def empty? diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index fbabcbc5af..94ce53df2f 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -205,7 +205,7 @@ def resume def generate_chunk(metadata) # FileChunk generates real path with unique_id perm = @file_permission || system_config.file_permission - chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress) + chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit) log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata return chunk diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 983c7745ba..e4947f38c0 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -183,7 +183,7 @@ def resume end begin - chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress) + chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress, decompression_size_limit: @decompression_size_limit) chunk.restore_size(@chunk_format) if @calc_num_records rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e exist_broken_file = true @@ -216,7 +216,7 @@ def resume def generate_chunk(metadata) # FileChunk generates real path with unique_id perm = @file_permission || system_config.file_permission - chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress) + chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit) log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata diff --git a/lib/fluent/plugin/buf_memory.rb b/lib/fluent/plugin/buf_memory.rb index c6c9554306..1237dd93d2 100644 --- a/lib/fluent/plugin/buf_memory.rb +++ b/lib/fluent/plugin/buf_memory.rb @@ -27,7 +27,7 @@ def resume end def generate_chunk(metadata) - Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress) + Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress, decompression_size_limit: @decompression_size_limit) end end end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index daeaf4c2ea..9607d0e60e 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text + desc 'The size limit of the decompressed element.' + config_param :decompression_size_limit, :size, default: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT + desc 'If true, chunks are thrown away when unrecoverable error happens' config_param :disable_chunk_backup, :bool, default: false diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 30707e6811..c5b8f4a64d 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -48,7 +48,7 @@ class Chunk # TODO: CompressedPackedMessage of forward protocol? - def initialize(metadata, compress: :text) + def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super() @unique_id = generate_unique_id @metadata = metadata @@ -64,6 +64,7 @@ def initialize(metadata, compress: :text) elsif compress == :zstd extend ZstdDecompressable end + @decompression_size_limit = decompression_size_limit end attr_reader :unique_id, :metadata, :state diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 24dbaaeb55..d8ab700ee6 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -39,8 +39,8 @@ class FileChunkError < StandardError; end attr_reader :path, :meta_path, :permission - def initialize(metadata, path, mode, perm: nil, compress: :text) - super(metadata, compress: compress) + def initialize(metadata, path, mode, perm: nil, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) + super(metadata, compress: compress, decompression_size_limit: decompression_size_limit) perm ||= Fluent::DEFAULT_FILE_PERMISSION @permission = perm.is_a?(String) ? perm.to_i(8) : perm @bytesize = @size = @adding_bytes = @adding_size = 0 diff --git a/lib/fluent/plugin/buffer/file_single_chunk.rb b/lib/fluent/plugin/buffer/file_single_chunk.rb index ba20d1f45a..f533ece5b3 100644 --- a/lib/fluent/plugin/buffer/file_single_chunk.rb +++ b/lib/fluent/plugin/buffer/file_single_chunk.rb @@ -35,8 +35,8 @@ class FileChunkError < StandardError; end attr_reader :path, :permission - def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text) - super(metadata, compress: compress) + def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) + super(metadata, compress: compress, decompression_size_limit: decompression_size_limit) @key = key perm ||= Fluent::DEFAULT_FILE_PERMISSION @permission = perm.is_a?(String) ? perm.to_i(8) : perm diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index 476851c08c..47e0891ca0 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -20,7 +20,7 @@ module Fluent module Plugin class Buffer class MemoryChunk < Chunk - def initialize(metadata, compress: :text) + def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super @chunk = ''.force_encoding(Encoding::ASCII_8BIT) @chunk_bytes = 0 diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index 2ec2122931..c5b449d526 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -14,6 +14,7 @@ # limitations under the License. # +require 'fluent/plugin/extractor' require 'stringio' require 'zlib' require 'zstd-ruby' @@ -21,6 +22,8 @@ module Fluent module Plugin module Compressable + DEFAULT_DECOMPRESSION_SIZE_LIMIT = 256 * 1024 * 1024 + def compress(data, type: :gzip, **kwargs) output_io = kwargs[:output_io] io = output_io || StringIO.new @@ -60,79 +63,21 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip private - def string_decompress_gzip(compressed_data) - io = StringIO.new(compressed_data) - out = '' - loop do - reader = Zlib::GzipReader.new(io) - out << reader.read - unused = reader.unused - reader.finish - unless unused.nil? - adjust = unused.length - io.pos -= adjust - end - break if io.eof? - end - out - end - - def string_decompress_zstd(compressed_data) - io = StringIO.new(compressed_data) - reader = Zstd::StreamReader.new(io) - out = '' - loop do - # Zstd::StreamReader needs to specify the size of the buffer - out << reader.read(1024) - # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position - break if io.eof? - end - out - end - def string_decompress(compressed_data, type = :gzip) if type == :gzip - string_decompress_gzip(compressed_data) + Extractor.decompress_gzip(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT) elsif type == :zstd - string_decompress_zstd(compressed_data) + Extractor.decompress_zstd(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT) else raise ArgumentError, "Unknown compression type: #{type}" end end - def io_decompress_gzip(input, output) - loop do - reader = Zlib::GzipReader.new(input) - v = reader.read - output.write(v) - unused = reader.unused - reader.finish - unless unused.nil? - adjust = unused.length - input.pos -= adjust - end - break if input.eof? - end - output - end - - def io_decompress_zstd(input, output) - reader = Zstd::StreamReader.new(input) - loop do - # Zstd::StreamReader needs to specify the size of the buffer - v = reader.read(1024) - output.write(v) - # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position - break if input.eof? - end - output - end - def io_decompress(input, output, type = :gzip) if type == :gzip - io_decompress_gzip(input, output) + Extractor.io_decompress_gzip(input, output) elsif type == :zstd - io_decompress_zstd(input, output) + Extractor.io_decompress_zstd(input, output) else raise ArgumentError, "Unknown compression type: #{type}" end diff --git a/lib/fluent/plugin/extractor.rb b/lib/fluent/plugin/extractor.rb new file mode 100644 index 0000000000..57867c5dab --- /dev/null +++ b/lib/fluent/plugin/extractor.rb @@ -0,0 +1,121 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'stringio' +require 'zlib' +require 'zstd-ruby' +require 'fluent/error' + +module Fluent + module Plugin + module Extractor + class SizeLimitError < UnrecoverableError; end + + BYTES_TO_READ = 64 * 1024 + INFLATE_BYTES_TO_READ = 1024 + + def self.decompress_gzip(compressed_data, limit:) + io = StringIO.new(compressed_data) + out = '' + loop do + reader = Zlib::GzipReader.new(io) + while (chunk = reader.read(BYTES_TO_READ)) + out << chunk + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + end + + unused = reader.unused + reader.finish + unless unused.nil? + adjust = unused.length + io.pos -= adjust + end + break if io.eof? + end + out + end + + def self.decompress_zstd(compressed_data, limit:) + io = StringIO.new(compressed_data) + reader = Zstd::StreamReader.new(io) + out = '' + loop do + # Zstd::StreamReader needs to specify the size of the buffer + out << reader.read(BYTES_TO_READ) + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if io.eof? + end + out + end + + def self.decompress_deflate(compressed_data, limit:) + io = StringIO.new(compressed_data) + out = '' + begin + zstream = Zlib::Inflate.new + while (chunk = io.read(INFLATE_BYTES_TO_READ)) + out << zstream.inflate(chunk) + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + end + out << zstream.finish + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + ensure + zstream&.close + end + out + end + + def self.io_decompress_gzip(input, output) + loop do + reader = Zlib::GzipReader.new(input) + while (chunk = reader.read(BYTES_TO_READ)) + output.write(chunk) + end + unused = reader.unused + reader.finish + unless unused.nil? + adjust = unused.length + input.pos -= adjust + end + break if input.eof? + end + output + end + + def self.io_decompress_zstd(input, output) + reader = Zstd::StreamReader.new(input) + loop do + # Zstd::StreamReader needs to specify the size of the buffer + chunk = reader.read(BYTES_TO_READ) + output.write(chunk) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if input.eof? + end + output + end + end + end +end diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 489c474261..78c573abb7 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -54,6 +54,8 @@ class ForwardInput < Input config_param :chunk_size_warn_limit, :size, default: nil desc 'Received chunk is dropped if it is larger than this value.' config_param :chunk_size_limit, :size, default: nil + desc 'The size limit of the decompressed element.' + config_param :decompression_size_limit, :size, default: 256*1024*1024 desc 'Skip an event if incoming event is invalid.' config_param :skip_invalid_event, :bool, default: true @@ -311,7 +313,7 @@ def on_message(msg, chunk_size, conn) size = option['size'] || 0 if option['compressed'] && option['compressed'] != 'text' - es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) + es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym, decompression_size_limit: @decompression_size_limit) else es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i) end diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 8d7066c6a0..c5df87ea3b 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -14,6 +14,7 @@ # limitations under the License. # +require 'fluent/plugin/extractor' require 'fluent/plugin/input' require 'fluent/plugin/parser' require 'fluent/event' @@ -64,6 +65,8 @@ class HttpInput < Input config_param :bind, :string, default: '0.0.0.0' desc 'The size limit of the POSTed element. Default is 32MB.' config_param :body_size_limit, :size, default: 32*1024*1024 # TODO default + desc 'The size limit of the decompressed element.' + config_param :decompression_size_limit, :size, default: 256*1024*1024 # TODO default desc 'The timeout limit for keeping the connection alive.' config_param :keepalive_timeout, :time, default: 10 # TODO default config_param :backlog, :integer, default: nil @@ -259,7 +262,7 @@ def on_request(path_info, params) def on_server_connect(conn) handler = Handler.new(conn, @km, method(:on_request), - @body_size_limit, @format_name, log, + @body_size_limit, @decompression_size_limit, @format_name, log, @cors_allow_origins, @cors_allow_credentials, @add_query_params) @@ -343,12 +346,13 @@ def convert_time_field(record) class Handler attr_reader :content_type - def initialize(io, km, callback, body_size_limit, format_name, log, + def initialize(io, km, callback, body_size_limit, decompression_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit + @decompression_size_limit = decompression_size_limit @next_close = false @format_name = format_name @log = log @@ -518,9 +522,9 @@ def on_message_complete # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze - @body = Zlib::GzipReader.new(StringIO.new(@body)).read + @body = Extractor.decompress_gzip(@body, limit: @decompression_size_limit) elsif @content_encoding == 'deflate'.freeze - @body = Zlib::Inflate.inflate(@body) + @body = Extractor.decompress_deflate(@body, limit: @decompression_size_limit) end rescue @log.warn 'fails to decode payload', error: $!.to_s diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 0990cb30e4..09cc860e11 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -87,6 +87,27 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) end end + sub_test_case 'configuration' do + setup do + Fluent::Test.setup + + @dir = File.expand_path('../../tmp/buffer_file_dir', __FILE__) + FileUtils.rm_rf @dir + FileUtils.mkdir_p @dir + end + + test 'passes decompression_size_limit to FileChunk' do + d = FluentPluginFileBufferTest::DummyOutputPlugin.new + p = Fluent::Plugin::FileBuffer.new + p.owner = d + p.configure(config_element('buffer', '', {'path' => File.join(@dir, 'buffer.*.file'), 'decompression_size_limit' => 4 * 1024 * 1024})) + + chunk = p.generate_chunk(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + + assert_equal 4 * 1024 * 1024, chunk.instance_variable_get(:@decompression_size_limit) + end + end + sub_test_case 'non configured buffer plugin instance' do setup do Fluent::Test.setup diff --git a/test/plugin/test_buf_file_single.rb b/test/plugin/test_buf_file_single.rb index 86675e8039..56f9b643c9 100644 --- a/test/plugin/test_buf_file_single.rb +++ b/test/plugin/test_buf_file_single.rb @@ -129,6 +129,20 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum p = @d.instance.buffer assert_equal File.join(@bufdir, 'fsb.*.buf'), p.path end + + test 'passes decompression_size_limit to FileChunk' do + @d = create_driver(%[ + + @type file_single + path #{@bufdir}/foo.*.bar + decompression_size_limit 4m + + ]) + p = @d.instance.buffer + chunk = p.generate_chunk(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + + assert_equal 4 * 1024 * 1024, chunk.instance_variable_get(:@decompression_size_limit) + end end sub_test_case 'buffer configurations and workers' do diff --git a/test/plugin/test_buf_memory.rb b/test/plugin/test_buf_memory.rb index 4e1b2e211b..df300410b3 100644 --- a/test/plugin/test_buf_memory.rb +++ b/test/plugin/test_buf_memory.rb @@ -39,4 +39,13 @@ class MemoryBufferTest < Test::Unit::TestCase assert c2.is_a? Fluent::Plugin::Buffer::MemoryChunk assert_equal m2, c2.metadata end + + test 'passes decompression_size_limit to MemoryChunk' do + @p.configure(config_element('buffer', '', {'decompression_size_limit' => 4 * 1024 * 1024})) + + meta = Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil) + chunk = @p.generate_chunk(meta) + + assert_equal 4 * 1024 * 1024, chunk.instance_variable_get(:@decompression_size_limit) + end end diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 2cff91fb94..0b03ec7fdf 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -20,7 +20,7 @@ class DummyMemoryChunkError < StandardError; end class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk attr_reader :append_count, :rollbacked, :closed, :purged, :chunk attr_accessor :failing - def initialize(metadata, compress: :text) + def initialize(metadata, compress: :text, decompression_size_limit: Fluent::Plugin::Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super @append_count = 0 @rollbacked = false @@ -79,7 +79,7 @@ def resume return staged, queued end def generate_chunk(metadata) - DummyMemoryChunk.new(metadata, compress: @compress) + DummyMemoryChunk.new(metadata, compress: @compress, decompression_size_limit: @decompression_size_limit) end end end @@ -1441,6 +1441,10 @@ def create_chunk_es(metadata, es) assert_equal :gzip, @p.compress end + test 'decompression_size_limit is 256MB' do + assert_equal 256*1024*1024, @p.decompression_size_limit + end + test 'create decompressable chunk' do chunk = @p.generate_chunk(create_metadata) assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::GzipDecompressable) @@ -1450,8 +1454,8 @@ def create_chunk_es(metadata, es) @p = create_buffer({'compress' => 'gzip', 'chunk_limit_size' => 70}) timestamp = event_time('2016-04-11 16:00:02 +0000') es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}], # overflow - [timestamp, {"message" => "aaa"}], - [timestamp, {"message" => "bbb"}]]) + [timestamp, {"message" => "a"}], + [timestamp, {"message" => "b"}]]) assert_equal [], @p.queue.map(&:metadata) assert_equal 70, @p.chunk_limit_size @@ -1469,6 +1473,21 @@ def create_chunk_es(metadata, es) assert_equal([2, [@dm0, @dm0], [1, 1], nil], [@p.queue.size, @p.queue.map(&:metadata), @p.queue.map(&:size), @p.stage[@dm0]]) end + + test '#read raises SizeLimitError when decompressed data exceeds decompression_size_limit' do + @p = create_buffer({'compress' => 'gzip', 'decompression_size_limit' => 100}) + timestamp = event_time('2016-04-11 16:00:02 +0000') + + es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "A" * 1024}]]) + + @p.write({@dm0 => es}) + + chunk = @p.stage[@dm0] + + assert_raise Fluent::Plugin::Extractor::SizeLimitError do + chunk.read + end + end end sub_test_case '#statistics' do diff --git a/test/plugin/test_extractor.rb b/test/plugin/test_extractor.rb new file mode 100644 index 0000000000..c256d77683 --- /dev/null +++ b/test/plugin/test_extractor.rb @@ -0,0 +1,129 @@ +require_relative '../helper' +require 'fluent/plugin/extractor' +require 'stringio' +require 'zlib' +require 'zstd-ruby' + +class ExtractorTest < Test::Unit::TestCase + setup do + @strict_limit = 1023 + @safe_limit = 1024 + @original_text = "A" * @safe_limit + end + + def create_gzip(data) + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + gz.write(data) + gz.close + io.string + end + + def create_multi_gzip(data1, data2) + create_gzip(data1) + create_gzip(data2) + end + + def create_zstd(data) + Zstd.compress(data) + end + + def create_multi_zstd(data1, data2) + create_zstd(data1) + create_zstd(data2) + end + + def create_deflate(data) + Zlib::Deflate.deflate(data) + end + + sub_test_case 'decompress_gzip' do + test 'successfully decompresses data within the limit' do + compressed = create_gzip(@original_text) + result = Fluent::Plugin::Extractor.decompress_gzip(compressed, limit: @safe_limit) + assert_equal @original_text, result + end + + test 'successfully decompresses multi-member gzip data' do + compressed = create_multi_gzip("Hello ", "World!") + result = Fluent::Plugin::Extractor.decompress_gzip(compressed, limit: @safe_limit) + assert_equal "Hello World!", result + end + + test 'raises an error when decompressed data exceeds the limit' do + compressed = create_gzip(@original_text) + err = assert_raise(Fluent::Plugin::Extractor::SizeLimitError) do + Fluent::Plugin::Extractor.decompress_gzip(compressed, limit: @strict_limit) + end + assert_equal "Decompressed data exceeds limit of #{@strict_limit} bytes", err.message + end + end + + sub_test_case 'decompress_zstd' do + test 'successfully decompresses data within the limit' do + compressed = create_zstd(@original_text) + result = Fluent::Plugin::Extractor.decompress_zstd(compressed, limit: @safe_limit) + assert_equal @original_text, result + end + + test 'successfully decompresses multi-member zstd data' do + compressed = create_multi_zstd("Hello ", "World!") + result = Fluent::Plugin::Extractor.decompress_zstd(compressed, limit: @safe_limit) + assert_equal "Hello World!", result + end + + test 'raises an error when decompressed data exceeds the limit' do + compressed = create_zstd(@original_text) + err = assert_raise(Fluent::Plugin::Extractor::SizeLimitError) do + Fluent::Plugin::Extractor.decompress_zstd(compressed, limit: @strict_limit) + end + assert_equal "Decompressed data exceeds limit of #{@strict_limit} bytes", err.message + end + end + + sub_test_case 'decompress_deflate' do + test 'successfully decompresses data within the limit' do + compressed = create_deflate(@original_text) + result = Fluent::Plugin::Extractor.decompress_deflate(compressed, limit: @safe_limit) + assert_equal @original_text, result + end + + test 'raises an error when decompressed data exceeds the limit' do + compressed = create_deflate(@original_text) + err = assert_raise(Fluent::Plugin::Extractor::SizeLimitError) do + Fluent::Plugin::Extractor.decompress_deflate(compressed, limit: @strict_limit) + end + assert_equal "Decompressed data exceeds limit of #{@strict_limit} bytes", err.message + end + end + + sub_test_case 'io_decompress_gzip' do + test 'successfully decompresses data' do + input = StringIO.new(create_gzip(@original_text)) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_gzip(input, output) + assert_equal @original_text, output.string + end + + test 'successfully decompresses multi-member gzip data' do + input = StringIO.new(create_multi_gzip("Hello ", "World!")) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_gzip(input, output) + assert_equal "Hello World!", output.string + end + end + + sub_test_case 'io_decompress_zstd' do + test 'successfully decompresses data' do + input = StringIO.new(create_zstd(@original_text)) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_zstd(input, output) + assert_equal @original_text, output.string + end + + test 'successfully decompresses multi-member zstd data' do + input = StringIO.new(create_multi_zstd("Hello ", "World!")) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_zstd(input, output) + assert_equal "Hello World!", output.string + end + end +end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 48546d4e32..273b1ae3f8 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -612,6 +612,33 @@ def create_driver(conf=base_config) end end end + + test 'raises SizeLimitError when decompressed size exceeds limit' do + @d = d = create_driver(%[ + port #{@port} + bind "127.0.0.1" + decompression_size_limit 5k + ]) + + time_i = event_time("2011-01-02 13:14:15 UTC").to_i + events = (1..20).map { ["tag1", time_i, {"a"=>"A" * 1000}] } + + # create compressed entries + entries = '' + events.each do |_tag, _time, record| + v = [_time, record].to_msgpack + entries << compress(v) + end + chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack + + d.run do + assert_raise Fluent::Plugin::Extractor::SizeLimitError do + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| + d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) + end + end + end + end end sub_test_case 'warning' do diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 413ff904f7..6ce2fb231a 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -31,6 +31,7 @@ def config port #{@port} bind "127.0.0.1" body_size_limit 10m + decompression_size_limit 128m keepalive_timeout 5 respond_with_empty_img true use_204_response false @@ -46,6 +47,7 @@ def test_configure assert_equal @port, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 10*1024*1024, d.instance.body_size_limit + assert_equal 128*1024*1024, d.instance.decompression_size_limit assert_equal 5, d.instance.keepalive_timeout assert_equal false, d.instance.add_http_headers assert_equal false, d.instance.add_query_params @@ -1005,6 +1007,34 @@ def test_content_encoding_gzip assert_equal_event_time time, d.events[1][1] end + def test_content_encoding_gzip_size_error + d = create_driver(%[ + port #{@port} + bind "127.0.0.1" + decompression_size_limit 512k + ]) + + time = event_time("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"msg"=>"a" * 1024 * 1024}], + ] + res_codes = [] + + d.run do + events.each do |tag, time, record| + header = {'Content-Type'=>'application/json', 'Content-Encoding'=>'gzip'} + res = post("/#{tag}?time=#{time}", compress_gzip(record.to_json), header) + res_codes << res.code + end + end + + assert_equal ["400"], res_codes + assert_true d.events.empty? + + log = d.logs.first + assert_true log.include?("fails to decode payload") && log.include?("Decompressed data exceeds limit of #{512*1024} bytes") + end + def test_content_encoding_deflate d = create_driver @@ -1028,6 +1058,34 @@ def test_content_encoding_deflate assert_equal_event_time time, d.events[1][1] end + def test_content_encoding_deflate_size_error + d = create_driver(%[ + port #{@port} + bind "127.0.0.1" + decompression_size_limit 512k + ]) + + time = event_time("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"msg"=>"a" * 1024 * 1024}], + ] + res_codes = [] + + d.run do + events.each do |tag, time, record| + header = {'Content-Type'=>'application/msgpack', 'Content-Encoding'=>'deflate'} + res = post("/#{tag}?time=#{time}", Zlib.deflate(record.to_msgpack), header) + res_codes << res.code + end + end + + assert_equal ["400"], res_codes + assert_true d.events.empty? + + log = d.logs.first + assert_true log.include?("fails to decode payload") && log.include?("Decompressed data exceeds limit of #{512*1024} bytes") + end + def test_cors_disallowed d = create_driver(config + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins