@@ -10,35 +10,36 @@ import (
1010 v1 "github.com/celenium-io/celestial-module/pkg/api/v1"
1111 "github.com/celenium-io/celestial-module/pkg/storage"
1212 "github.com/celenium-io/celestial-module/pkg/storage/postgres"
13- "github.com/cosmos/cosmos-sdk/types/bech32"
1413 "github.com/dipdup-net/go-lib/config"
1514 "github.com/dipdup-net/indexer-sdk/pkg/modules"
1615 sdk "github.com/dipdup-net/indexer-sdk/pkg/storage"
1716 "github.com/pkg/errors"
17+ "github.com/rs/zerolog/log"
1818)
1919
20+ type AddressHandler func (ctx context.Context , address string ) (uint64 , error )
21+
2022type Module struct {
2123 modules.BaseModule
2224
23- celestialsApi celestials.API
24- address IdByHash
25- states storage.ICelestialState
26- celestials storage.ICelestial
27- tx sdk.Transactable
28- state storage.CelestialState
25+ celestialsApi celestials.API
26+ addressHandler AddressHandler
27+ states storage.ICelestialState
28+ celestials storage.ICelestial
29+ tx sdk.Transactable
30+ state storage.CelestialState
2931
3032 celestialsDatasource config.DataSource
3133 indexerName string
3234 network string
33- prefix string
3435 indexPeriod time.Duration
3536 databaseTimeout time.Duration
3637 limit int64
3738}
3839
3940func New (
4041 celestialsDatasource config.DataSource ,
41- address IdByHash ,
42+ addressHandler AddressHandler ,
4243 celestials storage.ICelestial ,
4344 state storage.ICelestialState ,
4445 tx sdk.Transactable ,
@@ -48,7 +49,6 @@ func New(
4849) * Module {
4950 module := Module {
5051 BaseModule : modules .New ("celestials" ),
51- address : address ,
5252 celestials : celestials ,
5353 states : state ,
5454 tx : tx ,
@@ -59,6 +59,7 @@ func New(
5959 databaseTimeout : time .Minute ,
6060 limit : 100 ,
6161 celestialsDatasource : celestialsDatasource ,
62+ addressHandler : addressHandler ,
6263 }
6364
6465 for i := range opts {
@@ -76,6 +77,9 @@ func (m *Module) Close() error {
7677}
7778
7879func (m * Module ) Start (ctx context.Context ) {
80+ if m .addressHandler == nil {
81+ panic ("nil address handler" )
82+ }
7983 if err := m .getState (ctx ); err != nil {
8084 m .Log .Err (err ).Msg ("state receiving" )
8185 return
@@ -146,6 +150,10 @@ func (m *Module) sync(ctx context.Context) error {
146150 if err != nil {
147151 return errors .Wrap (err , "get changes" )
148152 }
153+ log .Info ().
154+ Int ("changes_count" , len (changes .Changes )).
155+ Int64 ("head" , changes .Head ).
156+ Msg ("received changes" )
149157
150158 cids := make (map [string ]storage.Celestial )
151159 addressIds := make (map [uint64 ]struct {})
@@ -155,22 +163,9 @@ func (m *Module) sync(ctx context.Context) error {
155163 continue
156164 }
157165 m .state .ChangeId = changes .Changes [i ].ChangeID
158-
159- prefix , hash , err := bech32 .DecodeAndConvert (changes .Changes [i ].Address )
160- if err != nil {
161- return errors .Wrapf (err , "decoding address %s" , changes .Changes [i ].Address )
162- }
163- if m .prefix != "" && prefix != m .prefix {
164- return errors .Errorf ("invalid address prefix %s" , changes .Changes [i ].Address )
165- }
166-
167- addressId , err := m .address .IdByHash (ctx , hash )
166+ addressId , err := m .addressHandler (ctx , changes .Changes [i ].Address )
168167 if err != nil {
169- return errors .Wrap (err , "address by hash" )
170- }
171-
172- if len (addressId ) == 0 {
173- return errors .Errorf ("can't find address %s" , changes .Changes [i ].Address )
168+ return errors .Wrap (err , "address handler" )
174169 }
175170
176171 status , err := storage .ParseStatus (changes .Changes [i ].Status )
@@ -179,13 +174,13 @@ func (m *Module) sync(ctx context.Context) error {
179174 }
180175
181176 if status == storage .StatusPRIMARY {
182- addressIds [addressId [ 0 ] ] = struct {}{}
177+ addressIds [addressId ] = struct {}{}
183178 }
184179
185180 cids [changes .Changes [i ].CelestialID ] = storage.Celestial {
186181 Id : changes .Changes [i ].CelestialID ,
187182 ImageUrl : changes .Changes [i ].ImageURL ,
188- AddressId : addressId [ 0 ] ,
183+ AddressId : addressId ,
189184 ChangeId : changes .Changes [i ].ChangeID ,
190185 Status : status ,
191186 }
@@ -194,6 +189,11 @@ func (m *Module) sync(ctx context.Context) error {
194189 if err := m .save (ctx , cids , addressIds ); err != nil {
195190 return errors .Wrap (err , "save" )
196191 }
192+ log .Debug ().
193+ Int ("changes_count" , len (cids )).
194+ Int64 ("head" , m .state .ChangeId ).
195+ Msg ("saved changes" )
196+
197197 end = len (changes .Changes ) < int (m .limit )
198198 }
199199
0 commit comments