@@ -68,7 +68,12 @@ def each(unpacker: nil, &block)
6868 helpers :server
6969
7070 LISTEN_PORT = 24224
71+ OPTION_ACK = 'ack' . freeze
72+ OPTION_CHUNK = 'chunk' . freeze
73+ OPTION_COMPRESSED = 'compressed' . freeze
7174 OPTION_FLUENT_SIGNAL = 'fluent_signal' . freeze
75+ OPTION_SIZE = 'size' . freeze
76+ OPTION_TEXT = 'text' . freeze
7277
7378 desc 'The port to listen to.'
7479 config_param :port , :integer , default : LISTEN_PORT
@@ -313,8 +318,8 @@ def read_messages(conn, &block)
313318 end
314319
315320 def response ( option )
316- if option && option [ 'chunk' ]
317- return { 'ack' => option [ 'chunk' ] }
321+ if option && option [ OPTION_CHUNK ]
322+ return { OPTION_ACK => option [ OPTION_CHUNK ] }
318323 end
319324 nil
320325 end
@@ -348,10 +353,11 @@ def on_message(msg, chunk_size, conn)
348353 when String
349354 # PackedForward
350355 option = msg [ 2 ] || { }
351- size = option [ 'size' ] || 0
356+ size = option [ OPTION_SIZE ] || 0
357+ compressed = option [ OPTION_COMPRESSED ]
352358
353- if option [ ' compressed' ] && option [ ' compressed' ] != 'text'
354- es = Fluent ::CompressedMessagePackEventStream . new ( entries , nil , size . to_i , compress : option [ ' compressed' ] . to_sym , decompression_size_limit : @decompression_size_limit )
359+ if compressed && compressed != OPTION_TEXT
360+ es = Fluent ::CompressedMessagePackEventStream . new ( entries , nil , size . to_i , compress : compressed . to_sym , decompression_size_limit : @decompression_size_limit )
355361 else
356362 es = Fluent ::MessagePackEventStream . new ( entries , nil , size . to_i )
357363 end
0 commit comments