Skip to content

Commit 376e3ef

Browse files
committed
parquet: support sharded parquet file conversion
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 0732fe3 commit 376e3ef

4 files changed

Lines changed: 103 additions & 0 deletions

File tree

docs/configuration/config-file-reference.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ parquet_converter:
186186
# CLI flag: -parquet-converter.max-rows-per-row-group
187187
[max_rows_per_row_group: <int> | default = 1000000]
188188

189+
# Maximum number of row groups per parquet shard. Each shard holds at most
190+
# num-row-groups * max-rows-per-row-group series, so lowering this value
191+
# splits a block into more parquet shards for better read parallelization.
192+
# Default is unlimited (single shard).
193+
# CLI flag: -parquet-converter.num-row-groups
194+
[num_row_groups: <int> | default = 2147483647]
195+
189196
# Enable disk-based write buffering to reduce memory consumption during
190197
# parquet file generation.
191198
# CLI flag: -parquet-converter.file-buffer-enabled

pkg/parquetconverter/converter.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"hash/fnv"
8+
"math"
89
"math/rand"
910
"net/http"
1011
"os"
@@ -57,6 +58,7 @@ type Config struct {
5758
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
5859
ConversionInterval time.Duration `yaml:"conversion_interval"`
5960
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
61+
NumRowGroups int `yaml:"num_row_groups"`
6062
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
6163

6264
DataDir string `yaml:"data_dir"`
@@ -107,6 +109,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
107109
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Local directory path for caching TSDB blocks during parquet conversion.")
108110
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.")
109111
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.")
112+
f.IntVar(&cfg.NumRowGroups, "parquet-converter.num-row-groups", math.MaxInt32, "Maximum number of row groups per parquet shard. Each shard holds at most num-row-groups * max-rows-per-row-group series, so lowering this value splits a block into more parquet shards for better read parallelization. Default is unlimited (single shard).")
110113
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.")
111114
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.")
112115
}
@@ -126,6 +129,13 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
126129
}
127130

128131
func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter {
132+
// A non-positive number of row groups is invalid and would lead to a division by zero
133+
// while sharding the block.
134+
numRowGroups := cfg.NumRowGroups
135+
if numRowGroups <= 0 {
136+
numRowGroups = math.MaxInt32
137+
}
138+
129139
c := &Converter{
130140
cfg: cfg,
131141
reg: registerer,
@@ -141,6 +151,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
141151
baseConverterOptions: []convert.ConvertOption{
142152
convert.WithColDuration(time.Hour * 8),
143153
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
154+
convert.WithNumRowGroups(numRowGroups),
144155
},
145156
}
146157

pkg/parquetconverter/converter_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,85 @@ func prepareConfig() Config {
173173
return cfg
174174
}
175175

176+
func TestConverter_SplitsBlockIntoMultipleShards(t *testing.T) {
177+
cfg := prepareConfig()
178+
// Configure the converter so that each parquet shard holds at most
179+
// numRowGroups * maxRowsPerRowGroup = 1 * 2 = 2 series.
180+
cfg.NumRowGroups = 1
181+
cfg.MaxRowsPerRowGroup = 2
182+
183+
user := "user-1"
184+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
185+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
186+
dir := t.TempDir()
187+
188+
cfg.Ring.InstanceID = "parquet-converter-1"
189+
cfg.Ring.InstanceAddr = "1.2.3.4"
190+
cfg.Ring.KVStore.Mock = ringStore
191+
bucketClient, err := filesystem.NewBucket(t.TempDir())
192+
require.NoError(t, err)
193+
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
194+
limits := &validation.Limits{}
195+
flagext.DefaultValues(limits)
196+
limits.ParquetConverterEnabled = true
197+
198+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil)
199+
200+
ctx := context.Background()
201+
202+
// Create 5 unique series so that the block is split into
203+
// ceil(5 / 2) = 3 parquet shards.
204+
const numSeries = 5
205+
const expectedShards = 3
206+
series := make([]labels.Labels, 0, numSeries)
207+
for i := range numSeries {
208+
series = append(series, labels.FromStrings("__name__", "test", "series", fmt.Sprintf("%d", i)))
209+
}
210+
211+
// Create and upload a 24h block. It must be larger than the first configured
212+
// block range (2h) so that the converter does not skip it as a raw TSDB block.
213+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
214+
blockID, err := e2e.CreateBlock(ctx, rnd, dir, series, 2, 0, 24*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
215+
require.NoError(t, err)
216+
blockDir := fmt.Sprintf("%s/%s", dir, blockID.String())
217+
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
218+
require.NoError(t, err)
219+
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
220+
require.NoError(t, err)
221+
222+
// Start the converter.
223+
err = services.StartAndAwaitRunning(context.Background(), c)
224+
require.NoError(t, err)
225+
defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck
226+
227+
// Wait until the block is converted and assert it was split into multiple shards.
228+
test.Poll(t, 3*time.Minute, expectedShards, func() any {
229+
m, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
230+
require.NoError(t, err)
231+
if m.Version != parquet.CurrentVersion {
232+
return -1
233+
}
234+
return m.Shards
235+
})
236+
237+
// Verify that one labels/chunks parquet file exists per shard.
238+
for shard := range expectedShards {
239+
for _, file := range []string{
240+
fmt.Sprintf("%s/%d.chunks.parquet", blockID.String(), shard),
241+
fmt.Sprintf("%s/%d.labels.parquet", blockID.String(), shard),
242+
} {
243+
ok, err := userBucket.Exists(ctx, file)
244+
require.NoError(t, err)
245+
require.True(t, ok, "expected shard file %s to exist", file)
246+
}
247+
}
248+
249+
// Verify there is no extra shard beyond the expected count.
250+
ok, err := userBucket.Exists(ctx, fmt.Sprintf("%s/%d.chunks.parquet", blockID.String(), expectedShards))
251+
require.NoError(t, err)
252+
require.False(t, ok, "expected no shard file at index %d", expectedShards)
253+
}
254+
176255
func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) {
177256
storageCfg := cortex_tsdb.BlocksStorageConfig{}
178257
blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}

schemas/cortex-config-schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9112,6 +9112,12 @@
91129112
"type": "number",
91139113
"x-cli-flag": "parquet-converter.meta-sync-concurrency"
91149114
},
9115+
"num_row_groups": {
9116+
"default": 2147483647,
9117+
"description": "Maximum number of row groups per parquet shard. Each shard holds at most num-row-groups * max-rows-per-row-group series, so lowering this value splits a block into more parquet shards for better read parallelization. Default is unlimited (single shard).",
9118+
"type": "number",
9119+
"x-cli-flag": "parquet-converter.num-row-groups"
9120+
},
91159121
"ring": {
91169122
"properties": {
91179123
"auto_forget_delay": {

0 commit comments

Comments
 (0)