Support sharded Parquet file querying and conversion#7610
Conversation
635d72e to
524e917
Compare
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
524e917 to
2b79cb9
Compare
| errGroup.SetLimit(p.concurrency) | ||
| for i := range shardBlockIDs { | ||
| errGroup.Go(func() error { | ||
| blk, err := p.newParquetBlock(egCtx, shardBlockIDs[i], shardIDs[i], bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota) |
There was a problem hiding this comment.
I haven't looked at how this parquet sharding works for quite some time now... The sharding is to shard at columns... So do we really need to open all shards here? Or based on sharding do we know if we can only open 1 file is enough?
There was a problem hiding this comment.
We can't tell which shard holds a matching series.. So we need to open all shard files. (the converter mark only stores the shard count, with no per-shard label metadata)
There was a problem hiding this comment.
We should at least try to get some hints? The sharding is based on sorting order. For example if we sort by metric name label, we should be able to get the min and max value for each shard and store them in the convert marker. This way based on metric name in the query we can tell which shard file we need to open.
This doesn't block this PR but I think it is something we should do to optimize the query path
There was a problem hiding this comment.
Agree, we can store MinName and MaxName to the ConverterMark and then utilize it when pruning shards since __name__ is always the primary sort key. I'll track it as a follow-up PR.
| # splits a block into more parquet shards for better read parallelization. | ||
| # Default is unlimited (single shard). | ||
| # CLI flag: -parquet-converter.num-row-groups | ||
| [num_row_groups: <int> | default = 2147483647] |
There was a problem hiding this comment.
We should have an integration test with sharding enabled?
There was a problem hiding this comment.
I added an e2e test.
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
| return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID) | ||
| } | ||
| blocks = append(blocks, block) | ||
| numShards := marker.Shards |
There was a problem hiding this comment.
I am new to this parquet thing. Let me ask a dumb question.
ReadConverterMark does N calls to s3. And there can be some transient errors there.
Can't we get the shards from the bucket index? The shards are there as well right? And it's cached in the metadata cache.
| # splits a block into more parquet shards for better read parallelization. | ||
| # Default is unlimited (single shard). | ||
| # CLI flag: -parquet-converter.num-row-groups | ||
| [num_row_groups: <int> | default = 2147483647] |
There was a problem hiding this comment.
I think I prefer if we do something like this
| Cortex value | Behavior |
|---|---|
< 0 |
Validate() returns error → process fails at startup |
0 |
Don't pass WithNumRowGroups → library uses its default (MaxInt32 = unlimited = single shard) |
> 0 |
Pass convert.WithNumRowGroups(value) |
wdyt?
| require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( | ||
| labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) | ||
| } | ||
|
|
||
| func TestParquetMultiShardQuery(t *testing.T) { | ||
| for name, tc := range map[string]struct { | ||
| viaStoreGateway bool | ||
| }{ | ||
| "querier parquet queryable": {viaStoreGateway: false}, | ||
| "store-gateway parquet bucket store": {viaStoreGateway: true}, | ||
| } { | ||
| t.Run(name, func(t *testing.T) { | ||
| s, err := e2e.NewScenario(networkName) | ||
| require.NoError(t, err) | ||
| defer s.Close() | ||
|
|
||
| consul := e2edb.NewConsulWithName("consul") | ||
| require.NoError(t, s.StartAndWaitReady(consul)) | ||
|
|
||
| const ( | ||
| // 2 metrics * seriesPerMetric unique series. Sized together with the | ||
| // converter flags below so the block is split into exactly 2 shards. | ||
| seriesPerMetric = 10 | ||
| totalSeries = seriesPerMetric * 2 // 20 | ||
| maxRowsPerRowGroup = 10 | ||
| numRowGroups = 1 | ||
| expectedShards = 2 // ceil(20 / (1 * 10)) | ||
| seriesSize = 10 | ||
| ) | ||
|
|
||
| baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) | ||
| flags := mergeFlags( | ||
| baseFlags, | ||
| map[string]string{ | ||
| "-target": "all,parquet-converter", | ||
| "-blocks-storage.tsdb.block-ranges-period": "1m,24h", | ||
| "-blocks-storage.tsdb.ship-interval": "1s", | ||
| "-blocks-storage.bucket-store.sync-interval": "1s", | ||
| "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", | ||
| "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", | ||
| "-blocks-storage.bucket-store.bucket-index.enabled": "true", | ||
| // compactor | ||
| "-compactor.cleanup-interval": "1s", | ||
| // Ingester. | ||
| "-ring.store": "consul", | ||
| "-consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| // Distributor. | ||
| "-distributor.replication-factor": "1", | ||
| // alert manager | ||
| "-alertmanager.web.external-url": "http://localhost/alertmanager", | ||
| // Don't query ingesters: the queried time range is older than this, | ||
| // so all data is served exclusively from parquet blocks. | ||
| "-limits.query-ingesters-within": "2h", | ||
| // parquet-converter | ||
| "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| "-parquet-converter.conversion-interval": "1s", | ||
| "-parquet-converter.enabled": "true", | ||
| "-parquet-converter.num-row-groups": strconv.Itoa(numRowGroups), | ||
| "-parquet-converter.max-rows-per-row-group": strconv.Itoa(maxRowsPerRowGroup), | ||
| }, | ||
| ) | ||
|
|
||
| if tc.viaStoreGateway { | ||
| // Route reads through the store-gateway's parquet bucket store. | ||
| flags = mergeFlags(flags, map[string]string{ | ||
| "-blocks-storage.bucket-store.bucket-store-type": "parquet", | ||
| // Enable sharding so the querier discovers the store-gateway via the | ||
| // ring and routes block queries to it. | ||
| "-store-gateway.sharding-enabled": "true", | ||
| "-store-gateway.sharding-ring.store": "consul", | ||
| "-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| "-store-gateway.sharding-ring.replication-factor": "1", | ||
| // Disable the embedded parquet queryable so reads go to the store-gateway. | ||
| "-querier.enable-parquet-queryable": "false", | ||
| }) | ||
| } else { | ||
| // Query directly via the querier's embedded parquet queryable. | ||
| flags = mergeFlags(flags, map[string]string{ | ||
| "-store-gateway.sharding-enabled": "false", | ||
| "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways | ||
| "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, | ||
| "-querier.enable-parquet-queryable": "true", | ||
| }) | ||
| } | ||
|
|
||
| // make alert manager config dir | ||
| require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) | ||
|
|
||
| ctx := context.Background() | ||
| rnd := newFuzzRand(t) | ||
| dir := filepath.Join(s.SharedDir(), "data") | ||
| numSamples := 60 | ||
| scrapeInterval := time.Minute | ||
| now := time.Now() | ||
| // Keep the whole range older than -limits.query-ingesters-within (2h) | ||
| // so queries are served exclusively from parquet blocks. | ||
| start := now.Add(-time.Hour * 24) | ||
| end := now.Add(-time.Hour * 3) | ||
|
|
||
| // Generate unique series so the converter produces a deterministic series count. | ||
| lbls := make([]labels.Labels, 0, totalSeries) | ||
| for i := 0; i < seriesPerMetric; i++ { | ||
| lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "instance", strconv.Itoa(i))) | ||
| lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "instance", strconv.Itoa(i))) | ||
| } | ||
|
|
||
| id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), seriesSize) | ||
| require.NoError(t, err) | ||
| minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) | ||
| require.NoError(t, s.StartAndWaitReady(minio)) | ||
|
|
||
| storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) | ||
| require.NoError(t, err) | ||
| bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) | ||
|
|
||
| // Upload the block before starting cortex so the first compactor scan finds | ||
| // the complete block and includes it in the bucket index immediately. | ||
| err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) | ||
| require.NoError(t, err) | ||
|
|
||
| cortex := e2ecortex.NewSingleBinary("cortex", flags, "") | ||
| require.NoError(t, s.StartAndWaitReady(cortex)) | ||
|
|
||
| // Wait until the block is converted to parquet and the bucket index is updated. | ||
| cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} { | ||
| found := false | ||
| foundBucketIndex := false | ||
| err := bkt.Iter(context.Background(), "", func(name string) error { | ||
| if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { | ||
| found = true | ||
| } | ||
| if name == "bucket-index.json.gz" { | ||
| foundBucketIndex = true | ||
| } | ||
| return nil | ||
| }, objstore.WithRecursiveIter()) | ||
| require.NoError(t, err) | ||
| return found && foundBucketIndex | ||
| }) | ||
|
|
||
| // Verify the converter actually split the block into the expected number of shards. | ||
| marker, err := cortex_parquet.ReadConverterMark(ctx, id, bkt, log.Logger) | ||
| require.NoError(t, err) | ||
| require.Equal(t, expectedShards, marker.Shards, "block should be split into multiple parquet shards") | ||
|
|
||
| // Verify each shard's parquet files (labels + chunks) exist in object storage. | ||
| for shardID := 0; shardID < expectedShards; shardID++ { | ||
| labelsFile := fmt.Sprintf("%s/%d.labels.parquet", id.String(), shardID) | ||
| chunksFile := fmt.Sprintf("%s/%d.chunks.parquet", id.String(), shardID) | ||
|
|
||
| exists, err := bkt.Exists(ctx, labelsFile) | ||
| require.NoError(t, err) | ||
| require.True(t, exists, "labels parquet file should exist for shard %d", shardID) | ||
|
|
||
| exists, err = bkt.Exists(ctx, chunksFile) | ||
| require.NoError(t, err) | ||
| require.True(t, exists, "chunks parquet file should exist for shard %d", shardID) | ||
| } | ||
|
|
||
| // Verify the block is registered in the bucket index as a parquet block with the expected shard count. | ||
| cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} { | ||
| idx, err := bucketindex.ReadIndex(ctx, storage.GetBucket(), "user-1", nil, log.Logger) | ||
| if err != nil { | ||
| return false | ||
| } | ||
| for _, b := range idx.Blocks { | ||
| if b.ID == id && b.Parquet != nil && b.Parquet.Shards == expectedShards { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| }) | ||
|
|
||
| c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") | ||
| require.NoError(t, err) | ||
|
|
||
| // Wait until all series are queryable across both shards. | ||
| cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} { | ||
| labelSets, err := c.Series([]string{`{job="test"}`}, start, end) | ||
| if err != nil { | ||
| return false | ||
| } | ||
| return len(labelSets) == totalSeries | ||
| }) | ||
|
|
||
| rangeRes, err := c.QueryRange(`test_series_a`, start, end, scrapeInterval) | ||
| require.NoError(t, err) | ||
| rangeMatrix, ok := rangeRes.(model.Matrix) | ||
| require.True(t, ok) | ||
| require.Len(t, rangeMatrix, seriesPerMetric) | ||
|
|
||
| if tc.viaStoreGateway { | ||
| require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_storegateway_instances_hit_per_query"}, e2e.WithMetricCount, e2e.SkipMissingMetrics)) | ||
| } else { | ||
| require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( | ||
| labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) | ||
| } | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
It might be interesting to also test the code with 2 store gateways. TestParquetMultiShardQuery runs at replication-factor=1 on both rings, so the multi-replica fan-out path (and the new per-block seenBlocks dedup of QueriedBlocks hints) is never exercised. A third subtest with RF=2 and ≥2 store-gateway replicas would be nice
This PR supports for querying sharded Parquet files within a bucket store and enables the conversion of sharded Parquet files.
Benchmark Results
Currently, the concurrency is hard-coded as 4.
Which issue(s) this PR fixes:
Fixes #7176 #7174
Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]docs/configuration/v1-guarantees.mdupdated if this PR introduces experimental flags