Passing http request headers in the event metadata for http source#6671
Conversation
| return Collections.emptyMap(); | ||
| } | ||
|
|
||
| final boolean includeAll = metadataHeaders.size() == 1 && "*".equals(metadataHeaders.get(0)); |
There was a problem hiding this comment.
I'm not sure about supporting * without supporting regex in general. I think if we wanted this, we should add metadata_headers_regex instead. This would be consistent with other processors that support literals and regex.
There was a problem hiding this comment.
I will add the regex config in a follow up PR
| final Map<String, Object> headers = new HashMap<>(); | ||
| for (String headerName : headerNames) { | ||
| if (isSensitiveHeader(headerName)) { | ||
| LOG.warn("Skipping sensitive header '{}' from metadata_headers configuration", headerName); |
There was a problem hiding this comment.
I think we don't need any logging here. Maybe a TRACE if you want.
| return headers; | ||
| } | ||
|
|
||
| static boolean isSensitiveHeader(final String headerName) { |
There was a problem hiding this comment.
Put this in its own class and test it there.
| public void processRequestAttachesHeadersToEventMetadata() throws Exception { | ||
| final String tenantId = UUID.randomUUID().toString(); | ||
| final String region = UUID.randomUUID().toString(); | ||
| BlockingBuffer<Record<Log>> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); |
There was a problem hiding this comment.
Use a mocked Buffer instead. Then you don't need to use read. Verify that it is called and use an ArgumentCaptor.
| return HttpResponse.of(HttpStatus.OK); | ||
| } | ||
|
|
||
| private Map<String, Object> extractHeaders(final AggregatedHttpRequest aggregatedHttpRequest) { |
There was a problem hiding this comment.
I tend to think that this should go into it's own class. You can unit test this more easily. Then verify an interaction with the mock in the unit tests for LogHTTPService.
| return new Record<>(log); | ||
| .getThis(); | ||
| if (!headerAttributes.isEmpty()) { | ||
| builder.withEventMetadataAttributes(headerAttributes); |
There was a problem hiding this comment.
This approach will put all the headers as metadata in the root of the metadata. I wonder if it would make more sense to have these under headers in the metadata. Do we support nested metadata? If not, then I guess this is the approach we should take.
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
79c50d1 to
788c85b
Compare
| private PluginModel codec; | ||
|
|
||
| @JsonProperty("metadata_headers") | ||
| private List<String> metadataHeaders; |
There was a problem hiding this comment.
Suggest using empty list as default. Otherwise it's null by default and will need null check wherever it's used.
| private List<String> metadataHeaders; | |
| private List<String> metadataHeaders = Collections.emptyList(); |
|
|
||
| HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception { | ||
| final HttpData content = aggregatedHttpRequest.content(); | ||
| final Map<String, Object> extractedHeaders = httpHeaderExtractor.extractHeaders(aggregatedHttpRequest); |
There was a problem hiding this comment.
All records in a batch share the exact same extractedHeaders map instance in their metadata. If a processor downstream somehow mutates the map, it will affect other records in the batch, might cause a surprise. We can wrap extractedHeaders in Collections.unmodifiableMap() to prevents accidental in-place mutation.
| assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath()); | ||
| assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort()); | ||
| assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath()); | ||
| assertNull(sourceConfig.getMetadataHeaders()); |
There was a problem hiding this comment.
I believe this is causing the failure:
HTTPSourceConfigTest > testDefault() FAILED
org.opentest4j.AssertionFailedError: expected: <null> but was: <[]>
This is now an empty collection by default which is good.
|
|
||
| return new Record<>(log); | ||
| .getThis(); | ||
| if (!headerAttributes.isEmpty()) { |
There was a problem hiding this comment.
How can we consolidate this block with line 138? We seem to have a split in how we create records that could result in discrepancies.
| import java.util.stream.Collectors; | ||
|
|
||
|
|
||
| /* |
| final Buffer<Record<Log>> buffer, | ||
| final PluginMetrics pluginMetrics, | ||
| final InputCodec codec) { | ||
| this(bufferWriteTimeoutInMillis, buffer, pluginMetrics, codec, new HttpHeaderExtractor(null)); |
There was a problem hiding this comment.
You should use an empty collection for consistency. new HttpHeaderExtractor() or new HttpHeaderExtractor(Collections.emptySet())
| private final List<String> metadataHeaders; | ||
|
|
||
| public HttpHeaderExtractor(final List<String> metadataHeaders) { | ||
| this.metadataHeaders = metadataHeaders; |
| } | ||
|
|
||
| public Map<String, Object> extractHeaders(final AggregatedHttpRequest aggregatedHttpRequest) { | ||
| if (metadataHeaders == null || metadataHeaders.isEmpty()) { |
There was a problem hiding this comment.
Remove metadataHeaders == null after other changes I suggested.
|
|
||
| private final List<String> metadataHeaders; | ||
|
|
||
| public HttpHeaderExtractor(final List<String> metadataHeaders) { |
There was a problem hiding this comment.
This can be Collection<String> You don't require ordering.
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
Description
Adds a new
metadata_headersconfiguration option to the HTTP source plugin that allows users to extract HTTP request headers and attach them as event metadata. This enables downstream processors and sinks to route, filter, or enrich events based on HTTP headers.How it works
Header key normalization
X-Tenant-Idis stored in metadata asx-tenant-id.A hardcoded blocklist of sensitive headers is always filtered out, regardless of configuration. This includes authorization, cookie, x-api-key, x-amz-security-token, x-amz-credential, and others.
Headers with multiple values (e.g., X-Forwarded-For appearing twice) are stored as a List. Single-value headers are stored as a plain String.
This feature is currently only supported for the in-memory buffer (BlockingBuffer). The byte buffer / Kafka buffer path does not propagate metadata headers yet.
Example pipeline
Issues Resolved
Resolves #6239
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.