Skip to content

Support sharded Parquet file querying and conversion#7610

Open
SungJin1212 wants to merge 3 commits into
cortexproject:masterfrom
SungJin1212:parquet-shard
Open

Support sharded Parquet file querying and conversion#7610
SungJin1212 wants to merge 3 commits into
cortexproject:masterfrom
SungJin1212:parquet-shard

Conversation

@SungJin1212

@SungJin1212 SungJin1212 commented Jun 9, 2026

Copy link
Copy Markdown
Member

This PR supports for querying sharded Parquet files within a bucket store and enables the conversion of sharded Parquet files.

Benchmark Results

Currently, the concurrency is hard-coded as 4.

GOROOT=/usr/local/opt/go/libexec #gosetup
GOPATH=/Users/kakao_ent/go #gosetup
/usr/local/opt/go/libexec/bin/go test -c -tags=slicelabels -o /Users/kakao_ent/Library/Caches/JetBrains/GoLand2026.1/tmp/GoLand/___1BenchmarkParquetBucketStore_MultiShard_in_github_com_cortexproject_cortex_pkg_storegateway.test github.com/cortexproject/cortex/pkg/storegateway #gosetup
/Users/kakao_ent/Library/Caches/JetBrains/GoLand2026.1/tmp/GoLand/___1BenchmarkParquetBucketStore_MultiShard_in_github_com_cortexproject_cortex_pkg_storegateway.test -test.v -test.paniconexit0 -test.bench ^\QBenchmarkParquetBucketStore_MultiShard\E$ -test.run ^$ #gosetup
goos: darwin
goarch: amd64
pkg: github.com/cortexproject/cortex/pkg/storegateway
cpu: VirtualApple @ 2.50GHz
BenchmarkParquetBucketStore_MultiShard
BenchmarkParquetBucketStore_MultiShard/shards=1
BenchmarkParquetBucketStore_MultiShard/shards=1-14         	      72	  15630539 ns/op	36701543 B/op	  282624 allocs/op
BenchmarkParquetBucketStore_MultiShard/shards=2
BenchmarkParquetBucketStore_MultiShard/shards=2-14         	     100	  11494683 ns/op	38358405 B/op	  284007 allocs/op
BenchmarkParquetBucketStore_MultiShard/shards=4
BenchmarkParquetBucketStore_MultiShard/shards=4-14         	     100	  10774228 ns/op	38830028 B/op	  286728 allocs/op
BenchmarkParquetBucketStore_MultiShard/shards=8
BenchmarkParquetBucketStore_MultiShard/shards=8-14         	     100	  11819611 ns/op	38578193 B/op	  291999 allocs/op
PASS

Process finished with the exit code 0

Which issue(s) this PR fixes:
Fixes #7176 #7174

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]
  • docs/configuration/v1-guarantees.md updated if this PR introduces experimental flags

@dosubot dosubot Bot added component/store-gateway go Pull requests that update Go code storage/blocks Blocks storage engine type/feature labels Jun 9, 2026
@SungJin1212 SungJin1212 force-pushed the parquet-shard branch 2 times, most recently from 635d72e to 524e917 Compare June 9, 2026 11:03
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
errGroup.SetLimit(p.concurrency)
for i := range shardBlockIDs {
errGroup.Go(func() error {
blk, err := p.newParquetBlock(egCtx, shardBlockIDs[i], shardIDs[i], bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at how this parquet sharding works for quite some time now... The sharding is to shard at columns... So do we really need to open all shards here? Or based on sharding do we know if we can only open 1 file is enough?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't tell which shard holds a matching series.. So we need to open all shard files. (the converter mark only stores the shard count, with no per-shard label metadata)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should at least try to get some hints? The sharding is based on sorting order. For example if we sort by metric name label, we should be able to get the min and max value for each shard and store them in the convert marker. This way based on metric name in the query we can tell which shard file we need to open.

This doesn't block this PR but I think it is something we should do to optimize the query path

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we can store MinName and MaxName to the ConverterMark and then utilize it when pruning shards since __name__ is always the primary sort key. I'll track it as a follow-up PR.

# splits a block into more parquet shards for better read parallelization.
# Default is unlimited (single shard).
# CLI flag: -parquet-converter.num-row-groups
[num_row_groups: <int> | default = 2147483647]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have an integration test with sharding enabled?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an e2e test.

Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>

@friedrichg friedrichg left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this!

return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID)
}
blocks = append(blocks, block)
numShards := marker.Shards

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am new to this parquet thing. Let me ask a dumb question.
ReadConverterMark does N calls to s3. And there can be some transient errors there.

Can't we get the shards from the bucket index? The shards are there as well right? And it's cached in the metadata cache.

# splits a block into more parquet shards for better read parallelization.
# Default is unlimited (single shard).
# CLI flag: -parquet-converter.num-row-groups
[num_row_groups: <int> | default = 2147483647]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer if we do something like this

Cortex value Behavior
< 0 Validate() returns error → process fails at startup
0 Don't pass WithNumRowGroups → library uses its default (MaxInt32 = unlimited = single shard)
> 0 Pass convert.WithNumRowGroups(value)

wdyt?

Comment on lines 401 to +600
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}

func TestParquetMultiShardQuery(t *testing.T) {
for name, tc := range map[string]struct {
viaStoreGateway bool
}{
"querier parquet queryable": {viaStoreGateway: false},
"store-gateway parquet bucket store": {viaStoreGateway: true},
} {
t.Run(name, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

const (
// 2 metrics * seriesPerMetric unique series. Sized together with the
// converter flags below so the block is split into exactly 2 shards.
seriesPerMetric = 10
totalSeries = seriesPerMetric * 2 // 20
maxRowsPerRowGroup = 10
numRowGroups = 1
expectedShards = 2 // ceil(20 / (1 * 10))
seriesSize = 10
)

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Don't query ingesters: the queried time range is older than this,
// so all data is served exclusively from parquet blocks.
"-limits.query-ingesters-within": "2h",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
"-parquet-converter.num-row-groups": strconv.Itoa(numRowGroups),
"-parquet-converter.max-rows-per-row-group": strconv.Itoa(maxRowsPerRowGroup),
},
)

if tc.viaStoreGateway {
// Route reads through the store-gateway's parquet bucket store.
flags = mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.bucket-store-type": "parquet",
// Enable sharding so the querier discovers the store-gateway via the
// ring and routes block queries to it.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
"-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-store-gateway.sharding-ring.replication-factor": "1",
// Disable the embedded parquet queryable so reads go to the store-gateway.
"-querier.enable-parquet-queryable": "false",
})
} else {
// Query directly via the querier's embedded parquet queryable.
flags = mergeFlags(flags, map[string]string{
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.enable-parquet-queryable": "true",
})
}

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := newFuzzRand(t)
dir := filepath.Join(s.SharedDir(), "data")
numSamples := 60
scrapeInterval := time.Minute
now := time.Now()
// Keep the whole range older than -limits.query-ingesters-within (2h)
// so queries are served exclusively from parquet blocks.
start := now.Add(-time.Hour * 24)
end := now.Add(-time.Hour * 3)

// Generate unique series so the converter produces a deterministic series count.
lbls := make([]labels.Labels, 0, totalSeries)
for i := 0; i < seriesPerMetric; i++ {
lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "instance", strconv.Itoa(i)))
lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "instance", strconv.Itoa(i)))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), seriesSize)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

// Upload the block before starting cortex so the first compactor scan finds
// the complete block and includes it in the bucket index immediately.
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until the block is converted to parquet and the bucket index is updated.
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
found := false
foundBucketIndex := false
err := bkt.Iter(context.Background(), "", func(name string) error {
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
found = true
}
if name == "bucket-index.json.gz" {
foundBucketIndex = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found && foundBucketIndex
})

// Verify the converter actually split the block into the expected number of shards.
marker, err := cortex_parquet.ReadConverterMark(ctx, id, bkt, log.Logger)
require.NoError(t, err)
require.Equal(t, expectedShards, marker.Shards, "block should be split into multiple parquet shards")

// Verify each shard's parquet files (labels + chunks) exist in object storage.
for shardID := 0; shardID < expectedShards; shardID++ {
labelsFile := fmt.Sprintf("%s/%d.labels.parquet", id.String(), shardID)
chunksFile := fmt.Sprintf("%s/%d.chunks.parquet", id.String(), shardID)

exists, err := bkt.Exists(ctx, labelsFile)
require.NoError(t, err)
require.True(t, exists, "labels parquet file should exist for shard %d", shardID)

exists, err = bkt.Exists(ctx, chunksFile)
require.NoError(t, err)
require.True(t, exists, "chunks parquet file should exist for shard %d", shardID)
}

// Verify the block is registered in the bucket index as a parquet block with the expected shard count.
cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
idx, err := bucketindex.ReadIndex(ctx, storage.GetBucket(), "user-1", nil, log.Logger)
if err != nil {
return false
}
for _, b := range idx.Blocks {
if b.ID == id && b.Parquet != nil && b.Parquet.Shards == expectedShards {
return true
}
}
return false
})

c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Wait until all series are queryable across both shards.
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
labelSets, err := c.Series([]string{`{job="test"}`}, start, end)
if err != nil {
return false
}
return len(labelSets) == totalSeries
})

rangeRes, err := c.QueryRange(`test_series_a`, start, end, scrapeInterval)
require.NoError(t, err)
rangeMatrix, ok := rangeRes.(model.Matrix)
require.True(t, ok)
require.Len(t, rangeMatrix, seriesPerMetric)

if tc.viaStoreGateway {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_storegateway_instances_hit_per_query"}, e2e.WithMetricCount, e2e.SkipMissingMetrics))
} else {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
})
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be interesting to also test the code with 2 store gateways. TestParquetMultiShardQuery runs at replication-factor=1 on both rings, so the multi-replica fan-out path (and the new per-block seenBlocks dedup of QueriedBlocks hints) is never exercised. A third subtest with RF=2 and ≥2 store-gateway replicas would be nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component/store-gateway go Pull requests that update Go code size/XL storage/blocks Blocks storage engine type/feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Parquet] Support sharded parquet file conversion

3 participants