Skip to content

Commit dcc04af

Browse files
authored
Tenant impersonation push stream (#7475)
* add checking logic if not distirbutor side request Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add SignWriteRequestsKeys Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * change addr Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix flakiness Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix test message Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add SecretStringSliceCSV type Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent d7e386b commit dcc04af

17 files changed

Lines changed: 612 additions & 9 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
2020
* [ENHANCEMENT] Upgrade gRPC from v1.71.2 to v1.79.3 to address CVE-2026-33186. #7460
2121
* [ENHANCEMENT] Query Frontend: Add `query_too_expensive` reason to QFE and `reason` field to query stats. #7479
22+
* [ENHANCEMENT] Distributor: Add HMAC-SHA256 stream authentication for `PushStream` via `-distributor.sign-write-requests-keys`. #7475
2223
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
2324
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
2425
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389

docs/configuration/config-file-reference.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3213,6 +3213,17 @@ ha_tracker:
32133213
# CLI flag: -distributor.sign-write-requests
32143214
[sign_write_requests: <boolean> | default = false]
32153215
3216+
# EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating
3217+
# PushStream connections between distributors and ingesters. The first key is
3218+
# used by the distributor to sign; all keys are accepted by the ingester. It
3219+
# only takes effect when the -distributor.sign-write-requests is true. The key
3220+
# change procedure for zero downtime is: (1) redeploy ingesters first with
3221+
# 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with
3222+
# 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy
3223+
# both with 'newkey' to drop the old key.
3224+
# CLI flag: -distributor.sign-write-requests-keys
3225+
[sign_write_requests_keys: <string> | default = ""]
3226+
32163227
# EXPERIMENTAL: If enabled, distributor would use stream connection to send
32173228
# requests to ingesters.
32183229
# CLI flag: -distributor.use-stream-push

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ Currently experimental features are:
114114
- `-store-gateway.query-protection.rejection`
115115
- Distributor/Ingester: Stream push connection
116116
- Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor.
117+
- Enable stream push authentication on Distributor/Ingester. (`-distributor.sign-write-requests-keys`)
117118
- Add `__type__` and `__unit__` labels to OTLP and remote write v2 requests (`-distributor.enable-type-and-unit-labels`)
118119
- Handle StartTimestampMs (ST) for remote write v2 samples and histograms, using CreatedTimestamp (CT) as a fallback when ST is not set (`-distributor.enable-start-timestamp`)
119120
- Ingester: Series Queried Metric

integration/ingester_stream_push_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,101 @@ func TestIngesterStreamPushConnection(t *testing.T) {
116116
assertServiceMetricsPrefixes(t, Ingester, ingester3)
117117
}
118118

119+
func TestIngesterStreamPushConnectionWithMatchingSigningKey(t *testing.T) {
120+
const signingKey = "shared-secret-for-integration-test"
121+
122+
s, err := e2e.NewScenario(networkName)
123+
require.NoError(t, err)
124+
defer s.Close()
125+
126+
flags := BlocksStorageFlags()
127+
flags["-distributor.use-stream-push"] = "true"
128+
flags["-distributor.replication-factor"] = "1"
129+
flags["-distributor.sign-write-requests"] = "true"
130+
flags["-distributor.sign-write-requests-keys"] = signingKey
131+
flags["-ingester.heartbeat-period"] = "1s"
132+
flags["-distributor.ring.heartbeat-period"] = "1s"
133+
134+
consul := e2edb.NewConsul()
135+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
136+
require.NoError(t, s.StartAndWaitReady(consul, minio))
137+
138+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
139+
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
140+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))
141+
142+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"},
143+
e2e.WithLabelMatchers(
144+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
145+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")),
146+
e2e.WaitMissingMetrics))
147+
148+
now := time.Now()
149+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
150+
require.NoError(t, err)
151+
152+
// Push a few series; all should succeed because the signing key matches.
153+
for i := 0; i < 5; i++ {
154+
series, _ := generateSeries(fmt.Sprintf("test_signing_ok_%d", i), now)
155+
res, err := client.Push(series)
156+
require.NoError(t, err)
157+
require.Equal(t, 200, res.StatusCode,
158+
"push must succeed when distributor and ingester share the same signing key")
159+
}
160+
}
161+
162+
func TestIngesterStreamPushConnectionWithMismatchedSigningKey(t *testing.T) {
163+
const distributorKey = "distributor-key"
164+
const ingesterKey = "ingester-key-different"
165+
166+
s, err := e2e.NewScenario(networkName)
167+
require.NoError(t, err)
168+
defer s.Close()
169+
170+
// Distributor signs with distributorKey.
171+
distributorFlags := BlocksStorageFlags()
172+
distributorFlags["-distributor.use-stream-push"] = "true"
173+
distributorFlags["-distributor.replication-factor"] = "1"
174+
distributorFlags["-distributor.sign-write-requests"] = "true"
175+
distributorFlags["-distributor.sign-write-requests-keys"] = distributorKey
176+
distributorFlags["-ingester.heartbeat-period"] = "1s"
177+
178+
// Ingester verifies with ingesterKey (intentionally different).
179+
ingesterFlags := BlocksStorageFlags()
180+
ingesterFlags["-distributor.use-stream-push"] = "true"
181+
ingesterFlags["-distributor.replication-factor"] = "1"
182+
ingesterFlags["-distributor.sign-write-requests"] = "true"
183+
ingesterFlags["-distributor.sign-write-requests-keys"] = ingesterKey
184+
ingesterFlags["-ingester.heartbeat-period"] = "1s"
185+
186+
consul := e2edb.NewConsul()
187+
minio := e2edb.NewMinio(9000, distributorFlags["-blocks-storage.s3.bucket-name"])
188+
require.NoError(t, s.StartAndWaitReady(consul, minio))
189+
190+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), distributorFlags, "")
191+
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), ingesterFlags, "")
192+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))
193+
194+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"},
195+
e2e.WithLabelMatchers(
196+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
197+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")),
198+
e2e.WaitMissingMetrics))
199+
200+
now := time.Now()
201+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
202+
require.NoError(t, err)
203+
204+
for i := 0; i < 3; i++ {
205+
series, _ := generateSeries(fmt.Sprintf("test_signing_mismatch_%d", i), now)
206+
res, err := client.Push(series)
207+
if err == nil {
208+
require.NotEqual(t, 200, res.StatusCode,
209+
"push must fail when distributor and ingester use different signing keys")
210+
}
211+
}
212+
}
213+
119214
func TestIngesterStreamPushConnectionWithError(t *testing.T) {
120215

121216
s, err := e2e.NewScenario(networkName)

pkg/cortex/cortex.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,13 @@ func (t *Cortex) setupRequestSigning() {
423423
if t.Cfg.Distributor.SignWriteRequestsEnabled {
424424
util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled")
425425
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor)
426+
427+
// When signing keys are configured, authenticate PushStream connections.
428+
// All keys in the list are accepted by the server; the first key is used by
429+
// the client to sign. Multiple keys enable zero-downtime key rotation.
430+
if keys := t.Cfg.Distributor.SignWriteRequestsKeys.Value(); len(keys) > 0 {
431+
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcclient.NewStreamSigningServerInterceptor(keys...))
432+
}
426433
}
427434
}
428435

pkg/cortex/modules.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
245245
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
246246
t.Cfg.Distributor.NameValidationScheme = t.Cfg.NameValidationScheme
247247
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled
248+
// The client signs with the first key in the list; additional keys are only used on the
249+
// server side (ingester) for accepting signatures during key rotation.
250+
if keys := t.Cfg.Distributor.SignWriteRequestsKeys.Value(); len(keys) > 0 {
251+
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsKey = keys[0]
252+
}
248253

249254
// Check whether the distributor can join the distributors ring, which is
250255
// whenever it's not running as an internal dependency (ie. querier or

pkg/distributor/distributor.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
3838
"github.com/cortexproject/cortex/pkg/util"
3939
"github.com/cortexproject/cortex/pkg/util/extract"
40+
"github.com/cortexproject/cortex/pkg/util/flagext"
4041
"github.com/cortexproject/cortex/pkg/util/labelset"
4142
"github.com/cortexproject/cortex/pkg/util/limiter"
4243
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -153,13 +154,14 @@ type Config struct {
153154
RemoteTimeout time.Duration `yaml:"remote_timeout"`
154155
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
155156

156-
ShardingStrategy string `yaml:"sharding_strategy"`
157-
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
158-
ExtendWrites bool `yaml:"extend_writes"`
159-
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
160-
UseStreamPush bool `yaml:"use_stream_push"`
161-
RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"`
162-
AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"`
157+
ShardingStrategy string `yaml:"sharding_strategy"`
158+
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
159+
ExtendWrites bool `yaml:"extend_writes"`
160+
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
161+
SignWriteRequestsKeys flagext.SecretStringSliceCSV `yaml:"sign_write_requests_keys"`
162+
UseStreamPush bool `yaml:"use_stream_push"`
163+
RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"`
164+
AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"`
163165

164166
// Distributors ring
165167
DistributorRing RingConfig `yaml:"ring"`
@@ -217,6 +219,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
217219
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
218220
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
219221
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
222+
f.Var(&cfg.SignWriteRequestsKeys, "distributor.sign-write-requests-keys", "EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating PushStream connections between distributors and ingesters. The first key is used by the distributor to sign; all keys are accepted by the ingester. It only takes effect when the -distributor.sign-write-requests is true. The key change procedure for zero downtime is: (1) redeploy ingesters first with 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy both with 'newkey' to drop the old key.")
220223
f.BoolVar(&cfg.UseStreamPush, "distributor.use-stream-push", false, "EXPERIMENTAL: If enabled, distributor would use stream connection to send requests to ingesters.")
221224
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
222225
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")

pkg/ingester/client/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9+
"strings"
910
"sync"
1011

1112
"github.com/go-kit/log"
@@ -205,8 +206,10 @@ func (c *closableHealthAndIngesterClient) Run(streamPushChan chan *streamWriteJo
205206

206207
var workerErr error
207208
var wg sync.WaitGroup
209+
// Sanitize addr: colons (from host:port) are not allowed in tenant IDs.
210+
sanitizedAddr := strings.ReplaceAll(c.addr, ":", "-")
208211
for i := range INGESTER_CLIENT_STREAM_WORKER_COUNT {
209-
workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", c.addr, i)
212+
workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", sanitizedAddr, i)
210213
wg.Go(func() {
211214
workerCtx := user.InjectOrgID(streamCtx, workerName)
212215
err := c.worker(workerCtx)

pkg/ingester/ingester.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/http"
1111
"os"
1212
"path/filepath"
13+
"regexp"
1314
"runtime"
1415
"runtime/pprof"
1516
"slices"
@@ -110,6 +111,8 @@ var (
110111
errLabelsOutOfOrder = errors.New("labels out of order")
111112

112113
tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk]
114+
115+
distributorWorkerOrgIDRe = regexp.MustCompile(`^ingester-.+-stream-push-worker-\d+$`)
113116
)
114117

115118
// Config for an Ingester.
@@ -1724,6 +1727,16 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error {
17241727
if err != nil {
17251728
return err
17261729
}
1730+
1731+
if contextOrgID, extractErr := users.TenantID(ctx); extractErr == nil {
1732+
if !isDistributorWorkerOrgID(contextOrgID) && contextOrgID != req.TenantID {
1733+
req.Free()
1734+
return status.Errorf(codes.PermissionDenied,
1735+
"tenant ID mismatch: stream authenticated as %q but request specifies %q",
1736+
contextOrgID, req.TenantID)
1737+
}
1738+
}
1739+
17271740
pushCtx := user.InjectOrgID(ctx, req.TenantID)
17281741
resp, err := i.Push(pushCtx, req.Request)
17291742
if resp == nil {
@@ -1747,6 +1760,21 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error {
17471760
}
17481761
}
17491762

1763+
// isDistributorWorkerOrgID reports whether orgID matches the synthetic worker-name pattern
1764+
// that the distributor injects as X-Scope-OrgID when opening a long-lived PushStream:
1765+
//
1766+
// "ingester-<addr>-stream-push-worker-<N>"
1767+
//
1768+
// When this pattern is detected, PushStream bypasses the orgID == req.TenantID check and
1769+
// instead trusts req.TenantID from the payload.
1770+
//
1771+
// Note: trusting this pattern alone is not sufficient — an attacker who knows the
1772+
// pattern can spoof it and write to any tenant. The stream-level gRPC interceptor
1773+
// enabled via -distributor.sign-write-requests-keys provides cryptographic proof.
1774+
func isDistributorWorkerOrgID(orgID string) bool {
1775+
return distributorWorkerOrgIDRe.MatchString(orgID)
1776+
}
1777+
17501778
func (u *userTSDB) acquireReadLock() error {
17511779
u.stateMtx.RLock()
17521780
defer u.stateMtx.RUnlock()

0 commit comments

Comments
 (0)