diff --git a/go/cmd/gitter/gitter.go b/go/cmd/gitter/gitter.go index 93aa77bb826..02cfd8b4337 100644 --- a/go/cmd/gitter/gitter.go +++ b/go/cmd/gitter/gitter.go @@ -46,6 +46,7 @@ var ( persistancePath = path.Join(defaultGitterWorkDir, persistanceFileName) gitStorePath = path.Join(defaultGitterWorkDir, gitStoreFileName) fetchTimeout time.Duration + semaphore chan struct{} ) const shutdownTimeout = 10 * time.Second @@ -205,8 +206,11 @@ func main() { port := flag.Int("port", 8888, "Listen port") workDir := flag.String("work_dir", defaultGitterWorkDir, "Work directory") flag.DurationVar(&fetchTimeout, "fetch_timeout", time.Hour, "Fetch timeout duration") + concurrentLimit := flag.Int("concurrent_limit", 100, "Concurrent limit for unique requests") flag.Parse() + semaphore = make(chan struct{}, *concurrentLimit) + persistancePath = path.Join(*workDir, persistanceFileName) gitStorePath = path.Join(*workDir, gitStoreFileName) @@ -296,6 +300,10 @@ func gitHandler(w http.ResponseWriter, r *http.Request) { // the repo once, and always with force update. // This is a tradeoff for simplicity to avoid having to setup locks per repo. fileData, err, _ := g.Do(url, func() (any, error) { + semaphore <- struct{}{} + defer func() { <-semaphore }() + logger.DebugContext(ctx, "Concurrent processes", slog.Int("count", len(semaphore))) + return fetchBlob(ctx, url, forceUpdate) }) diff --git a/go/cmd/gitter/gitter_test.go b/go/cmd/gitter/gitter_test.go index 38cb7603209..88676bc5569 100644 --- a/go/cmd/gitter/gitter_test.go +++ b/go/cmd/gitter/gitter_test.go @@ -90,6 +90,7 @@ func TestGitHandler_Integration(t *testing.T) { // but for this simple script we modify package globals. gitStorePath = tmpDir fetchTimeout = time.Minute + semaphore = make(chan struct{}, 100) // Ensure lastFetch map is initialized if lastFetch == nil { loadMap()