|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "flag" |
| 6 | + "log/slog" |
| 7 | + "os" |
| 8 | + "sync" |
| 9 | + |
| 10 | + "cloud.google.com/go/datastore" |
| 11 | + "cloud.google.com/go/pubsub/v2" |
| 12 | + "cloud.google.com/go/storage" |
| 13 | + db "github.com/google/osv.dev/go/internal/database/datastore" |
| 14 | + "github.com/google/osv.dev/go/internal/models" |
| 15 | + "github.com/google/osv.dev/go/logger" |
| 16 | +) |
| 17 | + |
| 18 | +type ImporterConfig struct { |
| 19 | + SourceRepoStore models.SourceRepositoryStore |
| 20 | + PubSubClient *pubsub.Client |
| 21 | + GCSClient *storage.Client |
| 22 | + |
| 23 | + StrictValidation bool |
| 24 | +} |
| 25 | + |
| 26 | +func main() { |
| 27 | + logger.InitGlobalLogger() |
| 28 | + |
| 29 | + strictValidation := flag.Bool("strict-validation", false, "Fail to import entries that do not pass validation.") |
| 30 | + delete := flag.Bool("delete", false, "Bypass importing and propagate record deletions from source to Datastore") |
| 31 | + deleteThresholdPct := flag.Float64("delete-threshold-pct", 10.0, "More than this percent of records for a given source being deleted triggers an error") |
| 32 | + |
| 33 | + flag.Parse() |
| 34 | + |
| 35 | + project := os.Getenv("GOOGLE_CLOUD_PROJECT") |
| 36 | + if project == "" { |
| 37 | + logger.Fatal("GOOGLE_CLOUD_PROJECT environment variable is not set") |
| 38 | + } |
| 39 | + |
| 40 | + config := ImporterConfig{ |
| 41 | + StrictValidation: *strictValidation, |
| 42 | + } |
| 43 | + |
| 44 | + datastoreClient, err := datastore.NewClient(context.Background(), project) |
| 45 | + if err != nil { |
| 46 | + logger.Fatal("Failed to create datastore client", slog.Any("error", err)) |
| 47 | + } |
| 48 | + config.SourceRepoStore = db.NewSourceRepositoryStore(datastoreClient) |
| 49 | + |
| 50 | + config.PubSubClient, err = pubsub.NewClient(context.Background(), project) |
| 51 | + if err != nil { |
| 52 | + logger.Fatal("Failed to create pubsub client", slog.Any("error", err)) |
| 53 | + } |
| 54 | + |
| 55 | + config.GCSClient, err = storage.NewClient(context.Background()) |
| 56 | + if err != nil { |
| 57 | + logger.Fatal("Failed to create GCS client", slog.Any("error", err)) |
| 58 | + } |
| 59 | + |
| 60 | + if *delete { |
| 61 | + _ = deleteThresholdPct |
| 62 | + logger.Fatal("delete not implemented yet") |
| 63 | + } |
| 64 | + |
| 65 | + if err := RunImporter(context.Background(), config); err != nil { |
| 66 | + logger.Fatal("Importer failed", slog.Any("error", err)) |
| 67 | + } |
| 68 | +} |
| 69 | + |
| 70 | +func RunImporter(ctx context.Context, config ImporterConfig) error { |
| 71 | + logger.Info("Importer started") |
| 72 | + var wg sync.WaitGroup |
| 73 | + for sourceRepo, err := range config.SourceRepoStore.All(ctx) { |
| 74 | + if err != nil { |
| 75 | + return err |
| 76 | + } |
| 77 | + wg.Go(func() { |
| 78 | + switch sourceRepo.Type { |
| 79 | + case models.SourceRepositoryTypeGit: |
| 80 | + if err := handleImportGit(ctx, config, sourceRepo); err != nil { |
| 81 | + logger.Error("Failed to import git source repository", slog.Any("error", err), slog.String("source_repository", sourceRepo.Name)) |
| 82 | + } |
| 83 | + case models.SourceRepositoryTypeBucket: |
| 84 | + if err := handleImportBucket(ctx, config, sourceRepo); err != nil { |
| 85 | + logger.Error("Failed to import bucket source repository", slog.Any("error", err), slog.String("source_repository", sourceRepo.Name)) |
| 86 | + } |
| 87 | + case models.SourceRepositoryTypeREST: |
| 88 | + if err := handleImportREST(ctx, config, sourceRepo); err != nil { |
| 89 | + logger.Error("Failed to import REST source repository", slog.Any("error", err), slog.String("source_repository", sourceRepo.Name)) |
| 90 | + } |
| 91 | + default: |
| 92 | + logger.Error("Unsupported source repository type", slog.String("source_repository", sourceRepo.Name), slog.Any("type", sourceRepo.Type)) |
| 93 | + } |
| 94 | + }) |
| 95 | + } |
| 96 | + wg.Wait() |
| 97 | + return nil |
| 98 | +} |
0 commit comments