Skip to content

Commit 702a172

Browse files
committed
Add parquet shard concurrency configuration for store-gateway
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 74185ef commit 702a172

8 files changed

Lines changed: 62 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
1717
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
1818
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
19+
* [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-shard-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group filtering, and column materialization. #7613
1920
* [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478
2021
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
2122
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401

docs/blocks-storage/querier.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,12 @@ blocks_storage:
20982098
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
20992099
[parquet_shard_cache_ttl: <duration> | default = 24h]
21002100

2101+
# Maximum number of concurrent goroutines per query applied at each level of
2102+
# parquet processing in store-gateway: shard querying, row group filtering,
2103+
# and column materialization.
2104+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-concurrency
2105+
[parquet_shard_concurrency: <int> | default = 4]
2106+
21012107
tsdb:
21022108
# Local directory to store TSDBs in the ingesters.
21032109
# CLI flag: -blocks-storage.tsdb.dir

docs/blocks-storage/store-gateway.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2156,6 +2156,12 @@ blocks_storage:
21562156
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
21572157
[parquet_shard_cache_ttl: <duration> | default = 24h]
21582158

2159+
# Maximum number of concurrent goroutines per query applied at each level of
2160+
# parquet processing in store-gateway: shard querying, row group filtering,
2161+
# and column materialization.
2162+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-concurrency
2163+
[parquet_shard_concurrency: <int> | default = 4]
2164+
21592165
tsdb:
21602166
# Local directory to store TSDBs in the ingesters.
21612167
# CLI flag: -blocks-storage.tsdb.dir

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2735,6 +2735,12 @@ bucket_store:
27352735
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
27362736
[parquet_shard_cache_ttl: <duration> | default = 24h]
27372737

2738+
# Maximum number of concurrent goroutines per query applied at each level of
2739+
# parquet processing in store-gateway: shard querying, row group filtering,
2740+
# and column materialization.
2741+
# CLI flag: -blocks-storage.bucket-store.parquet-shard-concurrency
2742+
[parquet_shard_concurrency: <int> | default = 4]
2743+
27382744
tsdb:
27392745
# Local directory to store TSDBs in the ingesters.
27402746
# CLI flag: -blocks-storage.tsdb.dir

pkg/storage/tsdb/config.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,16 @@ const (
4949

5050
// Validation errors
5151
var (
52-
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
53-
errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency")
54-
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
55-
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
56-
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
57-
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
58-
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
59-
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
60-
errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')")
52+
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
53+
errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency")
54+
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
55+
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
56+
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
57+
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
58+
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
59+
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
60+
errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')")
61+
errInvalidParquetShardConcurrency = errors.New("invalid parquet shard concurrency, the value must be greater than 0")
6162

6263
ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled")
6364
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
@@ -336,6 +337,10 @@ type BucketStoreConfig struct {
336337
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
337338
// Parquet shard cache config
338339
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
340+
341+
// ParquetShardConcurrency controls the maximum number of concurrent goroutines
342+
// used when querying parquet shards.
343+
ParquetShardConcurrency int `yaml:"parquet_shard_concurrency"`
339344
}
340345

341346
type TokenBucketBytesLimiterConfig struct {
@@ -398,6 +403,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
398403
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token")
399404
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
400405
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
406+
f.IntVar(&cfg.ParquetShardConcurrency, "blocks-storage.bucket-store.parquet-shard-concurrency", 4, "Maximum number of concurrent goroutines per query applied at each level of parquet processing in store-gateway: shard querying, row group filtering, and column materialization.")
401407
cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f)
402408
}
403409

@@ -435,6 +441,9 @@ func (cfg *BucketStoreConfig) Validate() error {
435441
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
436442
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
437443
}
444+
if cfg.ParquetShardConcurrency <= 0 {
445+
return errInvalidParquetShardConcurrency
446+
}
438447
return nil
439448
}
440449

pkg/storage/tsdb/config_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) {
145145
},
146146
expectedErr: errUnSupportedWALCompressionType,
147147
},
148+
"should fail on parquet shard concurrency set to 0": {
149+
setup: func(cfg *BlocksStorageConfig) {
150+
cfg.BucketStore.ParquetShardConcurrency = 0
151+
},
152+
expectedErr: errInvalidParquetShardConcurrency,
153+
},
154+
"should fail on negative parquet shard concurrency": {
155+
setup: func(cfg *BlocksStorageConfig) {
156+
cfg.BucketStore.ParquetShardConcurrency = -1
157+
},
158+
expectedErr: errInvalidParquetShardConcurrency,
159+
},
160+
"should pass on valid parquet shard concurrency": {
161+
setup: func(cfg *BlocksStorageConfig) {
162+
cfg.BucketStore.ParquetShardConcurrency = 4
163+
},
164+
expectedErr: nil,
165+
},
148166
}
149167

150168
for testName, testData := range tests {

pkg/storegateway/parquet_bucket_stores.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger
273273
logger: userLogger,
274274
bucket: userBucket,
275275
limits: u.limits,
276-
concurrency: 4, // TODO: make this configurable
276+
concurrency: u.cfg.BucketStore.ParquetShardConcurrency,
277277
chunksDecoder: u.chunksDecoder,
278278
matcherCache: u.matcherCache,
279279
parquetShardCache: u.parquetShardCache,

schemas/cortex-config-schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2952,6 +2952,12 @@
29522952
"x-cli-flag": "blocks-storage.bucket-store.parquet-shard-cache-ttl",
29532953
"x-format": "duration"
29542954
},
2955+
"parquet_shard_concurrency": {
2956+
"default": 4,
2957+
"description": "Maximum number of concurrent goroutines per query applied at each level of parquet processing in store-gateway: shard querying, row group filtering, and column materialization.",
2958+
"type": "number",
2959+
"x-cli-flag": "blocks-storage.bucket-store.parquet-shard-concurrency"
2960+
},
29552961
"series_batch_size": {
29562962
"default": 10000,
29572963
"description": "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.",

0 commit comments

Comments
 (0)