|
18 | 18 |
|
19 | 19 | (def null-output-stream-writer |
20 | 20 | (java.io.OutputStreamWriter. |
21 | | - (proxy [java.io.OutputStream] [] |
22 | | - (write |
23 | | - ([^bytes b]) |
24 | | - ([^bytes b, off, len]))))) |
| 21 | + (proxy [java.io.OutputStream] [] |
| 22 | + (write |
| 23 | + ([^bytes b]) |
| 24 | + ([^bytes b, off, len]))))) |
25 | 25 |
|
26 | 26 | (defmacro discarding-stdout |
27 | 27 | "Evaluates body in a context in which writes to *out* are discarded." |
|
118 | 118 | (let [ch (async/chan buf-or-n)] |
119 | 119 | (async/thread |
120 | 120 | (discarding-stdout |
121 | | - (loop [] |
122 | | - (when-let [arg (async/<!! ch)] |
123 | | - (f arg) |
124 | | - (recur))))) |
| 121 | + (loop [] |
| 122 | + (when-let [arg (async/<!! ch)] |
| 123 | + (f arg) |
| 124 | + (recur))))) |
125 | 125 | ch)) |
126 | 126 |
|
127 | 127 | (def input-buffer-size |
|
177 | 177 |
|
178 | 178 | (defn pending-received-request [method context params] |
179 | 179 | (let [cancelled? (atom false) |
180 | | - ;; coerce result/error to promise |
181 | | - result-promise (p/promise |
182 | | - (receive-request method |
183 | | - (assoc context ::req-cancelled? cancelled?) |
184 | | - params))] |
| 180 | + ;; Run handler asynchronously to prevent blocking the request processing thread. |
| 181 | + ;; Using p/future instead of p/promise ensures handlers run on a thread pool, |
| 182 | + ;; allowing the main loop to continue processing other messages and responses. |
| 183 | + ;; Wrap in discarding-stdout to prevent handler logging from corrupting JSON-RPC output. |
| 184 | + result-promise (p/future |
| 185 | + (discarding-stdout |
| 186 | + (receive-request method |
| 187 | + (assoc context ::req-cancelled? cancelled?) |
| 188 | + params)))] |
185 | 189 | (map->PendingReceivedRequest |
186 | | - {:result-promise result-promise |
187 | | - :cancelled? cancelled?}))) |
| 190 | + {:result-promise result-promise |
| 191 | + :cancelled? cancelled?}))) |
188 | 192 |
|
189 | 193 | ;; TODO: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize |
190 | 194 | ;; * receive-request should return error until initialize request is received |
|
256 | 260 | ;; presumed to be both very rare and indicative of a problem that can |
257 | 261 | ;; be solved only in the client or the language server. |
258 | 262 | client-initiated-in-ch (thread-loop |
259 | | - input-buffer-size |
260 | | - (fn [[message-type message]] |
261 | | - (if (identical? :request message-type) |
262 | | - (protocols.endpoint/receive-request this context message) |
263 | | - (protocols.endpoint/receive-notification this context message)))) |
| 263 | + input-buffer-size |
| 264 | + (fn [[message-type message]] |
| 265 | + (if (identical? :request message-type) |
| 266 | + (protocols.endpoint/receive-request this context message) |
| 267 | + (protocols.endpoint/receive-notification this context message)))) |
264 | 268 | reject-pending-sent-requests (fn [exception] |
265 | 269 | (doseq [pending-request (vals @pending-sent-requests*)] |
266 | 270 | (p/reject! (:p pending-request) |
|
277 | 281 | (when-not (async/offer! client-initiated-in-ch [message-type message]) |
278 | 282 | ;; Buffers full. Fail any waiting pending requests and... |
279 | 283 | (reject-pending-sent-requests |
280 | | - (ex-info "Buffer of client messages exhausted." {})) |
| 284 | + (ex-info "Buffer of client messages exhausted." {})) |
281 | 285 | ;; ... try again, but park this time. |
282 | 286 | (async/>! client-initiated-in-ch [message-type message]))) |
283 | 287 | (recur)) |
|
380 | 384 | :result-promise |
381 | 385 | ;; convert result/error to response |
382 | 386 | (p/then |
383 | | - (fn [result] |
384 | | - (if (identical? ::method-not-found result) |
385 | | - (do |
386 | | - (protocols.endpoint/log this :warn "received unexpected request" method) |
387 | | - (responses/error resp (errors/not-found method))) |
388 | | - (responses/infer resp result)))) |
| 387 | + (fn [result] |
| 388 | + (if (identical? ::method-not-found result) |
| 389 | + (do |
| 390 | + (protocols.endpoint/log this :warn "received unexpected request" method) |
| 391 | + (responses/error resp (errors/not-found method))) |
| 392 | + (responses/infer resp result)))) |
389 | 393 | ;; Handle |
390 | 394 | ;; 1. Exceptions thrown within p/future created by receive-request. |
391 | 395 | ;; 2. Cancelled requests. |
|
433 | 437 | log-ch (or log-ch (async/chan (async/sliding-buffer 20))) |
434 | 438 | trace-ch (or trace-ch (async/chan (async/sliding-buffer 20)))] |
435 | 439 | (map->ChanServer |
436 | | - {:output-ch output-ch |
437 | | - :input-ch input-ch |
438 | | - :log-ch log-ch |
439 | | - :trace-ch trace-ch |
440 | | - :tracer* (atom tracer) |
441 | | - :clock clock |
442 | | - :on-close on-close |
443 | | - :response-executor response-executor |
444 | | - :request-id* (atom 0) |
445 | | - :pending-sent-requests* (atom {}) |
446 | | - :pending-received-requests* (atom {}) |
447 | | - :join (promise)}))) |
| 440 | + {:output-ch output-ch |
| 441 | + :input-ch input-ch |
| 442 | + :log-ch log-ch |
| 443 | + :trace-ch trace-ch |
| 444 | + :tracer* (atom tracer) |
| 445 | + :clock clock |
| 446 | + :on-close on-close |
| 447 | + :response-executor response-executor |
| 448 | + :request-id* (atom 0) |
| 449 | + :pending-sent-requests* (atom {}) |
| 450 | + :pending-received-requests* (atom {}) |
| 451 | + :join (promise)}))) |
0 commit comments