Skip to content

Commit 2b79cb9

Browse files
committed
parquet bucket store: support sharded parquet file querying
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 99e8261 commit 2b79cb9

5 files changed

Lines changed: 258 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
55
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
66
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
7+
* [FEATURE] Parquet: Support sharded parquet file conversion and querying. #7610
78
* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476
89
* [FEATURE] Ingester: Add experimental head-only queried series metric. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500
910
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Includes a `jsonEscape` template function for safely embedding expressions in JSON-encoded URL parameters (e.g., Grafana Explore panes). Supports Grafana Explore, Perses, and other UIs. #7302

pkg/storegateway/parquet_bucket_store.go

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/go-kit/log"
99
"github.com/gogo/protobuf/types"
10+
"github.com/oklog/ulid/v2"
1011
"github.com/pkg/errors"
1112
"github.com/prometheus-community/parquet-common/convert"
1213
"github.com/prometheus-community/parquet-common/schema"
@@ -25,14 +26,15 @@ import (
2526
"google.golang.org/grpc/codes"
2627
"google.golang.org/grpc/status"
2728

29+
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
2830
"github.com/cortexproject/cortex/pkg/util/parquetutil"
2931
"github.com/cortexproject/cortex/pkg/util/spanlogger"
3032
"github.com/cortexproject/cortex/pkg/util/validation"
3133
)
3234

3335
type parquetBucketStore struct {
3436
logger log.Logger
35-
bucket objstore.Bucket
37+
bucket objstore.InstrumentedBucket
3638
limits *validation.Overrides
3739
concurrency int
3840

@@ -62,19 +64,51 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher
6264
}
6365

6466
blockIDs := strings.Split(blockMatchers[0].Value, "|")
65-
blocks := make([]*parquetBlock, 0, len(blockIDs))
6667
bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket)
6768
noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx))
69+
70+
// Read converter marks and expand to per-shard (blockID, shardID) lists.
71+
var shardBlockIDs []string
72+
var shardIDs []int
6873
for _, blockID := range blockIDs {
69-
// TODO: support shard ID > 0 later.
70-
block, err := p.newParquetBlock(ctx, blockID, 0, bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota)
74+
uid, err := ulid.Parse(blockID)
75+
if err != nil {
76+
return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID)
77+
}
78+
marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger)
7179
if err != nil {
72-
return nil, err
80+
return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID)
7381
}
74-
blocks = append(blocks, block)
82+
numShards := marker.Shards
83+
if numShards <= 0 {
84+
// backward compatibility: blocks without a shard count have one shard
85+
numShards = 1
86+
}
87+
for shardID := 0; shardID < numShards; shardID++ {
88+
shardBlockIDs = append(shardBlockIDs, blockID)
89+
shardIDs = append(shardIDs, shardID)
90+
}
91+
}
92+
93+
// Open all shards in parallel.
94+
parquetBlocks := make([]*parquetBlock, len(shardBlockIDs))
95+
errGroup, egCtx := errgroup.WithContext(ctx)
96+
errGroup.SetLimit(p.concurrency)
97+
for i := range shardBlockIDs {
98+
errGroup.Go(func() error {
99+
blk, err := p.newParquetBlock(egCtx, shardBlockIDs[i], shardIDs[i], bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota)
100+
if err != nil {
101+
return err
102+
}
103+
parquetBlocks[i] = blk
104+
return nil
105+
})
106+
}
107+
if err := errGroup.Wait(); err != nil {
108+
return nil, err
75109
}
76110

77-
return blocks, nil
111+
return parquetBlocks, nil
78112
}
79113

80114
// Series implements the store interface for a single parquet bucket store
@@ -112,10 +146,14 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep
112146
errGroup, ctx := errgroup.WithContext(srv.Context())
113147
errGroup.SetLimit(p.concurrency)
114148

149+
seenBlocks := make(map[string]struct{}, len(shards))
115150
for i, shard := range shards {
116-
resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{
117-
Id: shard.name,
118-
})
151+
if _, seen := seenBlocks[shard.name]; !seen {
152+
seenBlocks[shard.name] = struct{}{}
153+
resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{
154+
Id: shard.name,
155+
})
156+
}
119157
errGroup.Go(func() error {
120158
ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers)
121159
seriesSet[i] = ss
@@ -197,10 +235,14 @@ func (p *parquetBucketStore) LabelNames(ctx context.Context, req *storepb.LabelN
197235
errGroup, ctx := errgroup.WithContext(ctx)
198236
errGroup.SetLimit(p.concurrency)
199237

238+
seenBlocks := make(map[string]struct{}, len(shards))
200239
for i, s := range shards {
201-
resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{
202-
Id: s.name,
203-
})
240+
if _, seen := seenBlocks[s.name]; !seen {
241+
seenBlocks[s.name] = struct{}{}
242+
resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{
243+
Id: s.name,
244+
})
245+
}
204246
errGroup.Go(func() error {
205247
r, err := s.LabelNames(ctx, req.Limit, matchers)
206248
resNameSets[i] = r
@@ -254,10 +296,14 @@ func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label
254296
errGroup, ctx := errgroup.WithContext(ctx)
255297
errGroup.SetLimit(p.concurrency)
256298

299+
seenBlocks := make(map[string]struct{}, len(shards))
257300
for i, s := range shards {
258-
resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{
259-
Id: s.name,
260-
})
301+
if _, seen := seenBlocks[s.name]; !seen {
302+
seenBlocks[s.name] = struct{}{}
303+
resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{
304+
Id: s.name,
305+
})
306+
}
261307
errGroup.Go(func() error {
262308
r, err := s.LabelValues(ctx, req.Label, req.Limit, matchers)
263309
resNameValues[i] = r

pkg/storegateway/parquet_bucket_store_bench_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,95 @@ func BenchmarkParquetBucketStore_SeriesBatch(b *testing.B) {
120120
}
121121
}
122122

123+
func BenchmarkParquetBucketStore_MultiShard(b *testing.B) {
124+
const (
125+
totalSeries = 10000
126+
numSamples = 100
127+
userID = "user-1"
128+
)
129+
130+
// shardCases defines configurations to convert a block into different numbers of parquet shards.
131+
// totalShards = ceil(NumSeries / (NumRowGroups × MaxRowsPerRowGroup))
132+
shardCases := []struct {
133+
numShards int
134+
numRowGroups int
135+
maxRowsPerRowGroup int
136+
}{
137+
{numShards: 1, numRowGroups: math.MaxInt32, maxRowsPerRowGroup: 1_000_000}, // default: single shard (no sharding path)
138+
{numShards: 2, numRowGroups: 1, maxRowsPerRowGroup: totalSeries / 2},
139+
{numShards: 4, numRowGroups: 1, maxRowsPerRowGroup: totalSeries / 4},
140+
{numShards: 8, numRowGroups: 1, maxRowsPerRowGroup: totalSeries / 8},
141+
}
142+
143+
for _, tc := range shardCases {
144+
b.Run(fmt.Sprintf("shards=%d", tc.numShards), func(b *testing.B) {
145+
ctx := context.Background()
146+
tmpDir := b.TempDir()
147+
storageDir := filepath.Join(tmpDir, "storage")
148+
dataDir := filepath.Join(tmpDir, "data")
149+
150+
storageCfg := cortex_tsdb.BlocksStorageConfig{
151+
UsersScanner: users.UsersScannerConfig{
152+
Strategy: users.UserScanStrategyList,
153+
UpdateInterval: time.Second,
154+
},
155+
Bucket: bucket.Config{
156+
Backend: "filesystem",
157+
Filesystem: filesystem.Config{
158+
Directory: storageDir,
159+
},
160+
},
161+
BucketStore: cortex_tsdb.BucketStoreConfig{
162+
SyncDir: filepath.Join(tmpDir, "sync"),
163+
BucketStoreType: "parquet",
164+
BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery),
165+
},
166+
}
167+
bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry())
168+
require.NoError(b, err)
169+
170+
blockID := prepareParquetBlockWithShards(
171+
b, ctx, storageCfg, bucketClient, dataDir, userID,
172+
totalSeries, numSamples, tc.numRowGroups, tc.maxRowsPerRowGroup,
173+
)
174+
175+
stores, err := NewBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), prometheus.NewPedanticRegistry())
176+
require.NoError(b, err)
177+
178+
listener, err := net.Listen("tcp", "localhost:0")
179+
require.NoError(b, err)
180+
gRPCServer := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor))
181+
storepb.RegisterStoreServer(gRPCServer, stores)
182+
go func() {
183+
if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped {
184+
b.Error(err)
185+
}
186+
}()
187+
defer gRPCServer.Stop()
188+
189+
conn, err := grpc.NewClient(listener.Addr().String(),
190+
grpc.WithTransportCredentials(insecure.NewCredentials()),
191+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
192+
)
193+
require.NoError(b, err)
194+
defer conn.Close()
195+
196+
gRPCClient := storepb.NewStoreClient(conn)
197+
198+
b.ResetTimer()
199+
b.ReportAllocs()
200+
for b.Loop() {
201+
benchmarkBatchingForParquetBucketStore(b, gRPCClient, userID, 1000, totalSeries, blockID)
202+
}
203+
})
204+
}
205+
}
206+
123207
func prepareParquetBlock(b *testing.B, ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, bkt objstore.InstrumentedBucket, dataDir, userID string, numSeries, numSamples int) string {
208+
return prepareParquetBlockWithShards(b, ctx, storageCfg, bkt, dataDir, userID, numSeries, numSamples, math.MaxInt32, 1_000_000)
209+
}
210+
211+
func prepareParquetBlockWithShards(b *testing.B, ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, bkt objstore.InstrumentedBucket, dataDir, userID string, numSeries, numSamples, numRowGroups, maxRowsPerRowGroup int) string {
124212
logger := log.NewNopLogger()
125213
reg := prometheus.NewRegistry()
126214

@@ -170,6 +258,8 @@ func prepareParquetBlock(b *testing.B, ctx context.Context, storageCfg cortex_ts
170258
flagext.DefaultValues(&convCfg)
171259
convCfg.ConversionInterval = time.Second // to convert quickly
172260
convCfg.DataDir = filepath.Join(dataDir, "converter-data")
261+
convCfg.NumRowGroups = numRowGroups
262+
convCfg.MaxRowsPerRowGroup = maxRowsPerRowGroup
173263

174264
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
175265
b.Cleanup(func() { assert.NoError(b, closer.Close()) })

pkg/storegateway/parquet_bucket_stores.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, s
307307
name,
308308
labelsFileOpener,
309309
chunksFileOpener,
310-
0, // we always only have 1 shard - shard 0
310+
shardID,
311311
parquet_storage.WithFileOptions(
312312
parquet.SkipMagicBytes(true),
313313
parquet.ReadBufferSize(100*1024),

0 commit comments

Comments
 (0)