Skip to content

Commit 5fff0fa

Browse files
feat(token): integrate indexer provider (#195)
* feat: indexer-backed token provider * mocks generated * fixes * lint fixes * feat(token): integrate indexer provider in token service * lint and test fixes * review fixes --------- Co-authored-by: Sebastian Lindner <33971232+salindne@users.noreply.github.com>
1 parent 54b8844 commit 5fff0fa

6 files changed

Lines changed: 146 additions & 19 deletions

File tree

config.e2e-local.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ canton:
6060
package_id: "6fac182df4943e7e2f70360b413b6e3ab10e65289ba0d971978b6d861a860d72"
6161
module: "Wayfinder.Bridge"
6262

63+
token_provider:
64+
mode: indexer
65+
indexer:
66+
url: "http://indexer:8082"
67+
instruments:
68+
PROMPT: "BridgeIssuer::1220192c25966fe2d53554dfe3a7e2c1c786268318f9870a26902935f3c912ac51ac"
69+
DEMO: "BridgeIssuer::1220192c25966fe2d53554dfe3a7e2c1c786268318f9870a26902935f3c912ac51ac"
70+
6371
# Token metadata
6472
token:
6573
supported_tokens:

pkg/app/api/server.go

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
ethrpcminer "github.com/chainsafe/canton-middleware/pkg/ethrpc/miner"
1818
ethrpc "github.com/chainsafe/canton-middleware/pkg/ethrpc/service"
1919
ethrpcstore "github.com/chainsafe/canton-middleware/pkg/ethrpc/store"
20+
indexerclient "github.com/chainsafe/canton-middleware/pkg/indexer/client"
2021
"github.com/chainsafe/canton-middleware/pkg/keys"
2122
"github.com/chainsafe/canton-middleware/pkg/log"
2223
"github.com/chainsafe/canton-middleware/pkg/pgutil"
@@ -31,6 +32,7 @@ import (
3132

3233
"github.com/go-chi/chi/v5"
3334
"github.com/go-chi/chi/v5/middleware"
35+
"github.com/uptrace/bun"
3436
"go.uber.org/zap"
3537
)
3638

@@ -106,6 +108,58 @@ func (s *Server) Run() error {
106108
// Keep this defer as a safety net.
107109
defer stopReconcile()
108110

111+
svcs, err := initServices(ctx, cfg, dbBun, cantonClient, cipher, logger)
112+
if err != nil {
113+
return err
114+
}
115+
116+
router := s.setupRouter(svcs.evmStore, cantonClient, svcs.tokenService, svcs.regSvc, svcs.transferSvc, logger)
117+
118+
err = apphttp.ServeAndWait(ctx, router, logger, cfg.Server)
119+
120+
// Stop background work before deferred DB/client closes kick in.
121+
stopReconcile()
122+
123+
return err
124+
}
125+
126+
// buildTokenProvider constructs the token data provider according to the
127+
// configured mode. canton is the default; indexer reads from the indexer's
128+
// pre-materialized HTTP API instead of issuing live gRPC ACS scans.
129+
func buildTokenProvider(cfg *config.APIServer, cantonToken cantontkn.Token) (token.Provider, error) {
130+
switch cfg.TokenProvider.Mode {
131+
case config.TokenProviderIndexer:
132+
ic := cfg.TokenProvider.Indexer
133+
if ic == nil {
134+
return nil, fmt.Errorf("token_provider.indexer config is required when mode is %q", config.TokenProviderIndexer)
135+
}
136+
c, err := indexerclient.New(ic.URL, nil)
137+
if err != nil {
138+
return nil, fmt.Errorf("create indexer client: %w", err)
139+
}
140+
return tokenprovider.NewIndexer(c, ic.Instruments), nil
141+
default: // TokenProviderCanton
142+
return tokenprovider.NewCanton(cantonToken), nil
143+
}
144+
}
145+
146+
type services struct {
147+
evmStore ethrpc.Store
148+
tokenService *token.Service
149+
regSvc userservice.Service
150+
transferSvc transfer.Service
151+
}
152+
153+
func initServices(
154+
ctx context.Context,
155+
cfg *config.APIServer,
156+
dbBun *bun.DB,
157+
cantonClient *canton.Client,
158+
cipher keys.KeyCipher,
159+
logger *zap.Logger,
160+
) (*services, error) {
161+
userStore := userstore.NewStore(dbBun)
162+
109163
topologyCache := userservice.NewTopologyCache(topologyCacheTTL)
110164
go topologyCache.Start(ctx)
111165

@@ -118,29 +172,27 @@ func (s *Server) Run() error {
118172
topologyCache,
119173
)
120174

121-
tokenDataProvider := tokenprovider.NewCanton(cantonClient.Token)
122-
tokenService := token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token)
175+
tokenDataProvider, err := buildTokenProvider(cfg, cantonClient.Token)
176+
if err != nil {
177+
return nil, fmt.Errorf("build token provider: %w", err)
178+
}
179+
123180
evmStore := ethrpcstore.NewStore(dbBun)
124181

125182
transferCache := transfer.NewPreparedTransferCache(transferCacheTTL, transferCacheMaxSize)
126183
go transferCache.Start(ctx)
127-
transferSvc := transfer.NewTransferService(cantonClient.Token, userStore, transferCache, tokenSymbols(cfg.Token))
128-
regSvcLog := userservice.NewLog(registrationService, logger)
129-
transferSvcLog := transfer.NewLog(transferSvc, logger)
130184

131185
if cfg.EthRPC.Enabled {
132186
m := ethrpcminer.New(evmStore, cfg.EthRPC.ChainID, cfg.EthRPC.GasLimit, cfg.EthRPC.MinerMaxTxsPerBlock, cfg.EthRPC.MinerInterval, logger)
133187
go m.Start(ctx)
134188
}
135189

136-
router := s.setupRouter(evmStore, cantonClient, tokenService, regSvcLog, transferSvcLog, logger)
137-
138-
err = apphttp.ServeAndWait(ctx, router, logger, cfg.Server)
139-
140-
// Stop background work before deferred DB/client closes kick in.
141-
stopReconcile()
142-
143-
return err
190+
return &services{
191+
evmStore: evmStore,
192+
tokenService: token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token),
193+
regSvc: userservice.NewLog(registrationService, logger),
194+
transferSvc: transfer.NewLog(transfer.NewTransferService(cantonClient.Token, userStore, transferCache, tokenSymbols(cfg.Token)), logger),
195+
}, nil
144196
}
145197

146198
func (s *Server) getMasterKey() ([]byte, error) {

pkg/config/config.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,53 @@ import (
2424
"gopkg.in/yaml.v3"
2525
)
2626

27+
// TokenProviderMode selects which backend the token service uses for balance
28+
// and total-supply queries.
29+
type TokenProviderMode string
30+
31+
const (
32+
// TokenProviderCanton uses live gRPC ACS scans against the Canton ledger.
33+
// This is the default and requires no additional infrastructure.
34+
TokenProviderCanton TokenProviderMode = "canton"
35+
36+
// TokenProviderIndexer reads from the indexer's pre-materialized PostgreSQL
37+
// tables via the indexer's HTTP admin API. Requires the indexer process to
38+
// be running and reachable at IndexerProviderConfig.URL.
39+
TokenProviderIndexer TokenProviderMode = "indexer"
40+
)
41+
42+
// IndexerProviderConfig holds the settings needed when token_provider.mode is "indexer".
43+
type IndexerProviderConfig struct {
44+
// URL is the base URL of the indexer's HTTP admin API (e.g. "http://indexer:8082").
45+
URL string `yaml:"url" validate:"required"`
46+
// Instruments maps each supported token symbol (InstrumentID) to its Canton
47+
// instrument admin party. The indexer keys tokens by {admin, id}, so this
48+
// mapping is required to translate from the Provider interface's symbol-only
49+
// calls to the indexer's composite key.
50+
//
51+
// Example:
52+
// instruments:
53+
// DEMO: "admin::abc123@domain"
54+
// PROMPT: "issuer::xyz@domain"
55+
Instruments map[string]string `yaml:"instruments" validate:"required,min=1"`
56+
}
57+
58+
// TokenProviderConfig selects and configures the token data provider.
59+
type TokenProviderConfig struct {
60+
// Mode selects the provider backend. Defaults to "canton".
61+
Mode TokenProviderMode `yaml:"mode" default:"canton" validate:"required,oneof=canton indexer"`
62+
// Indexer holds settings used when Mode is "indexer". Must be set when
63+
// Mode is "indexer"; ignored otherwise.
64+
Indexer *IndexerProviderConfig `yaml:"indexer"`
65+
}
66+
2767
// APIServer represents the ERC-20 API server configuration
2868
type APIServer struct {
2969
Server *http.ServerConfig `yaml:"server" validate:"required"`
3070
Database *pgdb.DatabaseConfig `yaml:"database" validate:"required"`
3171
Canton *canton.Config `yaml:"canton" validate:"required"`
3272
Token *token.Config `yaml:"token" validate:"required"`
73+
TokenProvider *TokenProviderConfig `yaml:"token_provider" default:"-"` // omit → defaults to canton mode
3374
EthRPC *ethrpc.Config `yaml:"eth_rpc" validate:"required"`
3475
JWKS *JWKS `yaml:"jwks" default:"-"` // nil by default (feature disabled)
3576
Logging *log.Config `yaml:"logging" validate:"required"`
@@ -76,6 +117,9 @@ func LoadAPIServer(configPath string) (*APIServer, error) {
76117
if err := loadConfigFromFile(configPath, &cfg); err != nil {
77118
return nil, err
78119
}
120+
if cfg.TokenProvider == nil {
121+
cfg.TokenProvider = &TokenProviderConfig{Mode: TokenProviderCanton}
122+
}
79123
if err := validateConfig(&cfg); err != nil {
80124
return nil, err
81125
}
@@ -171,6 +215,14 @@ func newStartupValidator() *validator.Validate {
171215
}
172216
return name
173217
})
218+
219+
v.RegisterStructValidation(func(sl validator.StructLevel) {
220+
cfg := sl.Current().Interface().(TokenProviderConfig)
221+
if cfg.Mode == TokenProviderIndexer && cfg.Indexer == nil {
222+
sl.ReportError(cfg.Indexer, "indexer", "Indexer", "required_if_indexer_mode", "")
223+
}
224+
}, TokenProviderConfig{})
225+
174226
return v
175227
}
176228

pkg/config/defaults/config.api-server.docker.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ canton:
5151
package_id: "6fac182df4943e7e2f70360b413b6e3ab10e65289ba0d971978b6d861a860d72"
5252
module: "Wayfinder.Bridge"
5353

54+
token_provider:
55+
mode: indexer
56+
indexer:
57+
url: "http://indexer:8082"
58+
instruments:
59+
PROMPT: "${CANTON_ISSUER_PARTY}"
60+
DEMO: "${CANTON_ISSUER_PARTY}"
61+
5462
# Token metadata
5563
token:
5664
supported_tokens:

pkg/indexer/client/http.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8+
"io"
89
"net/http"
910
"net/url"
1011
"strconv"
@@ -220,16 +221,21 @@ func (c *HTTP) getJSON(ctx context.Context, rawURL string, dest any) error {
220221
defer resp.Body.Close()
221222

222223
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
223-
// Best-effort decode of the indexer's JSON error envelope.
224-
// If the body is not JSON (e.g. an HTML gateway error page), errMsg
225-
// stays empty and the status code alone is returned to the caller.
226-
var errMsg string
224+
bodyBytes, err := io.ReadAll(resp.Body)
225+
if err != nil {
226+
return fmt.Errorf("indexer HTTP %d: failed to read error body: %w", resp.StatusCode, err)
227+
}
228+
227229
var body struct {
228230
Error string `json:"error"`
229231
}
230-
if err := json.NewDecoder(resp.Body).Decode(&body); err == nil {
232+
// Default to the full body as the error message.
233+
errMsg := strings.TrimSpace(string(bodyBytes))
234+
// If we can parse the JSON error envelope, use that instead.
235+
if err := json.Unmarshal(bodyBytes, &body); err == nil && body.Error != "" {
231236
errMsg = body.Error
232237
}
238+
233239
if resp.StatusCode == http.StatusNotFound {
234240
return apperrors.ResourceNotFoundError(nil, errMsg)
235241
}

scripts/setup/docker-bootstrap.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ if [ -f "$API_SERVER_CONFIG_FILE" ]; then
219219
echo ">>> Updating API server config file..."
220220
sed -i "s|domain_id: \".*\"|domain_id: \"$DOMAIN_ID\"|" "$API_SERVER_CONFIG_FILE"
221221
sed -i "s|issuer_party: \".*\"|issuer_party: \"$PARTY_ID\"|" "$API_SERVER_CONFIG_FILE"
222-
echo " API server config updated with issuer_party and domain_id"
222+
sed -i "s|\${CANTON_ISSUER_PARTY}|$PARTY_ID|g" "$API_SERVER_CONFIG_FILE"
223+
echo " API server config updated with issuer_party, domain_id, and instrument admins"
223224
fi
224225

225226
# =============================================================================

0 commit comments

Comments
 (0)