Skip to content

Commit 6f32a1a

Browse files
committed
refactor: migrate from internal presto package to presto-go library
Replace the internal presto/ package with the external github.com/ethanyzhang/presto-go module. This consolidates the Presto client into a standalone, reusable library. Key changes: - Replace all pbench/presto imports with presto-go equivalents - Move pbench-specific types (ColumnStats, QuerySplitter, Unmarshaller, plan_node) to new prestoapi/ package - Replace live-server stage tests with prestotest.MockPrestoServer - Fix context leak in GetCtxWithTimeout (return cancel func) - Fix race in Run() where select could pick timeToExit over a simultaneously-ready resultChan, losing the last RecordQuery call - Add .idea/ and __pycache__/ to .gitignore - Gate table_summary integration test behind testing.Short()
1 parent 7a3bce5 commit 6f32a1a

62 files changed

Lines changed: 384 additions & 1463 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,9 @@ pbench_*
3535

3636
# genddl output dir
3737
cmd/genddl/out/
38+
39+
# IDE
40+
.idea/
41+
42+
# Python
43+
__pycache__/

CLAUDE.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Build Commands
6+
7+
```bash
8+
# Build for current platform
9+
make
10+
11+
# Build for all platforms (darwin/linux, amd64/arm64)
12+
make all
13+
14+
# Build with InfluxDB support
15+
make TAGS=influx
16+
17+
# Install locally (creates symlink to /usr/local/bin/pbench)
18+
make install
19+
20+
# Generate cluster configurations from templates
21+
make clusters
22+
```
23+
24+
## Testing
25+
26+
```bash
27+
# Run all tests
28+
go test ./...
29+
30+
# Run a specific package's tests
31+
go test ./presto
32+
go test ./stage
33+
go test ./cmd/cmp
34+
35+
# Run a single test
36+
go test ./presto -run TestQuerySplitter
37+
```
38+
39+
## Architecture
40+
41+
PBench is a Presto/Trino benchmark runner built with Cobra CLI. It replaces Benchto with support for concurrent workloads, result capture, and query log collection.
42+
43+
### Package Structure
44+
45+
- **main.go** - Entry point, calls `cmd.Execute()`
46+
- **cmd/** - Cobra command definitions. Each subcommand has a wrapper file (e.g., `run.go`) and implementation package (e.g., `cmd/run/`)
47+
- **presto/** - Presto/Trino HTTP client implementation with query execution, session management, and result parsing
48+
- **stage/** - Core benchmark execution engine. A `Stage` defines queries, settings, and can chain to other stages via `next` field in JSON
49+
- **utils/** - Shared utilities including Presto flag handling and path helpers
50+
- **clusters/** - Cluster configuration templates and generated configs
51+
- **benchmarks/** - Benchmark definitions (TPC-DS, TPC-H, ClickBench, etc.) as JSON stage files and SQL queries
52+
53+
### Key Concepts
54+
55+
**Stages**: Benchmarks are defined as JSON files that specify queries (inline or via files), session parameters, catalog/schema, and execution settings. Stages form a DAG via `next` field, enabling sequential/parallel execution patterns.
56+
57+
**Stage Settings** (inherited by child stages unless overridden):
58+
- `catalog`, `schema`, `timezone` - Presto session settings
59+
- `cold_runs`, `warm_runs` - Number of runs per query
60+
- `save_output`, `save_json`, `save_column_metadata` - Output capture options
61+
- `abort_on_error` - Stop on first failure
62+
- `random_execution`, `randomly_execute_until` - Random query selection mode
63+
64+
**Run Recorders**: Results can be recorded to file (default), InfluxDB (requires `TAGS=influx` build), or MySQL.
65+
66+
### Build Tags
67+
68+
- `influx` - Enables InfluxDB run recorder support. Without this tag, `stage/no_influx.go` provides a stub that returns an error if InfluxDB config is provided.
69+
70+
## Commands
71+
72+
- `run` - Execute benchmarks from stage JSON files
73+
- `cmp` - Compare query results between two directories
74+
- `genconfig` - Generate cluster configs from templates
75+
- `genddl` - Generate DDL scripts
76+
- `loadjson` - Load query JSON files into databases
77+
- `replay` - Replay workloads from CSV
78+
- `forward` - Forward queries between Presto clusters
79+
- `save` - Save table schema/data information

benchmarks/java_trino.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"session_params": {
55
"query_max_execution_time": "8h",
66
"join_distribution_type": "AUTOMATIC",
7-
"join_reordering_strategy": "AUTOMATIC"
7+
"join_reordering_strategy": "AUTOMATIC",
8+
"optimizer_use_histograms": null
89
}
910
}

cmd/forward/main.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import (
77
"os/signal"
88
"path/filepath"
99
"pbench/log"
10-
"pbench/presto"
11-
"pbench/presto/query_json"
1210
"pbench/utils"
11+
12+
presto "github.com/ethanyzhang/presto-go"
13+
"github.com/ethanyzhang/presto-go/query_json"
1314
"regexp"
1415
"sync"
1516
"sync/atomic"
@@ -140,7 +141,7 @@ func Run(_ *cobra.Command, _ []string) {
140141
// Keep running until user interrupts or quits using Ctrl + C or Ctrl + D.
141142
// When the cluster is unavailable to return the running queries, wait and retry for at most 10 times before quitting.
142143
for attempt := 1; ctx.Err() == nil && attempt <= maxRetry; {
143-
states, _, err := sourceClient.GetQueryState(ctx, &presto.GetQueryStatsOptions{
144+
states, _, err := sourceClient.GetQueryState(ctx, &presto.GetQueryStateOptions{
144145
IncludeAllQueries: &trueValue,
145146
QueryTextSizeLimit: &queryTextSizeLimit,
146147
})
@@ -209,21 +210,20 @@ func checkAndCancelQuery(ctx context.Context, queryState *presto.QueryStateInfo)
209210

210211
func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, clients []*presto.Client) {
211212
defer runningTasks.Done()
212-
var (
213-
queryInfo *query_json.QueryInfo
214-
queryInfoErr error
215-
)
213+
var queryInfoErr error
214+
queryInfo := new(query_json.QueryInfo)
216215
for attempt := 1; attempt <= maxRetry; attempt++ {
217-
queryInfo, _, queryInfoErr = clients[0].GetQueryInfo(ctx, queryState.QueryId, false, nil)
216+
_, queryInfoErr = clients[0].GetQueryInfo(ctx, queryState.QueryId, queryInfo)
218217
if queryInfoErr != nil {
218+
queryInfo = new(query_json.QueryInfo)
219219
log.Error().Str("source_query_id", queryState.QueryId).Err(queryInfoErr).
220220
Msgf("failed to get query info for forwarding, attempt %d/%d", attempt, maxRetry)
221221
waitForNextPoll(ctx)
222222
} else {
223223
break
224224
}
225225
}
226-
if queryInfo == nil {
226+
if queryInfoErr != nil {
227227
log.Error().Str("source_query_id", queryState.QueryId).
228228
Msgf("cannot get query info for forwarding after %d retries, skipping", maxRetry)
229229
failedToForward.Add(1)

cmd/loadjson/main.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"database/sql"
66
"encoding/json"
77
"fmt"
8+
"github.com/ethanyzhang/presto-go/query_json"
89
"os"
910
"os/signal"
1011
"path/filepath"
1112
"pbench/log"
12-
"pbench/presto/query_json"
1313
"pbench/stage"
1414
"pbench/utils"
1515
"reflect"
@@ -117,7 +117,9 @@ func Run(_ *cobra.Command, args []string) {
117117
pseudoStage.States.RunStartTime = runStartTime.GetTime()
118118
pseudoStage.States.RunFinishTime = runEndTime.GetTime()
119119
for _, r := range runRecorders {
120-
r.RecordRun(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResults)
120+
rCtx, rCancel := utils.GetCtxWithTimeout(time.Second * 5)
121+
r.RecordRun(rCtx, pseudoStage, queryResults)
122+
rCancel()
121123
}
122124

123125
log.Info().Int("file_loaded", len(queryResults)).Send()
@@ -173,7 +175,7 @@ func processFile(ctx context.Context, path string) {
173175
}
174176
if queryInfo.ErrorCode != nil {
175177
// Need to set this so the run recorders will mark this query as failed.
176-
queryResult.QueryError = fmt.Errorf(*queryInfo.ErrorCode.Name)
178+
queryResult.QueryError = fmt.Errorf("%s", *queryInfo.ErrorCode.Name)
177179
}
178180
// Unlike benchmarks run by pbench, we do not know when did the run start and finish when loading them from files.
179181
// We infer that the whole run starts at min(queryStartTime) and ends at max(queryEndTime).
@@ -226,7 +228,9 @@ func processFile(ctx context.Context, path string) {
226228
}
227229
}
228230
for _, r := range runRecorders {
229-
r.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResult)
231+
rCtx, rCancel := utils.GetCtxWithTimeout(time.Second * 5)
232+
r.RecordQuery(rCtx, pseudoStage, queryResult)
233+
rCancel()
230234
}
231235
log.Info().Str("path", path).Str("query_id", queryInfo.QueryId).Msg("success")
232236
resultChan <- queryResult

cmd/loadjson/query_info_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"embed"
55
_ "embed"
66
"encoding/json"
7+
"github.com/ethanyzhang/presto-go/query_json"
78
"github.com/stretchr/testify/assert"
8-
"pbench/presto/query_json"
99
"testing"
1010
)
1111

cmd/queryplan/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"io"
2222
"os"
2323
"pbench/log"
24-
"pbench/presto/plan_node"
24+
"pbench/prestoapi/plan_node"
2525

2626
"github.com/spf13/cobra"
2727
)

cmd/replay/frame_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package replay
22

33
import (
44
"encoding/csv"
5+
presto "github.com/ethanyzhang/presto-go"
56
"github.com/stretchr/testify/assert"
67
"io"
7-
"pbench/presto"
88
"sort"
99
"strings"
1010
"testing"
@@ -28,7 +28,7 @@ func TestFrame(t *testing.T) {
2828
assert.Equal(t, "20240415_112042_61088_qa5fd", frame.QueryId)
2929
assert.Equal(t, "2024-04-15 11:20:42.755 UTC", frame.CreateTime.Format(CreateTimeFormat))
3030
assert.Equal(t, 99993, frame.WallTimeMillis)
31-
client, _ := presto.NewClient("http://127.0.0.1", false)
31+
client, _ := presto.NewClient("http://127.0.0.1")
3232
sessionParams := strings.Split(client.GenerateSessionParamsHeaderValue(frame.ParseSessionParams()), ",")
3333
sort.Strings(sessionParams)
3434
assert.Equal(t, []string{

cmd/replay/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import (
1010
"os/signal"
1111
"path/filepath"
1212
"pbench/log"
13-
"pbench/presto"
1413
"pbench/utils"
14+
15+
presto "github.com/ethanyzhang/presto-go"
1516
"sync"
1617
"syscall"
1718
"time"

cmd/replay/query_log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package replay
33
import (
44
"encoding/json"
55
"fmt"
6-
"pbench/presto/query_json"
6+
"github.com/ethanyzhang/presto-go/query_json"
77
"time"
88
)
99

0 commit comments

Comments
 (0)