Skip to content

Commit dab88f1

Browse files
committed
Experimental: Replace Yajl to JSON for handling JSON stream
Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
1 parent 3e53f56 commit dab88f1

11 files changed

Lines changed: 45 additions & 43 deletions

File tree

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ source 'https://rubygems.org/'
33
gemspec
44

55
gem 'benchmark'
6+
gem 'json', git: 'https://github.com/byroot/json.git', branch: 'resumable-parser'
67

78
local_gemfile = File.join(File.dirname(__FILE__), "Gemfile.local")
89
if File.exist?(local_gemfile)

fluentd.gemspec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ Gem::Specification.new do |gem|
2929

3030
gem.add_runtime_dependency("bundler")
3131
gem.add_runtime_dependency("msgpack", [">= 1.3.1", "< 2.0.0"])
32-
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
3332
gem.add_runtime_dependency("cool.io", [">= 1.4.5", "< 2.0.0"])
3433
gem.add_runtime_dependency("serverengine", [">= 2.3.2", "< 3.0.0"])
3534
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.9.0"])

lib/fluent/compat/exec_util.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
require 'msgpack'
1818
require 'json'
19-
require 'yajl'
2019

2120
require 'fluent/engine'
2221
require 'fluent/plugin'
@@ -78,9 +77,15 @@ def each_line(line)
7877

7978
class JSONParser < Parser
8079
def call(io)
81-
y = Yajl::Parser.new
82-
y.on_parse_complete = @on_message
83-
y.parse(io)
80+
parser = JSON::Ext::ResumableParser.new({})
81+
buffer_size = 8192
82+
83+
while (chunk = io.read(buffer_size))
84+
parser << chunk
85+
while parser.parse
86+
@on_message.call(parser.value)
87+
end
88+
end
8489
end
8590
end
8691

lib/fluent/config/literal_parser.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
require 'stringio'
1818

1919
require 'json'
20-
require 'yajl'
2120
require 'socket'
2221
require 'ripper'
2322

lib/fluent/load.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
require 'stringio'
55
require 'fileutils'
66
require 'json'
7-
require 'yajl'
87
require 'uri'
98
require 'msgpack'
109
require 'strptime'

lib/fluent/plugin/in_forward.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
require 'fluent/plugin/input'
1919
require 'fluent/msgpack_factory'
20-
require 'yajl'
20+
require 'json'
2121
require 'digest'
2222
require 'securerandom'
2323

@@ -248,13 +248,15 @@ def read_messages(conn, &block)
248248
unless feeder
249249
first = data[0]
250250
if first == '{' || first == '[' # json
251-
parser = Yajl::Parser.new
252-
parser.on_parse_complete = ->(obj){
253-
block.call(obj, bytes, serializer)
254-
bytes = 0
255-
}
251+
parser = JSON::Ext::ResumableParser.new({})
256252
serializer = :to_json.to_proc
257-
feeder = ->(d){ parser << d }
253+
feeder = ->(d){
254+
parser << d
255+
while parser.parse
256+
block.call(parser.value, bytes, serializer)
257+
bytes = 0
258+
end
259+
}
258260
else # msgpack
259261
parser = Fluent::MessagePackFactory.msgpack_unpacker
260262
serializer = :to_msgpack.to_proc

lib/fluent/plugin/in_unix.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
require 'fluent/msgpack_factory'
2020

2121
require 'cool.io'
22-
require 'yajl'
22+
require 'json'
2323
require 'fileutils'
2424
require 'socket'
2525

@@ -158,8 +158,7 @@ def on_read(data)
158158
first = data[0]
159159
if first == '{'.freeze || first == '['.freeze
160160
m = method(:on_read_json)
161-
@parser = Yajl::Parser.new
162-
@parser.on_parse_complete = @on_message
161+
@parser = JSON::Ext::ResumableParser.new({})
163162
else
164163
m = method(:on_read_msgpack)
165164
@parser = Fluent::MessagePackFactory.msgpack_unpacker
@@ -173,6 +172,9 @@ def on_read(data)
173172

174173
def on_read_json(data)
175174
@parser << data
175+
while @parser.parse
176+
@on_message.call(@parser.value)
177+
end
176178
rescue => e
177179
@log.error "unexpected error in json payload", error: e.to_s
178180
@log.error_backtrace

lib/fluent/plugin/parser_json.rb

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
require 'fluent/time'
1919
require 'fluent/oj_options'
2020

21-
require 'yajl'
2221
require 'json'
2322

2423
module Fluent
@@ -28,12 +27,10 @@ class JSONParser < Parser
2827

2928
config_set_default :time_key, 'time'
3029
desc 'Set JSON parser'
30+
# NOTE: Contains yajl for backward compatibility
3131
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj
3232

33-
# The Yajl library defines a default buffer size of 8KiB when parsing
34-
# from IO streams, so maintain this for backwards-compatibility.
35-
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
36-
desc 'Set the buffer size that Yajl will use when parsing streaming input'
33+
desc 'Set the buffer size that JSON parser will use when parsing streaming input'
3734
config_param :stream_buffer_size, :integer, default: 8192
3835

3936
config_set_default :time_type, :float
@@ -54,8 +51,8 @@ def configure_json_parser(name)
5451

5552
log&.info "Oj is not installed, and failing back to JSON for json parser"
5653
configure_json_parser(:json)
57-
when :json then [JSON.method(:parse), JSON::ParserError]
58-
when :yajl then [Yajl.method(:load), Yajl::ParseError]
54+
when :yajl, :json # NOTE: Fallback yajl to json for backward compatibility
55+
[JSON.method(:parse), JSON::ParserError]
5956
else
6057
raise "BUG: unknown json parser specified: #{name}"
6158
end
@@ -94,11 +91,15 @@ def parser_type
9491
end
9592

9693
def parse_io(io, &block)
97-
y = Yajl::Parser.new
98-
y.on_parse_complete = ->(record){
99-
block.call(parse_time(record), record)
100-
}
101-
y.parse(io, @stream_buffer_size)
94+
parser = JSON::Ext::ResumableParser.new({})
95+
while (chunk = io.read(@stream_buffer_size))
96+
parser << chunk
97+
98+
while parser.parse
99+
record = parser.value
100+
block.call(parse_time(record), record)
101+
end
102+
end
102103
end
103104
end
104105
end

test/plugin/test_parser_json.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def setup
1111
sub_test_case "configure_json_parser" do
1212
data("oj", [:oj, [Oj.method(:load), Oj::ParseError]])
1313
data("json", [:json, [JSON.method(:parse), JSON::ParserError]])
14-
data("yajl", [:yajl, [Yajl.method(:load), Yajl::ParseError]])
14+
data("yajl", [:yajl, [JSON.method(:parse), JSON::ParserError]])
1515
def test_return_each_loader((input, expected_return))
1616
result = @parser.instance.configure_json_parser(input)
1717
assert_equal expected_return, result

test/test_event_time.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
require_relative 'helper'
22
require 'timecop'
33
require 'oj'
4-
require 'yajl'
4+
require 'json'
55

66
class EventTimeTest < Test::Unit::TestCase
77
setup do
@@ -70,12 +70,6 @@ class EventTimeTest < Test::Unit::TestCase
7070
assert_equal('["tag",100,{"key":"value"}]', Oj.dump(["tag", time, {"key" => "value"}], mode: :compat))
7171
end
7272

73-
test 'Yajl.dump' do
74-
time = Fluent::EventTime.new(100)
75-
assert_equal('{"time":100}', Yajl.dump({'time' => time}))
76-
assert_equal('["tag",100,{"key":"value"}]', Yajl.dump(["tag", time, {"key" => "value"}]))
77-
end
78-
7973
test '.from_time' do
8074
sec = 1000
8175
usec = 2

0 commit comments

Comments
 (0)