|
70 | 70 | ssl-ctx network/*ssl-context* |
71 | 71 | rm (fn [request] |
72 | 72 | (-> request |
| 73 | + (assoc :timeout-millis 30000) |
73 | 74 | (update :headers merge |
74 | 75 | (into {} (map (fn [[k v]] |
75 | 76 | [(name k) (replace-env-vars (str v))])) |
76 | 77 | config-headers)) |
77 | 78 | (update :headers merge |
78 | 79 | (when-let [access-token (get-in @db* [:mcp-auth server-name :access-token])] |
79 | 80 | {"Authorization" (str "Bearer " access-token)})))) |
80 | | - hc (phc/make-http-client url (cond-> {:request-middleware rm} |
| 81 | + hc (phc/make-http-client url (cond-> {:request-middleware rm |
| 82 | + :timeout-millis 10000} |
81 | 83 | ssl-ctx (assoc :ssl-context ssl-ctx)))] |
82 | 84 | (when (string/includes? url "/sse") |
83 | 85 | (logger/warn logger-tag (format "SSE transport is no longer supported for server '%s'. Using Streamable HTTP instead. Consider updating the URL." server-name))) |
84 | 86 | (logger/info logger-tag (format "Creating HTTP transport for server '%s' at %s" server-name url)) |
85 | 87 | {:transport (phct/make-streamable-http-transport hc) |
| 88 | + :http-client hc |
86 | 89 | :needs-reinit?* needs-reinit?*}) |
87 | 90 |
|
88 | 91 | ;; STDIO transport |
|
336 | 339 | (swap! db* assoc-in [:mcp-clients name :tools] tools) |
337 | 340 | (on-server-updated (->server name server-config :running @db*))))] |
338 | 341 | (loop [attempt 1] |
339 | | - (let [{:keys [transport needs-reinit?*]} (->transport name server-config workspaces db*) |
| 342 | + (let [{:keys [transport http-client needs-reinit?*]} (->transport name server-config workspaces db*) |
340 | 343 | result (try |
341 | 344 | (let [client (->client name transport init-timeout workspaces |
342 | 345 | {:on-tools-change on-tools-change}) |
343 | 346 | init-result (pmc/get-initialize-result client) |
344 | 347 | version (get-in init-result [:serverInfo :version])] |
345 | | - (swap! db* assoc-in [:mcp-clients name] {:client client |
346 | | - :status :starting |
347 | | - :needs-reinit?* needs-reinit?*}) |
| 348 | + (swap! db* assoc-in [:mcp-clients name] (cond-> {:client client |
| 349 | + :status :starting |
| 350 | + :needs-reinit?* needs-reinit?*} |
| 351 | + http-client (assoc :http-client http-client))) |
348 | 352 | (swap! db* assoc-in [:mcp-clients name :version] version) |
349 | 353 | (swap! db* assoc-in [:mcp-clients name :instructions] (:instructions init-result)) |
350 | 354 | (swap! db* assoc-in [:mcp-clients name :tools] (list-server-tools client)) |
|
422 | 426 | (def ^:private disconnect-timeout-ms 5000) |
423 | 427 |
|
424 | 428 | (defn stop-server! [name db* config {:keys [on-server-updated]}] |
425 | | - (when-let [{:keys [client]} (get-in @db* [:mcp-clients name])] |
| 429 | + (when-let [{:keys [client http-client]} (get-in @db* [:mcp-clients name])] |
426 | 430 | (let [server-config (get-in config [:mcpServers name])] |
427 | 431 | (swap! db* assoc-in [:mcp-clients name :status] :stopping) |
428 | 432 | (on-server-updated (->server name server-config :stopping @db*)) |
429 | 433 | (let [f (future (try (pmc/disconnect! client) (catch Exception _ nil)))] |
430 | 434 | (when-not (deref f disconnect-timeout-ms nil) |
431 | 435 | (logger/warn logger-tag (format "Timeout disconnecting MCP server %s, forcing transport stop" name)) |
432 | | - (try (pp/stop-client-transport! client false) (catch Exception _)))) |
| 436 | + (if http-client |
| 437 | + (try (pp/stop! http-client) (catch Exception _)) |
| 438 | + (try (pp/stop-client-transport! (pcs/?transport client) false) (catch Exception _))))) |
433 | 439 | (swap! db* assoc-in [:mcp-clients name :status] :stopped) |
434 | 440 | (on-server-updated (->server name server-config :stopped @db*)) |
435 | 441 | (swap! db* update :mcp-clients dissoc name) |
|
571 | 577 | gone server-side), then runs a fresh initialize-server! cycle." |
572 | 578 | [server-name old-client db* config metrics] |
573 | 579 | (logger/info logger-tag (format "Re-initializing MCP server '%s'" server-name)) |
574 | | - (try (pp/stop-client-transport! old-client false) (catch Exception _)) |
| 580 | + (try (pp/stop-client-transport! (pcs/?transport old-client) false) (catch Exception _)) |
575 | 581 | (swap! db* update :mcp-clients dissoc server-name) |
576 | 582 | (initialize-server! server-name db* config metrics (constantly nil))) |
577 | 583 |
|
|
703 | 709 |
|
704 | 710 | (defn shutdown! |
705 | 711 | "Shutdown MCP servers: interrupts in-flight init threads and disconnects |
706 | | - running clients in parallel with a total 5s timeout." |
| 712 | + running clients in parallel with a total 5s timeout. |
| 713 | + HTTP clients are force-stopped immediately (skipping the slow DELETE handshake), |
| 714 | + while stdio clients go through graceful disconnect with a timeout fallback." |
707 | 715 | [db*] |
708 | 716 | ;; 1. Interrupt any servers still initializing so they unblock promptly |
709 | 717 | (interrupt-init-threads!) |
710 | | - ;; 2. Disconnect running clients in parallel via daemon threads |
| 718 | + ;; 2. Disconnect running clients |
711 | 719 | (try |
712 | 720 | (let [clients (vals (:mcp-clients @db*)) |
713 | | - latch (java.util.concurrent.CountDownLatch. (count clients)) |
714 | | - threads (doall |
715 | | - (map (fn [{:keys [client]}] |
716 | | - (doto (Thread. |
717 | | - (fn [] |
718 | | - (try |
719 | | - (pmc/disconnect! client) |
720 | | - (catch Exception _) |
721 | | - (finally |
722 | | - (.countDown latch))))) |
723 | | - (.setDaemon true) |
724 | | - (.start))) |
725 | | - clients))] |
726 | | - (when-not (.await latch disconnect-timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) |
727 | | - (logger/warn logger-tag "Some MCP servers did not disconnect within timeout, forcing stop") |
728 | | - (doseq [^Thread t threads] |
729 | | - (.interrupt t)))) |
| 721 | + ;; HTTP clients: force-stop immediately (the DELETE in disconnect! always |
| 722 | + ;; times out because the server is slow to respond, and we're shutting down anyway) |
| 723 | + http-clients (filter :http-client clients) |
| 724 | + ;; stdio clients: graceful disconnect with timeout |
| 725 | + stdio-clients (remove :http-client clients)] |
| 726 | + (doseq [{:keys [http-client]} http-clients] |
| 727 | + (try (pp/stop! http-client) (catch Exception _))) |
| 728 | + (when (seq stdio-clients) |
| 729 | + (let [latch (java.util.concurrent.CountDownLatch. (count stdio-clients)) |
| 730 | + threads (doall |
| 731 | + (map (fn [{:keys [client]}] |
| 732 | + (doto (Thread. |
| 733 | + (fn [] |
| 734 | + (try |
| 735 | + (pmc/disconnect! client) |
| 736 | + (catch Exception _) |
| 737 | + (finally |
| 738 | + (.countDown latch))))) |
| 739 | + (.setDaemon true) |
| 740 | + (.start))) |
| 741 | + stdio-clients))] |
| 742 | + (when-not (.await latch disconnect-timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) |
| 743 | + (logger/warn logger-tag "Some MCP servers did not disconnect within timeout, forcing stop") |
| 744 | + (doseq [{:keys [client]} stdio-clients] |
| 745 | + (try (pp/stop-client-transport! (pcs/?transport client) false) (catch Exception _))))))) |
730 | 746 | (catch Exception _ nil)) |
731 | 747 | (swap! db* assoc :mcp-clients {})) |
0 commit comments