Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,13 @@ func validate(cfg *Config) error {
// status updates for any txid in the system if it runs without bearer-token
// auth, so we fail-closed here at config load rather than silently exposing
// the unauthenticated receiver. See issue #76 / finding F-018.
//
// This same check now also gates the OUTBOUND /watch token forwarding:
// merkleservice.Client.Register/RegisterBatch propagate cfg.CallbackToken
// to merkle-service so it can attach `Authorization: Bearer <token>` on
// callbacks. Without a configured token there's nothing to forward AND the
// inbound receiver would 401 anyway — the same fail-closed posture covers
// both ends, so a duplicate "outbound token required" check is unnecessary.
if cfg.MerkleService.URL != "" && cfg.CallbackToken == "" {
return fmt.Errorf("callback_token is required when merkle_service.url is set " +
"(unauthenticated /api/v1/merkle-service/callback would accept forged callbacks; see issue #76)")
Expand Down
32 changes: 23 additions & 9 deletions merkleservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,29 @@ func (c *Client) SetLogger(logger *zap.Logger) {
c.logger = logger
}

// watchRequest is the payload sent to POST /watch
// watchRequest is the payload sent to POST /watch.
// CallbackToken (when non-empty) tells merkle-service which bearer token to
// attach as `Authorization: Bearer <token>` on outbound callback delivery to
// arcade. arcade's /api/v1/merkle-service/callback receiver requires this
// header (PR #112 / F-018), so a missing token means callbacks 401. Empty
// values are omitted from the JSON to preserve back-compat with merkle-service
// builds that don't yet know the field.
type watchRequest struct {
TxID string `json:"txid"`
CallbackURL string `json:"callbackUrl"`
TxID string `json:"txid"`
CallbackURL string `json:"callbackUrl"`
CallbackToken string `json:"callbackToken,omitempty"`
}

// Register registers a transaction with the Merkle Service for watching.
// The Merkle Service will send callbacks to callbackURL when the transaction is seen or mined.
func (c *Client) Register(ctx context.Context, txid, callbackURL string) error {
// callbackToken is forwarded so merkle-service can authenticate itself back to
// arcade on callback delivery; empty string disables forwarding (and the JSON
// field is omitted entirely thanks to omitempty).
func (c *Client) Register(ctx context.Context, txid, callbackURL, callbackToken string) error {
body, err := json.Marshal(watchRequest{
TxID: txid,
CallbackURL: callbackURL,
TxID: txid,
CallbackURL: callbackURL,
CallbackToken: callbackToken,
})
if err != nil {
return fmt.Errorf("failed to marshal watch request: %w", err)
Expand Down Expand Up @@ -98,9 +109,12 @@ func (c *Client) Register(ctx context.Context, txid, callbackURL string) error {
}

// Registration represents a single txid+callbackURL pair for batch registration.
// CallbackToken is the bearer token merkle-service should use when calling
// back to arcade for this registration; empty omits the field on the wire.
type Registration struct {
TxID string
CallbackURL string
TxID string
CallbackURL string
CallbackToken string
}

// RegisterBatch registers multiple transactions concurrently with bounded parallelism.
Expand All @@ -118,7 +132,7 @@ func (c *Client) RegisterBatch(ctx context.Context, registrations []Registration

for _, reg := range registrations {
g.Go(func() error {
return c.Register(gctx, reg.TxID, reg.CallbackURL)
return c.Register(gctx, reg.TxID, reg.CallbackURL, reg.CallbackToken)
})
}

Expand Down
60 changes: 58 additions & 2 deletions merkleservice/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRegister(t *testing.T) {
defer server.Close()

client := NewClient(server.URL, "mytoken", 0)
err := client.Register(context.Background(), "abc123", "http://callback/url")
err := client.Register(context.Background(), "abc123", "http://callback/url", "")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -44,14 +44,70 @@ func TestRegister(t *testing.T) {
}
}

// TestRegister_ForwardsCallbackToken pins the F-018 fix on the outbound
// /watch path: when arcade is configured with a callback token, that token
// must round-trip through the watch payload so merkle-service can stamp
// Authorization on callbacks. The inbound receiver requires it; without
// forwarding the loop 401s.
func TestRegister_ForwardsCallbackToken(t *testing.T) {
var rawBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawBody, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

client := NewClient(server.URL, "auth-token", 0)
if err := client.Register(context.Background(), "abc123", "http://callback/url", "my-token"); err != nil {
t.Fatalf("unexpected error: %v", err)
}

// Field present and exact match on the wire.
if !strings.Contains(string(rawBody), `"callbackToken":"my-token"`) {
t.Errorf("expected body to contain callbackToken=my-token, got %s", string(rawBody))
}

// And it round-trips cleanly through json.Unmarshal too.
var parsed map[string]string
if err := json.Unmarshal(rawBody, &parsed); err != nil {
t.Fatalf("body is not valid JSON: %v", err)
}
if parsed["callbackToken"] != "my-token" {
t.Errorf("expected callbackToken=my-token, got %q", parsed["callbackToken"])
}
}

// TestRegister_OmitsEmptyCallbackToken pins the back-compat half: pre-fix
// callers (and arcade builds without a configured token) must produce a wire
// payload with NO callbackToken key, so merkle-service builds that don't yet
// know the field aren't impacted. The omitempty tag is what enforces this and
// the test exists specifically to fail loudly if someone removes it.
func TestRegister_OmitsEmptyCallbackToken(t *testing.T) {
var rawBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawBody, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

client := NewClient(server.URL, "", 0)
if err := client.Register(context.Background(), "abc123", "http://callback/url", ""); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if strings.Contains(string(rawBody), "callbackToken") {
t.Errorf("expected body to omit callbackToken when empty, got %s", string(rawBody))
}
}

func TestRegister_Error(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()

client := NewClient(server.URL, "", 0)
err := client.Register(context.Background(), "abc123", "http://callback")
err := client.Register(context.Background(), "abc123", "http://callback", "")
if err == nil {
t.Error("expected error for 500 response")
}
Expand Down
2 changes: 1 addition & 1 deletion services/propagation/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (p *Propagator) handleMessage(ctx context.Context, msg *kafka.Message) erro

if p.merkleClient != nil && p.cfg.CallbackURL != "" {
mStart := time.Now()
if err := p.merkleClient.Register(ctx, propMsg.TXID, p.cfg.CallbackURL); err != nil {
if err := p.merkleClient.Register(ctx, propMsg.TXID, p.cfg.CallbackURL, p.cfg.CallbackToken); err != nil {
metrics.PropagationMerkleRegisterDuration.Observe(time.Since(mStart).Seconds())
metrics.PropagationMerkleRegisterFailures.WithLabelValues("register_error").Inc()
// Surface the failure so the consumer's retry+DLQ machinery
Expand Down
40 changes: 40 additions & 0 deletions services/propagation/propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,46 @@ func handleAndFlush(t *testing.T, p *Propagator, payload []byte) error {
return p.flushBatch(context.Background())
}

// TestHandleMessage_ForwardsCallbackToken pins the propagator → merkle-service
// half of the F-018 callback-auth loop: the token configured at the arcade
// side (cfg.CallbackToken) must reach merkle-service via the /watch payload,
// so merkle-service can attach it as Authorization on outbound delivery. If
// this test fails, callbacks will 401 even if the inbound receiver and
// merkle-service forwarder are both correct.
func TestHandleMessage_ForwardsCallbackToken(t *testing.T) {
var gotToken string
merkleSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req struct {
TxID string `json:"txid"`
CallbackToken string `json:"callbackToken"`
}
_ = json.NewDecoder(r.Body).Decode(&req)
gotToken = req.CallbackToken
w.WriteHeader(http.StatusOK)
}))
defer merkleSrv.Close()

teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK)
defer teranodeSrv.Close()

cfg := &config.Config{
CallbackURL: "http://localhost:8080/callback",
CallbackToken: "secret-arcade-token",
}
cfg.Propagation.MerkleConcurrency = 10
mc := merkleservice.NewClient(merkleSrv.URL, "", 5*time.Second)
tc := teranode.NewClient([]string{teranodeSrv.URL}, "", teranode.HealthConfig{FailureThreshold: 1 << 20})
p := New(cfg, zap.NewNop(), nil, nil, newMockStore(), nil, tc, mc)

if err := handleAndFlush(t, p, makePropMsg("abc123")); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if gotToken != "secret-arcade-token" {
t.Errorf("expected merkle-service to receive callbackToken=secret-arcade-token, got %q", gotToken)
}
}

// Test 1: Registration happens before broadcast on success (single message)
func TestHandleMessage_RegistrationBeforeBroadcast(t *testing.T) {
log := &eventLog{}
Expand Down
Loading