diff --git a/config/config.go b/config/config.go index fbe2f5d..486275d 100644 --- a/config/config.go +++ b/config/config.go @@ -578,6 +578,13 @@ func validate(cfg *Config) error { // status updates for any txid in the system if it runs without bearer-token // auth, so we fail-closed here at config load rather than silently exposing // the unauthenticated receiver. See issue #76 / finding F-018. + // + // This same check now also gates the OUTBOUND /watch token forwarding: + // merkleservice.Client.Register/RegisterBatch propagate cfg.CallbackToken + // to merkle-service so it can attach `Authorization: Bearer ` on + // callbacks. Without a configured token there's nothing to forward AND the + // inbound receiver would 401 anyway — the same fail-closed posture covers + // both ends, so a duplicate "outbound token required" check is unnecessary. if cfg.MerkleService.URL != "" && cfg.CallbackToken == "" { return fmt.Errorf("callback_token is required when merkle_service.url is set " + "(unauthenticated /api/v1/merkle-service/callback would accept forged callbacks; see issue #76)") diff --git a/merkleservice/client.go b/merkleservice/client.go index 769e0bb..378661c 100644 --- a/merkleservice/client.go +++ b/merkleservice/client.go @@ -44,18 +44,29 @@ func (c *Client) SetLogger(logger *zap.Logger) { c.logger = logger } -// watchRequest is the payload sent to POST /watch +// watchRequest is the payload sent to POST /watch. +// CallbackToken (when non-empty) tells merkle-service which bearer token to +// attach as `Authorization: Bearer ` on outbound callback delivery to +// arcade. arcade's /api/v1/merkle-service/callback receiver requires this +// header (PR #112 / F-018), so a missing token means callbacks 401. Empty +// values are omitted from the JSON to preserve back-compat with merkle-service +// builds that don't yet know the field. type watchRequest struct { - TxID string `json:"txid"` - CallbackURL string `json:"callbackUrl"` + TxID string `json:"txid"` + CallbackURL string `json:"callbackUrl"` + CallbackToken string `json:"callbackToken,omitempty"` } // Register registers a transaction with the Merkle Service for watching. // The Merkle Service will send callbacks to callbackURL when the transaction is seen or mined. -func (c *Client) Register(ctx context.Context, txid, callbackURL string) error { +// callbackToken is forwarded so merkle-service can authenticate itself back to +// arcade on callback delivery; empty string disables forwarding (and the JSON +// field is omitted entirely thanks to omitempty). +func (c *Client) Register(ctx context.Context, txid, callbackURL, callbackToken string) error { body, err := json.Marshal(watchRequest{ - TxID: txid, - CallbackURL: callbackURL, + TxID: txid, + CallbackURL: callbackURL, + CallbackToken: callbackToken, }) if err != nil { return fmt.Errorf("failed to marshal watch request: %w", err) @@ -98,9 +109,12 @@ func (c *Client) Register(ctx context.Context, txid, callbackURL string) error { } // Registration represents a single txid+callbackURL pair for batch registration. +// CallbackToken is the bearer token merkle-service should use when calling +// back to arcade for this registration; empty omits the field on the wire. type Registration struct { - TxID string - CallbackURL string + TxID string + CallbackURL string + CallbackToken string } // RegisterBatch registers multiple transactions concurrently with bounded parallelism. @@ -118,7 +132,7 @@ func (c *Client) RegisterBatch(ctx context.Context, registrations []Registration for _, reg := range registrations { g.Go(func() error { - return c.Register(gctx, reg.TxID, reg.CallbackURL) + return c.Register(gctx, reg.TxID, reg.CallbackURL, reg.CallbackToken) }) } diff --git a/merkleservice/client_test.go b/merkleservice/client_test.go index 59f1683..c1ed80c 100644 --- a/merkleservice/client_test.go +++ b/merkleservice/client_test.go @@ -26,7 +26,7 @@ func TestRegister(t *testing.T) { defer server.Close() client := NewClient(server.URL, "mytoken", 0) - err := client.Register(context.Background(), "abc123", "http://callback/url") + err := client.Register(context.Background(), "abc123", "http://callback/url", "") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -44,6 +44,62 @@ func TestRegister(t *testing.T) { } } +// TestRegister_ForwardsCallbackToken pins the F-018 fix on the outbound +// /watch path: when arcade is configured with a callback token, that token +// must round-trip through the watch payload so merkle-service can stamp +// Authorization on callbacks. The inbound receiver requires it; without +// forwarding the loop 401s. +func TestRegister_ForwardsCallbackToken(t *testing.T) { + var rawBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rawBody, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewClient(server.URL, "auth-token", 0) + if err := client.Register(context.Background(), "abc123", "http://callback/url", "my-token"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Field present and exact match on the wire. + if !strings.Contains(string(rawBody), `"callbackToken":"my-token"`) { + t.Errorf("expected body to contain callbackToken=my-token, got %s", string(rawBody)) + } + + // And it round-trips cleanly through json.Unmarshal too. + var parsed map[string]string + if err := json.Unmarshal(rawBody, &parsed); err != nil { + t.Fatalf("body is not valid JSON: %v", err) + } + if parsed["callbackToken"] != "my-token" { + t.Errorf("expected callbackToken=my-token, got %q", parsed["callbackToken"]) + } +} + +// TestRegister_OmitsEmptyCallbackToken pins the back-compat half: pre-fix +// callers (and arcade builds without a configured token) must produce a wire +// payload with NO callbackToken key, so merkle-service builds that don't yet +// know the field aren't impacted. The omitempty tag is what enforces this and +// the test exists specifically to fail loudly if someone removes it. +func TestRegister_OmitsEmptyCallbackToken(t *testing.T) { + var rawBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rawBody, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewClient(server.URL, "", 0) + if err := client.Register(context.Background(), "abc123", "http://callback/url", ""); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if strings.Contains(string(rawBody), "callbackToken") { + t.Errorf("expected body to omit callbackToken when empty, got %s", string(rawBody)) + } +} + func TestRegister_Error(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusInternalServerError) @@ -51,7 +107,7 @@ func TestRegister_Error(t *testing.T) { defer server.Close() client := NewClient(server.URL, "", 0) - err := client.Register(context.Background(), "abc123", "http://callback") + err := client.Register(context.Background(), "abc123", "http://callback", "") if err == nil { t.Error("expected error for 500 response") } diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index 675452b..8172071 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -214,7 +214,7 @@ func (p *Propagator) handleMessage(ctx context.Context, msg *kafka.Message) erro if p.merkleClient != nil && p.cfg.CallbackURL != "" { mStart := time.Now() - if err := p.merkleClient.Register(ctx, propMsg.TXID, p.cfg.CallbackURL); err != nil { + if err := p.merkleClient.Register(ctx, propMsg.TXID, p.cfg.CallbackURL, p.cfg.CallbackToken); err != nil { metrics.PropagationMerkleRegisterDuration.Observe(time.Since(mStart).Seconds()) metrics.PropagationMerkleRegisterFailures.WithLabelValues("register_error").Inc() // Surface the failure so the consumer's retry+DLQ machinery diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index 5ef364b..253e5f7 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -305,6 +305,46 @@ func handleAndFlush(t *testing.T, p *Propagator, payload []byte) error { return p.flushBatch(context.Background()) } +// TestHandleMessage_ForwardsCallbackToken pins the propagator → merkle-service +// half of the F-018 callback-auth loop: the token configured at the arcade +// side (cfg.CallbackToken) must reach merkle-service via the /watch payload, +// so merkle-service can attach it as Authorization on outbound delivery. If +// this test fails, callbacks will 401 even if the inbound receiver and +// merkle-service forwarder are both correct. +func TestHandleMessage_ForwardsCallbackToken(t *testing.T) { + var gotToken string + merkleSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req struct { + TxID string `json:"txid"` + CallbackToken string `json:"callbackToken"` + } + _ = json.NewDecoder(r.Body).Decode(&req) + gotToken = req.CallbackToken + w.WriteHeader(http.StatusOK) + })) + defer merkleSrv.Close() + + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + cfg := &config.Config{ + CallbackURL: "http://localhost:8080/callback", + CallbackToken: "secret-arcade-token", + } + cfg.Propagation.MerkleConcurrency = 10 + mc := merkleservice.NewClient(merkleSrv.URL, "", 5*time.Second) + tc := teranode.NewClient([]string{teranodeSrv.URL}, "", teranode.HealthConfig{FailureThreshold: 1 << 20}) + p := New(cfg, zap.NewNop(), nil, nil, newMockStore(), nil, tc, mc) + + if err := handleAndFlush(t, p, makePropMsg("abc123")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if gotToken != "secret-arcade-token" { + t.Errorf("expected merkle-service to receive callbackToken=secret-arcade-token, got %q", gotToken) + } +} + // Test 1: Registration happens before broadcast on success (single message) func TestHandleMessage_RegistrationBeforeBroadcast(t *testing.T) { log := &eventLog{}