diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 78e1d4d920..18839d5f52 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -17,6 +17,7 @@ require 'net/http' require 'uri' require 'openssl' +require 'securerandom' require 'fluent/tls' require 'fluent/plugin/output' require 'fluent/plugin_helper/socket' @@ -58,6 +59,9 @@ class RetryableResponse < StandardError; end desc 'Compress HTTP request body' config_param :compress, :enum, list: [:text, :gzip], default: :text + desc 'Allowed hosts list for dynamic endpoints' + config_param :allowed_hosts, :array, default: [] + desc 'The connection open timeout in seconds' config_param :open_timeout, :integer, default: nil desc 'The read timeout in seconds' @@ -106,6 +110,11 @@ class RetryableResponse < StandardError; end config_param :aws_role_arn, :string, default: nil end + # To prevent URI::InvalidURIError, we replace Fluentd placeholders with a dummy string. + # We use the ".invalid" TLD (RFC 2606) to ensure it is RFC-compliant for URI parsing, + # while guaranteeing it will never conflict with a real-world hostname. + REPLACED_ENDPOINT_PLACEHOLDER = "#{SecureRandom.uuid}.invalid".freeze + def connection_cache_id_thread_key "#{plugin_id}_connection_cache_id" end @@ -146,6 +155,15 @@ def configure(conf) @retryable_response_codes = [503] end + begin + # Replace all Fluentd placeholder syntaxes (${...} or %{...}) + endpoint = @endpoint.gsub(%r([$%]{[^}]+}), REPLACED_ENDPOINT_PLACEHOLDER) + # If @endpoint has placeholder as host name, then, @endpoint_host == REPLACED_ENDPOINT_PLACEHOLDER + @endpoint_host = URI.parse(endpoint).host + rescue URI::InvalidURIError => e + raise Fluent::ConfigError, "Invalid endpoint URI: #{@endpoint} (#{e.message})" + end + @http_opt = setup_http_option @proxy_uri = URI.parse(@proxy) if @proxy @formatter = formatter_create @@ -278,7 +296,19 @@ def setup_http_option def parse_endpoint(chunk) endpoint = extract_placeholders(@endpoint, chunk) - URI.parse(endpoint) + uri = URI.parse(endpoint) + + if @endpoint_host != uri.host + if @allowed_hosts.empty? + raise Fluent::UnrecoverableError, "allowed_hosts is strictly required when using placeholders in the endpoint host" + end + + unless @allowed_hosts.include?(uri.host) + raise Fluent::UnrecoverableError, "Not allowed host: #{uri.host}" + end + end + + uri end def set_headers(req, uri, chunk) diff --git a/test/plugin/test_out_http.rb b/test/plugin/test_out_http.rb index b63b7b1324..10b63f4740 100644 --- a/test/plugin/test_out_http.rb +++ b/test/plugin/test_out_http.rb @@ -228,6 +228,12 @@ def test_configure_content_type_json_array(content_type) assert_equal content_type, d.instance.content_type end + def test_configure_allowed_hosts + d = create_driver(config + 'allowed_hosts ["example.com"]') + + assert_equal ["example.com"], d.instance.allowed_hosts + end + data('PUT' => 'put', 'POST' => 'post') def test_write_with_method(method) d = create_driver(config + "http_method #{method}") @@ -600,4 +606,61 @@ def test_connection_recreation assert_not_empty result.headers end end + + sub_test_case 'dynamic endpoint host validation in parse_endpoint' do + setup do + @chunk = Object.new + end + + test 'allows request when host does not change (e.g., placeholder only in path)' do + plugin = create_driver(%( + endpoint http://api.example.com/logs/${tag} + )).instance + + stub(plugin).extract_placeholders { 'http://api.example.com/logs/my.custom.tag' } + + uri = plugin.send(:parse_endpoint, @chunk) + assert_equal 'api.example.com', uri.host + assert_equal '/logs/my.custom.tag', uri.path + end + + test 'allows request when host changes and new host is in allowed_hosts' do + plugin = create_driver(%( + endpoint http://${tag}:8080/api + allowed_hosts ["known-host.example.com", "another-host.example.com"] + )).instance + + stub(plugin).extract_placeholders { 'http://known-host.example.com:8080/api' } + + uri = plugin.send(:parse_endpoint, @chunk) + assert_equal 'known-host.example.com', uri.host + end + + test 'raises UnrecoverableError when host changes and allowed_hosts is empty' do + plugin = create_driver(%( + endpoint http://${tag}:8080/api + )).instance + + stub(plugin).extract_placeholders { 'http://unknown-host.example.com:8080/api' } + + err = assert_raise(Fluent::UnrecoverableError) do + plugin.send(:parse_endpoint, @chunk) + end + assert_match(/allowed_hosts is strictly required when using placeholders in the endpoint host/, err.message) + end + + test 'raises UnrecoverableError when host changes and it is not in allowed_hosts' do + plugin = create_driver(%( + endpoint http://${tag}:8080/api + allowed_hosts ["known-host.example.com"] + )).instance + + stub(plugin).extract_placeholders { 'http://unknown-host.example.com:8080/api' } + + err = assert_raise(Fluent::UnrecoverableError) do + plugin.send(:parse_endpoint, @chunk) + end + assert_match(/Not allowed host: unknown-host\.example\.com/, err.message) + end + end end