Skip to content

Commit b7ae244

Browse files
ericdalloeca-agent
andcommitted
Support ask_user agent questions for remote REST/SSE clients
The ask_user tool's :enabled-fn checks :client-capabilities :code-assistant :chat-capabilities :ask-question, which JSON-RPC editors set during initialize but REST/SSE clients (e.g. eca-web) had no way to declare. The LLM therefore could not invoke ask_user and emitted questions as plain markdown, leaving web users unable to answer interactively. Wire the missing pieces: - Declare the askQuestion capability after the remote Jetty server binds. - BroadcastMessenger.ask-question mints a requestId, registers a promise in a per-record pending-questions registry, and broadcasts chat:ask-question over SSE; falls back to the inner JSON-RPC messenger when no SSE clients are connected so editor sessions are unaffected. - New POST /api/v1/answer route + handle-answer-question handler resolves the pending promise. Uses swap-vals! so concurrent answers for the same requestId atomically claim the entry; only the winner returns 204. 🤖 Generated with [eca](https://eca.dev) Co-Authored-By: eca-agent <git@eca.dev>
1 parent 334fa84 commit b7ae244

8 files changed

Lines changed: 162 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- Support `ask_user` agent questions for remote REST/SSE clients (e.g. `eca-web`): the remote server now declares the `askQuestion` capability, broadcasts `chat:ask-question` over SSE with a generated `requestId`, and accepts answers via the new `POST /api/v1/answer` endpoint. Falls back to the JSON-RPC inner messenger when no SSE clients are connected, preserving editor behavior.
6+
57
## 0.132.0
68

79
- `variantsByModel` entries now support an optional `:api` filter (string or vector) to restrict variant matching by provider API type.

src/eca/remote/handlers.clj

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
[eca.config :as config]
99
[eca.features.chat :as f.chat]
1010
[eca.handlers :as handlers]
11+
[eca.remote.messenger :as remote.messenger]
1112
[eca.remote.sse :as sse]
1213
[eca.shared :as shared]
1314
[ring.core.protocols :as ring.protocols])
@@ -261,6 +262,28 @@
261262
(sse/broadcast! sse-connections* "trust:updated" {:trust trust})
262263
(json-response {:trust trust})))
263264

265+
(defn handle-answer-question
266+
"Resolves a pending question previously asked via the SSE `chat:ask-question`
267+
event. Body: {:requestId String :answer (String|nil) :cancelled Boolean}.
268+
Returns 204 on success, 400 if `requestId` is missing or blank, 404 if the
269+
requestId is unknown (e.g. already answered or never registered)."
270+
[{:keys [messenger]} request]
271+
(let [body (parse-body request)
272+
request-id (:requestId body)]
273+
(cond
274+
(not (and (string? request-id) (seq request-id)))
275+
(error-response 400 "invalid_request" "Missing required field: requestId")
276+
277+
(remote.messenger/answer-question! messenger
278+
request-id
279+
(:answer body)
280+
(:cancelled body))
281+
(no-content)
282+
283+
:else
284+
(error-response 404 "question_not_found"
285+
(str "No pending question for requestId " request-id)))))
286+
264287
(defn handle-mcp-start [{:keys [db*] :as components} _request server-name]
265288
(let [config (config/all @db*)]
266289
(handlers/mcp-start-server (assoc components :config config) {:name server-name})

src/eca/remote/messenger.clj

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
(defn- ->camel [data]
1212
(shared/map->camel-cased-map data))
1313

14-
(defrecord BroadcastMessenger [inner sse-connections*]
14+
(defrecord BroadcastMessenger [inner sse-connections* pending-questions*]
1515
messenger/IMessenger
1616

1717
(chat-content-received [_this data]
@@ -68,4 +68,39 @@
6868
(editor-diagnostics [_this uri]
6969
(messenger/editor-diagnostics inner uri))
7070
(ask-question [_this params]
71-
(messenger/ask-question inner params)))
71+
;; If there are no SSE clients, fall back to the inner messenger so
72+
;; JSON-RPC editor sessions keep working unchanged. Otherwise mint a
73+
;; requestId, register a promise, and broadcast the question to all
74+
;; connected SSE clients. The promise is resolved by `answer-question!`
75+
;; when a client posts to /api/v1/answer.
76+
(if (empty? @sse-connections*)
77+
(messenger/ask-question inner params)
78+
(let [request-id (str (random-uuid))
79+
p (promise)]
80+
(swap! pending-questions* assoc request-id p)
81+
(sse/broadcast! sse-connections* "chat:ask-question"
82+
(->camel (assoc params :requestId request-id)))
83+
p))))
84+
85+
(defn make-broadcast-messenger
86+
"Creates a BroadcastMessenger with a fresh pending-questions registry.
87+
Prefer this over `->BroadcastMessenger` so callers don't have to know
88+
about the internal registry atom."
89+
[inner sse-connections*]
90+
(->BroadcastMessenger inner sse-connections* (atom {})))
91+
92+
(defn answer-question!
93+
"Resolves a pending question (previously registered by `ask-question`) by
94+
request-id. Delivers `{:answer answer :cancelled (boolean cancelled)}` to
95+
the registered promise and removes the entry from the registry.
96+
Returns true if a pending question was found and delivered, nil otherwise
97+
(e.g. unknown or already-answered request-id).
98+
99+
Uses `swap-vals!` so that claiming the entry is a single atomic op:
100+
under concurrent answer-question! calls for the same request-id only the
101+
caller that wins the swap observes the entry in `old` and delivers."
102+
[{:keys [pending-questions*]} request-id answer cancelled]
103+
(let [[old _new] (swap-vals! pending-questions* dissoc request-id)]
104+
(when-let [p (get old request-id)]
105+
(deliver p {:answer answer :cancelled (boolean cancelled)})
106+
true)))

src/eca/remote/routes.clj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@
4343
(when (= :post method)
4444
[handlers/handle-set-trust components request {:sse-connections* sse-connections*}])
4545

46+
["api" "v1" "answer"]
47+
(when (= :post method)
48+
[handlers/handle-answer-question components request])
49+
4650
["api" "v1" "mcp"]
4751
(when (= :post method)
4852
[handlers/handle-mcp-add components request])

src/eca/remote/server.clj

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,14 @@
300300
host-with-port
301301
"&pass=" token
302302
"&protocol=" protocol)]
303+
;; Declare the askQuestion capability now that the remote
304+
;; server is up, so the `ask_user` tool is enabled for the
305+
;; LLM. BroadcastMessenger.ask-question routes the question
306+
;; over SSE when at least one client is connected, and falls
307+
;; back to the inner JSON-RPC messenger otherwise.
308+
(swap! (:db* components) assoc-in
309+
[:client-capabilities :code-assistant :chat-capabilities :ask-question]
310+
true)
303311
(when (and localhost-only? private? (not= host-base "127.0.0.1"))
304312
(logger/warn logger-tag
305313
(str "⚠️ Bound to 127.0.0.1:" actual-port " (localhost only) because another service "

src/eca/server.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@
273273
;; HTTP server can be started later (e.g. when local project config
274274
;; enables it after initialize). Broadcasting to an empty set is a no-op.
275275
sse-connections* (atom #{})
276-
messenger (remote.messenger/->BroadcastMessenger stdio-messenger sse-connections*)
276+
messenger (remote.messenger/make-broadcast-messenger stdio-messenger sse-connections*)
277277
start-remote-server!
278278
(fn [components]
279279
(when-let [rs (remote.server/start! components sse-connections*)]

test/eca/remote/handlers_test.clj

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
[cheshire.core :as json]
44
[clojure.test :refer [deftest is testing]]
55
[eca.config :as config]
6+
[eca.messenger :as messenger]
67
[eca.remote.handlers :as handlers]
8+
[eca.remote.messenger :as remote.messenger]
9+
[eca.remote.sse :as sse]
710
[eca.test-helper :as h]))
811

912
(h/reset-components-before-test)
@@ -88,3 +91,43 @@
8891
(is (= 200 (:status response)))
8992
(is (string? (:version body)))
9093
(is (= "1.0" (:protocolVersion body))))))
94+
95+
(deftest handle-answer-question-test
96+
(let [inner (h/messenger)
97+
sse-connections* (sse/create-connections)
98+
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
99+
os (java.io.ByteArrayOutputStream.)
100+
_client (sse/add-client! sse-connections* os)
101+
request-with-body (fn [body]
102+
{:body (java.io.ByteArrayInputStream.
103+
(.getBytes ^String (json/generate-string body) "UTF-8"))})]
104+
105+
(testing "returns 400 when requestId is missing"
106+
(let [response (handlers/handle-answer-question
107+
{:messenger broadcast-messenger}
108+
(request-with-body {:answer "x"}))
109+
body (json/parse-string (:body response) true)]
110+
(is (= 400 (:status response)))
111+
(is (= "invalid_request" (get-in body [:error :code])))))
112+
113+
(testing "returns 404 when requestId is unknown"
114+
(let [response (handlers/handle-answer-question
115+
{:messenger broadcast-messenger}
116+
(request-with-body {:requestId "nonexistent" :answer "x"}))
117+
body (json/parse-string (:body response) true)]
118+
(is (= 404 (:status response)))
119+
(is (= "question_not_found" (get-in body [:error :code])))))
120+
121+
(testing "returns 204 and resolves the pending promise on a successful answer"
122+
(let [p (messenger/ask-question broadcast-messenger {:chat-id "c1" :question "Q?"})]
123+
(Thread/sleep 100)
124+
(let [pending @(:pending-questions* broadcast-messenger)
125+
[request-id _] (first pending)
126+
response (handlers/handle-answer-question
127+
{:messenger broadcast-messenger}
128+
(request-with-body {:requestId request-id :answer "ok" :cancelled false}))]
129+
(is (= 204 (:status response)))
130+
(is (realized? p))
131+
(is (= {:answer "ok" :cancelled false} @p)))))
132+
133+
(sse/close-all! sse-connections*)))

test/eca/remote/messenger_test.clj

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
(deftest broadcast-messenger-delegates-and-broadcasts-test
1212
(let [inner (h/messenger)
1313
sse-connections* (sse/create-connections)
14-
broadcast-messenger (remote.messenger/->BroadcastMessenger inner sse-connections*)
14+
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
1515
os (java.io.ByteArrayOutputStream.)
1616
_client (sse/add-client! sse-connections* os)]
1717

@@ -60,3 +60,46 @@
6060
(is (not (.contains (.toString os3 "UTF-8") "rewrite")))))
6161

6262
(sse/close-all! sse-connections*)))
63+
64+
(deftest ask-question-broadcasts-and-resolves-via-answer-test
65+
(testing "ask-question registers a promise, broadcasts SSE, and answer-question! resolves it"
66+
(let [inner (h/messenger)
67+
sse-connections* (sse/create-connections)
68+
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
69+
os (java.io.ByteArrayOutputStream.)
70+
_client (sse/add-client! sse-connections* os)
71+
p (messenger/ask-question broadcast-messenger {:chat-id "c1" :question "Why?"})]
72+
(Thread/sleep 100)
73+
(is (not (realized? p)) "promise should not be realized before answer")
74+
(let [output (.toString os "UTF-8")]
75+
(is (.contains output "chat:ask-question") "SSE event name should be chat:ask-question")
76+
(is (.contains output "\"chatId\":\"c1\"") "payload should be camel-cased")
77+
(is (.contains output "\"requestId\"") "payload should include a generated requestId"))
78+
(let [pending @(:pending-questions* broadcast-messenger)
79+
[request-id _] (first pending)]
80+
(is (= 1 (count pending)) "exactly one pending question should be registered")
81+
(is (string? request-id))
82+
(is (= true (remote.messenger/answer-question! broadcast-messenger request-id "because" false)))
83+
(is (realized? p) "promise should be realized after answer-question!")
84+
(is (= {:answer "because" :cancelled false} @p))
85+
(is (empty? @(:pending-questions* broadcast-messenger))
86+
"registry should be cleared after delivery"))
87+
(sse/close-all! sse-connections*))))
88+
89+
(deftest ask-question-falls-back-to-inner-when-no-sse-clients-test
90+
(testing "ask-question delegates to inner messenger when no SSE clients are connected"
91+
(let [inner (h/messenger)
92+
sse-connections* (sse/create-connections)
93+
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)]
94+
(reset! (:ask-question-response* inner) {:answer "from-inner" :cancelled false})
95+
(let [result (messenger/ask-question broadcast-messenger {:chat-id "c1" :question "Why?"})]
96+
(is (= {:answer "from-inner" :cancelled false} @result))
97+
(is (empty? @(:pending-questions* broadcast-messenger))
98+
"no SSE-side registration should occur when delegating to inner")))))
99+
100+
(deftest answer-question-returns-nil-for-unknown-id-test
101+
(testing "answer-question! returns nil when the request-id is unknown"
102+
(let [inner (h/messenger)
103+
sse-connections* (sse/create-connections)
104+
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)]
105+
(is (nil? (remote.messenger/answer-question! broadcast-messenger "nonexistent" "x" false))))))

0 commit comments

Comments
 (0)