diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 2a6044092f..e2f7150ac5 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -268,11 +268,12 @@ def to_msgpack_stream(time_int: false, packer: nil) end class CompressedMessagePackEventStream < MessagePackEventStream - def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) + def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip, decompression_size_limit: Fluent::Plugin::Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records) @decompressed_data = nil @compressed_data = data @type = compress + @decompression_size_limit = decompression_size_limit end def empty? diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index fbabcbc5af..94ce53df2f 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -205,7 +205,7 @@ def resume def generate_chunk(metadata) # FileChunk generates real path with unique_id perm = @file_permission || system_config.file_permission - chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress) + chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit) log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata return chunk diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 983c7745ba..e4947f38c0 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -183,7 +183,7 @@ def resume end begin - chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress) + chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress, decompression_size_limit: @decompression_size_limit) chunk.restore_size(@chunk_format) if @calc_num_records rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e exist_broken_file = true @@ -216,7 +216,7 @@ def resume def generate_chunk(metadata) # FileChunk generates real path with unique_id perm = @file_permission || system_config.file_permission - chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress) + chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit) log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata diff --git a/lib/fluent/plugin/buf_memory.rb b/lib/fluent/plugin/buf_memory.rb index c6c9554306..1237dd93d2 100644 --- a/lib/fluent/plugin/buf_memory.rb +++ b/lib/fluent/plugin/buf_memory.rb @@ -27,7 +27,7 @@ def resume end def generate_chunk(metadata) - Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress) + Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress, decompression_size_limit: @decompression_size_limit) end end end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index daeaf4c2ea..9607d0e60e 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text + desc 'The size limit of the decompressed element.' + config_param :decompression_size_limit, :size, default: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT + desc 'If true, chunks are thrown away when unrecoverable error happens' config_param :disable_chunk_backup, :bool, default: false diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 30707e6811..c5b8f4a64d 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -48,7 +48,7 @@ class Chunk # TODO: CompressedPackedMessage of forward protocol? - def initialize(metadata, compress: :text) + def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super() @unique_id = generate_unique_id @metadata = metadata @@ -64,6 +64,7 @@ def initialize(metadata, compress: :text) elsif compress == :zstd extend ZstdDecompressable end + @decompression_size_limit = decompression_size_limit end attr_reader :unique_id, :metadata, :state diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 24dbaaeb55..d8ab700ee6 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -39,8 +39,8 @@ class FileChunkError < StandardError; end attr_reader :path, :meta_path, :permission - def initialize(metadata, path, mode, perm: nil, compress: :text) - super(metadata, compress: compress) + def initialize(metadata, path, mode, perm: nil, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) + super(metadata, compress: compress, decompression_size_limit: decompression_size_limit) perm ||= Fluent::DEFAULT_FILE_PERMISSION @permission = perm.is_a?(String) ? perm.to_i(8) : perm @bytesize = @size = @adding_bytes = @adding_size = 0 diff --git a/lib/fluent/plugin/buffer/file_single_chunk.rb b/lib/fluent/plugin/buffer/file_single_chunk.rb index ba20d1f45a..f533ece5b3 100644 --- a/lib/fluent/plugin/buffer/file_single_chunk.rb +++ b/lib/fluent/plugin/buffer/file_single_chunk.rb @@ -35,8 +35,8 @@ class FileChunkError < StandardError; end attr_reader :path, :permission - def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text) - super(metadata, compress: compress) + def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) + super(metadata, compress: compress, decompression_size_limit: decompression_size_limit) @key = key perm ||= Fluent::DEFAULT_FILE_PERMISSION @permission = perm.is_a?(String) ? perm.to_i(8) : perm diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index 476851c08c..47e0891ca0 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -20,7 +20,7 @@ module Fluent module Plugin class Buffer class MemoryChunk < Chunk - def initialize(metadata, compress: :text) + def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super @chunk = ''.force_encoding(Encoding::ASCII_8BIT) @chunk_bytes = 0 diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index 2ec2122931..c5b449d526 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -14,6 +14,7 @@ # limitations under the License. # +require 'fluent/plugin/extractor' require 'stringio' require 'zlib' require 'zstd-ruby' @@ -21,6 +22,8 @@ module Fluent module Plugin module Compressable + DEFAULT_DECOMPRESSION_SIZE_LIMIT = 256 * 1024 * 1024 + def compress(data, type: :gzip, **kwargs) output_io = kwargs[:output_io] io = output_io || StringIO.new @@ -60,79 +63,21 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip private - def string_decompress_gzip(compressed_data) - io = StringIO.new(compressed_data) - out = '' - loop do - reader = Zlib::GzipReader.new(io) - out << reader.read - unused = reader.unused - reader.finish - unless unused.nil? - adjust = unused.length - io.pos -= adjust - end - break if io.eof? - end - out - end - - def string_decompress_zstd(compressed_data) - io = StringIO.new(compressed_data) - reader = Zstd::StreamReader.new(io) - out = '' - loop do - # Zstd::StreamReader needs to specify the size of the buffer - out << reader.read(1024) - # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position - break if io.eof? - end - out - end - def string_decompress(compressed_data, type = :gzip) if type == :gzip - string_decompress_gzip(compressed_data) + Extractor.decompress_gzip(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT) elsif type == :zstd - string_decompress_zstd(compressed_data) + Extractor.decompress_zstd(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT) else raise ArgumentError, "Unknown compression type: #{type}" end end - def io_decompress_gzip(input, output) - loop do - reader = Zlib::GzipReader.new(input) - v = reader.read - output.write(v) - unused = reader.unused - reader.finish - unless unused.nil? - adjust = unused.length - input.pos -= adjust - end - break if input.eof? - end - output - end - - def io_decompress_zstd(input, output) - reader = Zstd::StreamReader.new(input) - loop do - # Zstd::StreamReader needs to specify the size of the buffer - v = reader.read(1024) - output.write(v) - # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position - break if input.eof? - end - output - end - def io_decompress(input, output, type = :gzip) if type == :gzip - io_decompress_gzip(input, output) + Extractor.io_decompress_gzip(input, output) elsif type == :zstd - io_decompress_zstd(input, output) + Extractor.io_decompress_zstd(input, output) else raise ArgumentError, "Unknown compression type: #{type}" end diff --git a/lib/fluent/plugin/extractor.rb b/lib/fluent/plugin/extractor.rb new file mode 100644 index 0000000000..57867c5dab --- /dev/null +++ b/lib/fluent/plugin/extractor.rb @@ -0,0 +1,121 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'stringio' +require 'zlib' +require 'zstd-ruby' +require 'fluent/error' + +module Fluent + module Plugin + module Extractor + class SizeLimitError < UnrecoverableError; end + + BYTES_TO_READ = 64 * 1024 + INFLATE_BYTES_TO_READ = 1024 + + def self.decompress_gzip(compressed_data, limit:) + io = StringIO.new(compressed_data) + out = '' + loop do + reader = Zlib::GzipReader.new(io) + while (chunk = reader.read(BYTES_TO_READ)) + out << chunk + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + end + + unused = reader.unused + reader.finish + unless unused.nil? + adjust = unused.length + io.pos -= adjust + end + break if io.eof? + end + out + end + + def self.decompress_zstd(compressed_data, limit:) + io = StringIO.new(compressed_data) + reader = Zstd::StreamReader.new(io) + out = '' + loop do + # Zstd::StreamReader needs to specify the size of the buffer + out << reader.read(BYTES_TO_READ) + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if io.eof? + end + out + end + + def self.decompress_deflate(compressed_data, limit:) + io = StringIO.new(compressed_data) + out = '' + begin + zstream = Zlib::Inflate.new + while (chunk = io.read(INFLATE_BYTES_TO_READ)) + out << zstream.inflate(chunk) + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + end + out << zstream.finish + if out.bytesize > limit + raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" + end + ensure + zstream&.close + end + out + end + + def self.io_decompress_gzip(input, output) + loop do + reader = Zlib::GzipReader.new(input) + while (chunk = reader.read(BYTES_TO_READ)) + output.write(chunk) + end + unused = reader.unused + reader.finish + unless unused.nil? + adjust = unused.length + input.pos -= adjust + end + break if input.eof? + end + output + end + + def self.io_decompress_zstd(input, output) + reader = Zstd::StreamReader.new(input) + loop do + # Zstd::StreamReader needs to specify the size of the buffer + chunk = reader.read(BYTES_TO_READ) + output.write(chunk) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if input.eof? + end + output + end + end + end +end diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 489c474261..78c573abb7 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -54,6 +54,8 @@ class ForwardInput < Input config_param :chunk_size_warn_limit, :size, default: nil desc 'Received chunk is dropped if it is larger than this value.' config_param :chunk_size_limit, :size, default: nil + desc 'The size limit of the decompressed element.' + config_param :decompression_size_limit, :size, default: 256*1024*1024 desc 'Skip an event if incoming event is invalid.' config_param :skip_invalid_event, :bool, default: true @@ -311,7 +313,7 @@ def on_message(msg, chunk_size, conn) size = option['size'] || 0 if option['compressed'] && option['compressed'] != 'text' - es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) + es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym, decompression_size_limit: @decompression_size_limit) else es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i) end diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 8d7066c6a0..c5df87ea3b 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -14,6 +14,7 @@ # limitations under the License. # +require 'fluent/plugin/extractor' require 'fluent/plugin/input' require 'fluent/plugin/parser' require 'fluent/event' @@ -64,6 +65,8 @@ class HttpInput < Input config_param :bind, :string, default: '0.0.0.0' desc 'The size limit of the POSTed element. Default is 32MB.' config_param :body_size_limit, :size, default: 32*1024*1024 # TODO default + desc 'The size limit of the decompressed element.' + config_param :decompression_size_limit, :size, default: 256*1024*1024 # TODO default desc 'The timeout limit for keeping the connection alive.' config_param :keepalive_timeout, :time, default: 10 # TODO default config_param :backlog, :integer, default: nil @@ -259,7 +262,7 @@ def on_request(path_info, params) def on_server_connect(conn) handler = Handler.new(conn, @km, method(:on_request), - @body_size_limit, @format_name, log, + @body_size_limit, @decompression_size_limit, @format_name, log, @cors_allow_origins, @cors_allow_credentials, @add_query_params) @@ -343,12 +346,13 @@ def convert_time_field(record) class Handler attr_reader :content_type - def initialize(io, km, callback, body_size_limit, format_name, log, + def initialize(io, km, callback, body_size_limit, decompression_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit + @decompression_size_limit = decompression_size_limit @next_close = false @format_name = format_name @log = log @@ -518,9 +522,9 @@ def on_message_complete # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze - @body = Zlib::GzipReader.new(StringIO.new(@body)).read + @body = Extractor.decompress_gzip(@body, limit: @decompression_size_limit) elsif @content_encoding == 'deflate'.freeze - @body = Zlib::Inflate.inflate(@body) + @body = Extractor.decompress_deflate(@body, limit: @decompression_size_limit) end rescue @log.warn 'fails to decode payload', error: $!.to_s diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 0990cb30e4..09cc860e11 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -87,6 +87,27 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) end end + sub_test_case 'configuration' do + setup do + Fluent::Test.setup + + @dir = File.expand_path('../../tmp/buffer_file_dir', __FILE__) + FileUtils.rm_rf @dir + FileUtils.mkdir_p @dir + end + + test 'passes decompression_size_limit to FileChunk' do + d = FluentPluginFileBufferTest::DummyOutputPlugin.new + p = Fluent::Plugin::FileBuffer.new + p.owner = d + p.configure(config_element('buffer', '', {'path' => File.join(@dir, 'buffer.*.file'), 'decompression_size_limit' => 4 * 1024 * 1024})) + + chunk = p.generate_chunk(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + + assert_equal 4 * 1024 * 1024, chunk.instance_variable_get(:@decompression_size_limit) + end + end + sub_test_case 'non configured buffer plugin instance' do setup do Fluent::Test.setup diff --git a/test/plugin/test_buf_file_single.rb b/test/plugin/test_buf_file_single.rb index 86675e8039..56f9b643c9 100644 --- a/test/plugin/test_buf_file_single.rb +++ b/test/plugin/test_buf_file_single.rb @@ -129,6 +129,20 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum p = @d.instance.buffer assert_equal File.join(@bufdir, 'fsb.*.buf'), p.path end + + test 'passes decompression_size_limit to FileChunk' do + @d = create_driver(%[ + + @type file_single + path #{@bufdir}/foo.*.bar + decompression_size_limit 4m + + ]) + p = @d.instance.buffer + chunk = p.generate_chunk(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + + assert_equal 4 * 1024 * 1024, chunk.instance_variable_get(:@decompression_size_limit) + end end sub_test_case 'buffer configurations and workers' do diff --git a/test/plugin/test_buf_memory.rb b/test/plugin/test_buf_memory.rb index 4e1b2e211b..df300410b3 100644 --- a/test/plugin/test_buf_memory.rb +++ b/test/plugin/test_buf_memory.rb @@ -39,4 +39,13 @@ class MemoryBufferTest < Test::Unit::TestCase assert c2.is_a? Fluent::Plugin::Buffer::MemoryChunk assert_equal m2, c2.metadata end + + test 'passes decompression_size_limit to MemoryChunk' do + @p.configure(config_element('buffer', '', {'decompression_size_limit' => 4 * 1024 * 1024})) + + meta = Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil) + chunk = @p.generate_chunk(meta) + + assert_equal 4 * 1024 * 1024, chunk.instance_variable_get(:@decompression_size_limit) + end end diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 2cff91fb94..0b03ec7fdf 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -20,7 +20,7 @@ class DummyMemoryChunkError < StandardError; end class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk attr_reader :append_count, :rollbacked, :closed, :purged, :chunk attr_accessor :failing - def initialize(metadata, compress: :text) + def initialize(metadata, compress: :text, decompression_size_limit: Fluent::Plugin::Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) super @append_count = 0 @rollbacked = false @@ -79,7 +79,7 @@ def resume return staged, queued end def generate_chunk(metadata) - DummyMemoryChunk.new(metadata, compress: @compress) + DummyMemoryChunk.new(metadata, compress: @compress, decompression_size_limit: @decompression_size_limit) end end end @@ -1441,6 +1441,10 @@ def create_chunk_es(metadata, es) assert_equal :gzip, @p.compress end + test 'decompression_size_limit is 256MB' do + assert_equal 256*1024*1024, @p.decompression_size_limit + end + test 'create decompressable chunk' do chunk = @p.generate_chunk(create_metadata) assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::GzipDecompressable) @@ -1450,8 +1454,8 @@ def create_chunk_es(metadata, es) @p = create_buffer({'compress' => 'gzip', 'chunk_limit_size' => 70}) timestamp = event_time('2016-04-11 16:00:02 +0000') es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}], # overflow - [timestamp, {"message" => "aaa"}], - [timestamp, {"message" => "bbb"}]]) + [timestamp, {"message" => "a"}], + [timestamp, {"message" => "b"}]]) assert_equal [], @p.queue.map(&:metadata) assert_equal 70, @p.chunk_limit_size @@ -1469,6 +1473,21 @@ def create_chunk_es(metadata, es) assert_equal([2, [@dm0, @dm0], [1, 1], nil], [@p.queue.size, @p.queue.map(&:metadata), @p.queue.map(&:size), @p.stage[@dm0]]) end + + test '#read raises SizeLimitError when decompressed data exceeds decompression_size_limit' do + @p = create_buffer({'compress' => 'gzip', 'decompression_size_limit' => 100}) + timestamp = event_time('2016-04-11 16:00:02 +0000') + + es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "A" * 1024}]]) + + @p.write({@dm0 => es}) + + chunk = @p.stage[@dm0] + + assert_raise Fluent::Plugin::Extractor::SizeLimitError do + chunk.read + end + end end sub_test_case '#statistics' do diff --git a/test/plugin/test_extractor.rb b/test/plugin/test_extractor.rb new file mode 100644 index 0000000000..c256d77683 --- /dev/null +++ b/test/plugin/test_extractor.rb @@ -0,0 +1,129 @@ +require_relative '../helper' +require 'fluent/plugin/extractor' +require 'stringio' +require 'zlib' +require 'zstd-ruby' + +class ExtractorTest < Test::Unit::TestCase + setup do + @strict_limit = 1023 + @safe_limit = 1024 + @original_text = "A" * @safe_limit + end + + def create_gzip(data) + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + gz.write(data) + gz.close + io.string + end + + def create_multi_gzip(data1, data2) + create_gzip(data1) + create_gzip(data2) + end + + def create_zstd(data) + Zstd.compress(data) + end + + def create_multi_zstd(data1, data2) + create_zstd(data1) + create_zstd(data2) + end + + def create_deflate(data) + Zlib::Deflate.deflate(data) + end + + sub_test_case 'decompress_gzip' do + test 'successfully decompresses data within the limit' do + compressed = create_gzip(@original_text) + result = Fluent::Plugin::Extractor.decompress_gzip(compressed, limit: @safe_limit) + assert_equal @original_text, result + end + + test 'successfully decompresses multi-member gzip data' do + compressed = create_multi_gzip("Hello ", "World!") + result = Fluent::Plugin::Extractor.decompress_gzip(compressed, limit: @safe_limit) + assert_equal "Hello World!", result + end + + test 'raises an error when decompressed data exceeds the limit' do + compressed = create_gzip(@original_text) + err = assert_raise(Fluent::Plugin::Extractor::SizeLimitError) do + Fluent::Plugin::Extractor.decompress_gzip(compressed, limit: @strict_limit) + end + assert_equal "Decompressed data exceeds limit of #{@strict_limit} bytes", err.message + end + end + + sub_test_case 'decompress_zstd' do + test 'successfully decompresses data within the limit' do + compressed = create_zstd(@original_text) + result = Fluent::Plugin::Extractor.decompress_zstd(compressed, limit: @safe_limit) + assert_equal @original_text, result + end + + test 'successfully decompresses multi-member zstd data' do + compressed = create_multi_zstd("Hello ", "World!") + result = Fluent::Plugin::Extractor.decompress_zstd(compressed, limit: @safe_limit) + assert_equal "Hello World!", result + end + + test 'raises an error when decompressed data exceeds the limit' do + compressed = create_zstd(@original_text) + err = assert_raise(Fluent::Plugin::Extractor::SizeLimitError) do + Fluent::Plugin::Extractor.decompress_zstd(compressed, limit: @strict_limit) + end + assert_equal "Decompressed data exceeds limit of #{@strict_limit} bytes", err.message + end + end + + sub_test_case 'decompress_deflate' do + test 'successfully decompresses data within the limit' do + compressed = create_deflate(@original_text) + result = Fluent::Plugin::Extractor.decompress_deflate(compressed, limit: @safe_limit) + assert_equal @original_text, result + end + + test 'raises an error when decompressed data exceeds the limit' do + compressed = create_deflate(@original_text) + err = assert_raise(Fluent::Plugin::Extractor::SizeLimitError) do + Fluent::Plugin::Extractor.decompress_deflate(compressed, limit: @strict_limit) + end + assert_equal "Decompressed data exceeds limit of #{@strict_limit} bytes", err.message + end + end + + sub_test_case 'io_decompress_gzip' do + test 'successfully decompresses data' do + input = StringIO.new(create_gzip(@original_text)) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_gzip(input, output) + assert_equal @original_text, output.string + end + + test 'successfully decompresses multi-member gzip data' do + input = StringIO.new(create_multi_gzip("Hello ", "World!")) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_gzip(input, output) + assert_equal "Hello World!", output.string + end + end + + sub_test_case 'io_decompress_zstd' do + test 'successfully decompresses data' do + input = StringIO.new(create_zstd(@original_text)) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_zstd(input, output) + assert_equal @original_text, output.string + end + + test 'successfully decompresses multi-member zstd data' do + input = StringIO.new(create_multi_zstd("Hello ", "World!")) + output = StringIO.new + Fluent::Plugin::Extractor.io_decompress_zstd(input, output) + assert_equal "Hello World!", output.string + end + end +end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 48546d4e32..273b1ae3f8 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -612,6 +612,33 @@ def create_driver(conf=base_config) end end end + + test 'raises SizeLimitError when decompressed size exceeds limit' do + @d = d = create_driver(%[ + port #{@port} + bind "127.0.0.1" + decompression_size_limit 5k + ]) + + time_i = event_time("2011-01-02 13:14:15 UTC").to_i + events = (1..20).map { ["tag1", time_i, {"a"=>"A" * 1000}] } + + # create compressed entries + entries = '' + events.each do |_tag, _time, record| + v = [_time, record].to_msgpack + entries << compress(v) + end + chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack + + d.run do + assert_raise Fluent::Plugin::Extractor::SizeLimitError do + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| + d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) + end + end + end + end end sub_test_case 'warning' do diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 413ff904f7..6ce2fb231a 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -31,6 +31,7 @@ def config port #{@port} bind "127.0.0.1" body_size_limit 10m + decompression_size_limit 128m keepalive_timeout 5 respond_with_empty_img true use_204_response false @@ -46,6 +47,7 @@ def test_configure assert_equal @port, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 10*1024*1024, d.instance.body_size_limit + assert_equal 128*1024*1024, d.instance.decompression_size_limit assert_equal 5, d.instance.keepalive_timeout assert_equal false, d.instance.add_http_headers assert_equal false, d.instance.add_query_params @@ -1005,6 +1007,34 @@ def test_content_encoding_gzip assert_equal_event_time time, d.events[1][1] end + def test_content_encoding_gzip_size_error + d = create_driver(%[ + port #{@port} + bind "127.0.0.1" + decompression_size_limit 512k + ]) + + time = event_time("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"msg"=>"a" * 1024 * 1024}], + ] + res_codes = [] + + d.run do + events.each do |tag, time, record| + header = {'Content-Type'=>'application/json', 'Content-Encoding'=>'gzip'} + res = post("/#{tag}?time=#{time}", compress_gzip(record.to_json), header) + res_codes << res.code + end + end + + assert_equal ["400"], res_codes + assert_true d.events.empty? + + log = d.logs.first + assert_true log.include?("fails to decode payload") && log.include?("Decompressed data exceeds limit of #{512*1024} bytes") + end + def test_content_encoding_deflate d = create_driver @@ -1028,6 +1058,34 @@ def test_content_encoding_deflate assert_equal_event_time time, d.events[1][1] end + def test_content_encoding_deflate_size_error + d = create_driver(%[ + port #{@port} + bind "127.0.0.1" + decompression_size_limit 512k + ]) + + time = event_time("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"msg"=>"a" * 1024 * 1024}], + ] + res_codes = [] + + d.run do + events.each do |tag, time, record| + header = {'Content-Type'=>'application/msgpack', 'Content-Encoding'=>'deflate'} + res = post("/#{tag}?time=#{time}", Zlib.deflate(record.to_msgpack), header) + res_codes << res.code + end + end + + assert_equal ["400"], res_codes + assert_true d.events.empty? + + log = d.logs.first + assert_true log.include?("fails to decode payload") && log.include?("Decompressed data exceeds limit of #{512*1024} bytes") + end + def test_cors_disallowed d = create_driver(config + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins