Skip to content

Commit 5728226

Browse files
authored
golang: add DrainConnectionUponCompletion() to StreamInfo API (envoyproxy#43037)
Commit Message: This change exposes the `setShouldDrainConnectionUponCompletion` functionality to the Go HTTP filter, allowing Go plugins to mark connections for draining after the current request completes. For HTTP/1.x, this will add a "Connection: close" header and close the connection after the response is sent. For HTTP/2 and HTTP/3, this will send a GOAWAY frame and initiate a graceful drain sequence. This mirrors the functionality added to the Lua filter in 1.37 (drainConnectionUponCompletion). Additional Description: Risk Level: Testing: Docs Changes: Release Notes: Platform Specific Features: [Optional Runtime guard:] [Optional Fixes #Issue] [Optional Fixes commit #PR or SHA] [Optional Deprecated:] [Optional [API Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):] Signed-off-by: William Dauchy <william.dauchy@datadoghq.com>
1 parent f1dcefb commit 5728226

11 files changed

Lines changed: 66 additions & 0 deletions

File tree

contrib/golang/common/go/api/api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ CAPIStatus envoyGoFilterHttpGetStringProperty(void* r, void* key_data, int key_l
123123
uint64_t* value_data, int* value_len, int* rc);
124124
CAPIStatus envoyGoFilterHttpGetStringSecret(void* r, void* key_data, int key_len,
125125
uint64_t* value_data, int* value_len);
126+
CAPIStatus envoyGoFilterHttpSetDrainConnectionUponCompletion(void* r);
126127

127128
/* These APIs have nothing to do with request */
128129
void envoyGoFilterLog(uint32_t level, void* message_data, int message_len);

contrib/golang/common/go/api/capi.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type HttpCAPI interface {
6161

6262
HttpFinalize(r unsafe.Pointer, reason int)
6363
HttpGetStringSecret(c unsafe.Pointer, key string) (string, bool)
64+
HttpSetDrainConnectionUponCompletion(r unsafe.Pointer)
6465

6566
/* These APIs are related to config, use the pointer of config. */
6667
HttpDefineMetric(c unsafe.Pointer, metricType MetricType, name string) uint32

contrib/golang/common/go/api/filter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ type StreamInfo interface {
160160
VirtualClusterName() (string, bool)
161161
// WorkerID returns the ID of the Envoy worker thread
162162
WorkerID() uint32
163+
// DrainConnectionUponCompletion marks the connection to be drained after the current request completes.
164+
// For HTTP/1.x, this will add a "Connection: close" header to the response.
165+
// For HTTP/2 and HTTP/3, this will send a GOAWAY frame after the response is sent.
166+
DrainConnectionUponCompletion()
163167
// Some fields in stream info can be fetched via GetProperty
164168
// For example, startTime() is equal to GetProperty("request.time")
165169
}

contrib/golang/filters/http/source/cgo.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,12 @@ CAPIStatus envoyGoFilterHttpGetStringSecret(void* r, void* key_data, int key_len
370370
});
371371
}
372372

373+
CAPIStatus envoyGoFilterHttpSetDrainConnectionUponCompletion(void* r) {
374+
return envoyGoFilterHandlerWrapper(r, [](std::shared_ptr<Filter>& filter) -> CAPIStatus {
375+
return filter->setDrainConnectionUponCompletion();
376+
});
377+
}
378+
373379
CAPIStatus envoyGoFilterHttpDefineMetric(void* c, uint32_t metric_type, void* name_data,
374380
int name_len, uint32_t* metric_id) {
375381
return envoyGoConfigHandlerWrapper(

contrib/golang/filters/http/source/go/pkg/http/capi_impl.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,12 @@ func (c *httpCApiImpl) HttpGetStringSecret(r unsafe.Pointer, key string) (string
490490
return strings.Clone(unsafe.String((*byte)(unsafe.Pointer(uintptr(valueData))), int(valueLen))), true
491491
}
492492

493+
func (c *httpCApiImpl) HttpSetDrainConnectionUponCompletion(r unsafe.Pointer) {
494+
req := (*httpRequest)(r)
495+
res := C.envoyGoFilterHttpSetDrainConnectionUponCompletion(unsafe.Pointer(req.req))
496+
handleCApiStatus(res)
497+
}
498+
493499
func (c *httpCApiImpl) HttpLog(level api.LogType, message string) {
494500
C.envoyGoFilterLog(C.uint32_t(level), unsafe.Pointer(unsafe.StringData(message)), C.int(len(message)))
495501
}

contrib/golang/filters/http/source/go/pkg/http/filter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,10 @@ func (s *streamInfo) WorkerID() uint32 {
391391
return uint32(s.request.req.worker_id)
392392
}
393393

394+
func (s *streamInfo) DrainConnectionUponCompletion() {
395+
cAPI.HttpSetDrainConnectionUponCompletion(unsafe.Pointer(s.request))
396+
}
397+
394398
type filterState struct {
395399
request *httpRequest
396400
}

contrib/golang/filters/http/source/golang_filter.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,6 +1546,16 @@ CAPIStatus Filter::getSecret(const absl::string_view name, uint64_t* value_data,
15461546
}
15471547
}
15481548

1549+
CAPIStatus Filter::setDrainConnectionUponCompletion() {
1550+
Thread::LockGuard lock(mutex_);
1551+
if (has_destroyed_) {
1552+
ENVOY_LOG(debug, "golang filter has been destroyed");
1553+
return CAPIStatus::CAPIFilterIsDestroy;
1554+
}
1555+
streamInfo().setShouldDrainConnectionUponCompletion(true);
1556+
return CAPIStatus::CAPIOK;
1557+
}
1558+
15491559
/* ConfigId */
15501560

15511561
uint64_t Filter::getMergedConfigId() {

contrib/golang/filters/http/source/golang_filter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ class Filter : public Http::StreamFilter,
313313
CAPIStatus getStringProperty(absl::string_view path, uint64_t* value_data, int* value_len,
314314
GoInt32* rc);
315315
CAPIStatus getSecret(absl::string_view key, uint64_t* value_data, int* value_len);
316+
CAPIStatus setDrainConnectionUponCompletion();
316317

317318
bool isProcessingInGo() {
318319
return decoding_state_.isProcessingInGo() || encoding_state_.isProcessingInGo();

contrib/golang/filters/http/test/golang_integration_test.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,4 +1940,28 @@ TEST_P(GolangIntegrationTest, SetUpstreamOverrideHost_InvalidHost_Strict_NotFoun
19401940
false, "", true);
19411941
}
19421942

1943+
// Test DrainConnectionUponCompletion triggers connection draining for HTTP/1.1.
1944+
TEST_P(GolangIntegrationTest, DrainConnectionUponCompletion) {
1945+
initializeBasicFilter(BASIC);
1946+
registerTestServerPorts({"http"});
1947+
1948+
codec_client_ = makeHttpConnection(lookupPort("http"));
1949+
1950+
// Make request with drainConnection query parameter.
1951+
Http::TestRequestHeaderMapImpl request_headers{
1952+
{":method", "GET"}, {":path", "/test?drainConnection=1"}, {":authority", "test"}};
1953+
auto response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0);
1954+
1955+
EXPECT_TRUE(response->complete());
1956+
EXPECT_EQ("200", response->headers().getStatusValue());
1957+
1958+
// For HTTP/1.1, we should see Connection: close header.
1959+
EXPECT_EQ("close", response->headers().getConnectionValue());
1960+
1961+
// Connection should be closed after request completes.
1962+
ASSERT_TRUE(codec_client_->waitForDisconnect());
1963+
1964+
cleanupUpstreamAndDownstream();
1965+
}
1966+
19431967
} // namespace Envoy

contrib/golang/filters/http/test/test_data/basic/filter.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type filter struct {
4141

4242
upstreamOverrideHost string // set upstream override host
4343
upstreamOverrideHostStrict bool // set strict mode for upstream override host
44+
drainConnection bool // drain connection upon completion
4445
}
4546

4647
func parseQuery(path string) url.Values {
@@ -90,6 +91,7 @@ func (f *filter) initRequest(header api.RequestHeaderMap) {
9091
f.refreshRoute = f.query_params.Get("refreshRoute") != ""
9192
f.upstreamOverrideHost = f.query_params.Get("upstreamOverrideHost")
9293
f.upstreamOverrideHostStrict = f.query_params.Get("upstreamOverrideHostStrict") != ""
94+
f.drainConnection = f.query_params.Get("drainConnection") != ""
9395
}
9496

9597
func (f *filter) fail(callbacks api.FilterProcessCallbacks, msg string, a ...any) api.StatusType {
@@ -285,6 +287,9 @@ func (f *filter) decodeHeaders(header api.RequestHeaderMap, endStream bool) api.
285287
f.callbacks.RefreshRouteCache()
286288
header.SetPath("/api/") // path used by the upstream
287289
}
290+
if f.drainConnection {
291+
f.callbacks.StreamInfo().DrainConnectionUponCompletion()
292+
}
288293
return api.Continue
289294
}
290295

0 commit comments

Comments
 (0)