-
Notifications
You must be signed in to change notification settings - Fork 940
Fix server hang after "Running scheduled diagnostics refresh" #3628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
360eba7
3dd8169
3091afe
e2074f9
ccd5658
60619b9
1dd960a
b876992
9e7b395
371c3ff
e9d9fb4
22abe61
72f2544
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,10 @@ const ( | |
| UpdateReasonIdleCleanDiskCache | ||
| ) | ||
|
|
||
| // watchRequestTimeout is the maximum time to wait for the client to respond to | ||
| // a WatchFiles or UnwatchFiles request while holding the watches mutex. | ||
| const watchRequestTimeout = time.Second | ||
|
andrewbranch marked this conversation as resolved.
|
||
|
|
||
| // SessionOptions are the immutable initialization options for a session. | ||
| // Snapshots may reference them as a pointer since they never change. | ||
| type SessionOptions struct { | ||
|
|
@@ -156,8 +160,7 @@ type Session struct { | |
|
|
||
| // watches tracks the current watch globs and how many individual WatchedFiles | ||
| // are using each glob. | ||
| watches map[fileSystemWatcherKey]*fileSystemWatcherValue | ||
| watchesMu sync.Mutex | ||
| watches *watchRegistry | ||
|
|
||
| // globalDiagPublishPending is set to true when a global diagnostics publish | ||
| // task should be enqueued. It is reset when the task runs, coalescing multiple | ||
|
|
@@ -222,7 +225,7 @@ func NewSession(init *SessionInit) *Session { | |
| initialUserPreferences: lsutil.NewDefaultUserPreferences(), | ||
| workspaceUserPreferences: lsutil.NewDefaultUserPreferences(), | ||
| pendingATAChanges: make(map[tspath.Path]*ATAStateChange), | ||
| watches: make(map[fileSystemWatcherKey]*fileSystemWatcherValue), | ||
| watches: newWatchRegistry(), | ||
| } | ||
|
|
||
| if init.Options.TypingsLocation != "" && init.NpmExecutor != nil { | ||
|
|
@@ -1133,29 +1136,36 @@ func (s *Session) WaitForBackgroundTasks() { | |
|
|
||
| func updateWatch[T any](ctx context.Context, session *Session, logger logging.Logger, oldWatcher, newWatcher *WatchedFiles[T]) []error { | ||
| var errors []error | ||
| session.watchesMu.Lock() | ||
| defer session.watchesMu.Unlock() | ||
| session.watches.mu.Lock() | ||
| defer session.watches.mu.Unlock() | ||
| if newWatcher != nil { | ||
| w := newWatcher.Watchers() | ||
| watchers := append(w.WorkspaceWatchers, w.OutsideWorkspaceWatchers...) | ||
| if len(watchers) > 0 { | ||
| // Dedupe watchers by key to prevent refcount inflation from | ||
| // duplicate patterns. | ||
| seen := make(map[fileSystemWatcherKey]bool, len(watchers)) | ||
| var newWatchers collections.OrderedMap[WatcherID, *lsproto.FileSystemWatcher] | ||
| for i, watcher := range watchers { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := session.watches[key] | ||
| globId := WatcherID(fmt.Sprintf("%s.%d", w.WatcherID, i)) | ||
| if value == nil { | ||
| value = &fileSystemWatcherValue{id: globId} | ||
| session.watches[key] = value | ||
| if seen[key] { | ||
| continue | ||
| } | ||
| value.count++ | ||
| if value.count == 1 { | ||
| seen[key] = true | ||
| globId := WatcherID(fmt.Sprintf("%s.%d", w.WatcherID, i)) | ||
| if session.watches.Acquire(watcher, globId) { | ||
| newWatchers.Set(globId, watcher) | ||
| } | ||
| } | ||
| var watchErrors []error | ||
| for id, watcher := range newWatchers.Entries() { | ||
| if err := session.client.WatchFiles(ctx, id, []*lsproto.FileSystemWatcher{watcher}); err != nil { | ||
| errors = append(errors, err) | ||
| // Create a fresh timeout per client call so earlier calls | ||
| // don't consume the deadline for later ones. | ||
| callCtx, callCancel := context.WithTimeout(ctx, watchRequestTimeout) | ||
| err := session.client.WatchFiles(callCtx, id, []*lsproto.FileSystemWatcher{watcher}) | ||
| callCancel() | ||
| if err != nil { | ||
| watchErrors = append(watchErrors, err) | ||
| } else if logger != nil { | ||
|
Comment on lines
+1139
to
1169
|
||
| if oldWatcher == nil { | ||
| logger.Log(fmt.Sprintf("Added new watch: %s", id)) | ||
|
|
@@ -1166,6 +1176,19 @@ func updateWatch[T any](ctx context.Context, session *Session, logger logging.Lo | |
| logger.Log("") | ||
| } | ||
| } | ||
| if len(watchErrors) > 0 { | ||
| // Roll back ALL newly-acquired watchers on any failure to keep | ||
| // refcounts clean. On retry, Acquire will see them as new again. | ||
| // Re-registering an already-registered watcher with the client | ||
| // is harmless (registerCapability with the same ID replaces it). | ||
| for _, watcher := range newWatchers.Entries() { | ||
| session.watches.Release(watcher) | ||
| } | ||
| session.watches.MarkPending(w.WatcherID) | ||
| errors = append(errors, watchErrors...) | ||
| } else { | ||
| session.watches.ClearPending(w.WatcherID) | ||
| } | ||
| if len(w.IgnoredPaths) > 0 { | ||
| logger.Logf("%d paths ineligible for watching", len(w.IgnoredPaths)) | ||
| if logger.IsVerbose() { | ||
|
|
@@ -1180,22 +1203,17 @@ func updateWatch[T any](ctx context.Context, session *Session, logger logging.Lo | |
| w := oldWatcher.Watchers() | ||
| watchers := append(w.WorkspaceWatchers, w.OutsideWorkspaceWatchers...) | ||
| if len(watchers) > 0 { | ||
| var removedWatchers []WatcherID | ||
| var removedIDs []WatcherID | ||
| for _, watcher := range watchers { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := session.watches[key] | ||
| if value == nil { | ||
| continue | ||
| } | ||
| if value.count <= 1 { | ||
| delete(session.watches, key) | ||
| removedWatchers = append(removedWatchers, value.id) | ||
| } else { | ||
| value.count-- | ||
| if id, removed := session.watches.Release(watcher); removed { | ||
| removedIDs = append(removedIDs, id) | ||
| } | ||
| } | ||
| for _, id := range removedWatchers { | ||
| if err := session.client.UnwatchFiles(ctx, id); err != nil { | ||
| for _, id := range removedIDs { | ||
| callCtx, callCancel := context.WithTimeout(ctx, watchRequestTimeout) | ||
| err := session.client.UnwatchFiles(callCtx, id) | ||
| callCancel() | ||
| if err != nil { | ||
| errors = append(errors, err) | ||
| } else if logger != nil && newWatcher == nil { | ||
| logger.Log(fmt.Sprintf("Removed watch: %s", id)) | ||
|
|
@@ -1226,6 +1244,19 @@ func (s *Session) updateWatches(oldSnapshot *Snapshot, newSnapshot *Snapshot) er | |
| errors = append(errors, updateWatch(ctx, s, s.logger, oldEntry.rootFilesWatch, newEntry.rootFilesWatch)...) | ||
| }, | ||
| ) | ||
| // Retry config watchers whose IDs didn't change but whose previous registration failed. | ||
| for path, newEntry := range newSnapshot.ConfigFileRegistry.configs { | ||
| if oldEntry, ok := oldSnapshot.ConfigFileRegistry.configs[path]; ok { | ||
| if oldEntry.rootFilesWatch.ID() == newEntry.rootFilesWatch.ID() { | ||
| s.watches.mu.Lock() | ||
| isPending := s.watches.IsPending(newEntry.rootFilesWatch.ID()) | ||
| s.watches.mu.Unlock() | ||
| if isPending { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newEntry.rootFilesWatch)...) | ||
| } | ||
| } | ||
|
Comment on lines
+1247
to
+1257
|
||
| } | ||
| } | ||
|
|
||
| collections.DiffOrderedMaps( | ||
| oldSnapshot.ProjectCollection.ProjectsByPath(), | ||
|
|
@@ -1241,15 +1272,36 @@ func (s *Session) updateWatches(oldSnapshot *Snapshot, newSnapshot *Snapshot) er | |
| func(_ tspath.Path, oldProject, newProject *Project) { | ||
| if oldProject.programFilesWatch.ID() != newProject.programFilesWatch.ID() { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, oldProject.programFilesWatch, newProject.programFilesWatch)...) | ||
| } else { | ||
| s.watches.mu.Lock() | ||
| isPending := s.watches.IsPending(newProject.programFilesWatch.ID()) | ||
| s.watches.mu.Unlock() | ||
| if isPending { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newProject.programFilesWatch)...) | ||
| } | ||
| } | ||
| if oldProject.typingsWatch.ID() != newProject.typingsWatch.ID() { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, oldProject.typingsWatch, newProject.typingsWatch)...) | ||
| } else { | ||
| s.watches.mu.Lock() | ||
| isPending := s.watches.IsPending(newProject.typingsWatch.ID()) | ||
| s.watches.mu.Unlock() | ||
| if isPending { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newProject.typingsWatch)...) | ||
| } | ||
| } | ||
| }, | ||
| ) | ||
|
|
||
| if oldSnapshot.autoImportsWatch.ID() != newSnapshot.autoImportsWatch.ID() { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, oldSnapshot.autoImportsWatch, newSnapshot.autoImportsWatch)...) | ||
| } else { | ||
| s.watches.mu.Lock() | ||
| isPending := s.watches.IsPending(newSnapshot.autoImportsWatch.ID()) | ||
| s.watches.mu.Unlock() | ||
| if isPending { | ||
| errors = append(errors, updateWatch(ctx, s, s.logger, nil, newSnapshot.autoImportsWatch)...) | ||
| } | ||
| } | ||
|
|
||
| if len(errors) > 0 { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,78 @@ type fileSystemWatcherValue struct { | |
| id WatcherID | ||
| } | ||
|
|
||
| // watchRegistry tracks the current watch globs and how many individual | ||
| // WatchedFiles reference each glob. It provides ref-count helpers so callers | ||
| // don't manipulate the map directly. | ||
| // | ||
| // All methods require the caller to hold mu. Call mu.Lock/Unlock around | ||
| // groups of operations that must be atomic (e.g. Acquire + WatchFiles + | ||
| // rollback on failure). | ||
| type watchRegistry struct { | ||
| mu sync.Mutex | ||
| entries map[fileSystemWatcherKey]*fileSystemWatcherValue | ||
| pending map[WatcherID]struct{} | ||
| } | ||
|
|
||
| func newWatchRegistry() *watchRegistry { | ||
| return &watchRegistry{ | ||
| entries: make(map[fileSystemWatcherKey]*fileSystemWatcherValue), | ||
| pending: make(map[WatcherID]struct{}), | ||
| } | ||
| } | ||
|
|
||
| // Acquire increments the ref count for a watcher. If this is the first | ||
| // reference (count goes from 0 to 1), it returns true so the caller knows | ||
| // to register the watcher with the client. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) Acquire(watcher *lsproto.FileSystemWatcher, id WatcherID) (isNew bool) { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := r.entries[key] | ||
| if value == nil { | ||
| value = &fileSystemWatcherValue{id: id} | ||
| r.entries[key] = value | ||
| } | ||
| value.count++ | ||
| return value.count == 1 | ||
| } | ||
|
|
||
|
Comment on lines
+32
to
+66
|
||
| // Release decrements the ref count for a watcher. If no references remain, | ||
| // the entry is removed and the function returns the WatcherID and true so | ||
| // the caller knows to unregister the watcher from the client. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) Release(watcher *lsproto.FileSystemWatcher) (id WatcherID, removed bool) { | ||
| key := toFileSystemWatcherKey(watcher) | ||
| value := r.entries[key] | ||
| if value == nil { | ||
| return "", false | ||
| } | ||
| if value.count <= 1 { | ||
| delete(r.entries, key) | ||
| return value.id, true | ||
| } | ||
| value.count-- | ||
| return "", false | ||
| } | ||
|
|
||
| // MarkPending records that a watcher's registration failed and needs retry. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) MarkPending(id WatcherID) { | ||
| r.pending[id] = struct{}{} | ||
| } | ||
|
|
||
| // ClearPending removes a watcher from the pending set after successful registration. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) ClearPending(id WatcherID) { | ||
| delete(r.pending, id) | ||
| } | ||
|
|
||
| // IsPending returns true if the watcher needs retry due to a previous failure. | ||
| // Must be called with mu held. | ||
| func (r *watchRegistry) IsPending(id WatcherID) bool { | ||
| _, ok := r.pending[id] | ||
| return ok | ||
| } | ||
|
|
||
| type PatternsAndIgnored struct { | ||
| directoriesOutsideWorkspace []string | ||
| patternsInsideWorkspace []string | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.