-
Notifications
You must be signed in to change notification settings - Fork 975
Fix server hang after "Running scheduled diagnostics refresh" #3628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
360eba7
3dd8169
3091afe
e2074f9
ccd5658
60619b9
1dd960a
b876992
9e7b395
371c3ff
e9d9fb4
22abe61
72f2544
ce17015
a2d063e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,10 @@ const ( | |
| UpdateReasonIdleCleanDiskCache | ||
| ) | ||
|
|
||
| // watchRequestTimeout is the maximum time to wait for the client to respond to | ||
| // a WatchFiles or UnwatchFiles request while holding the watches mutex. | ||
| const watchRequestTimeout = time.Second | ||
|
andrewbranch marked this conversation as resolved.
|
||
|
|
||
| // SessionOptions are the immutable initialization options for a session. | ||
| // Snapshots may reference them as a pointer since they never change. | ||
| type SessionOptions struct { | ||
|
|
@@ -156,8 +160,7 @@ type Session struct { | |
|
|
||
| // watches tracks the current watch globs and how many individual WatchedFiles | ||
| // are using each glob. | ||
| watches map[fileSystemWatcherKey]*fileSystemWatcherValue | ||
| watchesMu sync.Mutex | ||
| watches *watchRegistry | ||
|
|
||
| // globalDiagPublishPending is set to true when a global diagnostics publish | ||
| // task should be enqueued. It is reset when the task runs, coalescing multiple | ||
|
|
@@ -222,7 +225,7 @@ func NewSession(init *SessionInit) *Session { | |
| initialUserPreferences: lsutil.NewDefaultUserPreferences(), | ||
| workspaceUserPreferences: lsutil.NewDefaultUserPreferences(), | ||
| pendingATAChanges: make(map[tspath.Path]*ATAStateChange), | ||
| watches: make(map[fileSystemWatcherKey]*fileSystemWatcherValue), | ||
| watches: newWatchRegistry(), | ||
| } | ||
|
|
||
| if init.Options.TypingsLocation != "" && init.NpmExecutor != nil { | ||
|
|
@@ -1133,28 +1136,26 @@ func (s *Session) WaitForBackgroundTasks() { | |
|
|
||
| func updateWatch[T any](ctx context.Context, session *Session, logger logging.Logger, oldWatcher, newWatcher *WatchedFiles[T]) []error { | ||
| var errors []error | ||
| session.watchesMu.Lock() | ||
| defer session.watchesMu.Unlock() | ||
| session.watches.mu.Lock() | ||
| defer session.watches.mu.Unlock() | ||
| // Use a timeout for client requests to prevent holding the mutex indefinitely | ||
| // if the client is slow or unresponsive. | ||
| watchCtx, watchCancel := context.WithTimeout(ctx, watchRequestTimeout) | ||
| defer watchCancel() | ||
| if newWatcher != nil { | ||
| w := newWatcher.Watchers() | ||
| watchers := append(w.WorkspaceWatchers, w.OutsideWorkspaceWatchers...) | ||
| if len(watchers) > 0 { | ||
| var newWatchers collections.OrderedMap[WatcherID, *lsproto.FileSystemWatcher] | ||
| for i, watcher := range watchers { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := session.watches[key] | ||
| globId := WatcherID(fmt.Sprintf("%s.%d", w.WatcherID, i)) | ||
| if value == nil { | ||
| value = &fileSystemWatcherValue{id: globId} | ||
| session.watches[key] = value | ||
| } | ||
| value.count++ | ||
| if value.count == 1 { | ||
| if session.watches.Acquire(watcher, globId) { | ||
| newWatchers.Set(globId, watcher) | ||
| } | ||
| } | ||
| for id, watcher := range newWatchers.Entries() { | ||
| if err := session.client.WatchFiles(ctx, id, []*lsproto.FileSystemWatcher{watcher}); err != nil { | ||
| if err := session.client.WatchFiles(watchCtx, id, []*lsproto.FileSystemWatcher{watcher}); err != nil { | ||
| session.watches.Release(watcher) | ||
| errors = append(errors, err) | ||
| } else if logger != nil { | ||
|
||
| if oldWatcher == nil { | ||
|
|
@@ -1166,6 +1167,11 @@ func updateWatch[T any](ctx context.Context, session *Session, logger logging.Lo | |
| logger.Log("") | ||
| } | ||
| } | ||
| if len(errors) > 0 { | ||
| session.watches.MarkPending(w.WatcherID) | ||
| } else { | ||
| session.watches.ClearPending(w.WatcherID) | ||
| } | ||
| if len(w.IgnoredPaths) > 0 { | ||
| logger.Logf("%d paths ineligible for watching", len(w.IgnoredPaths)) | ||
| if logger.IsVerbose() { | ||
|
|
@@ -1180,22 +1186,14 @@ func updateWatch[T any](ctx context.Context, session *Session, logger logging.Lo | |
| w := oldWatcher.Watchers() | ||
| watchers := append(w.WorkspaceWatchers, w.OutsideWorkspaceWatchers...) | ||
| if len(watchers) > 0 { | ||
| var removedWatchers []WatcherID | ||
| var removedIDs []WatcherID | ||
| for _, watcher := range watchers { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := session.watches[key] | ||
| if value == nil { | ||
| continue | ||
| } | ||
| if value.count <= 1 { | ||
| delete(session.watches, key) | ||
| removedWatchers = append(removedWatchers, value.id) | ||
| } else { | ||
| value.count-- | ||
| if id, removed := session.watches.Release(watcher); removed { | ||
| removedIDs = append(removedIDs, id) | ||
| } | ||
| } | ||
| for _, id := range removedWatchers { | ||
| if err := session.client.UnwatchFiles(ctx, id); err != nil { | ||
| for _, id := range removedIDs { | ||
| if err := session.client.UnwatchFiles(watchCtx, id); err != nil { | ||
| errors = append(errors, err) | ||
| } else if logger != nil && newWatcher == nil { | ||
| logger.Log(fmt.Sprintf("Removed watch: %s", id)) | ||
|
|
@@ -1226,6 +1224,14 @@ func (s *Session) updateWatches(oldSnapshot *Snapshot, newSnapshot *Snapshot) er | |
| errors = append(errors, updateWatch(ctx, s, s.logger, oldEntry.rootFilesWatch, newEntry.rootFilesWatch)...) | ||
| }, | ||
| ) | ||
| // Retry config watchers whose IDs didn't change but whose previous registration failed. | ||
| for path, newEntry := range newSnapshot.ConfigFileRegistry.configs { | ||
| if oldEntry, ok := oldSnapshot.ConfigFileRegistry.configs[path]; ok { | ||
| if oldEntry.rootFilesWatch.ID() == newEntry.rootFilesWatch.ID() && s.watches.IsPending(newEntry.rootFilesWatch.ID()) { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newEntry.rootFilesWatch)...) | ||
| } | ||
|
Comment on lines
+1237
to
+1244
|
||
| } | ||
| } | ||
|
|
||
| collections.DiffOrderedMaps( | ||
| oldSnapshot.ProjectCollection.ProjectsByPath(), | ||
|
|
@@ -1241,15 +1247,21 @@ func (s *Session) updateWatches(oldSnapshot *Snapshot, newSnapshot *Snapshot) er | |
| func(_ tspath.Path, oldProject, newProject *Project) { | ||
| if oldProject.programFilesWatch.ID() != newProject.programFilesWatch.ID() { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, oldProject.programFilesWatch, newProject.programFilesWatch)...) | ||
| } else if s.watches.IsPending(newProject.programFilesWatch.ID()) { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newProject.programFilesWatch)...) | ||
| } | ||
| if oldProject.typingsWatch.ID() != newProject.typingsWatch.ID() { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, oldProject.typingsWatch, newProject.typingsWatch)...) | ||
| } else if s.watches.IsPending(newProject.typingsWatch.ID()) { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newProject.typingsWatch)...) | ||
| } | ||
| }, | ||
| ) | ||
|
|
||
| if oldSnapshot.autoImportsWatch.ID() != newSnapshot.autoImportsWatch.ID() { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, oldSnapshot.autoImportsWatch, newSnapshot.autoImportsWatch)...) | ||
| } else if s.watches.IsPending(newSnapshot.autoImportsWatch.ID()) { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newSnapshot.autoImportsWatch)...) | ||
| } | ||
|
|
||
| if len(errors) > 0 { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,76 @@ type fileSystemWatcherValue struct { | |
| id WatcherID | ||
| } | ||
|
|
||
| // watchRegistry tracks the current watch globs and how many individual | ||
| // WatchedFiles reference each glob. It guards concurrent access with a mutex | ||
| // and provides ref-count helpers so callers don't manipulate the map directly. | ||
| // It also tracks which parent watcher IDs have pending (failed) registrations | ||
| // so that updateWatches can retry them even when the watcher identity hasn't changed. | ||
| type watchRegistry struct { | ||
| mu sync.Mutex | ||
| entries map[fileSystemWatcherKey]*fileSystemWatcherValue | ||
| pending map[WatcherID]struct{} | ||
| } | ||
|
|
||
| func newWatchRegistry() *watchRegistry { | ||
| return &watchRegistry{ | ||
| entries: make(map[fileSystemWatcherKey]*fileSystemWatcherValue), | ||
| pending: make(map[WatcherID]struct{}), | ||
| } | ||
| } | ||
|
|
||
| // Acquire increments the ref count for a watcher. If this is the first | ||
| // reference (count goes from 0 to 1), it returns true so the caller knows | ||
| // to register the watcher with the client. | ||
| func (r *watchRegistry) Acquire(watcher *lsproto.FileSystemWatcher, id WatcherID) (isNew bool) { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := r.entries[key] | ||
| if value == nil { | ||
| value = &fileSystemWatcherValue{id: id} | ||
| r.entries[key] = value | ||
| } | ||
| value.count++ | ||
| return value.count == 1 | ||
| } | ||
|
|
||
|
Comment on lines
+32
to
+65
|
||
| // Release decrements the ref count for a watcher. If no references remain, | ||
| // the entry is removed and the function returns the WatcherID and true so | ||
| // the caller knows to unregister the watcher from the client. | ||
| func (r *watchRegistry) Release(watcher *lsproto.FileSystemWatcher) (id WatcherID, removed bool) { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := r.entries[key] | ||
| if value == nil { | ||
| return "", false | ||
| } | ||
| if value.count <= 1 { | ||
| delete(r.entries, key) | ||
| return value.id, true | ||
| } | ||
| value.count-- | ||
| return "", false | ||
| } | ||
|
|
||
| // MarkPending records that a watcher's registration failed and needs retry. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) MarkPending(id WatcherID) { | ||
| r.pending[id] = struct{}{} | ||
| } | ||
|
|
||
| // ClearPending removes a watcher from the pending set after successful registration. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) ClearPending(id WatcherID) { | ||
| delete(r.pending, id) | ||
| } | ||
|
|
||
| // IsPending returns true if the watcher needs retry due to a previous failure. | ||
| // Acquires mu internally — must NOT be called with mu already held. | ||
| func (r *watchRegistry) IsPending(id WatcherID) bool { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| _, ok := r.pending[id] | ||
| return ok | ||
| } | ||
|
|
||
| type PatternsAndIgnored struct { | ||
| directoriesOutsideWorkspace []string | ||
| patternsInsideWorkspace []string | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| package project_test | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
| "sync/atomic" | ||
| "testing" | ||
| "testing/synctest" | ||
| "time" | ||
|
|
||
| "github.com/microsoft/typescript-go/internal/bundled" | ||
| "github.com/microsoft/typescript-go/internal/lsp/lsproto" | ||
| "github.com/microsoft/typescript-go/internal/project" | ||
| "github.com/microsoft/typescript-go/internal/testutil/projecttestutil" | ||
| "gotest.tools/v3/assert" | ||
| ) | ||
|
|
||
| func TestUpdateWatchTimeoutAndRollback(t *testing.T) { | ||
| t.Parallel() | ||
| if !bundled.Embedded { | ||
| t.Skip("bundled files are not embedded") | ||
| } | ||
|
|
||
| files := map[string]any{ | ||
| "/home/projects/TS/p1/tsconfig.json": `{ | ||
| "compilerOptions": { "noLib": true, "strict": true } | ||
| }`, | ||
| "/home/projects/TS/p1/src/index.ts": `export const x = 1;`, | ||
| } | ||
|
|
||
| t.Run("watch retries on next snapshot update after timeout with same watcher identity", func(t *testing.T) { | ||
| t.Parallel() | ||
| synctest.Test(t, func(t *testing.T) { | ||
| init, utils := projecttestutil.GetSessionInitOptions(files, nil, &projecttestutil.TypingsInstallerOptions{}) | ||
|
|
||
| // Track WatchFiles calls: record which watcher IDs were attempted | ||
| // and which succeeded. | ||
| var mu sync.Mutex | ||
| var attemptedIDs []project.WatcherID | ||
| var successfulIDs []project.WatcherID | ||
| var firstBatchDone atomic.Bool | ||
| utils.Client().WatchFilesFunc = func(ctx context.Context, id project.WatcherID, watchers []*lsproto.FileSystemWatcher) error { | ||
| mu.Lock() | ||
| attemptedIDs = append(attemptedIDs, id) | ||
| mu.Unlock() | ||
| if !firstBatchDone.Load() { | ||
| // Block until the context times out to simulate a slow client. | ||
| <-ctx.Done() | ||
| return ctx.Err() | ||
| } | ||
| // After the first batch, succeed immediately. | ||
| mu.Lock() | ||
| successfulIDs = append(successfulIDs, id) | ||
| mu.Unlock() | ||
| return nil | ||
| } | ||
|
|
||
| session := project.NewSession(init) | ||
| defer session.Close() | ||
|
|
||
| uri := lsproto.DocumentUri("file:///home/projects/TS/p1/src/index.ts") | ||
|
|
||
| // Step 1: Open the file. This creates the project and triggers | ||
| // updateWatches. All WatchFiles calls block and time out because | ||
| // the client is slow, so the registry is rolled back and the | ||
| // watchers are marked as pending. | ||
| session.DidOpenFile(context.Background(), uri, 1, files["/home/projects/TS/p1/src/index.ts"].(string), lsproto.LanguageKindTypeScript) | ||
|
|
||
| // Let the background goroutine block on WatchFiles, then advance | ||
| // fake time past the 1s watchRequestTimeout. | ||
| synctest.Wait() | ||
| time.Sleep(2 * time.Second) | ||
| synctest.Wait() | ||
|
|
||
| mu.Lock() | ||
| firstAttemptIDs := append([]project.WatcherID(nil), attemptedIDs...) | ||
| mu.Unlock() | ||
| assert.Assert(t, len(firstAttemptIDs) >= 1, "expected at least one WatchFiles call during initial open, got %d", len(firstAttemptIDs)) | ||
|
|
||
| // No watcher IDs should have succeeded. | ||
| mu.Lock() | ||
| assert.Equal(t, len(successfulIDs), 0, "expected no successful watches after timeout") | ||
| mu.Unlock() | ||
|
|
||
| // Step 2: Allow subsequent WatchFiles calls to succeed. | ||
| firstBatchDone.Store(true) | ||
|
|
||
| // Step 3: Make a single character change to the open file. This | ||
| // doesn't change any watcher identities — the program files remain | ||
| // the same, so the programFilesWatch ID is unchanged. The pending | ||
| // tracking ensures updateWatches retries the failed registrations. | ||
| session.DidChangeFile(context.Background(), uri, 2, []lsproto.TextDocumentContentChangePartialOrWholeDocument{ | ||
| { | ||
| Partial: &lsproto.TextDocumentContentChangePartial{ | ||
| Range: lsproto.Range{ | ||
| Start: lsproto.Position{Line: 0, Character: 18}, | ||
| End: lsproto.Position{Line: 0, Character: 19}, | ||
| }, | ||
| Text: "2", | ||
| }, | ||
| }, | ||
| }) | ||
|
|
||
| // Step 4: Flush the pending change by requesting the language service. | ||
| // This triggers getSnapshot → updateSnapshot → updateWatches. | ||
| _, err := session.GetLanguageService(context.Background(), uri) | ||
| assert.NilError(t, err) | ||
|
|
||
| // Let the background task run updateWatches. | ||
| synctest.Wait() | ||
| time.Sleep(2 * time.Second) | ||
| synctest.Wait() | ||
|
|
||
| // Verify: WatchFiles was called again with the same watcher IDs, | ||
| // and this time the calls succeeded. | ||
| mu.Lock() | ||
| retryIDs := attemptedIDs[len(firstAttemptIDs):] | ||
| assert.Assert(t, len(retryIDs) >= 1, | ||
| "expected WatchFiles to be retried after character change, got %d new calls (total %d, first batch %d)", | ||
| len(retryIDs), len(attemptedIDs), len(firstAttemptIDs)) | ||
| assert.Assert(t, len(successfulIDs) >= 1, | ||
| "expected at least one watcher to be registered after retry, got %d", len(successfulIDs)) | ||
|
andrewbranch marked this conversation as resolved.
Outdated
|
||
| mu.Unlock() | ||
| }) | ||
| }) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.