Skip to content

Commit 5639ebd

Browse files
committed
fix(EVO-558): QA fixes — FNV hash, signature placement, A2A adapter hardening
- Fix FNV hash to guarantee positive int64 (mask high bit) - Prepend signature to first segment only instead of all parts - Harden A2A adapter response parsing and error handling - Update pipeline service for multi-platform bot delegation - Update tests to match corrected behavior
1 parent e7d2a97 commit 5639ebd

12 files changed

Lines changed: 269 additions & 96 deletions

File tree

.env.example

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
LISTEN_ADDR=:8080
22
REDIS_URL=redis://localhost:6379
33
BOT_RUNTIME_SECRET=change_me_in_production
4-
AI_PROCESSOR_URL=http://ai-processor:8000
5-
AI_PROCESSOR_API_KEY=your_api_key
4+
AI_PROCESSOR_BASE_URL=http://ai-processor:8000
65
AI_CALL_TIMEOUT_SECONDS=30

cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func main() {
5151
debounce := debounceService.NewDebounceEngine(pipelineRepo)
5252

5353
// Step 6: AI adapter
54-
aiAdapter := aiService.NewAIAdapter(cfg.AIProcessorURL, cfg.AIProcessorAPIKey, cfg.AICallTimeoutSeconds)
54+
aiAdapter := aiService.NewAIAdapter(cfg.AIProcessorBaseURL, cfg.AICallTimeoutSeconds)
5555

5656
// Step 7: dispatch engine
5757
dispatch := dispatchService.NewDispatchEngine()

internal/config/config.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ type Config struct {
1010
ListenAddr string
1111
RedisURL string
1212
BotRuntimeSecret string
13-
AIProcessorURL string
14-
AIProcessorAPIKey string
13+
AIProcessorBaseURL string
1514
AICallTimeoutSeconds int
1615
}
1716

@@ -28,11 +27,7 @@ func Load() (*Config, error) {
2827
if err != nil {
2928
return nil, err
3029
}
31-
aiProcessorURL, err := mustGetEnv("AI_PROCESSOR_URL")
32-
if err != nil {
33-
return nil, err
34-
}
35-
aiProcessorAPIKey, err := mustGetEnv("AI_PROCESSOR_API_KEY")
30+
aiProcessorBaseURL, err := mustGetEnv("AI_PROCESSOR_BASE_URL")
3631
if err != nil {
3732
return nil, err
3833
}
@@ -45,8 +40,7 @@ func Load() (*Config, error) {
4540
ListenAddr: listenAddr,
4641
RedisURL: redisURL,
4742
BotRuntimeSecret: botRuntimeSecret,
48-
AIProcessorURL: aiProcessorURL,
49-
AIProcessorAPIKey: aiProcessorAPIKey,
43+
AIProcessorBaseURL: aiProcessorBaseURL,
5044
AICallTimeoutSeconds: aiCallTimeout,
5145
}, nil
5246
}

pkg/ai/model/a2a.go

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,61 @@
11
package model
22

3-
// A2ARequest is the envelope sent to AI Processor via HTTP POST.
4-
// ContactID and ConversationID are internal tracking fields (json:"-") used for
5-
// structured logging; they are never serialised into the HTTP payload.
3+
// A2ARequest carries the data needed to call AI Processor via JSON-RPC 2.0.
4+
// Fields are NOT serialised directly — the adapter builds the wire format.
65
type A2ARequest struct {
7-
Message string `json:"message"` // aggregated buffer content (FR-15)
8-
ContactID int64 `json:"-"`
9-
ConversationID int64 `json:"-"`
6+
AgentBotID string // used to construct URL path: /api/v1/a2a/{AgentBotID}
7+
ContactID int64 // used for userId in JSON-RPC params
8+
ConversationID int64 // used for contextId in JSON-RPC params
9+
ApiKey string // used for X-API-Key header (per-event auth)
10+
Message string // aggregated buffer content (FR-15)
1011
}
1112

12-
// A2AResponse is the raw JSON response from AI Processor.
13+
// jsonRPCRequest is the JSON-RPC 2.0 envelope sent to AI Processor.
14+
type JSONRPCRequest struct {
15+
JSONRPC string `json:"jsonrpc"`
16+
ID string `json:"id"`
17+
Method string `json:"method"`
18+
Params JSONRPCParams `json:"params"`
19+
}
20+
21+
type JSONRPCParams struct {
22+
ContextID string `json:"contextId"`
23+
UserID string `json:"userId"`
24+
Message JSONRPCMessage `json:"message"`
25+
Metadata map[string]any `json:"metadata"`
26+
}
27+
28+
type JSONRPCMessage struct {
29+
Role string `json:"role"`
30+
Parts []JSONRPCPart `json:"parts"`
31+
}
32+
33+
type JSONRPCPart struct {
34+
Type string `json:"type"`
35+
Text string `json:"text,omitempty"`
36+
}
37+
38+
// A2AResponse is the JSON-RPC 2.0 response from AI Processor.
1339
type A2AResponse struct {
14-
Content string `json:"content"`
40+
Result *A2AResult `json:"result"`
41+
}
42+
43+
type A2AResult struct {
44+
Artifacts []A2AArtifact `json:"artifacts"`
45+
Message *A2AMessage `json:"message"`
46+
}
47+
48+
type A2AArtifact struct {
49+
Parts []A2APart `json:"parts"`
50+
}
51+
52+
type A2AMessage struct {
53+
Parts []A2APart `json:"parts"`
54+
}
55+
56+
type A2APart struct {
57+
Type string `json:"type"`
58+
Text string `json:"text"`
1559
}
1660

1761
// NormalizedResponse is the internal format after parsing A2AResponse.

pkg/ai/service/ai_adapter.go

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"log/slog"
1111
"net/http"
12+
"strings"
1213
"time"
1314

1415
brtErrors "github.com/EvolutionAPI/evo-bot-runtime/internal/errors"
@@ -18,24 +19,23 @@ import (
1819
// maxResponseBytes caps the AI Processor response body to prevent OOM on oversized payloads.
1920
const maxResponseBytes = 1 << 20 // 1 MiB
2021

21-
// AIAdapter calls the AI Processor via A2A protocol.
22+
// AIAdapter calls the AI Processor via A2A protocol (JSON-RPC 2.0).
2223
// Swap the backend by providing a different implementation at main.go wiring.
2324
type AIAdapter interface {
2425
Call(ctx context.Context, req *model.A2ARequest) (*model.NormalizedResponse, error)
2526
}
2627

2728
type aiAdapter struct {
28-
url string
29-
apiKey string
29+
baseURL string
3030
timeoutSecs int
3131
client *http.Client
3232
}
3333

3434
// NewAIAdapter constructs the adapter. Returns interface (GEAR R03).
35-
func NewAIAdapter(url, apiKey string, timeoutSecs int) AIAdapter {
35+
// baseURL is the AI Processor base URL without path (e.g. http://ai-processor:8000).
36+
func NewAIAdapter(baseURL string, timeoutSecs int) AIAdapter {
3637
return &aiAdapter{
37-
url: url,
38-
apiKey: apiKey,
38+
baseURL: strings.TrimRight(baseURL, "/"),
3939
timeoutSecs: timeoutSecs,
4040
client: &http.Client{},
4141
}
@@ -45,24 +45,41 @@ func (a *aiAdapter) Call(ctx context.Context, req *model.A2ARequest) (*model.Nor
4545
start := time.Now()
4646

4747
// Wrap with timeout — inner timeout, outer ctx for pipeline cancellation.
48-
// Error discrimination order matters:
49-
// 1. ctx.Err() first → pipeline cancellation (outer context, set by PipelineService.Cancel)
50-
// 2. timeoutCtx.Err() → AI timeout (inner context, set by WithTimeout)
51-
// 3. default → generic HTTP error
5248
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(a.timeoutSecs)*time.Second)
5349
defer cancel()
5450

55-
body, err := json.Marshal(req)
51+
// Build per-event URL: {baseURL}/api/v1/a2a/{agent_bot_id}
52+
url := fmt.Sprintf("%s/api/v1/a2a/%s", a.baseURL, req.AgentBotID)
53+
54+
// Build JSON-RPC 2.0 envelope
55+
rpcReq := model.JSONRPCRequest{
56+
JSONRPC: "2.0",
57+
ID: fmt.Sprintf("%d:%d", req.ContactID, req.ConversationID),
58+
Method: "message/send",
59+
Params: model.JSONRPCParams{
60+
ContextID: fmt.Sprintf("%d", req.ConversationID),
61+
UserID: fmt.Sprintf("%d", req.ContactID),
62+
Message: model.JSONRPCMessage{
63+
Role: "user",
64+
Parts: []model.JSONRPCPart{
65+
{Type: "text", Text: req.Message},
66+
},
67+
},
68+
Metadata: map[string]any{},
69+
},
70+
}
71+
72+
body, err := json.Marshal(rpcReq)
5673
if err != nil {
5774
return nil, fmt.Errorf("pipeline.ai.marshal: %w", err)
5875
}
5976

60-
httpReq, err := http.NewRequestWithContext(timeoutCtx, http.MethodPost, a.url, bytes.NewReader(body))
77+
httpReq, err := http.NewRequestWithContext(timeoutCtx, http.MethodPost, url, bytes.NewReader(body))
6178
if err != nil {
6279
return nil, fmt.Errorf("pipeline.ai.new_request: %w", err)
6380
}
6481
httpReq.Header.Set("Content-Type", "application/json")
65-
httpReq.Header.Set("Authorization", "Bearer "+a.apiKey)
82+
httpReq.Header.Set("X-API-Key", req.ApiKey)
6683

6784
resp, err := a.client.Do(httpReq)
6885
if err != nil {
@@ -90,11 +107,40 @@ func (a *aiAdapter) Call(ctx context.Context, req *model.A2ARequest) (*model.Nor
90107
return nil, fmt.Errorf("pipeline.ai.decode: %w", err)
91108
}
92109

110+
content := extractResponseText(&a2aResp)
111+
93112
slog.Info("pipeline.ai.http.completed",
94113
"contact_id", req.ContactID,
95114
"conversation_id", req.ConversationID,
96115
"duration_ms", time.Since(start).Milliseconds(),
97116
)
98117

99-
return &model.NormalizedResponse{Content: a2aResp.Content}, nil
118+
return &model.NormalizedResponse{Content: content}, nil
119+
}
120+
121+
// extractResponseText extracts the text content from the A2A JSON-RPC response.
122+
// Tries result.artifacts[0].parts[0].text first, then result.message.parts[0].text.
123+
func extractResponseText(resp *model.A2AResponse) string {
124+
if resp.Result == nil {
125+
return ""
126+
}
127+
// Try artifacts first (primary response format)
128+
if len(resp.Result.Artifacts) > 0 {
129+
for _, artifact := range resp.Result.Artifacts {
130+
for _, part := range artifact.Parts {
131+
if part.Text != "" {
132+
return part.Text
133+
}
134+
}
135+
}
136+
}
137+
// Fallback to message format
138+
if resp.Result.Message != nil {
139+
for _, part := range resp.Result.Message.Parts {
140+
if part.Text != "" {
141+
return part.Text
142+
}
143+
}
144+
}
145+
return ""
100146
}

0 commit comments

Comments
 (0)