Skip to content

Commit 60d73ec

Browse files
committed
cilium: defer response logging until stream completion
The Cilium HTTP access log entry for the response side is currently emitted from encodeHeaders(), before later stages of the response stream complete. This means response-side header mutations performed by downstream filters such as ext_proc are not reflected in the access log entry that the Cilium agent consumes, even though they reach the wire correctly. Defer the response log entry until the response stream has actually completed: - encodeHeaders() now records the response header map and only emits the log entry immediately if end_stream is true on headers. - encodeData() and encodeTrailers() are implemented and call logResponse() once the stream ends. - onStreamComplete() acts as a final safety net. A response_logged_ guard ensures the response log entry is emitted exactly once per stream regardless of which code path triggers it. This is a prerequisite for surfacing ext_proc-injected response metadata (for example, AI usage headers such as token counts added by an external processor on the response path) through the Cilium access log bridge into Hubble. Tests: - New unit coverage in tests/accesslog_test.cc for deferred response logging on stream completion. - New integration coverage in tests/cilium_http_integration_test.cc validating that response-side headers mutated after encodeHeaders() are reflected in the access log entry consumed by Cilium. Signed-off-by: Nico Vibert <nvibert@cisco.com>
1 parent bdb8b53 commit 60d73ec

4 files changed

Lines changed: 124 additions & 10 deletions

File tree

cilium/l7policy.cc

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,24 @@ void AccessFilter::onStreamComplete() {
303303
if (log_entry_ && !log_entry_->request_logged_) {
304304
config_->log(*log_entry_, ::cilium::EntryType::Request);
305305
}
306+
307+
if (!response_logged_ && response_headers_.has_value()) {
308+
logResponse();
309+
}
306310
}
307311

308-
Http::FilterHeadersStatus AccessFilter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) {
312+
void AccessFilter::logResponse() {
313+
if (response_logged_ || log_entry_ == nullptr || !response_headers_.has_value()) {
314+
return;
315+
}
316+
317+
log_entry_->updateFromResponse(*response_headers_, config_->time_source_);
318+
config_->log(*log_entry_, ::cilium::EntryType::Response);
319+
response_logged_ = true;
320+
}
321+
322+
Http::FilterHeadersStatus AccessFilter::encodeHeaders(Http::ResponseHeaderMap& headers,
323+
bool end_stream) {
309324
const auto& stream_info = callbacks_->streamInfo();
310325

311326
// Skip enforcement or logging on shadows
@@ -374,11 +389,27 @@ Http::FilterHeadersStatus AccessFilter::encodeHeaders(Http::ResponseHeaderMap& h
374389
config_->log(*log_entry_, log_type);
375390
}
376391

377-
// Log the response
378-
log_entry_->updateFromResponse(headers, config_->time_source_);
379-
config_->log(*log_entry_, ::cilium::EntryType::Response);
392+
response_headers_ = headers;
393+
394+
if (end_stream) {
395+
logResponse();
396+
}
397+
380398
return Http::FilterHeadersStatus::Continue;
381399
}
382400

401+
Http::FilterDataStatus AccessFilter::encodeData(Buffer::Instance&, bool end_stream) {
402+
if (end_stream) {
403+
logResponse();
404+
}
405+
406+
return Http::FilterDataStatus::Continue;
407+
}
408+
409+
Http::FilterTrailersStatus AccessFilter::encodeTrailers(Http::ResponseTrailerMap&) {
410+
logResponse();
411+
return Http::FilterTrailersStatus::Continue;
412+
}
413+
383414
} // namespace Cilium
384415
} // namespace Envoy

cilium/l7policy.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,18 +94,15 @@ class AccessFilter : public Http::StreamFilter,
9494
}
9595
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
9696
bool end_stream) override;
97-
Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override {
98-
return Http::FilterDataStatus::Continue;
99-
}
100-
Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override {
101-
return Http::FilterTrailersStatus::Continue;
102-
}
97+
Http::FilterDataStatus encodeData(Buffer::Instance&, bool end_stream) override;
98+
Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override;
10399
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks&) override {}
104100
Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap&) override {
105101
return Http::FilterMetadataStatus::Continue;
106102
}
107103

108104
private:
105+
void logResponse();
109106
void sendLocalError(absl::string_view details);
110107

111108
ConfigSharedPtr config_;
@@ -114,7 +111,9 @@ class AccessFilter : public Http::StreamFilter,
114111
AccessLog::Entry* log_entry_ = nullptr;
115112

116113
OptRef<Http::RequestHeaderMap> latched_headers_;
114+
OptRef<Http::ResponseHeaderMap> response_headers_;
117115
absl::optional<bool> latched_end_stream_;
116+
bool response_logged_ = false;
118117
};
119118

120119
} // namespace Cilium

tests/accesslog_test.cc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,47 @@ TEST_F(CiliumTest, AccessLog) {
8686
EXPECT_STREQ(log.entry_.http().headers(1).value().c_str(), "response");
8787
}
8888

89+
TEST_F(CiliumTest, AccessLogPreservesAIResponseHeaders) {
90+
Http::TestRequestHeaderMapImpl headers{{":method", "POST"},
91+
{":path", "/v1/chat/completions"},
92+
{":authority", "model-server"},
93+
{"x-forwarded-proto", "http"},
94+
{"x-request-id", "req-123"}};
95+
Network::MockConnection connection;
96+
auto source_address = std::make_shared<Network::Address::Ipv4Instance>("5.6.7.8", 45678);
97+
auto destination_address = std::make_shared<Network::Address::Ipv4Instance>("1.2.3.4", 80);
98+
connection.stream_info_.protocol_ = Http::Protocol::Http11;
99+
connection.stream_info_.start_time_ = time_system_.systemTime();
100+
connection.stream_info_.downstream_connection_info_provider_->setRemoteAddress(source_address);
101+
connection.stream_info_.downstream_connection_info_provider_->setLocalAddress(
102+
destination_address);
103+
104+
AccessLog::Entry log;
105+
log.initFromRequest("1.2.3.4", 42, true, 1, source_address, 173, destination_address,
106+
connection.stream_info_, headers);
107+
108+
Http::TestResponseHeaderMapImpl response_headers{{":status", "200"},
109+
{"x-ai-model", "llama3.1-8b-instruct"},
110+
{"x-ai-input-tokens", "11"},
111+
{"x-ai-output-tokens", "6"},
112+
{"x-ai-total-tokens", "17"}};
113+
114+
NiceMock<Event::SimulatedTimeSystem> time_source;
115+
log.updateFromResponse(response_headers, time_source);
116+
117+
EXPECT_EQ(log.entry_.http().status(), 200);
118+
EXPECT_EQ(log.entry_.http().headers_size(), 5);
119+
EXPECT_STREQ(log.entry_.http().headers(0).key().c_str(), "x-request-id");
120+
EXPECT_STREQ(log.entry_.http().headers(0).value().c_str(), "req-123");
121+
EXPECT_STREQ(log.entry_.http().headers(1).key().c_str(), "x-ai-model");
122+
EXPECT_STREQ(log.entry_.http().headers(1).value().c_str(), "llama3.1-8b-instruct");
123+
EXPECT_STREQ(log.entry_.http().headers(2).key().c_str(), "x-ai-input-tokens");
124+
EXPECT_STREQ(log.entry_.http().headers(2).value().c_str(), "11");
125+
EXPECT_STREQ(log.entry_.http().headers(3).key().c_str(), "x-ai-output-tokens");
126+
EXPECT_STREQ(log.entry_.http().headers(3).value().c_str(), "6");
127+
EXPECT_STREQ(log.entry_.http().headers(4).key().c_str(), "x-ai-total-tokens");
128+
EXPECT_STREQ(log.entry_.http().headers(4).value().c_str(), "17");
129+
}
130+
89131
} // namespace Cilium
90132
} // namespace Envoy

tests/cilium_http_integration_test.cc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,48 @@ TEST_P(CiliumIntegrationTest, AcceptedMethod) {
903903
}));
904904
}
905905

906+
TEST_P(CiliumIntegrationTest, AcceptedResponsePreservesArbitraryHeaders) {
907+
initialize();
908+
codec_client_ = makeHttpConnection(lookupPort("http"));
909+
910+
Http::TestResponseHeaderMapImpl response_headers{
911+
{":status", "200"}, {"x-ai-model", "llama3.1-8b-instruct"}, {"x-ai-total-tokens", "17"}};
912+
auto response = sendRequestAndWaitForResponse(
913+
{{":method", "PUT"}, {":path", "/public/opinions"}, {":authority", "host"}}, 0,
914+
response_headers, 0);
915+
916+
absl::optional<std::string> maybe_x_request_id;
917+
EXPECT_TRUE(expectAccessLogRequestTo([&maybe_x_request_id](const ::cilium::LogEntry& entry) {
918+
maybe_x_request_id = getHeader(entry.http().headers(), "x-request-id");
919+
return entry.http().status() == 0;
920+
}));
921+
ASSERT_TRUE(maybe_x_request_id.has_value());
922+
923+
absl::optional<std::string> maybe_x_request_id_resp;
924+
absl::optional<std::string> maybe_model;
925+
absl::optional<std::string> maybe_total_tokens;
926+
EXPECT_TRUE(expectAccessLogResponseTo([&maybe_x_request_id_resp, &maybe_model,
927+
&maybe_total_tokens](const ::cilium::LogEntry& entry) {
928+
maybe_x_request_id_resp = getHeader(entry.http().headers(), "x-request-id");
929+
maybe_model = getHeader(entry.http().headers(), "x-ai-model");
930+
maybe_total_tokens = getHeader(entry.http().headers(), "x-ai-total-tokens");
931+
return entry.http().status() == 200;
932+
}));
933+
934+
ASSERT_TRUE(maybe_x_request_id_resp.has_value());
935+
EXPECT_EQ(maybe_x_request_id.value(), maybe_x_request_id_resp.value());
936+
ASSERT_TRUE(maybe_model.has_value());
937+
EXPECT_EQ("llama3.1-8b-instruct", maybe_model.value());
938+
ASSERT_TRUE(maybe_total_tokens.has_value());
939+
EXPECT_EQ("17", maybe_total_tokens.value());
940+
941+
EXPECT_TRUE(response->complete());
942+
EXPECT_EQ("200", response->headers().getStatusValue());
943+
EXPECT_TRUE(upstream_request_->complete());
944+
EXPECT_EQ(0, upstream_request_->bodyLength());
945+
cleanupUpstreamAndDownstream();
946+
}
947+
906948
TEST_P(CiliumIntegrationTest, L3DeniedPath) {
907949
denied({{":method", "GET"}, {":path", "/only-2-allowed"}, {":authority", "host"}});
908950

0 commit comments

Comments
 (0)