Skip to content

Commit be75e79

Browse files
morpheusroguealex-sandrk
authored andcommitted
feat(proxy-router/mobile): add OpenAI-compatible chat completion and embeddings request handling
- Introduced `ChatCompletionRequestExtra` and `EmbeddingsRequest` types to facilitate building requests without internal package imports. - Implemented `SendChatCompletion` method to forward chat completion requests, preserving original JSON structure and allowing for upstream provider compatibility. - Added `SendEmbeddings` method to handle embeddings requests, returning the full response JSON verbatim. - Enhanced error handling for nil requests and callbacks, ensuring robust request processing. This update enhances the SDK's capabilities for interacting with OpenAI-compatible gateways, improving flexibility for external callers.
1 parent 2cbbca4 commit be75e79

3 files changed

Lines changed: 340 additions & 0 deletions

File tree

proxy-router/mobile/redact.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package mobile
2+
3+
import "regexp"
4+
5+
// Provider-identifying address redaction at the SDK boundary.
6+
//
7+
// Errors that bubble up from the proxy-router routinely include the
8+
// upstream provider's endpoint URL or raw IPv4 + port (e.g.
9+
// `dial tcp 216.81.245.17:18788: connect: connection refused`). Surfacing
10+
// that to external consumers — local OpenAI-compatible gateways, chat UIs,
11+
// MCP servers, anyone embedding this SDK — leaks provider infrastructure
12+
// for no upside; the failure mode (`connection refused`, `i/o timeout`,
13+
// HTTP status) is what callers actually need.
14+
//
15+
// This file applies the redaction to errors that cross the public SDK
16+
// boundary outward. Internal proxy-router logging continues to see the
17+
// raw addresses so operators can still debug. The patterns mirror
18+
// nodeneo/lib/utils/error_redaction.dart and
19+
// nodeneo/go/internal/gateway/redact.go — keep all three in lockstep.
20+
21+
const (
22+
providerPlaceholder = "<provider endpoint>"
23+
shortPlaceholder = "<provider>"
24+
)
25+
26+
var (
27+
httpURLPattern = regexp.MustCompile(
28+
`(?i)https?://(?:\[[^\]\s]+\]|[A-Za-z0-9._\-]+)(?::\d+)?(?:/[^\s"\)\],;]*)?`,
29+
)
30+
hostPortPattern = regexp.MustCompile(
31+
`([^A-Za-z0-9./@\-]|^)((?:[A-Za-z0-9\-]+\.)+[A-Za-z0-9\-]+:\d{2,5})([^A-Za-z0-9]|$)`,
32+
)
33+
bareIPPattern = regexp.MustCompile(
34+
`([^\d.]|^)(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})([^\d.]|$)`,
35+
)
36+
)
37+
38+
// redactProviderEndpointsString replaces provider URLs / host:port pairs /
39+
// bare IPv4s in s with neutral placeholders. Order matters: full URLs are
40+
// stripped first so the host-level passes can't eat fragments of an
41+
// already-cleaned URL. RE2 has no lookbehind, so the host:port and bare-IP
42+
// patterns capture surrounding boundary characters and re-emit them via
43+
// ReplaceAllStringFunc.
44+
func redactProviderEndpointsString(s string) string {
45+
if s == "" {
46+
return s
47+
}
48+
out := httpURLPattern.ReplaceAllString(s, providerPlaceholder)
49+
out = hostPortPattern.ReplaceAllStringFunc(out, func(m string) string {
50+
groups := hostPortPattern.FindStringSubmatch(m)
51+
if len(groups) < 4 {
52+
return m
53+
}
54+
return groups[1] + shortPlaceholder + groups[3]
55+
})
56+
out = bareIPPattern.ReplaceAllStringFunc(out, func(m string) string {
57+
groups := bareIPPattern.FindStringSubmatch(m)
58+
if len(groups) < 4 {
59+
return m
60+
}
61+
return groups[1] + shortPlaceholder + groups[3]
62+
})
63+
return out
64+
}
65+
66+
// redactedError is a small error wrapper that returns a redacted message
67+
// while preserving the underlying error chain for errors.Is / errors.As.
68+
// Wrapping (rather than allocating a new errors.New) means consumers that
69+
// switch on sentinel errors (e.g. `errors.Is(err, ErrProvider)`) still
70+
// match — only the human-readable string changes.
71+
type redactedError struct {
72+
original error
73+
message string
74+
}
75+
76+
func (e *redactedError) Error() string { return e.message }
77+
func (e *redactedError) Unwrap() error { return e.original }
78+
79+
// redactError returns nil for nil input; otherwise returns an error whose
80+
// message has provider-identifying addresses scrubbed. The original error
81+
// chain is preserved via Unwrap so `errors.Is` / `errors.As` still match.
82+
func redactError(err error) error {
83+
if err == nil {
84+
return nil
85+
}
86+
cleaned := redactProviderEndpointsString(err.Error())
87+
if cleaned == err.Error() {
88+
return err
89+
}
90+
return &redactedError{original: err, message: cleaned}
91+
}
92+
93+
// Compile-time interface assertion so future error-wrapper helpers in this
94+
// package don't accidentally regress the Unwrap contract.
95+
var _ interface{ Unwrap() error } = (*redactedError)(nil)

proxy-router/mobile/redact_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package mobile
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
)
9+
10+
// TestRedactProviderEndpointsString_KeepInLockstep documents the exact
11+
// shape of the redaction output. The same patterns are mirrored on the
12+
// gateway side (nodeneo/go/internal/gateway/redact.go) and the Flutter UI
13+
// (nodeneo/lib/utils/error_redaction.dart). Updating one without updating
14+
// the others creates inconsistent error rendering.
15+
func TestRedactProviderEndpointsString_KeepInLockstep(t *testing.T) {
16+
cases := []struct {
17+
name string
18+
in string
19+
want string
20+
}{
21+
{
22+
name: "full URL with IPv4 + port + path",
23+
in: `Post "http://216.81.245.17:18788/embeddings": dial tcp 216.81.245.17:18788: connect: connection refused`,
24+
want: `Post "<provider endpoint>": dial tcp <provider>: connect: connection refused`,
25+
},
26+
{
27+
name: "https with FQDN host",
28+
in: "could not connect to https://provider.mor.org:3333/v1 again",
29+
want: "could not connect to <provider endpoint> again",
30+
},
31+
{
32+
name: "bare host:port",
33+
in: "dial tcp provider.example.com:36318: i/o timeout",
34+
want: "dial tcp <provider>: i/o timeout",
35+
},
36+
{
37+
name: "bare IPv4",
38+
in: "no route to host 74.48.78.46 — try again",
39+
want: "no route to host <provider> — try again",
40+
},
41+
{
42+
name: "non-matching text untouched",
43+
in: "insufficient MOR balance — your wallet does not have enough MOR",
44+
want: "insufficient MOR balance — your wallet does not have enough MOR",
45+
},
46+
{
47+
name: "version numbers and timestamps NOT mistaken for IPs",
48+
in: "v1.2.3 released at 12:34:56",
49+
want: "v1.2.3 released at 12:34:56",
50+
},
51+
{
52+
name: "empty input",
53+
in: "",
54+
want: "",
55+
},
56+
}
57+
58+
for _, tc := range cases {
59+
t.Run(tc.name, func(t *testing.T) {
60+
got := redactProviderEndpointsString(tc.in)
61+
if got != tc.want {
62+
t.Errorf("\n in: %q\n got: %q\n want: %q", tc.in, got, tc.want)
63+
}
64+
})
65+
}
66+
}
67+
68+
// TestRedactError_PreservesErrorChain ensures the wrapped error still
69+
// matches errors.Is/As against the original sentinel. Callers (notably
70+
// the proxy-router itself) use sentinels like ErrProvider; if redaction
71+
// broke those checks we'd silently change behaviour for retry / rate-limit
72+
// logic that depends on identity comparisons.
73+
func TestRedactError_PreservesErrorChain(t *testing.T) {
74+
sentinel := errors.New("provider request failed")
75+
wrapped := fmt.Errorf("%w: dial tcp 216.81.245.17:18788: connect: connection refused", sentinel)
76+
77+
red := redactError(wrapped)
78+
if red == nil {
79+
t.Fatal("redactError returned nil for non-nil input")
80+
}
81+
82+
msg := red.Error()
83+
for _, leaked := range []string{"216.81.245.17", "18788"} {
84+
if strings.Contains(msg, leaked) {
85+
t.Errorf("redacted message still leaks %q: %s", leaked, msg)
86+
}
87+
}
88+
if !strings.Contains(msg, "<provider>") && !strings.Contains(msg, "<provider endpoint>") {
89+
t.Errorf("redacted message missing placeholder: %s", msg)
90+
}
91+
92+
if !errors.Is(red, sentinel) {
93+
t.Errorf("redacted error must still match original sentinel via errors.Is")
94+
}
95+
}
96+
97+
// TestRedactError_NilInput is a fast guard against the dumb mistake of
98+
// dereferencing a nil error during wrapping.
99+
func TestRedactError_NilInput(t *testing.T) {
100+
if got := redactError(nil); got != nil {
101+
t.Errorf("redactError(nil) = %v, want nil", got)
102+
}
103+
}
104+
105+
// TestRedactError_NoChangeIsPassthrough verifies that messages without any
106+
// provider-identifying tokens are returned unchanged (same pointer, no
107+
// wrapper allocated). This keeps the cost of the SDK-boundary redaction
108+
// effectively zero on the common case where errors never carried provider
109+
// info to begin with (e.g. "insufficient MOR balance").
110+
func TestRedactError_NoChangeIsPassthrough(t *testing.T) {
111+
original := errors.New("insufficient MOR balance — your wallet does not have enough MOR")
112+
got := redactError(original)
113+
if got != original {
114+
t.Errorf("clean error should be returned as-is, got wrapper %T", got)
115+
}
116+
}

proxy-router/mobile/sdk_chat.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,132 @@ func extractReasoningContent(chunk gcs.Chunk) string {
174174
}
175175
return ""
176176
}
177+
178+
// ChatCompletionRequestExtra is the OpenAI chat completion request type used by
179+
// the embedded gateway. It is exported as an alias so external callers (the
180+
// NodeNeo gateway) can build requests without importing internal packages.
181+
// All fields from openai.ChatCompletionRequest are honoured (Tools, ToolChoice,
182+
// ParallelToolCalls, ResponseFormat, StreamOptions, Seed, LogitBias, …) plus
183+
// any unknown JSON keys captured in Extra.
184+
type ChatCompletionRequestExtra = gcs.OpenAICompletionRequestExtra
185+
186+
// RawChunkCallback receives upstream chat completion chunks verbatim, suitable
187+
// for an OpenAI-compatible gateway that needs to relay provider fields the
188+
// typed structs may not capture (delta.tool_calls, delta.reasoning_content,
189+
// stream usage events, finish_reason, custom Morpheus extensions, etc.).
190+
//
191+
// chunkJSON is the raw JSON of either a streaming delta
192+
// (ChatCompletionStreamResponseExtra) or a non-streaming response
193+
// (ChatCompletionResponseExtra), preserving the original key order and any
194+
// provider-specific fields. isLast is true on the terminal control chunk; in
195+
// that case chunkJSON is nil — the caller should write the SSE [DONE] sentinel
196+
// or close the response.
197+
type RawChunkCallback func(chunkJSON json.RawMessage, isLast bool) error
198+
199+
// SendChatCompletion forwards an OpenAI-compatible chat completion request to
200+
// the upstream Morpheus provider and surfaces each response chunk verbatim via
201+
// cb. This is the entry point for OpenAI-compatible local gateways (e.g. the
202+
// NodeNeo AI Gateway used by Cursor / Zed / Claude Desktop / LangChain).
203+
//
204+
// Compared to SendPromptWithMessagesAndParams, this method:
205+
// - Accepts the full request object (including Tools, ToolChoice,
206+
// ResponseFormat, StreamOptions, ParallelToolCalls, Seed, LogitBias and
207+
// any vendor-specific fields preserved in Extra).
208+
// - Relays each chunk's original JSON unchanged so tool_calls,
209+
// reasoning_content, finish_reason, and usage flow through untouched.
210+
//
211+
// req.Model is overwritten with sessionID so the proxy-router can route, which
212+
// matches what the existing SendPrompt* methods do; the upstream provider
213+
// rewrites it again to its own model name before responding.
214+
func (s *SDK) SendChatCompletion(ctx context.Context, sessionID string, req *ChatCompletionRequestExtra, cb RawChunkCallback) error {
215+
if req == nil {
216+
return fmt.Errorf("nil chat completion request")
217+
}
218+
if cb == nil {
219+
return fmt.Errorf("nil chunk callback")
220+
}
221+
222+
id := common.HexToHash(sessionID)
223+
if err := s.ensureProviderForSession(ctx, id); err != nil {
224+
return redactError(err)
225+
}
226+
227+
req.Model = sessionID
228+
229+
internalCB := func(ctx context.Context, chunk gcs.Chunk, errResp *gcs.AiEngineErrorResponse) error {
230+
if errResp != nil {
231+
// errResp.ProviderModelError can include the upstream's
232+
// raw URL/IP — redact before bubbling to consumers.
233+
return fmt.Errorf("provider error: %s", redactProviderEndpointsString(fmt.Sprintf("%v", errResp.ProviderModelError)))
234+
}
235+
if chunk.Type() == gcs.ChunkTypeControl {
236+
return cb(nil, true)
237+
}
238+
239+
data := chunk.Data()
240+
if data == nil {
241+
return nil
242+
}
243+
raw, err := json.Marshal(data)
244+
if err != nil {
245+
return fmt.Errorf("marshal chunk: %w", err)
246+
}
247+
248+
return cb(raw, !chunk.IsStreaming())
249+
}
250+
251+
_, err := s.proxySender.SendPromptV2(ctx, id, req, internalCB)
252+
return redactError(err)
253+
}
254+
255+
// EmbeddingsRequest is the OpenAI embeddings request type used by the embedded
256+
// gateway. Exported as an alias so external callers do not need to import
257+
// internal proxy-router packages. All fields from openai.EmbeddingRequest are
258+
// honoured, plus any unknown JSON keys captured in Extra.
259+
type EmbeddingsRequest = gcs.EmbeddingsRequest
260+
261+
// SendEmbeddings forwards an OpenAI-compatible embeddings request to the
262+
// upstream Morpheus provider and returns the response JSON verbatim. Embeddings
263+
// are non-streaming: the upstream emits a single response chunk followed by a
264+
// control chunk.
265+
//
266+
// The returned RawMessage is the provider's full EmbeddingsResponse (id,
267+
// object, data:[…vectors…], model, usage, plus any vendor-specific extras).
268+
// The caller is expected to relay it to its HTTP client unchanged.
269+
func (s *SDK) SendEmbeddings(ctx context.Context, sessionID string, req *EmbeddingsRequest) (json.RawMessage, error) {
270+
if req == nil {
271+
return nil, fmt.Errorf("nil embeddings request")
272+
}
273+
274+
id := common.HexToHash(sessionID)
275+
if err := s.ensureProviderForSession(ctx, id); err != nil {
276+
return nil, redactError(err)
277+
}
278+
279+
var responseJSON json.RawMessage
280+
281+
internalCB := func(ctx context.Context, chunk gcs.Chunk, errResp *gcs.AiEngineErrorResponse) error {
282+
if errResp != nil {
283+
return fmt.Errorf("provider error: %s", redactProviderEndpointsString(fmt.Sprintf("%v", errResp.ProviderModelError)))
284+
}
285+
if chunk.Type() == gcs.ChunkTypeControl {
286+
return nil
287+
}
288+
data := chunk.Data()
289+
if data == nil {
290+
return nil
291+
}
292+
b, err := json.Marshal(data)
293+
if err != nil {
294+
return fmt.Errorf("marshal embeddings chunk: %w", err)
295+
}
296+
responseJSON = b
297+
return nil
298+
}
299+
300+
_, err := s.proxySender.SendEmbeddings(ctx, id, req, internalCB)
301+
if err != nil {
302+
return nil, redactError(err)
303+
}
304+
return responseJSON, nil
305+
}

0 commit comments

Comments
 (0)