Skip to content

Commit d62dcbb

Browse files
Parallelise skill file fetches and reuse HTTPS connections (#5335)
## Summary - Hoist a single shared `http.Client` in `libs/aitools/installer/installer.go` so the transport pool reuses TCP+TLS connections across fetches. `MaxIdleConnsPerHost` is bumped from Go's default 2 → 16 so parallel fetches to `raw.githubusercontent.com` actually reuse connections instead of churning handshakes. - Fetch a skill's files concurrently in `installSkillToDir` via `errgroup.WithContext` with `SetLimit(8)`. First error cancels in-flight peers, preserving the prior bail-on-first-error semantics. - Skills themselves are still installed serially across the outer loop. Most of the cold-start win therefore comes from the shared client keeping idle connections warm between sequential skills, rather than from intra-skill parallelism alone. ## Why `databricks aitools install` was sequential: every file constructed a fresh `&http.Client{}` and threw it away, paying the full TCP+TLS handshake per file. For `--experimental` (26 skills × ~5 files each = ~120 HTTPS GETs to GitHub raw) that meant wall-clock was dominated by handshake round-trips. Benchmarked locally against `https://raw.githubusercontent.com/databricks/databricks-agent-skills/v0.2.0` with a freshly built CLI on each side: | Condition | Baseline | This change | Speedup | |---|---|---|---| | Cold (no DNS/CDN warm) | 42.7 s | 0.52 s | ~80× | | Warm | 0.55–0.79 s | 0.20–0.21 s | ~3–4× | The cold-start number is what a first-time `databricks aitools install --experimental` user actually pays. The `fetchFileFn` package var that tests stub remains unchanged, so no test mocks need updating. ## Test plan - [x] `go test ./libs/aitools/...` — all green, including `-race -count=2`. - [x] `gofmt -l libs/aitools/installer/installer.go` — clean. - [x] `go vet ./libs/aitools/...` — clean. - [x] End-to-end timing against live GitHub raw — see table above. This pull request and its description were written by Isaac. --------- Co-authored-by: simon <simon.faltum@databricks.com>
1 parent b85b937 commit d62dcbb

2 files changed

Lines changed: 155 additions & 19 deletions

File tree

libs/aitools/installer/installer.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"path/filepath"
1313
"slices"
1414
"strings"
15+
"sync"
1516
"time"
1617

1718
"github.com/databricks/cli/internal/build"
@@ -21,15 +22,36 @@ import (
2122
"github.com/databricks/cli/libs/env"
2223
"github.com/databricks/cli/libs/log"
2324
"golang.org/x/mod/semver"
25+
"golang.org/x/sync/errgroup"
2426
)
2527

2628
const (
2729
skillsRepoOwner = "databricks"
2830
skillsRepoName = "databricks-agent-skills"
2931
stableSkillsRepoPath = "skills"
3032
experimentalRepoPath = "experimental"
33+
34+
// fetchConcurrency caps the number of in-flight skill file fetches.
35+
// Each file is one HTTPS GET to raw.githubusercontent.com; sequential
36+
// fetches were latency-bound on TLS handshakes. 8 is enough to amortise
37+
// the round-trip across a typical skill's files without overwhelming the
38+
// upstream CDN.
39+
fetchConcurrency = 8
3140
)
3241

42+
// httpClient is shared across all skill file fetches so the underlying
43+
// transport reuses TCP+TLS connections. The default MaxIdleConnsPerHost
44+
// (2) is bumped to leave headroom above fetchConcurrency so a brief overlap
45+
// between a closing and a new connection doesn't force a fresh handshake.
46+
var httpClient = sync.OnceValue(func() *http.Client {
47+
t := http.DefaultTransport.(*http.Transport).Clone()
48+
t.MaxIdleConnsPerHost = fetchConcurrency * 2
49+
return &http.Client{
50+
Timeout: 30 * time.Second,
51+
Transport: t,
52+
}
53+
})
54+
3355
func manifestHasExperimental(m *Manifest) bool {
3456
for _, meta := range m.Skills {
3557
if meta.IsExperimental() {
@@ -121,8 +143,7 @@ func fetchSkillFile(ctx context.Context, ref, repoDir, skillName, filePath strin
121143
return nil, fmt.Errorf("failed to create request: %w", err)
122144
}
123145

124-
client := &http.Client{Timeout: 30 * time.Second}
125-
resp, err := client.Do(req)
146+
resp, err := httpClient().Do(req)
126147
if err != nil {
127148
return nil, fmt.Errorf("failed to fetch %s: %w", filePath, err)
128149
}
@@ -555,25 +576,29 @@ func installSkillToDir(ctx context.Context, ref, repoDir, skillName, destDir str
555576
return fmt.Errorf("failed to create directory: %w", err)
556577
}
557578

579+
// Fetch files concurrently. Each file is a separate HTTPS GET, so
580+
// wall-clock time is dominated by per-request TLS handshakes rather
581+
// than payload size.
582+
g, gctx := errgroup.WithContext(ctx)
583+
g.SetLimit(fetchConcurrency)
558584
for _, file := range files {
559-
content, err := fetchFileFn(ctx, ref, repoDir, skillName, file)
560-
if err != nil {
561-
return err
562-
}
563-
564-
destPath := filepath.Join(destDir, file)
565-
566-
if err := os.MkdirAll(filepath.Dir(destPath), 0o755); err != nil {
567-
return fmt.Errorf("failed to create directory: %w", err)
568-
}
569-
570-
log.Debugf(ctx, "Downloading %s/%s", skillName, file)
571-
if err := os.WriteFile(destPath, content, 0o644); err != nil {
572-
return fmt.Errorf("failed to write %s: %w", file, err)
573-
}
585+
g.Go(func() error {
586+
content, err := fetchFileFn(gctx, ref, repoDir, skillName, file)
587+
if err != nil {
588+
return err
589+
}
590+
destPath := filepath.Join(destDir, file)
591+
if err := os.MkdirAll(filepath.Dir(destPath), 0o755); err != nil {
592+
return fmt.Errorf("failed to create directory: %w", err)
593+
}
594+
log.Debugf(gctx, "Downloading %s/%s", skillName, file)
595+
if err := os.WriteFile(destPath, content, 0o644); err != nil {
596+
return fmt.Errorf("failed to write %s: %w", file, err)
597+
}
598+
return nil
599+
})
574600
}
575-
576-
return nil
601+
return g.Wait()
577602
}
578603

579604
// copyDir copies all files from src to dest, recreating the directory structure.

libs/aitools/installer/installer_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package installer
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"io/fs"
78
"log/slog"
89
"os"
910
"path/filepath"
11+
"sync"
1012
"testing"
13+
"time"
1114

1215
"github.com/databricks/cli/internal/build"
1316
"github.com/databricks/cli/libs/aitools/agents"
@@ -188,6 +191,114 @@ func TestBackupThirdPartySkillRegularFile(t *testing.T) {
188191
assert.ErrorIs(t, err, fs.ErrNotExist)
189192
}
190193

194+
func TestInstallSkillToDirFetchesFilesConcurrently(t *testing.T) {
195+
baseCtx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
196+
defer cancel()
197+
ctx := cmdio.MockDiscard(baseCtx)
198+
199+
orig := fetchFileFn
200+
t.Cleanup(func() { fetchFileFn = orig })
201+
202+
started := make(chan string, 2)
203+
release := make(chan struct{})
204+
releaseOnce := sync.OnceFunc(func() { close(release) })
205+
t.Cleanup(releaseOnce)
206+
207+
fetchFileFn = func(ctx context.Context, _, _, _, filePath string) ([]byte, error) {
208+
started <- filePath
209+
select {
210+
case <-release:
211+
return []byte(filePath), nil
212+
case <-ctx.Done():
213+
return nil, ctx.Err()
214+
}
215+
}
216+
217+
destDir := filepath.Join(t.TempDir(), "databricks-test")
218+
errCh := make(chan error, 1)
219+
go func() {
220+
errCh <- installSkillToDir(ctx, testSkillsRef, stableSkillsRepoPath, "databricks-test", destDir, []string{"one.md", "two.md"})
221+
}()
222+
223+
fetched := make(map[string]bool, 2)
224+
for range 2 {
225+
select {
226+
case filePath := <-started:
227+
fetched[filePath] = true
228+
case <-ctx.Done():
229+
require.FailNow(t, "timed out waiting for concurrent fetches to start")
230+
}
231+
}
232+
assert.Equal(t, map[string]bool{"one.md": true, "two.md": true}, fetched)
233+
234+
releaseOnce()
235+
require.NoError(t, <-errCh)
236+
237+
one, err := os.ReadFile(filepath.Join(destDir, "one.md"))
238+
require.NoError(t, err)
239+
assert.Equal(t, "one.md", string(one))
240+
two, err := os.ReadFile(filepath.Join(destDir, "two.md"))
241+
require.NoError(t, err)
242+
assert.Equal(t, "two.md", string(two))
243+
}
244+
245+
func TestInstallSkillToDirCancelsInFlightFetchesOnError(t *testing.T) {
246+
baseCtx, cancel := context.WithCancel(t.Context())
247+
defer cancel()
248+
ctx := cmdio.MockDiscard(baseCtx)
249+
250+
orig := fetchFileFn
251+
t.Cleanup(func() { fetchFileFn = orig })
252+
253+
fetchErr := errors.New("fetch failed")
254+
blockedStarted := make(chan struct{})
255+
cancelled := make(chan struct{})
256+
257+
fetchFileFn = func(ctx context.Context, _, _, _, filePath string) ([]byte, error) {
258+
switch filePath {
259+
case "blocked.md":
260+
close(blockedStarted)
261+
<-ctx.Done()
262+
close(cancelled)
263+
return nil, ctx.Err()
264+
case "fail.md":
265+
select {
266+
case <-blockedStarted:
267+
return nil, fetchErr
268+
case <-ctx.Done():
269+
return nil, ctx.Err()
270+
}
271+
default:
272+
return []byte(filePath), nil
273+
}
274+
}
275+
276+
destDir := filepath.Join(t.TempDir(), "databricks-test")
277+
errCh := make(chan error, 1)
278+
go func() {
279+
errCh <- installSkillToDir(ctx, testSkillsRef, stableSkillsRepoPath, "databricks-test", destDir, []string{"blocked.md", "fail.md"})
280+
}()
281+
282+
var err error
283+
select {
284+
case err = <-errCh:
285+
case <-time.After(5 * time.Second):
286+
cancel()
287+
select {
288+
case <-errCh:
289+
case <-time.After(time.Second):
290+
}
291+
require.FailNow(t, "timed out waiting for errgroup cancellation")
292+
}
293+
require.ErrorIs(t, err, fetchErr)
294+
295+
select {
296+
case <-cancelled:
297+
default:
298+
require.Fail(t, "expected in-flight fetch to observe context cancellation")
299+
}
300+
}
301+
191302
// --- InstallSkillsForAgents tests ---
192303

193304
func TestInstallSkillsForAgentsWritesState(t *testing.T) {

0 commit comments

Comments
 (0)