Skip to content

Commit a8abb87

Browse files
committed
Add timeout support
1 parent 44dabca commit a8abb87

5 files changed

Lines changed: 192 additions & 15 deletions

File tree

cmd/execution/results.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,18 @@ package execution
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/duneanalytics/cli/cmdutil"
78
"github.com/duneanalytics/cli/output"
9+
"github.com/duneanalytics/duneapi-client-go/dune"
810
"github.com/duneanalytics/duneapi-client-go/models"
911
"github.com/spf13/cobra"
1012
)
1113

14+
// PollInterval controls the polling interval when waiting for execution results.
15+
var PollInterval = 2 * time.Second
16+
1217
func newResultsCmd() *cobra.Command {
1318
cmd := &cobra.Command{
1419
Use: "results <execution-id>",
@@ -19,6 +24,8 @@ func newResultsCmd() *cobra.Command {
1924

2025
cmd.Flags().Int("limit", 0, "maximum number of rows to return (0 = all)")
2126
cmd.Flags().Int("offset", 0, "number of rows to skip")
27+
cmd.Flags().Bool("no-wait", false, "fetch current state without waiting for completion")
28+
cmd.Flags().Int("timeout", 300, "maximum seconds to wait for completion")
2229
output.AddFormatFlag(cmd, "text")
2330

2431
return cmd
@@ -29,6 +36,7 @@ func runResults(cmd *cobra.Command, args []string) error {
2936

3037
limit, _ := cmd.Flags().GetInt("limit")
3138
offset, _ := cmd.Flags().GetInt("offset")
39+
noWait, _ := cmd.Flags().GetBool("no-wait")
3240

3341
if limit < 0 {
3442
return fmt.Errorf("limit must be non-negative, got %d", limit)
@@ -46,11 +54,58 @@ func runResults(cmd *cobra.Command, args []string) error {
4654
}
4755

4856
client := cmdutil.ClientFromCmd(cmd)
49-
resp, err := client.QueryResultsV2(executionID, opts)
50-
if err != nil {
51-
return err
57+
58+
if noWait {
59+
resp, err := client.QueryResultsV2(executionID, opts)
60+
if err != nil {
61+
return err
62+
}
63+
return handleResultsResponse(cmd, executionID, resp)
64+
}
65+
66+
timeout, _ := cmd.Flags().GetInt("timeout")
67+
intervalSec := int(PollInterval.Seconds())
68+
maxRetries := timeout
69+
if intervalSec > 0 {
70+
maxRetries = timeout / intervalSec
71+
}
72+
if maxRetries < 1 {
73+
maxRetries = 1
74+
}
75+
76+
return waitForResults(cmd, client, executionID, opts, PollInterval, maxRetries)
77+
}
78+
79+
func waitForResults(
80+
cmd *cobra.Command,
81+
client dune.DuneClient,
82+
executionID string,
83+
opts models.ResultOptions,
84+
interval time.Duration,
85+
maxRetries int,
86+
) error {
87+
for i := 0; i < maxRetries; i++ {
88+
resp, err := client.QueryResultsV2(executionID, opts)
89+
if err != nil {
90+
return err
91+
}
92+
93+
switch resp.State {
94+
case "QUERY_STATE_PENDING", "QUERY_STATE_EXECUTING":
95+
// still running, wait and retry
96+
default:
97+
return handleResultsResponse(cmd, executionID, resp)
98+
}
99+
100+
if i < maxRetries-1 {
101+
time.Sleep(interval)
102+
}
52103
}
53104

105+
return fmt.Errorf("timed out waiting for execution %s to complete", executionID)
106+
}
107+
108+
func handleResultsResponse(cmd *cobra.Command, executionID string, resp *models.ResultsResponse) error {
54109
switch resp.State {
55110
case "QUERY_STATE_COMPLETED":
56111
return output.DisplayResults(cmd, resp)

cmd/execution/results_test.go

Lines changed: 116 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/duneanalytics/cli/cmd/execution"
910
"github.com/duneanalytics/duneapi-client-go/models"
1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -67,7 +68,7 @@ func TestResultsJSONOutput(t *testing.T) {
6768
assert.Equal(t, "QUERY_STATE_COMPLETED", got.State)
6869
}
6970

70-
func TestResultsPending(t *testing.T) {
71+
func TestResultsPendingNoWait(t *testing.T) {
7172
mock := &mockClient{
7273
queryResultsV2Fn: func(_ string, _ models.ResultOptions) (*models.ResultsResponse, error) {
7374
return &models.ResultsResponse{
@@ -77,15 +78,15 @@ func TestResultsPending(t *testing.T) {
7778
}
7879

7980
root, buf := newTestRoot(mock)
80-
root.SetArgs([]string{"execution", "results", "01ABC"})
81+
root.SetArgs([]string{"execution", "results", "--no-wait", "01ABC"})
8182
require.NoError(t, root.Execute())
8283

8384
out := buf.String()
8485
assert.Contains(t, out, "Execution ID: 01ABC")
8586
assert.Contains(t, out, "State: QUERY_STATE_PENDING")
8687
}
8788

88-
func TestResultsExecuting(t *testing.T) {
89+
func TestResultsExecutingNoWait(t *testing.T) {
8990
mock := &mockClient{
9091
queryResultsV2Fn: func(_ string, _ models.ResultOptions) (*models.ResultsResponse, error) {
9192
return &models.ResultsResponse{
@@ -95,7 +96,7 @@ func TestResultsExecuting(t *testing.T) {
9596
}
9697

9798
root, buf := newTestRoot(mock)
98-
root.SetArgs([]string{"execution", "results", "01ABC"})
99+
root.SetArgs([]string{"execution", "results", "--no-wait", "01ABC"})
99100
require.NoError(t, root.Execute())
100101

101102
out := buf.String()
@@ -179,3 +180,114 @@ func TestResultsMissingArgument(t *testing.T) {
179180
require.Error(t, err)
180181
assert.Contains(t, err.Error(), "accepts 1 arg(s)")
181182
}
183+
184+
func TestResultsWaitPollsUntilComplete(t *testing.T) {
185+
execution.PollInterval = 0
186+
t.Cleanup(func() {
187+
execution.PollInterval = 2 * time.Second
188+
})
189+
190+
callCount := 0
191+
mock := &mockClient{
192+
queryResultsV2Fn: func(_ string, _ models.ResultOptions) (*models.ResultsResponse, error) {
193+
callCount++
194+
if callCount < 3 {
195+
return &models.ResultsResponse{
196+
State: "QUERY_STATE_EXECUTING",
197+
}, nil
198+
}
199+
return testResultsResponse, nil
200+
},
201+
}
202+
203+
root, buf := newTestRoot(mock)
204+
root.SetArgs([]string{"execution", "results", "--timeout", "10", "01ABC"})
205+
require.NoError(t, root.Execute())
206+
207+
assert.Equal(t, 3, callCount)
208+
out := buf.String()
209+
assert.Contains(t, out, "block_number")
210+
assert.Contains(t, out, "2 rows")
211+
}
212+
213+
func TestResultsWaitPollsUntilFailed(t *testing.T) {
214+
execution.PollInterval = 0
215+
t.Cleanup(func() {
216+
execution.PollInterval = 2 * time.Second
217+
})
218+
219+
callCount := 0
220+
mock := &mockClient{
221+
queryResultsV2Fn: func(_ string, _ models.ResultOptions) (*models.ResultsResponse, error) {
222+
callCount++
223+
if callCount < 2 {
224+
return &models.ResultsResponse{
225+
State: "QUERY_STATE_PENDING",
226+
}, nil
227+
}
228+
return &models.ResultsResponse{
229+
State: "QUERY_STATE_FAILED",
230+
Error: &models.ExecutionError{
231+
Message: "out of memory",
232+
},
233+
}, nil
234+
},
235+
}
236+
237+
root, _ := newTestRoot(mock)
238+
root.SetArgs([]string{"execution", "results", "--timeout", "10", "01ABC"})
239+
err := root.Execute()
240+
require.Error(t, err)
241+
assert.Contains(t, err.Error(), "out of memory")
242+
assert.Equal(t, 2, callCount)
243+
}
244+
245+
func TestResultsWaitTimeout(t *testing.T) {
246+
execution.PollInterval = 0
247+
t.Cleanup(func() {
248+
execution.PollInterval = 2 * time.Second
249+
})
250+
251+
callCount := 0
252+
mock := &mockClient{
253+
queryResultsV2Fn: func(_ string, _ models.ResultOptions) (*models.ResultsResponse, error) {
254+
callCount++
255+
return &models.ResultsResponse{
256+
State: "QUERY_STATE_EXECUTING",
257+
}, nil
258+
},
259+
}
260+
261+
root, _ := newTestRoot(mock)
262+
root.SetArgs([]string{"execution", "results", "--timeout", "1", "01ABC"})
263+
err := root.Execute()
264+
require.Error(t, err)
265+
assert.Contains(t, err.Error(), "timed out waiting for execution")
266+
267+
}
268+
269+
func TestResultsWaitAPIError(t *testing.T) {
270+
execution.PollInterval = 0
271+
t.Cleanup(func() {
272+
execution.PollInterval = 2 * time.Second
273+
})
274+
275+
callCount := 0
276+
mock := &mockClient{
277+
queryResultsV2Fn: func(_ string, _ models.ResultOptions) (*models.ResultsResponse, error) {
278+
callCount++
279+
if callCount < 2 {
280+
return &models.ResultsResponse{
281+
State: "QUERY_STATE_PENDING",
282+
}, nil
283+
}
284+
return nil, errors.New("api: rate limited")
285+
},
286+
}
287+
288+
root, _ := newTestRoot(mock)
289+
root.SetArgs([]string{"execution", "results", "--timeout", "10", "01ABC"})
290+
err := root.Execute()
291+
require.Error(t, err)
292+
assert.Contains(t, err.Error(), "api: rate limited")
293+
}

cmd/query/helpers.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ func parsePerformance(cmd *cobra.Command) (string, error) {
2828
return performance, nil
2929
}
3030

31-
func waitAndDisplay(cmd *cobra.Command, exec dune.Execution) error {
32-
resp, err := exec.WaitGetResults(5*time.Second, 60)
31+
func waitAndDisplay(cmd *cobra.Command, exec dune.Execution, timeout int) error {
32+
maxRetries := timeout / 2
33+
if maxRetries < 1 {
34+
maxRetries = 1
35+
}
36+
resp, err := exec.WaitGetResults(2*time.Second, maxRetries)
3337
if err != nil {
3438
return err
3539
}

cmd/query/run.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func newRunCmd() *cobra.Command {
2222
cmd.Flags().String("performance", "medium", `performance tier: "medium" or "large"`)
2323
cmd.Flags().Int("limit", 0, "maximum number of rows to display (0 = all)")
2424
cmd.Flags().Bool("no-wait", false, "submit execution and exit without waiting for results")
25+
cmd.Flags().Int("timeout", 300, "maximum seconds to wait for completion")
2526
output.AddFormatFlag(cmd, "text")
2627

2728
return cmd
@@ -56,7 +57,9 @@ func runRun(cmd *cobra.Command, args []string) error {
5657
if noWait {
5758
return runNoWait(cmd, req)
5859
}
59-
return runWait(cmd, req)
60+
61+
timeout, _ := cmd.Flags().GetInt("timeout")
62+
return runWait(cmd, req, timeout)
6063
}
6164

6265
func runNoWait(cmd *cobra.Command, req models.ExecuteRequest) error {
@@ -70,15 +73,15 @@ func runNoWait(cmd *cobra.Command, req models.ExecuteRequest) error {
7073
return displayExecuteResponse(cmd, resp)
7174
}
7275

73-
func runWait(cmd *cobra.Command, req models.ExecuteRequest) error {
76+
func runWait(cmd *cobra.Command, req models.ExecuteRequest, timeout int) error {
7477
client := cmdutil.ClientFromCmd(cmd)
7578

7679
exec, err := client.RunQuery(req)
7780
if err != nil {
7881
return err
7982
}
8083

81-
return waitAndDisplay(cmd, exec)
84+
return waitAndDisplay(cmd, exec, timeout)
8285
}
8386

8487
func parseParams(raw []string) (map[string]any, error) {

cmd/query/run_sql.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func newRunSQLCmd() *cobra.Command {
2121
cmd.Flags().String("performance", "medium", `performance tier: "medium" or "large"`)
2222
cmd.Flags().Int("limit", 0, "maximum number of rows to display (0 = all)")
2323
cmd.Flags().Bool("no-wait", false, "submit execution and exit without waiting for results")
24+
cmd.Flags().Int("timeout", 300, "maximum seconds to wait for completion")
2425
output.AddFormatFlag(cmd, "text")
2526

2627
return cmd
@@ -52,7 +53,9 @@ func runRunSQL(cmd *cobra.Command, _ []string) error {
5253
if noWait {
5354
return runSQLNoWait(cmd, req)
5455
}
55-
return runSQLWait(cmd, req)
56+
57+
timeout, _ := cmd.Flags().GetInt("timeout")
58+
return runSQLWait(cmd, req, timeout)
5659
}
5760

5861
func runSQLNoWait(cmd *cobra.Command, req models.ExecuteSQLRequest) error {
@@ -66,13 +69,13 @@ func runSQLNoWait(cmd *cobra.Command, req models.ExecuteSQLRequest) error {
6669
return displayExecuteResponse(cmd, resp)
6770
}
6871

69-
func runSQLWait(cmd *cobra.Command, req models.ExecuteSQLRequest) error {
72+
func runSQLWait(cmd *cobra.Command, req models.ExecuteSQLRequest, timeout int) error {
7073
client := cmdutil.ClientFromCmd(cmd)
7174

7275
exec, err := client.RunSQL(req)
7376
if err != nil {
7477
return err
7578
}
7679

77-
return waitAndDisplay(cmd, exec)
80+
return waitAndDisplay(cmd, exec, timeout)
7881
}

0 commit comments

Comments
 (0)