Skip to content

Commit f5f2b7c

Browse files
authored
Backport(v1.19) buffer, in_http: enforce size limits on decompressed payloads (#5393)
**Which issue(s) this PR fixes**: Fixes # **What this PR does / why we need it**: This PR introduces a strict upper bound for decompressed payloads. ### Changes 1. Introduced `Fluent::Plugin::Extractor` * Extracted the decompression logic (gzip, zstd, deflate) into a dedicated module that processes and decompresses data in manageable 64KB chunks, ensuring steady memory usage. 2. Added `decompression_size_limit` parameter (default: `256MiB`) * Specifies the upper limit of decompression in `buffer` (`buf_file`, `buf_file_single` and `buf_memory`) and `in_http` plugins. * If the limit is exceeded, it safely raises `Fluent::Plugin::Extractor::SizeLimitError` (or returns a `400 Bad Request` in `in_http`). ### Notice for 3rd-party Plugin Developers Plugins utilizing the `buffer` / `in_http` mechanisms or the newly introduced `Fluent::Plugin::Extractor` are automatically protected by this limit. However, if your custom plugin implements its own decompression logic independently, you must take care to implement similar boundary checks to guard against unexpected memory spikes. **Docs Changes**: **Release Note**: * buffer, in_http: enforce size limits on decompressed payloads Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
1 parent 9909215 commit f5f2b7c

20 files changed

Lines changed: 436 additions & 82 deletions

lib/fluent/event.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,12 @@ def to_msgpack_stream(time_int: false, packer: nil)
268268
end
269269

270270
class CompressedMessagePackEventStream < MessagePackEventStream
271-
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
271+
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip, decompression_size_limit: Fluent::Plugin::Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
272272
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
273273
@decompressed_data = nil
274274
@compressed_data = data
275275
@type = compress
276+
@decompression_size_limit = decompression_size_limit
276277
end
277278

278279
def empty?

lib/fluent/plugin/buf_file.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def resume
205205
def generate_chunk(metadata)
206206
# FileChunk generates real path with unique_id
207207
perm = @file_permission || system_config.file_permission
208-
chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress)
208+
chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit)
209209
log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata
210210

211211
return chunk

lib/fluent/plugin/buf_file_single.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def resume
183183
end
184184

185185
begin
186-
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress)
186+
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress, decompression_size_limit: @decompression_size_limit)
187187
chunk.restore_size(@chunk_format) if @calc_num_records
188188
rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e
189189
exist_broken_file = true
@@ -216,7 +216,7 @@ def resume
216216
def generate_chunk(metadata)
217217
# FileChunk generates real path with unique_id
218218
perm = @file_permission || system_config.file_permission
219-
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress)
219+
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit)
220220

221221
log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata
222222

lib/fluent/plugin/buf_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def resume
2727
end
2828

2929
def generate_chunk(metadata)
30-
Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress)
30+
Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress, decompression_size_limit: @decompression_size_limit)
3131
end
3232
end
3333
end

lib/fluent/plugin/buffer.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
6666
desc 'Compress buffered data.'
6767
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text
6868

69+
desc 'The size limit of the decompressed element.'
70+
config_param :decompression_size_limit, :size, default: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT
71+
6972
desc 'If true, chunks are thrown away when unrecoverable error happens'
7073
config_param :disable_chunk_backup, :bool, default: false
7174

lib/fluent/plugin/buffer/chunk.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class Chunk
4848

4949
# TODO: CompressedPackedMessage of forward protocol?
5050

51-
def initialize(metadata, compress: :text)
51+
def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
5252
super()
5353
@unique_id = generate_unique_id
5454
@metadata = metadata
@@ -64,6 +64,7 @@ def initialize(metadata, compress: :text)
6464
elsif compress == :zstd
6565
extend ZstdDecompressable
6666
end
67+
@decompression_size_limit = decompression_size_limit
6768
end
6869

6970
attr_reader :unique_id, :metadata, :state

lib/fluent/plugin/buffer/file_chunk.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ class FileChunkError < StandardError; end
3939

4040
attr_reader :path, :meta_path, :permission
4141

42-
def initialize(metadata, path, mode, perm: nil, compress: :text)
43-
super(metadata, compress: compress)
42+
def initialize(metadata, path, mode, perm: nil, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
43+
super(metadata, compress: compress, decompression_size_limit: decompression_size_limit)
4444
perm ||= Fluent::DEFAULT_FILE_PERMISSION
4545
@permission = perm.is_a?(String) ? perm.to_i(8) : perm
4646
@bytesize = @size = @adding_bytes = @adding_size = 0

lib/fluent/plugin/buffer/file_single_chunk.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ class FileChunkError < StandardError; end
3535

3636
attr_reader :path, :permission
3737

38-
def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text)
39-
super(metadata, compress: compress)
38+
def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
39+
super(metadata, compress: compress, decompression_size_limit: decompression_size_limit)
4040
@key = key
4141
perm ||= Fluent::DEFAULT_FILE_PERMISSION
4242
@permission = perm.is_a?(String) ? perm.to_i(8) : perm

lib/fluent/plugin/buffer/memory_chunk.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module Fluent
2020
module Plugin
2121
class Buffer
2222
class MemoryChunk < Chunk
23-
def initialize(metadata, compress: :text)
23+
def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
2424
super
2525
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
2626
@chunk_bytes = 0

lib/fluent/plugin/compressable.rb

Lines changed: 7 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
# limitations under the License.
1515
#
1616

17+
require 'fluent/plugin/extractor'
1718
require 'stringio'
1819
require 'zlib'
1920
require 'zstd-ruby'
2021

2122
module Fluent
2223
module Plugin
2324
module Compressable
25+
DEFAULT_DECOMPRESSION_SIZE_LIMIT = 256 * 1024 * 1024
26+
2427
def compress(data, type: :gzip, **kwargs)
2528
output_io = kwargs[:output_io]
2629
io = output_io || StringIO.new
@@ -60,79 +63,21 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip
6063

6164
private
6265

63-
def string_decompress_gzip(compressed_data)
64-
io = StringIO.new(compressed_data)
65-
out = ''
66-
loop do
67-
reader = Zlib::GzipReader.new(io)
68-
out << reader.read
69-
unused = reader.unused
70-
reader.finish
71-
unless unused.nil?
72-
adjust = unused.length
73-
io.pos -= adjust
74-
end
75-
break if io.eof?
76-
end
77-
out
78-
end
79-
80-
def string_decompress_zstd(compressed_data)
81-
io = StringIO.new(compressed_data)
82-
reader = Zstd::StreamReader.new(io)
83-
out = ''
84-
loop do
85-
# Zstd::StreamReader needs to specify the size of the buffer
86-
out << reader.read(1024)
87-
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
88-
break if io.eof?
89-
end
90-
out
91-
end
92-
9366
def string_decompress(compressed_data, type = :gzip)
9467
if type == :gzip
95-
string_decompress_gzip(compressed_data)
68+
Extractor.decompress_gzip(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT)
9669
elsif type == :zstd
97-
string_decompress_zstd(compressed_data)
70+
Extractor.decompress_zstd(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT)
9871
else
9972
raise ArgumentError, "Unknown compression type: #{type}"
10073
end
10174
end
10275

103-
def io_decompress_gzip(input, output)
104-
loop do
105-
reader = Zlib::GzipReader.new(input)
106-
v = reader.read
107-
output.write(v)
108-
unused = reader.unused
109-
reader.finish
110-
unless unused.nil?
111-
adjust = unused.length
112-
input.pos -= adjust
113-
end
114-
break if input.eof?
115-
end
116-
output
117-
end
118-
119-
def io_decompress_zstd(input, output)
120-
reader = Zstd::StreamReader.new(input)
121-
loop do
122-
# Zstd::StreamReader needs to specify the size of the buffer
123-
v = reader.read(1024)
124-
output.write(v)
125-
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
126-
break if input.eof?
127-
end
128-
output
129-
end
130-
13176
def io_decompress(input, output, type = :gzip)
13277
if type == :gzip
133-
io_decompress_gzip(input, output)
78+
Extractor.io_decompress_gzip(input, output)
13479
elsif type == :zstd
135-
io_decompress_zstd(input, output)
80+
Extractor.io_decompress_zstd(input, output)
13681
else
13782
raise ArgumentError, "Unknown compression type: #{type}"
13883
end

0 commit comments

Comments
 (0)