Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,12 @@ def to_msgpack_stream(time_int: false, packer: nil)
end

class CompressedMessagePackEventStream < MessagePackEventStream
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip, decompression_size_limit: Fluent::Plugin::Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
@decompressed_data = nil
@compressed_data = data
@type = compress
@decompression_size_limit = decompression_size_limit
end

def empty?
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def resume
def generate_chunk(metadata)
# FileChunk generates real path with unique_id
perm = @file_permission || system_config.file_permission
chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress)
chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit)
log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata

return chunk
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def resume
end

begin
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress)
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress, decompression_size_limit: @decompression_size_limit)
chunk.restore_size(@chunk_format) if @calc_num_records
rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e
exist_broken_file = true
Expand Down Expand Up @@ -216,7 +216,7 @@ def resume
def generate_chunk(metadata)
# FileChunk generates real path with unique_id
perm = @file_permission || system_config.file_permission
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress)
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress, decompression_size_limit: @decompression_size_limit)

log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def resume
end

def generate_chunk(metadata)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress, decompression_size_limit: @decompression_size_limit)
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text

desc 'The size limit of the decompressed element.'
config_param :decompression_size_limit, :size, default: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT

desc 'If true, chunks are thrown away when unrecoverable error happens'
config_param :disable_chunk_backup, :bool, default: false

Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Chunk

# TODO: CompressedPackedMessage of forward protocol?

def initialize(metadata, compress: :text)
def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
super()
@unique_id = generate_unique_id
@metadata = metadata
Expand All @@ -64,6 +64,7 @@ def initialize(metadata, compress: :text)
elsif compress == :zstd
extend ZstdDecompressable
end
@decompression_size_limit = decompression_size_limit
end

attr_reader :unique_id, :metadata, :state
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class FileChunkError < StandardError; end

attr_reader :path, :meta_path, :permission

def initialize(metadata, path, mode, perm: nil, compress: :text)
super(metadata, compress: compress)
def initialize(metadata, path, mode, perm: nil, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
super(metadata, compress: compress, decompression_size_limit: decompression_size_limit)
perm ||= Fluent::DEFAULT_FILE_PERMISSION
@permission = perm.is_a?(String) ? perm.to_i(8) : perm
@bytesize = @size = @adding_bytes = @adding_size = 0
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/buffer/file_single_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class FileChunkError < StandardError; end

attr_reader :path, :permission

def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text)
super(metadata, compress: compress)
def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
super(metadata, compress: compress, decompression_size_limit: decompression_size_limit)
@key = key
perm ||= Fluent::DEFAULT_FILE_PERMISSION
@permission = perm.is_a?(String) ? perm.to_i(8) : perm
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Fluent
module Plugin
class Buffer
class MemoryChunk < Chunk
def initialize(metadata, compress: :text)
def initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
super
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
@chunk_bytes = 0
Expand Down
69 changes: 7 additions & 62 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
# limitations under the License.
#

require 'fluent/plugin/extractor'
require 'stringio'
require 'zlib'
require 'zstd-ruby'

module Fluent
module Plugin
module Compressable
DEFAULT_DECOMPRESSION_SIZE_LIMIT = 256 * 1024 * 1024

def compress(data, type: :gzip, **kwargs)
output_io = kwargs[:output_io]
io = output_io || StringIO.new
Expand Down Expand Up @@ -60,79 +63,21 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip

private

def string_decompress_gzip(compressed_data)
io = StringIO.new(compressed_data)
out = ''
loop do
reader = Zlib::GzipReader.new(io)
out << reader.read
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
io.pos -= adjust
end
break if io.eof?
end
out
end

def string_decompress_zstd(compressed_data)
io = StringIO.new(compressed_data)
reader = Zstd::StreamReader.new(io)
out = ''
loop do
# Zstd::StreamReader needs to specify the size of the buffer
out << reader.read(1024)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if io.eof?
end
out
end

def string_decompress(compressed_data, type = :gzip)
if type == :gzip
string_decompress_gzip(compressed_data)
Extractor.decompress_gzip(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT)
elsif type == :zstd
string_decompress_zstd(compressed_data)
Extractor.decompress_zstd(compressed_data, limit: @decompression_size_limit || DEFAULT_DECOMPRESSION_SIZE_LIMIT)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end

def io_decompress_gzip(input, output)
loop do
reader = Zlib::GzipReader.new(input)
v = reader.read
output.write(v)
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
input.pos -= adjust
end
break if input.eof?
end
output
end

def io_decompress_zstd(input, output)
reader = Zstd::StreamReader.new(input)
loop do
# Zstd::StreamReader needs to specify the size of the buffer
v = reader.read(1024)
output.write(v)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if input.eof?
end
output
end

def io_decompress(input, output, type = :gzip)
if type == :gzip
io_decompress_gzip(input, output)
Extractor.io_decompress_gzip(input, output)
elsif type == :zstd
io_decompress_zstd(input, output)
Extractor.io_decompress_zstd(input, output)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
Expand Down
121 changes: 121 additions & 0 deletions lib/fluent/plugin/extractor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'stringio'
require 'zlib'
require 'zstd-ruby'
require 'fluent/error'

module Fluent
module Plugin
module Extractor
class SizeLimitError < UnrecoverableError; end

BYTES_TO_READ = 64 * 1024
INFLATE_BYTES_TO_READ = 1024

def self.decompress_gzip(compressed_data, limit:)
io = StringIO.new(compressed_data)
out = ''
loop do
reader = Zlib::GzipReader.new(io)
while (chunk = reader.read(BYTES_TO_READ))
out << chunk
if out.bytesize > limit
raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
end
end

unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
io.pos -= adjust
end
break if io.eof?
end
out
end

def self.decompress_zstd(compressed_data, limit:)
io = StringIO.new(compressed_data)
reader = Zstd::StreamReader.new(io)
out = ''
loop do
# Zstd::StreamReader needs to specify the size of the buffer
out << reader.read(BYTES_TO_READ)
if out.bytesize > limit
raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
end

# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if io.eof?
end
out
end

def self.decompress_deflate(compressed_data, limit:)
io = StringIO.new(compressed_data)
out = ''
begin
zstream = Zlib::Inflate.new
while (chunk = io.read(INFLATE_BYTES_TO_READ))
out << zstream.inflate(chunk)
if out.bytesize > limit
raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
end
end
out << zstream.finish
if out.bytesize > limit
raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
end
ensure
zstream&.close
end
out
end

def self.io_decompress_gzip(input, output)
loop do
reader = Zlib::GzipReader.new(input)
while (chunk = reader.read(BYTES_TO_READ))
output.write(chunk)
end
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
input.pos -= adjust
end
break if input.eof?
end
output
end

def self.io_decompress_zstd(input, output)
reader = Zstd::StreamReader.new(input)
loop do
# Zstd::StreamReader needs to specify the size of the buffer
chunk = reader.read(BYTES_TO_READ)
output.write(chunk)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if input.eof?
end
output
end
end
end
end
4 changes: 3 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ForwardInput < Input
config_param :chunk_size_warn_limit, :size, default: nil
desc 'Received chunk is dropped if it is larger than this value.'
config_param :chunk_size_limit, :size, default: nil
desc 'The size limit of the decompressed element.'
config_param :decompression_size_limit, :size, default: 256*1024*1024
desc 'Skip an event if incoming event is invalid.'
config_param :skip_invalid_event, :bool, default: true

Expand Down Expand Up @@ -311,7 +313,7 @@ def on_message(msg, chunk_size, conn)
size = option['size'] || 0

if option['compressed'] && option['compressed'] != 'text'
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym, decompression_size_limit: @decompression_size_limit)
else
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
end
Expand Down
Loading
Loading