Skip to content

Commit a8e04a6

Browse files
committed
sync: add --max-concurrent-requests flag
Expose the previously-hardcoded request concurrency cap as a flag on the `databricks sync` command. Defaults to the existing limit (20) so behavior is unchanged when the flag is omitted; values < 1 are rejected. Co-authored-by: Isaac
1 parent 975bf0c commit a8e04a6

5 files changed

Lines changed: 65 additions & 29 deletions

File tree

acceptance/cmd/sync-without-args/output.txt

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ Usage:
66
databricks sync [flags] SRC DST
77

88
Flags:
9-
--dry-run simulate sync execution without making actual changes
10-
--exclude strings patterns to exclude from sync (can be specified multiple times)
11-
--exclude-from string file containing patterns to exclude from sync (one pattern per line)
12-
--full perform full synchronization (default is incremental)
13-
-h, --help help for sync
14-
--include strings patterns to include in sync (can be specified multiple times)
15-
--include-from string file containing patterns to include to sync (one pattern per line)
16-
--interval duration file system polling interval (for --watch) (default 1s)
17-
--output type type of output format (default text)
18-
--watch watch local file system for changes
9+
--dry-run simulate sync execution without making actual changes
10+
--exclude strings patterns to exclude from sync (can be specified multiple times)
11+
--exclude-from string file containing patterns to exclude from sync (one pattern per line)
12+
--full perform full synchronization (default is incremental)
13+
-h, --help help for sync
14+
--include strings patterns to include in sync (can be specified multiple times)
15+
--include-from string file containing patterns to include to sync (one pattern per line)
16+
--interval duration file system polling interval (for --watch) (default 1s)
17+
--max-concurrent-requests int maximum number of concurrent requests to the workspace (default 20)
18+
--output type type of output format (default text)
19+
--watch watch local file system for changes
1920

2021
Global Flags:
2122
--debug enable debug logging

cmd/sync/sync.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,23 @@ import (
2626

2727
type syncFlags struct {
2828
// project files polling interval
29-
interval time.Duration
30-
full bool
31-
watch bool
32-
output flags.Output
33-
exclude []string
34-
include []string
35-
dryRun bool
36-
excludeFrom string
37-
includeFrom string
29+
interval time.Duration
30+
full bool
31+
watch bool
32+
output flags.Output
33+
exclude []string
34+
include []string
35+
dryRun bool
36+
excludeFrom string
37+
includeFrom string
38+
maxConcurrentRequests int
39+
}
40+
41+
func (f *syncFlags) validate() error {
42+
if f.maxConcurrentRequests < 1 {
43+
return fmt.Errorf("--max-concurrent-requests must be >= 1, got %d", f.maxConcurrentRequests)
44+
}
45+
return nil
3846
}
3947

4048
func readPatternsFile(filePath string) ([]string, error) {
@@ -89,6 +97,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b *
8997
opts.Include = append(opts.Include, f.include...)
9098
opts.Include = append(opts.Include, includePatterns...)
9199
opts.DryRun = f.dryRun
100+
opts.MaxConcurrentRequests = f.maxConcurrentRequests
92101
return opts, nil
93102
}
94103

@@ -161,8 +170,9 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
161170
SnapshotBasePath: filepath.Join(args[0], ".databricks"),
162171
WorkspaceClient: client,
163172

164-
OutputHandler: outputHandler,
165-
DryRun: f.dryRun,
173+
OutputHandler: outputHandler,
174+
DryRun: f.dryRun,
175+
MaxConcurrentRequests: f.maxConcurrentRequests,
166176
}
167177
return &opts, nil
168178
}
@@ -187,6 +197,7 @@ func New() *cobra.Command {
187197
cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)")
188198
cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)")
189199
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
200+
cmd.Flags().IntVar(&f.maxConcurrentRequests, "max-concurrent-requests", sync.MaxRequestsInFlight, "maximum number of concurrent requests to the workspace")
190201

191202
// Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle.
192203
mustWorkspaceClient := func(cmd *cobra.Command, args []string) error {
@@ -196,6 +207,10 @@ func New() *cobra.Command {
196207

197208
cmd.PreRunE = mustWorkspaceClient
198209
cmd.RunE = func(cmd *cobra.Command, args []string) error {
210+
if err := f.validate(); err != nil {
211+
return err
212+
}
213+
199214
var opts *sync.SyncOptions
200215
var err error
201216

cmd/sync/sync_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@ func TestSyncOptionsFromBundle(t *testing.T) {
3232
},
3333
}
3434

35-
f := syncFlags{}
35+
f := syncFlags{maxConcurrentRequests: 5}
3636
opts, err := f.syncOptionsFromBundle(New(), []string{}, b)
3737
require.NoError(t, err)
3838
assert.Equal(t, tempDir, opts.LocalRoot.Native())
3939
assert.Equal(t, "/Users/jane@doe.com/path", opts.RemotePath)
4040
assert.Equal(t, filepath.Join(tempDir, ".databricks", "bundle", "default"), opts.SnapshotBasePath)
41+
assert.Equal(t, 5, opts.MaxConcurrentRequests)
4142
assert.NotNil(t, opts.WorkspaceClient)
4243
}
4344

@@ -56,13 +57,25 @@ func TestSyncOptionsFromArgs(t *testing.T) {
5657
local := t.TempDir()
5758
remote := "/remote"
5859

59-
f := syncFlags{}
60+
f := syncFlags{maxConcurrentRequests: 7}
6061
cmd := New()
6162
cmd.SetContext(cmdctx.SetWorkspaceClient(t.Context(), nil))
6263
opts, err := f.syncOptionsFromArgs(cmd, []string{local, remote})
6364
require.NoError(t, err)
6465
assert.Equal(t, local, opts.LocalRoot.Native())
6566
assert.Equal(t, remote, opts.RemotePath)
67+
assert.Equal(t, 7, opts.MaxConcurrentRequests)
68+
}
69+
70+
func TestSyncFlagsValidate(t *testing.T) {
71+
f := syncFlags{maxConcurrentRequests: 1}
72+
require.NoError(t, f.validate())
73+
74+
f.maxConcurrentRequests = 0
75+
require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1")
76+
77+
f.maxConcurrentRequests = -3
78+
require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1")
6679
}
6780

6881
func TestExcludeFromFlag(t *testing.T) {

libs/sync/sync.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ type SyncOptions struct {
4343
OutputHandler OutputHandler
4444

4545
DryRun bool
46+
47+
MaxConcurrentRequests int
4648
}
4749

4850
type Sync struct {
@@ -96,6 +98,10 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
9698
return nil, errors.New("failed to resolve host for snapshot")
9799
}
98100

101+
if opts.MaxConcurrentRequests <= 0 {
102+
opts.MaxConcurrentRequests = MaxRequestsInFlight
103+
}
104+
99105
// For full sync, we start with an empty snapshot.
100106
// For incremental sync, we try to load an existing snapshot to start from.
101107
var snapshot *Snapshot

libs/sync/watchdog.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.
9696
})
9797
}
9898

99-
func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error {
99+
func groupRunParallel(ctx context.Context, paths []string, limit int, fn func(context.Context, string) error) error {
100100
group, ctx := errgroup.WithContext(ctx)
101-
group.SetLimit(MaxRequestsInFlight)
101+
group.SetLimit(limit)
102102

103103
for _, path := range paths {
104104
groupRunSingle(ctx, group, fn, path)
@@ -110,31 +110,32 @@ func groupRunParallel(ctx context.Context, paths []string, fn func(context.Conte
110110

111111
func (s *Sync) applyDiff(ctx context.Context, d diff) error {
112112
var err error
113+
limit := s.MaxConcurrentRequests
113114

114115
// Delete files in parallel.
115-
err = groupRunParallel(ctx, d.delete, s.applyDelete)
116+
err = groupRunParallel(ctx, d.delete, limit, s.applyDelete)
116117
if err != nil {
117118
return err
118119
}
119120

120121
// Delete directories ordered by depth from leaf to root.
121122
for _, group := range d.groupedRmdir() {
122-
err = groupRunParallel(ctx, group, s.applyRmdir)
123+
err = groupRunParallel(ctx, group, limit, s.applyRmdir)
123124
if err != nil {
124125
return err
125126
}
126127
}
127128

128129
// Create directories (leafs only because intermediates are created automatically).
129130
for _, group := range d.groupedMkdir() {
130-
err = groupRunParallel(ctx, group, s.applyMkdir)
131+
err = groupRunParallel(ctx, group, limit, s.applyMkdir)
131132
if err != nil {
132133
return err
133134
}
134135
}
135136

136137
// Put files in parallel.
137-
err = groupRunParallel(ctx, d.put, s.applyPut)
138+
err = groupRunParallel(ctx, d.put, limit, s.applyPut)
138139

139140
return err
140141
}

0 commit comments

Comments
 (0)