Skip to content

Commit 93b0765

Browse files
pieternigrekun
authored andcommitted
sync: rename flag to --concurrency and extend to bundle sync
Match the name of `fs cp --concurrency` (#4132) for consistency, and expose the same flag on `databricks bundle sync` so both sync entry points are tunable. Co-authored-by: Isaac
1 parent a8e04a6 commit 93b0765

7 files changed

Lines changed: 66 additions & 46 deletions

File tree

acceptance/bundle/help/bundle-sync/output.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Usage:
1515
databricks bundle sync [flags]
1616

1717
Flags:
18+
--concurrency int number of parallel requests to the workspace (default 20)
1819
--dry-run simulate sync execution without making actual changes
1920
--full perform full synchronization (default is incremental)
2021
-h, --help help for sync

acceptance/cmd/sync-without-args/output.txt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ Usage:
66
databricks sync [flags] SRC DST
77

88
Flags:
9-
--dry-run simulate sync execution without making actual changes
10-
--exclude strings patterns to exclude from sync (can be specified multiple times)
11-
--exclude-from string file containing patterns to exclude from sync (one pattern per line)
12-
--full perform full synchronization (default is incremental)
13-
-h, --help help for sync
14-
--include strings patterns to include in sync (can be specified multiple times)
15-
--include-from string file containing patterns to include to sync (one pattern per line)
16-
--interval duration file system polling interval (for --watch) (default 1s)
17-
--max-concurrent-requests int maximum number of concurrent requests to the workspace (default 20)
18-
--output type type of output format (default text)
19-
--watch watch local file system for changes
9+
--concurrency int number of parallel requests to the workspace (default 20)
10+
--dry-run simulate sync execution without making actual changes
11+
--exclude strings patterns to exclude from sync (can be specified multiple times)
12+
--exclude-from string file containing patterns to exclude from sync (one pattern per line)
13+
--full perform full synchronization (default is incremental)
14+
-h, --help help for sync
15+
--include strings patterns to include in sync (can be specified multiple times)
16+
--include-from string file containing patterns to include to sync (one pattern per line)
17+
--interval duration file system polling interval (for --watch) (default 1s)
18+
--output type type of output format (default text)
19+
--watch watch local file system for changes
2020

2121
Global Flags:
2222
--debug enable debug logging

cmd/bundle/sync.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bundle
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"time"
@@ -16,12 +17,22 @@ import (
1617
"github.com/spf13/cobra"
1718
)
1819

20+
var errInvalidConcurrency = errors.New("--concurrency must be at least 1")
21+
1922
type syncFlags struct {
20-
interval time.Duration
21-
full bool
22-
watch bool
23-
output flags.Output
24-
dryRun bool
23+
interval time.Duration
24+
full bool
25+
watch bool
26+
output flags.Output
27+
dryRun bool
28+
concurrency int
29+
}
30+
31+
func (f *syncFlags) validate() error {
32+
if f.concurrency < 1 {
33+
return errInvalidConcurrency
34+
}
35+
return nil
2536
}
2637

2738
func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) {
@@ -48,6 +59,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle)
4859
opts.Full = f.full
4960
opts.PollInterval = f.interval
5061
opts.DryRun = f.dryRun
62+
opts.Concurrency = f.concurrency
5163
return opts, nil
5264
}
5365

@@ -74,8 +86,13 @@ Use 'databricks bundle deploy' for full resource deployment.`,
7486
cmd.Flags().BoolVar(&f.watch, "watch", false, "watch local file system for changes")
7587
cmd.Flags().Var(&f.output, "output", "type of the output format")
7688
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
89+
cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace")
7790

7891
cmd.RunE = func(cmd *cobra.Command, args []string) error {
92+
if err := f.validate(); err != nil {
93+
return err
94+
}
95+
7996
b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{})
8097
if err != nil {
8198
return err

cmd/sync/sync.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,25 @@ import (
2424
"github.com/spf13/cobra"
2525
)
2626

27+
var errInvalidConcurrency = errors.New("--concurrency must be at least 1")
28+
2729
type syncFlags struct {
2830
// project files polling interval
29-
interval time.Duration
30-
full bool
31-
watch bool
32-
output flags.Output
33-
exclude []string
34-
include []string
35-
dryRun bool
36-
excludeFrom string
37-
includeFrom string
38-
maxConcurrentRequests int
31+
interval time.Duration
32+
full bool
33+
watch bool
34+
output flags.Output
35+
exclude []string
36+
include []string
37+
dryRun bool
38+
excludeFrom string
39+
includeFrom string
40+
concurrency int
3941
}
4042

4143
func (f *syncFlags) validate() error {
42-
if f.maxConcurrentRequests < 1 {
43-
return fmt.Errorf("--max-concurrent-requests must be >= 1, got %d", f.maxConcurrentRequests)
44+
if f.concurrency < 1 {
45+
return errInvalidConcurrency
4446
}
4547
return nil
4648
}
@@ -97,7 +99,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b *
9799
opts.Include = append(opts.Include, f.include...)
98100
opts.Include = append(opts.Include, includePatterns...)
99101
opts.DryRun = f.dryRun
100-
opts.MaxConcurrentRequests = f.maxConcurrentRequests
102+
opts.Concurrency = f.concurrency
101103
return opts, nil
102104
}
103105

@@ -170,9 +172,9 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
170172
SnapshotBasePath: filepath.Join(args[0], ".databricks"),
171173
WorkspaceClient: client,
172174

173-
OutputHandler: outputHandler,
174-
DryRun: f.dryRun,
175-
MaxConcurrentRequests: f.maxConcurrentRequests,
175+
OutputHandler: outputHandler,
176+
DryRun: f.dryRun,
177+
Concurrency: f.concurrency,
176178
}
177179
return &opts, nil
178180
}
@@ -197,7 +199,7 @@ func New() *cobra.Command {
197199
cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)")
198200
cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)")
199201
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
200-
cmd.Flags().IntVar(&f.maxConcurrentRequests, "max-concurrent-requests", sync.MaxRequestsInFlight, "maximum number of concurrent requests to the workspace")
202+
cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace")
201203

202204
// Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle.
203205
mustWorkspaceClient := func(cmd *cobra.Command, args []string) error {

cmd/sync/sync_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ func TestSyncOptionsFromBundle(t *testing.T) {
3232
},
3333
}
3434

35-
f := syncFlags{maxConcurrentRequests: 5}
35+
f := syncFlags{concurrency: 5}
3636
opts, err := f.syncOptionsFromBundle(New(), []string{}, b)
3737
require.NoError(t, err)
3838
assert.Equal(t, tempDir, opts.LocalRoot.Native())
3939
assert.Equal(t, "/Users/jane@doe.com/path", opts.RemotePath)
4040
assert.Equal(t, filepath.Join(tempDir, ".databricks", "bundle", "default"), opts.SnapshotBasePath)
41-
assert.Equal(t, 5, opts.MaxConcurrentRequests)
41+
assert.Equal(t, 5, opts.Concurrency)
4242
assert.NotNil(t, opts.WorkspaceClient)
4343
}
4444

@@ -57,25 +57,25 @@ func TestSyncOptionsFromArgs(t *testing.T) {
5757
local := t.TempDir()
5858
remote := "/remote"
5959

60-
f := syncFlags{maxConcurrentRequests: 7}
60+
f := syncFlags{concurrency: 7}
6161
cmd := New()
6262
cmd.SetContext(cmdctx.SetWorkspaceClient(t.Context(), nil))
6363
opts, err := f.syncOptionsFromArgs(cmd, []string{local, remote})
6464
require.NoError(t, err)
6565
assert.Equal(t, local, opts.LocalRoot.Native())
6666
assert.Equal(t, remote, opts.RemotePath)
67-
assert.Equal(t, 7, opts.MaxConcurrentRequests)
67+
assert.Equal(t, 7, opts.Concurrency)
6868
}
6969

7070
func TestSyncFlagsValidate(t *testing.T) {
71-
f := syncFlags{maxConcurrentRequests: 1}
71+
f := syncFlags{concurrency: 1}
7272
require.NoError(t, f.validate())
7373

74-
f.maxConcurrentRequests = 0
75-
require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1")
74+
f.concurrency = 0
75+
require.ErrorIs(t, f.validate(), errInvalidConcurrency)
7676

77-
f.maxConcurrentRequests = -3
78-
require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1")
77+
f.concurrency = -3
78+
require.ErrorIs(t, f.validate(), errInvalidConcurrency)
7979
}
8080

8181
func TestExcludeFromFlag(t *testing.T) {

libs/sync/sync.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type SyncOptions struct {
4444

4545
DryRun bool
4646

47-
MaxConcurrentRequests int
47+
Concurrency int
4848
}
4949

5050
type Sync struct {
@@ -98,8 +98,8 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
9898
return nil, errors.New("failed to resolve host for snapshot")
9999
}
100100

101-
if opts.MaxConcurrentRequests <= 0 {
102-
opts.MaxConcurrentRequests = MaxRequestsInFlight
101+
if opts.Concurrency <= 0 {
102+
opts.Concurrency = MaxRequestsInFlight
103103
}
104104

105105
// For full sync, we start with an empty snapshot.

libs/sync/watchdog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func groupRunParallel(ctx context.Context, paths []string, limit int, fn func(co
110110

111111
func (s *Sync) applyDiff(ctx context.Context, d diff) error {
112112
var err error
113-
limit := s.MaxConcurrentRequests
113+
limit := s.Concurrency
114114

115115
// Delete files in parallel.
116116
err = groupRunParallel(ctx, d.delete, limit, s.applyDelete)

0 commit comments

Comments
 (0)