@@ -20,11 +20,12 @@ import (
2020type Module struct {
2121 modules.BaseModule
2222
23- celestials celestials.API
24- address IdByHash
25- states storage.ICelestialState
26- tx sdk.Transactable
27- state storage.CelestialState
23+ celestialsApi celestials.API
24+ address IdByHash
25+ states storage.ICelestialState
26+ celestials storage.ICelestial
27+ tx sdk.Transactable
28+ state storage.CelestialState
2829
2930 celestialsDatasource config.DataSource
3031 indexerName string
@@ -38,6 +39,7 @@ type Module struct {
3839func New (
3940 celestialsDatasource config.DataSource ,
4041 address IdByHash ,
42+ celestials storage.ICelestial ,
4143 state storage.ICelestialState ,
4244 tx sdk.Transactable ,
4345 indexerName string ,
@@ -47,9 +49,10 @@ func New(
4749 module := Module {
4850 BaseModule : modules .New ("celestials" ),
4951 address : address ,
52+ celestials : celestials ,
5053 states : state ,
5154 tx : tx ,
52- celestials : v1 .New (celestialsDatasource .URL ),
55+ celestialsApi : v1 .New (celestialsDatasource .URL ),
5356 indexerName : indexerName ,
5457 network : network ,
5558 indexPeriod : time .Minute ,
@@ -124,7 +127,7 @@ func (m *Module) getChanges(ctx context.Context) (celestials.Changes, error) {
124127 requestCtx , cancel := context .WithTimeout (ctx , time .Second * time .Duration (m .celestialsDatasource .Timeout ))
125128 defer cancel ()
126129
127- return m .celestials .Changes (
130+ return m .celestialsApi .Changes (
128131 requestCtx ,
129132 m .network ,
130133 celestials .WithFromChangeId (m .state .ChangeId ),
@@ -145,6 +148,7 @@ func (m *Module) sync(ctx context.Context) error {
145148 }
146149
147150 cids := make (map [string ]storage.Celestial )
151+ addressIds := make (map [uint64 ]struct {})
148152
149153 for i := range changes .Changes {
150154 if m .state .ChangeId >= changes .Changes [i ].ChangeID {
@@ -169,15 +173,25 @@ func (m *Module) sync(ctx context.Context) error {
169173 return errors .Errorf ("can't find address %s" , changes .Changes [i ].Address )
170174 }
171175
176+ status , err := storage .ParseStatus (changes .Changes [i ].Status )
177+ if err != nil {
178+ return err
179+ }
180+
181+ if status == storage .StatusPRIMARY {
182+ addressIds [addressId [0 ]] = struct {}{}
183+ }
184+
172185 cids [changes .Changes [i ].CelestialID ] = storage.Celestial {
173186 Id : changes .Changes [i ].CelestialID ,
174187 ImageUrl : changes .Changes [i ].ImageURL ,
175188 AddressId : addressId [0 ],
176189 ChangeId : changes .Changes [i ].ChangeID ,
190+ Status : status ,
177191 }
178192 }
179193
180- if err := m .save (ctx , cids ); err != nil {
194+ if err := m .save (ctx , cids , addressIds ); err != nil {
181195 return errors .Wrap (err , "save" )
182196 }
183197 end = len (changes .Changes ) < int (m .limit )
@@ -187,7 +201,7 @@ func (m *Module) sync(ctx context.Context) error {
187201 return nil
188202}
189203
190- func (m * Module ) save (ctx context.Context , cids map [string ]storage.Celestial ) error {
204+ func (m * Module ) save (ctx context.Context , cids map [string ]storage.Celestial , addressIds map [ uint64 ] struct {} ) error {
191205 requestCtx , cancel := context .WithTimeout (ctx , m .databaseTimeout )
192206 defer cancel ()
193207
@@ -197,6 +211,10 @@ func (m *Module) save(ctx context.Context, cids map[string]storage.Celestial) er
197211 }
198212 defer tx .Close (requestCtx )
199213
214+ if err := tx .UpdateStatusForAddress (ctx , maps .Keys (addressIds )); err != nil {
215+ return tx .HandleError (requestCtx , errors .Wrap (err , "update primary statuses" ))
216+ }
217+
200218 if err := tx .SaveCelestials (requestCtx , maps .Values (cids )); err != nil {
201219 return tx .HandleError (requestCtx , errors .Wrap (err , "save celestials" ))
202220 }
0 commit comments