From 0fe715b06e073b615a6b93b7dca723dd9d105965 Mon Sep 17 00:00:00 2001 From: Shivaji Kharse Date: Mon, 16 Mar 2026 15:33:05 +0530 Subject: [PATCH 1/3] resolve rate-limit failures when downloading datasets --- .../ci-dgraph-integration2-tests.yml | 16 +++++ .github/workflows/ci-dgraph-ldbc-tests.yml | 15 ++++- .github/workflows/ci-dgraph-load-tests.yml | 15 ++++- dgraphtest/load.go | 30 ++++++--- t/t.go | 64 +++++++++++++------ 5 files changed, 111 insertions(+), 29 deletions(-) diff --git a/.github/workflows/ci-dgraph-integration2-tests.yml b/.github/workflows/ci-dgraph-integration2-tests.yml index 403a08465b4..7c8fadd24f4 100644 --- a/.github/workflows/ci-dgraph-integration2-tests.yml +++ b/.github/workflows/ci-dgraph-integration2-tests.yml @@ -29,6 +29,15 @@ jobs: with: fetch-depth: 0 + - name: Restore benchmark dataset cache + uses: actions/cache/restore@v4 + with: + path: dgraphtest/datafiles + key: dataset-dgraphtest-v1 + + - name: Ensure datafiles directory + run: mkdir -p dgraphtest/datafiles + - name: Set up Go uses: actions/setup-go@v6 with: @@ -55,3 +64,10 @@ jobs: go clean -testcache # sleep sleep 5 + + - name: Save benchmark dataset cache + if: success() + uses: actions/cache/save@v4 + with: + path: dgraphtest/datafiles + key: dataset-dgraphtest-v1 diff --git a/.github/workflows/ci-dgraph-ldbc-tests.yml b/.github/workflows/ci-dgraph-ldbc-tests.yml index 045ad3d42e9..4ba863dedb0 100644 --- a/.github/workflows/ci-dgraph-ldbc-tests.yml +++ b/.github/workflows/ci-dgraph-ldbc-tests.yml @@ -28,6 +28,12 @@ jobs: - name: Checkout Dgraph uses: actions/checkout@v5 + - name: Restore LDBC dataset cache + uses: actions/cache/restore@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-ldbc-v1 + - name: Set up Go uses: actions/setup-go@v6 with: @@ -61,6 +67,13 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the ldbc tests - cd t; ./t --suite=ldbc + cd t; ./t --suite=ldbc --tmp=${{ github.workspace }}/test-data # clean up docker containers after test execution ./t -r + + - name: Save LDBC dataset cache + if: success() + uses: actions/cache/save@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-ldbc-v1 diff --git a/.github/workflows/ci-dgraph-load-tests.yml b/.github/workflows/ci-dgraph-load-tests.yml index a1967b59d53..dd25be53a63 100644 --- a/.github/workflows/ci-dgraph-load-tests.yml +++ b/.github/workflows/ci-dgraph-load-tests.yml @@ -27,6 +27,12 @@ jobs: steps: - uses: actions/checkout@v5 + - name: Restore load test dataset cache + uses: actions/cache/restore@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-load-v1 + - name: Set up Go uses: actions/setup-go@v6 with: @@ -60,8 +66,15 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the load tests - cd t; ./t --suite=load + cd t; ./t --suite=load --tmp=${{ github.workspace }}/test-data # clean up docker containers after test execution ./t -r # sleep sleep 5 + + - name: Save load test dataset cache + if: success() + uses: actions/cache/save@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-load-v1 diff --git a/dgraphtest/load.go b/dgraphtest/load.go index 116c04a6b5c..d7bf35255e9 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -20,6 +20,7 @@ import ( "runtime" "strconv" "strings" + "time" "github.com/pkg/errors" @@ -41,10 +42,10 @@ func (c *LocalCluster) HostDgraphBinaryPath() string { } var datafiles = map[string]string{ - "1million.schema": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/1million.schema?raw=true", - "1million.rdf.gz": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/1million.rdf.gz?raw=true", - "21million.schema": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/21million.schema?raw=true", - "21million.rdf.gz": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/21million.rdf.gz?raw=true", + "1million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema", + "1million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz", + "21million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema", + "21million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz", } type DatasetType int @@ -604,11 +605,22 @@ func (d *Dataset) ensureFile(filename string) string { } func downloadFile(fname, url string) error { - cmd := exec.Command("wget", "-O", fname, url) - cmd.Dir = datasetFilesPath - - if _, err := cmd.CombinedOutput(); err != nil { - return fmt.Errorf("error downloading file %s: %w", fname, err) + const maxRetries = 3 + fpath := filepath.Join(datasetFilesPath, fname) + for attempt := 1; attempt <= maxRetries; attempt++ { + cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) + cmd.Dir = datasetFilesPath + + if out, err := cmd.CombinedOutput(); err != nil { + log.Printf("attempt %d/%d failed to download %s: %v\n%s", attempt, maxRetries, fname, err, string(out)) + if attempt < maxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + _ = os.Remove(fpath) + return fmt.Errorf("error downloading file %s after %d attempts: %w", fname, maxRetries, err) + } + return nil } return nil } diff --git a/t/t.go b/t/t.go index 168b6d364c7..cad64a1e1eb 100644 --- a/t/t.go +++ b/t/t.go @@ -1159,7 +1159,27 @@ var rdfFileNames = [...]string{ "workAt_0.rdf"} var ldbcDataFiles = map[string]string{ - "ldbcTypes.schema": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/ldbc/sf0.3/ldbcTypes.schema?raw=true", + "ldbcTypes.schema": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbcTypes.schema", +} + +func wgetWithRetry(fname, url, dir string) error { + const maxRetries = 3 + fpath := filepath.Join(dir, fname) + for attempt := 1; attempt <= maxRetries; attempt++ { + cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) + cmd.Dir = dir + if out, err := cmd.CombinedOutput(); err != nil { + fmt.Printf("attempt %d/%d failed to download %s: %v\n%s\n", attempt, maxRetries, fname, err, string(out)) + if attempt < maxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + _ = os.Remove(fpath) + return fmt.Errorf("failed to download %s after %d attempts: %w", fname, maxRetries, err) + } + return nil + } + return nil } func downloadDataFiles() { @@ -1168,12 +1188,13 @@ func downloadDataFiles() { return } for fname, link := range datafiles { - cmd := exec.Command("wget", "-O", fname, link) - cmd.Dir = *tmp - - if out, err := cmd.CombinedOutput(); err != nil { - fmt.Printf("Error %v\n", err) - panic(fmt.Sprintf("error downloading a file: %s", string(out))) + fpath := filepath.Join(*tmp, fname) + if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + fmt.Printf("Skipping %s (already exists)\n", fname) + continue + } + if err := wgetWithRetry(fname, link, *tmp); err != nil { + panic(fmt.Sprintf("error downloading %s: %v", fname, err)) } } } @@ -1189,20 +1210,26 @@ func downloadLDBCFiles(dir string) { } start := time.Now() + sem := make(chan struct{}, 5) var wg sync.WaitGroup for fname, link := range ldbcDataFiles { + fpath := filepath.Join(dir, fname) + if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + fmt.Printf("Skipping %s (already exists)\n", fname) + continue + } wg.Add(1) - go func(fname, link string, wg *sync.WaitGroup) { + go func(fname, link string) { defer wg.Done() - start := time.Now() - cmd := exec.Command("wget", "-O", fname, link) - cmd.Dir = dir - if out, err := cmd.CombinedOutput(); err != nil { - fmt.Printf("Error %v\n", err) - panic(fmt.Sprintf("error downloading a file: %s", string(out))) + sem <- struct{}{} + defer func() { <-sem }() + + dlStart := time.Now() + if err := wgetWithRetry(fname, link, dir); err != nil { + panic(fmt.Sprintf("error downloading %s: %v", fname, err)) } - fmt.Printf("Downloaded %s to %s in %s \n", fname, dir, time.Since(start)) - }(fname, link, &wg) + fmt.Printf("Downloaded %s to %s in %s \n", fname, dir, time.Since(dlStart)) + }(fname, link) } wg.Wait() fmt.Printf("Downloaded %d files in %s \n", len(ldbcDataFiles), time.Since(start)) @@ -1387,7 +1414,9 @@ func run() error { needsData := testSuiteContainsAny("load", "ldbc", "all") if needsData && *tmp == "" { *tmp = filepath.Join(os.TempDir(), "dgraph-test-data") - x.Check(testutil.MakeDirEmpty([]string{*tmp})) + } + if needsData { + x.Check(os.MkdirAll(*tmp, 0755)) } if testSuiteContainsAny("load", "all") { downloadDataFiles() @@ -1449,7 +1478,6 @@ func main() { procId = rand.Intn(1000) err := run() - _ = os.RemoveAll(*tmp) if err != nil { os.Exit(1) } From 9b44898fbda5c66c2f62c6722522852e1b531414 Mon Sep 17 00:00:00 2001 From: Shivaji Kharse Date: Fri, 20 Mar 2026 15:16:10 +0530 Subject: [PATCH 2/3] resolve review comments --- .../ci-dgraph-integration2-tests.yml | 4 +- .github/workflows/ci-dgraph-ldbc-tests.yml | 6 +- .github/workflows/ci-dgraph-load-tests.yml | 6 +- dgraphtest/load.go | 50 ++-- t/t.go | 135 +++++---- testutil/benchmark-data-version | 1 + testutil/download.go | 260 ++++++++++++++++++ 7 files changed, 362 insertions(+), 100 deletions(-) create mode 100644 testutil/benchmark-data-version create mode 100644 testutil/download.go diff --git a/.github/workflows/ci-dgraph-integration2-tests.yml b/.github/workflows/ci-dgraph-integration2-tests.yml index 7c8fadd24f4..86fe6cca541 100644 --- a/.github/workflows/ci-dgraph-integration2-tests.yml +++ b/.github/workflows/ci-dgraph-integration2-tests.yml @@ -33,7 +33,7 @@ jobs: uses: actions/cache/restore@v4 with: path: dgraphtest/datafiles - key: dataset-dgraphtest-v1 + key: dataset-dgraphtest-${{ hashFiles('testutil/benchmark-data-version') }} - name: Ensure datafiles directory run: mkdir -p dgraphtest/datafiles @@ -70,4 +70,4 @@ jobs: uses: actions/cache/save@v4 with: path: dgraphtest/datafiles - key: dataset-dgraphtest-v1 + key: dataset-dgraphtest-${{ hashFiles('testutil/benchmark-data-version') }} diff --git a/.github/workflows/ci-dgraph-ldbc-tests.yml b/.github/workflows/ci-dgraph-ldbc-tests.yml index 4ba863dedb0..bcb733d8dc1 100644 --- a/.github/workflows/ci-dgraph-ldbc-tests.yml +++ b/.github/workflows/ci-dgraph-ldbc-tests.yml @@ -32,7 +32,7 @@ jobs: uses: actions/cache/restore@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-ldbc-v1 + key: dataset-ldbc-${{ hashFiles('testutil/benchmark-data-version') }} - name: Set up Go uses: actions/setup-go@v6 @@ -67,7 +67,7 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the ldbc tests - cd t; ./t --suite=ldbc --tmp=${{ github.workspace }}/test-data + cd t; ./t --suite=ldbc --tmp=${{ github.workspace }}/test-data --keep-data # clean up docker containers after test execution ./t -r @@ -76,4 +76,4 @@ jobs: uses: actions/cache/save@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-ldbc-v1 + key: dataset-ldbc-${{ hashFiles('testutil/benchmark-data-version') }} diff --git a/.github/workflows/ci-dgraph-load-tests.yml b/.github/workflows/ci-dgraph-load-tests.yml index dd25be53a63..b6f8d17d4f3 100644 --- a/.github/workflows/ci-dgraph-load-tests.yml +++ b/.github/workflows/ci-dgraph-load-tests.yml @@ -31,7 +31,7 @@ jobs: uses: actions/cache/restore@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-load-v1 + key: dataset-load-${{ hashFiles('testutil/benchmark-data-version') }} - name: Set up Go uses: actions/setup-go@v6 @@ -66,7 +66,7 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the load tests - cd t; ./t --suite=load --tmp=${{ github.workspace }}/test-data + cd t; ./t --suite=load --tmp=${{ github.workspace }}/test-data --keep-data # clean up docker containers after test execution ./t -r # sleep @@ -77,4 +77,4 @@ jobs: uses: actions/cache/save@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-load-v1 + key: dataset-load-${{ hashFiles('testutil/benchmark-data-version') }} diff --git a/dgraphtest/load.go b/dgraphtest/load.go index d7bf35255e9..278b604a1db 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -20,13 +20,13 @@ import ( "runtime" "strconv" "strings" - "time" "github.com/pkg/errors" "github.com/dgraph-io/dgo/v250/protos/api" "github.com/dgraph-io/dgraph/v25/dgraphapi" "github.com/dgraph-io/dgraph/v25/enc" + "github.com/dgraph-io/dgraph/v25/testutil" "github.com/dgraph-io/dgraph/v25/x" ) @@ -41,11 +41,13 @@ func (c *LocalCluster) HostDgraphBinaryPath() string { return filepath.Join(c.tempBinDir, "dgraph_host") } -var datafiles = map[string]string{ - "1million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema", - "1million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz", - "21million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema", - "21million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz", +// datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo. +// URLs are constructed at runtime by combining these with the configured ref. +var datafilePaths = map[string]string{ + "1million.schema": "data/1million.schema", + "1million.rdf.gz": "data/1million.rdf.gz", + "21million.schema": "data/21million.schema", + "21million.rdf.gz": "data/21million.rdf.gz", } type DatasetType int @@ -592,35 +594,21 @@ func (d *Dataset) GqlSchemaPath() string { func (d *Dataset) ensureFile(filename string) string { fullPath := filepath.Join(datasetFilesPath, filename) - if exists, _ := fileExists(fullPath); !exists { - url, ok := datafiles[filename] + if !testutil.FileExistsAndValid(fullPath) { + repoPath, ok := datafilePaths[filename] if !ok { - panic(fmt.Sprintf("dataset file %s not found in datafiles map", filename)) + panic(fmt.Sprintf("dataset file %s not found in datafilePaths map", filename)) } - if err := downloadFile(filename, url); err != nil { + ref := testutil.BenchmarkDataRef("") + var err error + if strings.HasSuffix(filename, ".rdf.gz") { + err = testutil.DownloadLFSFile(filename, ref, repoPath, datasetFilesPath) + } else { + err = testutil.DownloadFile(filename, testutil.BenchmarkRawURL(ref, repoPath), datasetFilesPath) + } + if err != nil { panic(fmt.Sprintf("failed to download %s: %v", filename, err)) } } return fullPath } - -func downloadFile(fname, url string) error { - const maxRetries = 3 - fpath := filepath.Join(datasetFilesPath, fname) - for attempt := 1; attempt <= maxRetries; attempt++ { - cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) - cmd.Dir = datasetFilesPath - - if out, err := cmd.CombinedOutput(); err != nil { - log.Printf("attempt %d/%d failed to download %s: %v\n%s", attempt, maxRetries, fname, err, string(out)) - if attempt < maxRetries { - time.Sleep(time.Duration(attempt*5) * time.Second) - continue - } - _ = os.Remove(fpath) - return fmt.Errorf("error downloading file %s after %d attempts: %w", fname, maxRetries, err) - } - return nil - } - return nil -} diff --git a/t/t.go b/t/t.go index cad64a1e1eb..adc9ce6bef4 100644 --- a/t/t.go +++ b/t/t.go @@ -33,6 +33,7 @@ import ( "github.com/docker/docker/client" "github.com/golang/glog" "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "golang.org/x/tools/go/packages" "github.com/dgraph-io/dgraph/v25/testutil" @@ -93,7 +94,13 @@ var ( "unit = true unit tests only (no Docker, no integration tag). "+ "integration = everything except ldbc, load, and systest-heavy (with Docker). "+ "systest = systest-baseline + systest-heavy.") - tmp = pflag.String("tmp", "", "Temporary directory used to download data.") + tmp = pflag.String("tmp", "", "Temporary directory used to download data.") + keepData = pflag.Bool("keep-data", false, + "If true, do not remove the data directory after tests complete. "+ + "Useful in CI where data is cached between runs.") + dataRef = pflag.String("data-ref", "", + "Git ref (branch, tag, or SHA) for dgraph-benchmarks data. "+ + "Overrides DGRAPH_TEST_DATA_REF env var and benchmark-data-version file.") downloadResources = pflag.BoolP("download", "d", true, "Flag to specify whether to download resources or not") race = pflag.Bool("race", false, "Set true to build with race") @@ -1121,18 +1128,17 @@ func isHeavyPackage(pkg string) bool { return false } -var datafiles = map[string]string{ - "1million-noindex.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million-noindex.schema", - "1million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema", - "1million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz", - "21million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema", - "21million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz", +// datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo. +// URLs are constructed at runtime from BenchmarkDataRef(). +var datafilePaths = map[string]string{ + "1million-noindex.schema": "data/1million-noindex.schema", + "1million.schema": "data/1million.schema", + "1million.rdf.gz": "data/1million.rdf.gz", + "21million.schema": "data/21million.schema", + "21million.rdf.gz": "data/21million.rdf.gz", } -var baseUrl = "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbc_rdf_0.3/" -var suffix = "?raw=true" - -var rdfFileNames = [...]string{ +var ldbcRdfFileNames = [...]string{ "Deltas.rdf", "comment_0.rdf", "containerOf_0.rdf", @@ -1156,83 +1162,81 @@ var rdfFileNames = [...]string{ "studyAt_0.rdf", "tag_0.rdf", "tagclass_0.rdf", - "workAt_0.rdf"} - -var ldbcDataFiles = map[string]string{ - "ldbcTypes.schema": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbcTypes.schema", + "workAt_0.rdf", } -func wgetWithRetry(fname, url, dir string) error { - const maxRetries = 3 - fpath := filepath.Join(dir, fname) - for attempt := 1; attempt <= maxRetries; attempt++ { - cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) - cmd.Dir = dir - if out, err := cmd.CombinedOutput(); err != nil { - fmt.Printf("attempt %d/%d failed to download %s: %v\n%s\n", attempt, maxRetries, fname, err, string(out)) - if attempt < maxRetries { - time.Sleep(time.Duration(attempt*5) * time.Second) - continue - } - _ = os.Remove(fpath) - return fmt.Errorf("failed to download %s after %d attempts: %w", fname, maxRetries, err) - } - return nil - } - return nil +// ldbcFilePaths maps filenames to their paths inside the dgraph-benchmarks repo. +var ldbcFilePaths = map[string]string{ + "ldbcTypes.schema": "ldbc/sf0.3/ldbcTypes.schema", } -func downloadDataFiles() { +func downloadDataFiles() error { if !*downloadResources { fmt.Print("Skipping downloading of resources\n") - return + return nil } - for fname, link := range datafiles { + ref := testutil.BenchmarkDataRef(*dataRef) + fmt.Printf("Using benchmark data ref: %s\n", ref) + for fname, repoPath := range datafilePaths { fpath := filepath.Join(*tmp, fname) - if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + if testutil.FileExistsAndValid(fpath) { fmt.Printf("Skipping %s (already exists)\n", fname) continue } - if err := wgetWithRetry(fname, link, *tmp); err != nil { - panic(fmt.Sprintf("error downloading %s: %v", fname, err)) + var err error + if strings.HasSuffix(fname, ".rdf.gz") { + err = testutil.DownloadLFSFile(fname, ref, repoPath, *tmp) + } else { + err = testutil.DownloadFile(fname, testutil.BenchmarkRawURL(ref, repoPath), *tmp) + } + if err != nil { + return fmt.Errorf("error downloading %s: %v", fname, err) } } + return nil } -func downloadLDBCFiles(dir string) { +func downloadLDBCFiles(dir string) error { if !*downloadResources { fmt.Print("Skipping downloading of resources\n") - return + return nil } - for _, name := range rdfFileNames { - ldbcDataFiles[name] = baseUrl + name + suffix + ref := testutil.BenchmarkDataRef(*dataRef) + fmt.Printf("Using benchmark data ref: %s\n", ref) + + // All LDBC files (schema + RDF) are LFS-tracked. + allFiles := make(map[string]string) + for fname, repoPath := range ldbcFilePaths { + allFiles[fname] = repoPath + } + for _, name := range ldbcRdfFileNames { + allFiles[name] = "ldbc/sf0.3/ldbc_rdf_0.3/" + name } start := time.Now() - sem := make(chan struct{}, 5) - var wg sync.WaitGroup - for fname, link := range ldbcDataFiles { + g, _ := errgroup.WithContext(context.Background()) + g.SetLimit(5) + for fname, repoPath := range allFiles { fpath := filepath.Join(dir, fname) - if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + if testutil.FileExistsAndValid(fpath) { fmt.Printf("Skipping %s (already exists)\n", fname) continue } - wg.Add(1) - go func(fname, link string) { - defer wg.Done() - sem <- struct{}{} - defer func() { <-sem }() - + g.Go(func() error { dlStart := time.Now() - if err := wgetWithRetry(fname, link, dir); err != nil { - panic(fmt.Sprintf("error downloading %s: %v", fname, err)) + if err := testutil.DownloadLFSFile(fname, ref, repoPath, dir); err != nil { + return fmt.Errorf("error downloading %s: %v", fname, err) } - fmt.Printf("Downloaded %s to %s in %s \n", fname, dir, time.Since(dlStart)) - }(fname, link) + fmt.Printf("Downloaded %s to %s in %s\n", fname, dir, time.Since(dlStart)) + return nil + }) } - wg.Wait() - fmt.Printf("Downloaded %d files in %s \n", len(ldbcDataFiles), time.Since(start)) + if err := g.Wait(); err != nil { + return err + } + fmt.Printf("Downloaded %d files in %s\n", len(allFiles), time.Since(start)) + return nil } func createTestCoverageFile(path string) error { @@ -1419,7 +1423,10 @@ func run() error { x.Check(os.MkdirAll(*tmp, 0755)) } if testSuiteContainsAny("load", "all") { - downloadDataFiles() + if err := downloadDataFiles(); err != nil { + fmt.Printf("Failed to download data files: %v\n", err) + return + } } if testSuiteContainsAny("ldbc", "all") { // LDBC files go into a subdirectory because the LDBC test bulk-loads @@ -1427,7 +1434,10 @@ func run() error { // with LDBC data causes schema mismatches. ldbcDir := filepath.Join(*tmp, "ldbc") x.Check(os.MkdirAll(ldbcDir, 0755)) - downloadLDBCFiles(ldbcDir) + if err := downloadLDBCFiles(ldbcDir); err != nil { + fmt.Printf("Failed to download LDBC files: %v\n", err) + return + } } for i, task := range valid { select { @@ -1478,6 +1488,9 @@ func main() { procId = rand.Intn(1000) err := run() + if !*keepData && *tmp != "" { + _ = os.RemoveAll(*tmp) + } if err != nil { os.Exit(1) } diff --git a/testutil/benchmark-data-version b/testutil/benchmark-data-version new file mode 100644 index 00000000000..ba2906d0666 --- /dev/null +++ b/testutil/benchmark-data-version @@ -0,0 +1 @@ +main diff --git a/testutil/download.go b/testutil/download.go new file mode 100644 index 00000000000..ee4ffac217d --- /dev/null +++ b/testutil/download.go @@ -0,0 +1,260 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package testutil + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "strings" + "time" +) + +const ( + defaultMaxRetries = 3 + + benchmarkRepo = "dgraph-io/dgraph-benchmarks" + + // Environment variable to override the benchmark data ref at runtime. + envDataRef = "DGRAPH_TEST_DATA_REF" + + // benchmarkDataVersionFile is the filename (co-located with this source file + // in testutil/) that pins the git ref for benchmark data downloads. + // Changing its contents invalidates CI caches (workflows key on + // hashFiles('testutil/benchmark-data-version')). + // + // To override at runtime without editing the file: + // - Set the DGRAPH_TEST_DATA_REF environment variable, or + // - Pass --data-ref= to the t test runner. + benchmarkDataVersionFile = "benchmark-data-version" +) + +var ( + // lfsPointerPrefix is the first line of every Git LFS pointer file. + lfsPointerPrefix = []byte("version https://git-lfs.github.com") + + // gzipMagic is the two-byte header for gzip files. + gzipMagic = []byte{0x1f, 0x8b} +) + +// testutilDir returns the directory containing this source file (testutil/). +func testutilDir() string { + _, thisFile, _, _ := runtime.Caller(0) + return filepath.Dir(thisFile) +} + +// BenchmarkDataRef returns the git ref (branch, tag, or SHA) to use when +// downloading benchmark data from dgraph-benchmarks. Resolution order: +// 1. refOverride argument (non-empty string, e.g. from a --data-ref CLI flag) +// 2. DGRAPH_TEST_DATA_REF environment variable +// 3. Contents of testutil/benchmark-data-version file +// 4. Falls back to "main" +func BenchmarkDataRef(refOverride string) string { + if refOverride != "" { + return refOverride + } + if v := os.Getenv(envDataRef); v != "" { + return v + } + versionFile := filepath.Join(testutilDir(), benchmarkDataVersionFile) + if data, err := os.ReadFile(versionFile); err == nil { + if ref := strings.TrimSpace(string(data)); ref != "" { + return ref + } + } + return "main" +} + +// BenchmarkRawURL returns a raw.githubusercontent.com URL for a non-LFS file +// in the dgraph-benchmarks repo at the given ref. +func BenchmarkRawURL(ref, path string) string { + return fmt.Sprintf("https://raw.githubusercontent.com/%s/%s/%s", benchmarkRepo, ref, path) +} + +// BenchmarkLFSURL returns a media.githubusercontent.com URL for an LFS-tracked +// file in the dgraph-benchmarks repo at the given ref. +func BenchmarkLFSURL(ref, path string) string { + return fmt.Sprintf("https://media.githubusercontent.com/media/%s/%s/%s", benchmarkRepo, ref, path) +} + +// DownloadFile downloads a file from url into dir/fname using wget with retry +// logic (3 Go-level attempts, exponential backoff). wget itself uses --tries=1 +// so there is a single retry layer. On final failure, any partial file is +// removed to prevent corrupt data from persisting in caches. +// +// After a successful download the file is validated with ValidateFile; if +// validation fails the file is removed and an error is returned. +func DownloadFile(fname, url, dir string) error { + fpath := filepath.Join(dir, fname) + for attempt := 1; attempt <= defaultMaxRetries; attempt++ { + cmd := exec.Command("wget", "--tries=1", "--retry-connrefused", "-O", fname, url) + cmd.Dir = dir + if out, err := cmd.CombinedOutput(); err != nil { + log.Printf("attempt %d/%d failed to download %s: %v\n%s", + attempt, defaultMaxRetries, fname, err, string(out)) + if attempt < defaultMaxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + _ = os.Remove(fpath) + return fmt.Errorf("failed to download %s after %d attempts: %w", fname, defaultMaxRetries, err) + } + if err := ValidateFile(fpath); err != nil { + _ = os.Remove(fpath) + return fmt.Errorf("downloaded file %s is invalid: %w", fname, err) + } + return nil + } + return nil +} + +// lfsPointerInfo holds the expected SHA256 and size parsed from an LFS pointer. +type lfsPointerInfo struct { + SHA256 string + Size int64 +} + +// fetchLFSPointer fetches the raw LFS pointer file for the given repo path +// and ref, then parses out the SHA256 hash and file size. +func fetchLFSPointer(ref, repoPath string) (*lfsPointerInfo, error) { + pointerURL := BenchmarkRawURL(ref, repoPath) + resp, err := http.Get(pointerURL) + if err != nil { + return nil, fmt.Errorf("fetching LFS pointer: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("LFS pointer returned status %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading LFS pointer body: %w", err) + } + + info := &lfsPointerInfo{} + for _, line := range strings.Split(string(body), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "oid sha256:") { + info.SHA256 = strings.TrimPrefix(line, "oid sha256:") + } + if strings.HasPrefix(line, "size ") { + info.Size, err = strconv.ParseInt(strings.TrimPrefix(line, "size "), 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing LFS pointer size: %w", err) + } + } + } + if info.SHA256 == "" || info.Size == 0 { + return nil, fmt.Errorf("LFS pointer missing oid or size fields") + } + return info, nil +} + +// verifyFileChecksum computes the SHA256 of the file at fpath and compares it +// against the expected hash. Also verifies the file size matches. +func verifyFileChecksum(fpath string, expected *lfsPointerInfo) error { + fi, err := os.Stat(fpath) + if err != nil { + return err + } + if fi.Size() != expected.Size { + return fmt.Errorf("size mismatch: local %d != expected %d", fi.Size(), expected.Size) + } + + f, err := os.Open(fpath) + if err != nil { + return err + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return fmt.Errorf("computing SHA256: %w", err) + } + actual := hex.EncodeToString(h.Sum(nil)) + if actual != expected.SHA256 { + return fmt.Errorf("SHA256 mismatch: local %s != expected %s", actual, expected.SHA256) + } + return nil +} + +// DownloadLFSFile downloads an LFS-tracked file and verifies its integrity +// against the LFS pointer (SHA256 + size). It first fetches the tiny pointer +// from raw.githubusercontent.com to get the expected hash and size, then +// downloads the actual content from media.githubusercontent.com and verifies. +// +// This provides full integrity verification: truncation, corruption, and +// wrong-file detection are all caught. +func DownloadLFSFile(fname, ref, repoPath, dir string) error { + pointer, err := fetchLFSPointer(ref, repoPath) + if err != nil { + log.Printf("warning: could not fetch LFS pointer for %s, falling back to basic download: %v", fname, err) + return DownloadFile(fname, BenchmarkLFSURL(ref, repoPath), dir) + } + + url := BenchmarkLFSURL(ref, repoPath) + if err := DownloadFile(fname, url, dir); err != nil { + return err + } + + fpath := filepath.Join(dir, fname) + if err := verifyFileChecksum(fpath, pointer); err != nil { + _ = os.Remove(fpath) + return fmt.Errorf("integrity check failed for %s: %w", fname, err) + } + log.Printf("verified %s: SHA256=%s size=%d", fname, pointer.SHA256, pointer.Size) + return nil +} + +// FileExistsAndValid returns true if the file at fpath exists, is a regular +// file, has size > 0, and passes ValidateFile content checks. +func FileExistsAndValid(fpath string) bool { + fi, err := os.Stat(fpath) + if err != nil || fi.IsDir() || fi.Size() == 0 { + return false + } + return ValidateFile(fpath) == nil +} + +// ValidateFile performs content-level integrity checks on a downloaded file: +// - Rejects Git LFS pointer files (text stubs left when LFS content wasn't fetched). +// - Verifies .gz files start with the gzip magic bytes (0x1f 0x8b). +func ValidateFile(fpath string) error { + f, err := os.Open(fpath) + if err != nil { + return err + } + defer f.Close() + + header := make([]byte, 64) + n, err := f.Read(header) + if err != nil { + return fmt.Errorf("reading file header: %w", err) + } + header = header[:n] + + if bytes.HasPrefix(header, lfsPointerPrefix) { + return fmt.Errorf("file is a Git LFS pointer, not actual content") + } + + if strings.HasSuffix(fpath, ".gz") { + if n < 2 || !bytes.HasPrefix(header, gzipMagic) { + return fmt.Errorf("file does not have valid gzip header") + } + } + + return nil +} From ebcd77ca8948ba04aef8b64dd77e70a593773b97 Mon Sep 17 00:00:00 2001 From: Shivaji Kharse Date: Mon, 30 Mar 2026 11:13:29 +0530 Subject: [PATCH 3/3] resolved review comments --- .../ci-dgraph-integration2-tests.yml | 4 +- .github/workflows/ci-dgraph-ldbc-tests.yml | 4 +- .github/workflows/ci-dgraph-load-tests.yml | 4 +- benchdata/benchdata.go | 454 ++++++++++++++++++ .../benchmark-data-version | 0 dgraphtest/load.go | 32 +- t/t.go | 103 +--- testutil/download.go | 260 ---------- 8 files changed, 472 insertions(+), 389 deletions(-) create mode 100644 benchdata/benchdata.go rename {testutil => benchdata}/benchmark-data-version (100%) delete mode 100644 testutil/download.go diff --git a/.github/workflows/ci-dgraph-integration2-tests.yml b/.github/workflows/ci-dgraph-integration2-tests.yml index 86fe6cca541..3365387fe71 100644 --- a/.github/workflows/ci-dgraph-integration2-tests.yml +++ b/.github/workflows/ci-dgraph-integration2-tests.yml @@ -33,7 +33,7 @@ jobs: uses: actions/cache/restore@v4 with: path: dgraphtest/datafiles - key: dataset-dgraphtest-${{ hashFiles('testutil/benchmark-data-version') }} + key: dataset-dgraphtest-${{ hashFiles('benchdata/benchmark-data-version') }} - name: Ensure datafiles directory run: mkdir -p dgraphtest/datafiles @@ -70,4 +70,4 @@ jobs: uses: actions/cache/save@v4 with: path: dgraphtest/datafiles - key: dataset-dgraphtest-${{ hashFiles('testutil/benchmark-data-version') }} + key: dataset-dgraphtest-${{ hashFiles('benchdata/benchmark-data-version') }} diff --git a/.github/workflows/ci-dgraph-ldbc-tests.yml b/.github/workflows/ci-dgraph-ldbc-tests.yml index bcb733d8dc1..f31359ffd29 100644 --- a/.github/workflows/ci-dgraph-ldbc-tests.yml +++ b/.github/workflows/ci-dgraph-ldbc-tests.yml @@ -32,7 +32,7 @@ jobs: uses: actions/cache/restore@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-ldbc-${{ hashFiles('testutil/benchmark-data-version') }} + key: dataset-ldbc-${{ hashFiles('benchdata/benchmark-data-version') }} - name: Set up Go uses: actions/setup-go@v6 @@ -76,4 +76,4 @@ jobs: uses: actions/cache/save@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-ldbc-${{ hashFiles('testutil/benchmark-data-version') }} + key: dataset-ldbc-${{ hashFiles('benchdata/benchmark-data-version') }} diff --git a/.github/workflows/ci-dgraph-load-tests.yml b/.github/workflows/ci-dgraph-load-tests.yml index b6f8d17d4f3..73db29554d6 100644 --- a/.github/workflows/ci-dgraph-load-tests.yml +++ b/.github/workflows/ci-dgraph-load-tests.yml @@ -31,7 +31,7 @@ jobs: uses: actions/cache/restore@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-load-${{ hashFiles('testutil/benchmark-data-version') }} + key: dataset-load-${{ hashFiles('benchdata/benchmark-data-version') }} - name: Set up Go uses: actions/setup-go@v6 @@ -77,4 +77,4 @@ jobs: uses: actions/cache/save@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-load-${{ hashFiles('testutil/benchmark-data-version') }} + key: dataset-load-${{ hashFiles('benchdata/benchmark-data-version') }} diff --git a/benchdata/benchdata.go b/benchdata/benchdata.go new file mode 100644 index 00000000000..9c2ec42b5c3 --- /dev/null +++ b/benchdata/benchdata.go @@ -0,0 +1,454 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ +package benchdata + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "golang.org/x/sync/errgroup" +) + +const ( + benchmarkRepo = "dgraph-io/dgraph-benchmarks" + envDataRef = "DGRAPH_TEST_DATA_REF" + dataVersionFile = "benchmark-data-version" + maxRetries = 3 + concurrentLimit = 5 +) + +// TestDataFile is a type-safe identifier for benchmark data files. +// Using a named type prevents typos and lets callers reference files +// via the exported constants instead of raw strings. +type TestDataFile string + +// Load-test data files. +const ( + OneMillionNoIndexSchema TestDataFile = "1million-noindex.schema" + OneMillionSchema TestDataFile = "1million.schema" + OneMillionRDF TestDataFile = "1million.rdf.gz" + TwentyOneMillSchema TestDataFile = "21million.schema" + TwentyOneMillRDF TestDataFile = "21million.rdf.gz" +) + +// LDBC data files. +const ( + LDBCTypesSchema TestDataFile = "ldbcTypes.schema" + LDBCDeltas TestDataFile = "Deltas.rdf" + LDBCComment TestDataFile = "comment_0.rdf" + LDBCContainerOf TestDataFile = "containerOf_0.rdf" + LDBCForum TestDataFile = "forum_0.rdf" + LDBCHasCreator TestDataFile = "hasCreator_0.rdf" + LDBCHasInterest TestDataFile = "hasInterest_0.rdf" + LDBCHasMember TestDataFile = "hasMember_0.rdf" + LDBCHasModerator TestDataFile = "hasModerator_0.rdf" + LDBCHasTag TestDataFile = "hasTag_0.rdf" + LDBCHasType TestDataFile = "hasType_0.rdf" + LDBCIsLocatedIn TestDataFile = "isLocatedIn_0.rdf" + LDBCIsPartOf TestDataFile = "isPartOf_0.rdf" + LDBCIsSubclassOf TestDataFile = "isSubclassOf_0.rdf" + LDBCKnows TestDataFile = "knows_0.rdf" + LDBCLikes TestDataFile = "likes_0.rdf" + LDBCOrganisation TestDataFile = "organisation_0.rdf" + LDBCPerson TestDataFile = "person_0.rdf" + LDBCPlace TestDataFile = "place_0.rdf" + LDBCPost TestDataFile = "post_0.rdf" + LDBCReplyOf TestDataFile = "replyOf_0.rdf" + LDBCStudyAt TestDataFile = "studyAt_0.rdf" + LDBCTag TestDataFile = "tag_0.rdf" + LDBCTagclass TestDataFile = "tagclass_0.rdf" + LDBCWorkAt TestDataFile = "workAt_0.rdf" +) + +// Pre-built file groups for common test suites. +var ( + LoadTestFiles []TestDataFile + LDBCFiles []TestDataFile + AllFiles []TestDataFile +) + +// repoFilePath maps each TestDataFile to its path within the dgraph-benchmarks repo. +var repoFilePath = map[TestDataFile]string{ + OneMillionNoIndexSchema: "data/1million-noindex.schema", + OneMillionSchema: "data/1million.schema", + OneMillionRDF: "data/1million.rdf.gz", + TwentyOneMillSchema: "data/21million.schema", + TwentyOneMillRDF: "data/21million.rdf.gz", + LDBCTypesSchema: "ldbc/sf0.3/ldbcTypes.schema", + LDBCDeltas: "ldbc/sf0.3/ldbc_rdf_0.3/Deltas.rdf", + LDBCComment: "ldbc/sf0.3/ldbc_rdf_0.3/comment_0.rdf", + LDBCContainerOf: "ldbc/sf0.3/ldbc_rdf_0.3/containerOf_0.rdf", + LDBCForum: "ldbc/sf0.3/ldbc_rdf_0.3/forum_0.rdf", + LDBCHasCreator: "ldbc/sf0.3/ldbc_rdf_0.3/hasCreator_0.rdf", + LDBCHasInterest: "ldbc/sf0.3/ldbc_rdf_0.3/hasInterest_0.rdf", + LDBCHasMember: "ldbc/sf0.3/ldbc_rdf_0.3/hasMember_0.rdf", + LDBCHasModerator: "ldbc/sf0.3/ldbc_rdf_0.3/hasModerator_0.rdf", + LDBCHasTag: "ldbc/sf0.3/ldbc_rdf_0.3/hasTag_0.rdf", + LDBCHasType: "ldbc/sf0.3/ldbc_rdf_0.3/hasType_0.rdf", + LDBCIsLocatedIn: "ldbc/sf0.3/ldbc_rdf_0.3/isLocatedIn_0.rdf", + LDBCIsPartOf: "ldbc/sf0.3/ldbc_rdf_0.3/isPartOf_0.rdf", + LDBCIsSubclassOf: "ldbc/sf0.3/ldbc_rdf_0.3/isSubclassOf_0.rdf", + LDBCKnows: "ldbc/sf0.3/ldbc_rdf_0.3/knows_0.rdf", + LDBCLikes: "ldbc/sf0.3/ldbc_rdf_0.3/likes_0.rdf", + LDBCOrganisation: "ldbc/sf0.3/ldbc_rdf_0.3/organisation_0.rdf", + LDBCPerson: "ldbc/sf0.3/ldbc_rdf_0.3/person_0.rdf", + LDBCPlace: "ldbc/sf0.3/ldbc_rdf_0.3/place_0.rdf", + LDBCPost: "ldbc/sf0.3/ldbc_rdf_0.3/post_0.rdf", + LDBCReplyOf: "ldbc/sf0.3/ldbc_rdf_0.3/replyOf_0.rdf", + LDBCStudyAt: "ldbc/sf0.3/ldbc_rdf_0.3/studyAt_0.rdf", + LDBCTag: "ldbc/sf0.3/ldbc_rdf_0.3/tag_0.rdf", + LDBCTagclass: "ldbc/sf0.3/ldbc_rdf_0.3/tagclass_0.rdf", + LDBCWorkAt: "ldbc/sf0.3/ldbc_rdf_0.3/workAt_0.rdf", +} + +func init() { + LoadTestFiles = []TestDataFile{ + OneMillionNoIndexSchema, OneMillionSchema, OneMillionRDF, + TwentyOneMillSchema, TwentyOneMillRDF, + } + LDBCFiles = []TestDataFile{ + LDBCTypesSchema, + LDBCDeltas, LDBCComment, LDBCContainerOf, LDBCForum, + LDBCHasCreator, LDBCHasInterest, LDBCHasMember, LDBCHasModerator, + LDBCHasTag, LDBCHasType, LDBCIsLocatedIn, LDBCIsPartOf, + LDBCIsSubclassOf, LDBCKnows, LDBCLikes, LDBCOrganisation, + LDBCPerson, LDBCPlace, LDBCPost, LDBCReplyOf, + LDBCStudyAt, LDBCTag, LDBCTagclass, LDBCWorkAt, + } + AllFiles = make([]TestDataFile, 0, len(repoFilePath)) + for k := range repoFilePath { + AllFiles = append(AllFiles, k) + } +} + +// DataRef returns the git ref (branch, tag, or SHA) to use when downloading +// benchmark data. Resolution order: +// 1. refOverride argument (e.g. from a --data-ref CLI flag) +// 2. DGRAPH_TEST_DATA_REF environment variable +// 3. Contents of benchdata/benchmark-data-version file +// 4. Falls back to "main" +func DataRef(refOverride string) string { + if refOverride != "" { + return refOverride + } + if v := os.Getenv(envDataRef); v != "" { + return v + } + versionFile := filepath.Join(pkgDir(), dataVersionFile) + if data, err := os.ReadFile(versionFile); err == nil { + if ref := strings.TrimSpace(string(data)); ref != "" { + return ref + } + } + return "main" +} + +// EnsureFiles ensures the specified test data files exist in destDir, +// downloading any missing or invalid ones from dgraph-benchmarks at the given +// git ref. If no files are specified, all registered files are ensured. +// Returns the local paths of all requested files. +// +// LFS-tracked files are automatically detected via the GitHub Contents API +// and downloaded from the appropriate CDN URL. SHA256 checksums from the LFS +// pointer are verified after download. +func EnsureFiles(destDir, ref string, files ...TestDataFile) ([]string, error) { + if len(files) == 0 { + files = AllFiles + } + if err := os.MkdirAll(destDir, 0o755); err != nil { + return nil, fmt.Errorf("creating dest dir: %w", err) + } + + paths := make([]string, len(files)) + for i, f := range files { + paths[i] = filepath.Join(destDir, string(f)) + } + + g, _ := errgroup.WithContext(context.Background()) + g.SetLimit(concurrentLimit) + + for i, f := range files { + dest := paths[i] + if fileExistsAndValid(dest) { + log.Printf("Skipping %s (already exists)", f) + continue + } + g.Go(func() error { + start := time.Now() + if err := resolveAndDownload(f, ref, dest); err != nil { + return fmt.Errorf("downloading %s: %w", f, err) + } + log.Printf("Downloaded %s in %s", f, time.Since(start)) + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + return paths, nil +} + +type lfsPointerInfo struct { + OID string + Size int64 +} + +// detectLFS checks whether a file in the benchmark repo is LFS-tracked by +// fetching the raw git blob via the GitHub Contents API. For LFS-tracked files +// the API returns the pointer text (~130 bytes) regardless of the actual file +// size, so this works reliably for files of any size. +func detectLFS(repoPath, ref string) (*lfsPointerInfo, error) { + token := os.Getenv("GITHUB_TOKEN") + content, err := fetchRawBlob(repoPath, ref, token) + if err != nil { + return nil, err + } + return parseLFSPointer(content), nil +} + +func fetchRawBlob(path, ref, token string) ([]byte, error) { + url := fmt.Sprintf("https://api.github.com/repos/%s/contents/%s?ref=%s", + benchmarkRepo, path, ref) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/vnd.github.v3.raw") + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("GitHub Contents API raw %d: %s", resp.StatusCode, body) + } + + // LFS pointers are ~130 bytes. Cap the read to avoid buffering large + // non-LFS file content just for the LFS check. + const maxPointerSize = 1024 + buf := make([]byte, maxPointerSize) + n, err := io.ReadFull(resp.Body, buf) + if err == io.ErrUnexpectedEOF { + // File is smaller than 1KB -- that's fine, use what we got. + return buf[:n], nil + } + if err != nil { + return nil, err + } + // Read exactly 1KB. If there's more data, it's not an LFS pointer. + return buf[:n], nil +} + +func parseLFSPointer(content []byte) *lfsPointerInfo { + s := string(content) + if !strings.HasPrefix(s, "version https://git-lfs.github.com/spec/v1") { + return nil + } + info := &lfsPointerInfo{} + for _, line := range strings.Split(s, "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "oid sha256:") { + info.OID = strings.TrimPrefix(line, "oid sha256:") + } + if strings.HasPrefix(line, "size ") { + if _, err := fmt.Sscanf(line, "size %d", &info.Size); err != nil { + return nil + } + } + } + if info.OID == "" { + return nil + } + return info +} + +// resolveAndDownload determines the correct download URL for a file (auto- +// detecting LFS), downloads it with retry, and validates the result. +func resolveAndDownload(file TestDataFile, ref, destPath string) error { + path, ok := repoFilePath[file] + if !ok { + return fmt.Errorf("unknown test data file: %s", file) + } + + lfsInfo, err := detectLFS(path, ref) + if err != nil { + // GitHub API unavailable (rate limit, network). Try media URL first + // (handles LFS), fall back to raw URL (handles non-LFS). + log.Printf("warning: LFS detection failed for %s, trying both CDN URLs: %v", file, err) + mediaURL := fmt.Sprintf("https://media.githubusercontent.com/media/%s/%s/%s", + benchmarkRepo, ref, path) + rawURL := fmt.Sprintf("https://raw.githubusercontent.com/%s/%s/%s", + benchmarkRepo, ref, path) + if dlErr := downloadWithRetry(mediaURL, destPath); dlErr == nil { + if valErr := validateFile(destPath); valErr == nil { + return nil + } + } + if dlErr := downloadWithRetry(rawURL, destPath); dlErr != nil { + return dlErr + } + return validateFile(destPath) + } + + if lfsInfo != nil { + url := fmt.Sprintf("https://media.githubusercontent.com/media/%s/%s/%s", + benchmarkRepo, ref, path) + if err := downloadWithRetry(url, destPath); err != nil { + return err + } + if err := verifyChecksum(destPath, lfsInfo); err != nil { + _ = os.Remove(destPath) + return fmt.Errorf("integrity check failed: %w", err) + } + log.Printf("verified %s: SHA256=%s size=%d", file, lfsInfo.OID, lfsInfo.Size) + } else { + url := fmt.Sprintf("https://raw.githubusercontent.com/%s/%s/%s", + benchmarkRepo, ref, path) + if err := downloadWithRetry(url, destPath); err != nil { + return err + } + } + + return validateFile(destPath) +} + +// downloadWithRetry downloads a URL to destPath with Go-level retry +// (3 attempts, exponential backoff). On final failure the partial file +// is removed. +func downloadWithRetry(url, destPath string) error { + token := os.Getenv("GITHUB_TOKEN") + for attempt := 1; attempt <= maxRetries; attempt++ { + if err := downloadToFile(url, destPath, token); err != nil { + log.Printf("attempt %d/%d failed to download %s: %v", + attempt, maxRetries, filepath.Base(destPath), err) + _ = os.Remove(destPath) + if attempt < maxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + return fmt.Errorf("failed after %d attempts: %w", maxRetries, err) + } + return nil + } + return nil +} + +// downloadToFile streams a URL to a local file. +func downloadToFile(url, destPath, token string) error { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + if token != "" && strings.Contains(url, "githubusercontent.com") { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP %d from %s", resp.StatusCode, url) + } + + f, err := os.Create(destPath) + if err != nil { + return err + } + defer f.Close() + + if _, err := io.Copy(f, resp.Body); err != nil { + return err + } + return nil +} + +var ( + lfsPointerPrefix = []byte("version https://git-lfs.github.com") + gzipMagic = []byte{0x1f, 0x8b} +) + +func fileExistsAndValid(fpath string) bool { + fi, err := os.Stat(fpath) + if err != nil || fi.IsDir() || fi.Size() == 0 { + return false + } + return validateFile(fpath) == nil +} + +func validateFile(fpath string) error { + f, err := os.Open(fpath) + if err != nil { + return err + } + defer f.Close() + + header := make([]byte, 64) + n, err := f.Read(header) + if err != nil { + return fmt.Errorf("reading file header: %w", err) + } + header = header[:n] + + if bytes.HasPrefix(header, lfsPointerPrefix) { + return fmt.Errorf("file is a Git LFS pointer, not actual content") + } + if strings.HasSuffix(fpath, ".gz") && (n < 2 || !bytes.HasPrefix(header, gzipMagic)) { + return fmt.Errorf("file does not have valid gzip header") + } + return nil +} + +func verifyChecksum(fpath string, info *lfsPointerInfo) error { + fi, err := os.Stat(fpath) + if err != nil { + return err + } + if fi.Size() != info.Size { + return fmt.Errorf("size mismatch: local %d != expected %d", fi.Size(), info.Size) + } + + f, err := os.Open(fpath) + if err != nil { + return err + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return fmt.Errorf("computing SHA256: %w", err) + } + actual := hex.EncodeToString(h.Sum(nil)) + if actual != info.OID { + return fmt.Errorf("SHA256 mismatch: local %s != expected %s", actual, info.OID) + } + return nil +} + +func pkgDir() string { + _, thisFile, _, _ := runtime.Caller(0) + return filepath.Dir(thisFile) +} diff --git a/testutil/benchmark-data-version b/benchdata/benchmark-data-version similarity index 100% rename from testutil/benchmark-data-version rename to benchdata/benchmark-data-version diff --git a/dgraphtest/load.go b/dgraphtest/load.go index 278b604a1db..35f252737ed 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -24,9 +24,9 @@ import ( "github.com/pkg/errors" "github.com/dgraph-io/dgo/v250/protos/api" + "github.com/dgraph-io/dgraph/v25/benchdata" "github.com/dgraph-io/dgraph/v25/dgraphapi" "github.com/dgraph-io/dgraph/v25/enc" - "github.com/dgraph-io/dgraph/v25/testutil" "github.com/dgraph-io/dgraph/v25/x" ) @@ -41,15 +41,6 @@ func (c *LocalCluster) HostDgraphBinaryPath() string { return filepath.Join(c.tempBinDir, "dgraph_host") } -// datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo. -// URLs are constructed at runtime by combining these with the configured ref. -var datafilePaths = map[string]string{ - "1million.schema": "data/1million.schema", - "1million.rdf.gz": "data/1million.rdf.gz", - "21million.schema": "data/21million.schema", - "21million.rdf.gz": "data/21million.rdf.gz", -} - type DatasetType int type Dataset struct { name string @@ -593,22 +584,9 @@ func (d *Dataset) GqlSchemaPath() string { } func (d *Dataset) ensureFile(filename string) string { - fullPath := filepath.Join(datasetFilesPath, filename) - if !testutil.FileExistsAndValid(fullPath) { - repoPath, ok := datafilePaths[filename] - if !ok { - panic(fmt.Sprintf("dataset file %s not found in datafilePaths map", filename)) - } - ref := testutil.BenchmarkDataRef("") - var err error - if strings.HasSuffix(filename, ".rdf.gz") { - err = testutil.DownloadLFSFile(filename, ref, repoPath, datasetFilesPath) - } else { - err = testutil.DownloadFile(filename, testutil.BenchmarkRawURL(ref, repoPath), datasetFilesPath) - } - if err != nil { - panic(fmt.Sprintf("failed to download %s: %v", filename, err)) - } + paths, err := benchdata.EnsureFiles(datasetFilesPath, benchdata.DataRef(""), benchdata.TestDataFile(filename)) + if err != nil { + panic(fmt.Sprintf("failed to ensure %s: %v", filename, err)) } - return fullPath + return paths[0] } diff --git a/t/t.go b/t/t.go index adc9ce6bef4..831031c0df2 100644 --- a/t/t.go +++ b/t/t.go @@ -33,9 +33,9 @@ import ( "github.com/docker/docker/client" "github.com/golang/glog" "github.com/spf13/pflag" - "golang.org/x/sync/errgroup" "golang.org/x/tools/go/packages" + "github.com/dgraph-io/dgraph/v25/benchdata" "github.com/dgraph-io/dgraph/v25/testutil" "github.com/dgraph-io/dgraph/v25/x" "github.com/dgraph-io/ristretto/v2/z" @@ -1128,72 +1128,15 @@ func isHeavyPackage(pkg string) bool { return false } -// datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo. -// URLs are constructed at runtime from BenchmarkDataRef(). -var datafilePaths = map[string]string{ - "1million-noindex.schema": "data/1million-noindex.schema", - "1million.schema": "data/1million.schema", - "1million.rdf.gz": "data/1million.rdf.gz", - "21million.schema": "data/21million.schema", - "21million.rdf.gz": "data/21million.rdf.gz", -} - -var ldbcRdfFileNames = [...]string{ - "Deltas.rdf", - "comment_0.rdf", - "containerOf_0.rdf", - "forum_0.rdf", - "hasCreator_0.rdf", - "hasInterest_0.rdf", - "hasMember_0.rdf", - "hasModerator_0.rdf", - "hasTag_0.rdf", - "hasType_0.rdf", - "isLocatedIn_0.rdf", - "isPartOf_0.rdf", - "isSubclassOf_0.rdf", - "knows_0.rdf", - "likes_0.rdf", - "organisation_0.rdf", - "person_0.rdf", - "place_0.rdf", - "post_0.rdf", - "replyOf_0.rdf", - "studyAt_0.rdf", - "tag_0.rdf", - "tagclass_0.rdf", - "workAt_0.rdf", -} - -// ldbcFilePaths maps filenames to their paths inside the dgraph-benchmarks repo. -var ldbcFilePaths = map[string]string{ - "ldbcTypes.schema": "ldbc/sf0.3/ldbcTypes.schema", -} - func downloadDataFiles() error { if !*downloadResources { fmt.Print("Skipping downloading of resources\n") return nil } - ref := testutil.BenchmarkDataRef(*dataRef) + ref := benchdata.DataRef(*dataRef) fmt.Printf("Using benchmark data ref: %s\n", ref) - for fname, repoPath := range datafilePaths { - fpath := filepath.Join(*tmp, fname) - if testutil.FileExistsAndValid(fpath) { - fmt.Printf("Skipping %s (already exists)\n", fname) - continue - } - var err error - if strings.HasSuffix(fname, ".rdf.gz") { - err = testutil.DownloadLFSFile(fname, ref, repoPath, *tmp) - } else { - err = testutil.DownloadFile(fname, testutil.BenchmarkRawURL(ref, repoPath), *tmp) - } - if err != nil { - return fmt.Errorf("error downloading %s: %v", fname, err) - } - } - return nil + _, err := benchdata.EnsureFiles(*tmp, ref, benchdata.LoadTestFiles...) + return err } func downloadLDBCFiles(dir string) error { @@ -1201,42 +1144,10 @@ func downloadLDBCFiles(dir string) error { fmt.Print("Skipping downloading of resources\n") return nil } - - ref := testutil.BenchmarkDataRef(*dataRef) + ref := benchdata.DataRef(*dataRef) fmt.Printf("Using benchmark data ref: %s\n", ref) - - // All LDBC files (schema + RDF) are LFS-tracked. - allFiles := make(map[string]string) - for fname, repoPath := range ldbcFilePaths { - allFiles[fname] = repoPath - } - for _, name := range ldbcRdfFileNames { - allFiles[name] = "ldbc/sf0.3/ldbc_rdf_0.3/" + name - } - - start := time.Now() - g, _ := errgroup.WithContext(context.Background()) - g.SetLimit(5) - for fname, repoPath := range allFiles { - fpath := filepath.Join(dir, fname) - if testutil.FileExistsAndValid(fpath) { - fmt.Printf("Skipping %s (already exists)\n", fname) - continue - } - g.Go(func() error { - dlStart := time.Now() - if err := testutil.DownloadLFSFile(fname, ref, repoPath, dir); err != nil { - return fmt.Errorf("error downloading %s: %v", fname, err) - } - fmt.Printf("Downloaded %s to %s in %s\n", fname, dir, time.Since(dlStart)) - return nil - }) - } - if err := g.Wait(); err != nil { - return err - } - fmt.Printf("Downloaded %d files in %s\n", len(allFiles), time.Since(start)) - return nil + _, err := benchdata.EnsureFiles(dir, ref, benchdata.LDBCFiles...) + return err } func createTestCoverageFile(path string) error { diff --git a/testutil/download.go b/testutil/download.go deleted file mode 100644 index ee4ffac217d..00000000000 --- a/testutil/download.go +++ /dev/null @@ -1,260 +0,0 @@ -/* - * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. - * SPDX-License-Identifier: Apache-2.0 - */ - -package testutil - -import ( - "bytes" - "crypto/sha256" - "encoding/hex" - "fmt" - "io" - "log" - "net/http" - "os" - "os/exec" - "path/filepath" - "runtime" - "strconv" - "strings" - "time" -) - -const ( - defaultMaxRetries = 3 - - benchmarkRepo = "dgraph-io/dgraph-benchmarks" - - // Environment variable to override the benchmark data ref at runtime. - envDataRef = "DGRAPH_TEST_DATA_REF" - - // benchmarkDataVersionFile is the filename (co-located with this source file - // in testutil/) that pins the git ref for benchmark data downloads. - // Changing its contents invalidates CI caches (workflows key on - // hashFiles('testutil/benchmark-data-version')). - // - // To override at runtime without editing the file: - // - Set the DGRAPH_TEST_DATA_REF environment variable, or - // - Pass --data-ref= to the t test runner. - benchmarkDataVersionFile = "benchmark-data-version" -) - -var ( - // lfsPointerPrefix is the first line of every Git LFS pointer file. - lfsPointerPrefix = []byte("version https://git-lfs.github.com") - - // gzipMagic is the two-byte header for gzip files. - gzipMagic = []byte{0x1f, 0x8b} -) - -// testutilDir returns the directory containing this source file (testutil/). -func testutilDir() string { - _, thisFile, _, _ := runtime.Caller(0) - return filepath.Dir(thisFile) -} - -// BenchmarkDataRef returns the git ref (branch, tag, or SHA) to use when -// downloading benchmark data from dgraph-benchmarks. Resolution order: -// 1. refOverride argument (non-empty string, e.g. from a --data-ref CLI flag) -// 2. DGRAPH_TEST_DATA_REF environment variable -// 3. Contents of testutil/benchmark-data-version file -// 4. Falls back to "main" -func BenchmarkDataRef(refOverride string) string { - if refOverride != "" { - return refOverride - } - if v := os.Getenv(envDataRef); v != "" { - return v - } - versionFile := filepath.Join(testutilDir(), benchmarkDataVersionFile) - if data, err := os.ReadFile(versionFile); err == nil { - if ref := strings.TrimSpace(string(data)); ref != "" { - return ref - } - } - return "main" -} - -// BenchmarkRawURL returns a raw.githubusercontent.com URL for a non-LFS file -// in the dgraph-benchmarks repo at the given ref. -func BenchmarkRawURL(ref, path string) string { - return fmt.Sprintf("https://raw.githubusercontent.com/%s/%s/%s", benchmarkRepo, ref, path) -} - -// BenchmarkLFSURL returns a media.githubusercontent.com URL for an LFS-tracked -// file in the dgraph-benchmarks repo at the given ref. -func BenchmarkLFSURL(ref, path string) string { - return fmt.Sprintf("https://media.githubusercontent.com/media/%s/%s/%s", benchmarkRepo, ref, path) -} - -// DownloadFile downloads a file from url into dir/fname using wget with retry -// logic (3 Go-level attempts, exponential backoff). wget itself uses --tries=1 -// so there is a single retry layer. On final failure, any partial file is -// removed to prevent corrupt data from persisting in caches. -// -// After a successful download the file is validated with ValidateFile; if -// validation fails the file is removed and an error is returned. -func DownloadFile(fname, url, dir string) error { - fpath := filepath.Join(dir, fname) - for attempt := 1; attempt <= defaultMaxRetries; attempt++ { - cmd := exec.Command("wget", "--tries=1", "--retry-connrefused", "-O", fname, url) - cmd.Dir = dir - if out, err := cmd.CombinedOutput(); err != nil { - log.Printf("attempt %d/%d failed to download %s: %v\n%s", - attempt, defaultMaxRetries, fname, err, string(out)) - if attempt < defaultMaxRetries { - time.Sleep(time.Duration(attempt*5) * time.Second) - continue - } - _ = os.Remove(fpath) - return fmt.Errorf("failed to download %s after %d attempts: %w", fname, defaultMaxRetries, err) - } - if err := ValidateFile(fpath); err != nil { - _ = os.Remove(fpath) - return fmt.Errorf("downloaded file %s is invalid: %w", fname, err) - } - return nil - } - return nil -} - -// lfsPointerInfo holds the expected SHA256 and size parsed from an LFS pointer. -type lfsPointerInfo struct { - SHA256 string - Size int64 -} - -// fetchLFSPointer fetches the raw LFS pointer file for the given repo path -// and ref, then parses out the SHA256 hash and file size. -func fetchLFSPointer(ref, repoPath string) (*lfsPointerInfo, error) { - pointerURL := BenchmarkRawURL(ref, repoPath) - resp, err := http.Get(pointerURL) - if err != nil { - return nil, fmt.Errorf("fetching LFS pointer: %w", err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("LFS pointer returned status %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading LFS pointer body: %w", err) - } - - info := &lfsPointerInfo{} - for _, line := range strings.Split(string(body), "\n") { - line = strings.TrimSpace(line) - if strings.HasPrefix(line, "oid sha256:") { - info.SHA256 = strings.TrimPrefix(line, "oid sha256:") - } - if strings.HasPrefix(line, "size ") { - info.Size, err = strconv.ParseInt(strings.TrimPrefix(line, "size "), 10, 64) - if err != nil { - return nil, fmt.Errorf("parsing LFS pointer size: %w", err) - } - } - } - if info.SHA256 == "" || info.Size == 0 { - return nil, fmt.Errorf("LFS pointer missing oid or size fields") - } - return info, nil -} - -// verifyFileChecksum computes the SHA256 of the file at fpath and compares it -// against the expected hash. Also verifies the file size matches. -func verifyFileChecksum(fpath string, expected *lfsPointerInfo) error { - fi, err := os.Stat(fpath) - if err != nil { - return err - } - if fi.Size() != expected.Size { - return fmt.Errorf("size mismatch: local %d != expected %d", fi.Size(), expected.Size) - } - - f, err := os.Open(fpath) - if err != nil { - return err - } - defer f.Close() - - h := sha256.New() - if _, err := io.Copy(h, f); err != nil { - return fmt.Errorf("computing SHA256: %w", err) - } - actual := hex.EncodeToString(h.Sum(nil)) - if actual != expected.SHA256 { - return fmt.Errorf("SHA256 mismatch: local %s != expected %s", actual, expected.SHA256) - } - return nil -} - -// DownloadLFSFile downloads an LFS-tracked file and verifies its integrity -// against the LFS pointer (SHA256 + size). It first fetches the tiny pointer -// from raw.githubusercontent.com to get the expected hash and size, then -// downloads the actual content from media.githubusercontent.com and verifies. -// -// This provides full integrity verification: truncation, corruption, and -// wrong-file detection are all caught. -func DownloadLFSFile(fname, ref, repoPath, dir string) error { - pointer, err := fetchLFSPointer(ref, repoPath) - if err != nil { - log.Printf("warning: could not fetch LFS pointer for %s, falling back to basic download: %v", fname, err) - return DownloadFile(fname, BenchmarkLFSURL(ref, repoPath), dir) - } - - url := BenchmarkLFSURL(ref, repoPath) - if err := DownloadFile(fname, url, dir); err != nil { - return err - } - - fpath := filepath.Join(dir, fname) - if err := verifyFileChecksum(fpath, pointer); err != nil { - _ = os.Remove(fpath) - return fmt.Errorf("integrity check failed for %s: %w", fname, err) - } - log.Printf("verified %s: SHA256=%s size=%d", fname, pointer.SHA256, pointer.Size) - return nil -} - -// FileExistsAndValid returns true if the file at fpath exists, is a regular -// file, has size > 0, and passes ValidateFile content checks. -func FileExistsAndValid(fpath string) bool { - fi, err := os.Stat(fpath) - if err != nil || fi.IsDir() || fi.Size() == 0 { - return false - } - return ValidateFile(fpath) == nil -} - -// ValidateFile performs content-level integrity checks on a downloaded file: -// - Rejects Git LFS pointer files (text stubs left when LFS content wasn't fetched). -// - Verifies .gz files start with the gzip magic bytes (0x1f 0x8b). -func ValidateFile(fpath string) error { - f, err := os.Open(fpath) - if err != nil { - return err - } - defer f.Close() - - header := make([]byte, 64) - n, err := f.Read(header) - if err != nil { - return fmt.Errorf("reading file header: %w", err) - } - header = header[:n] - - if bytes.HasPrefix(header, lfsPointerPrefix) { - return fmt.Errorf("file is a Git LFS pointer, not actual content") - } - - if strings.HasSuffix(fpath, ".gz") { - if n < 2 || !bytes.HasPrefix(header, gzipMagic) { - return fmt.Errorf("file does not have valid gzip header") - } - } - - return nil -}