Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'net/http'
require 'uri'
require 'openssl'
require 'securerandom'
require 'fluent/tls'
require 'fluent/plugin/output'
require 'fluent/plugin_helper/socket'
Expand Down Expand Up @@ -58,6 +59,9 @@ class RetryableResponse < StandardError; end
desc 'Compress HTTP request body'
config_param :compress, :enum, list: [:text, :gzip], default: :text

desc 'Allowed hosts list for dynamic endpoints'
config_param :allowed_hosts, :array, default: []

desc 'The connection open timeout in seconds'
config_param :open_timeout, :integer, default: nil
desc 'The read timeout in seconds'
Expand Down Expand Up @@ -106,6 +110,11 @@ class RetryableResponse < StandardError; end
config_param :aws_role_arn, :string, default: nil
end

# To prevent URI::InvalidURIError, we replace Fluentd placeholders with a dummy string.
# We use the ".invalid" TLD (RFC 2606) to ensure it is RFC-compliant for URI parsing,
# while guaranteeing it will never conflict with a real-world hostname.
REPLACED_ENDPOINT_PLACEHOLDER = "#{SecureRandom.uuid}.invalid".freeze

def connection_cache_id_thread_key
"#{plugin_id}_connection_cache_id"
end
Expand Down Expand Up @@ -146,6 +155,15 @@ def configure(conf)
@retryable_response_codes = [503]
end

begin
# Replace all Fluentd placeholder syntaxes (${...} or %{...})
endpoint = @endpoint.gsub(%r([$%]{[^}]+}), REPLACED_ENDPOINT_PLACEHOLDER)
# If @endpoint has placeholder as host name, then, @endpoint_host == REPLACED_ENDPOINT_PLACEHOLDER
@endpoint_host = URI.parse(endpoint).host
rescue URI::InvalidURIError => e
raise Fluent::ConfigError, "Invalid endpoint URI: #{@endpoint} (#{e.message})"
end

@http_opt = setup_http_option
@proxy_uri = URI.parse(@proxy) if @proxy
@formatter = formatter_create
Expand Down Expand Up @@ -278,7 +296,19 @@ def setup_http_option

def parse_endpoint(chunk)
endpoint = extract_placeholders(@endpoint, chunk)
URI.parse(endpoint)
uri = URI.parse(endpoint)

if @endpoint_host != uri.host
if @allowed_hosts.empty?
raise Fluent::UnrecoverableError, "allowed_hosts is strictly required when using placeholders in the endpoint host"
end

unless @allowed_hosts.include?(uri.host)
raise Fluent::UnrecoverableError, "Not allowed host: #{uri.host}"
end
end

uri
end

def set_headers(req, uri, chunk)
Expand Down
63 changes: 63 additions & 0 deletions test/plugin/test_out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ def test_configure_content_type_json_array(content_type)
assert_equal content_type, d.instance.content_type
end

def test_configure_allowed_hosts
d = create_driver(config + 'allowed_hosts ["example.com"]')

assert_equal ["example.com"], d.instance.allowed_hosts
end

data('PUT' => 'put', 'POST' => 'post')
def test_write_with_method(method)
d = create_driver(config + "http_method #{method}")
Expand Down Expand Up @@ -600,4 +606,61 @@ def test_connection_recreation
assert_not_empty result.headers
end
end

sub_test_case 'dynamic endpoint host validation in parse_endpoint' do
setup do
@chunk = Object.new
end

test 'allows request when host does not change (e.g., placeholder only in path)' do
plugin = create_driver(%(
endpoint http://api.example.com/logs/${tag}
)).instance

stub(plugin).extract_placeholders { 'http://api.example.com/logs/my.custom.tag' }

uri = plugin.send(:parse_endpoint, @chunk)
assert_equal 'api.example.com', uri.host
assert_equal '/logs/my.custom.tag', uri.path
end

test 'allows request when host changes and new host is in allowed_hosts' do
plugin = create_driver(%(
endpoint http://${tag}:8080/api
allowed_hosts ["known-host.example.com", "another-host.example.com"]
)).instance

stub(plugin).extract_placeholders { 'http://known-host.example.com:8080/api' }

uri = plugin.send(:parse_endpoint, @chunk)
assert_equal 'known-host.example.com', uri.host
end

test 'raises UnrecoverableError when host changes and allowed_hosts is empty' do
plugin = create_driver(%(
endpoint http://${tag}:8080/api
)).instance

stub(plugin).extract_placeholders { 'http://unknown-host.example.com:8080/api' }

err = assert_raise(Fluent::UnrecoverableError) do
plugin.send(:parse_endpoint, @chunk)
end
assert_match(/allowed_hosts is strictly required when using placeholders in the endpoint host/, err.message)
end

test 'raises UnrecoverableError when host changes and it is not in allowed_hosts' do
plugin = create_driver(%(
endpoint http://${tag}:8080/api
allowed_hosts ["known-host.example.com"]
)).instance

stub(plugin).extract_placeholders { 'http://unknown-host.example.com:8080/api' }

err = assert_raise(Fluent::UnrecoverableError) do
plugin.send(:parse_endpoint, @chunk)
end
assert_match(/Not allowed host: unknown-host\.example\.com/, err.message)
end
end
end
Loading