11import { HttpStatus , Injectable } from "@nestjs/common" ;
22import { BinaryUtils } from "@multiversx/sdk-nestjs-common" ;
3- import { ElasticQuery , QueryOperator , QueryType , QueryConditionOptions , ElasticSortOrder , ElasticSortProperty , TermsQuery , RangeGreaterThanOrEqual , MatchQuery } from "@multiversx/sdk-nestjs-elastic" ;
3+ import { ElasticQuery , QueryOperator , QueryType , QueryConditionOptions , ElasticSortOrder , ElasticSortProperty , RangeGreaterThanOrEqual , MatchQuery } from "@multiversx/sdk-nestjs-elastic" ;
44import { IndexerInterface } from "../indexer.interface" ;
55import { ApiConfigService } from "src/common/api-config/api.config.service" ;
66import { CollectionFilter } from "src/endpoints/collections/entities/collection.filter" ;
@@ -279,9 +279,7 @@ export class ElasticIndexerService implements IndexerInterface {
279279
280280 const elasticOperations = await this . elasticService . getList ( 'operations' , 'txHash' , elasticQuery ) ;
281281
282- for ( const operation of elasticOperations ) {
283- this . processTransaction ( operation ) ;
284- }
282+ this . bulkProcessTransactions ( elasticOperations ) ;
285283
286284 return elasticOperations ;
287285 }
@@ -338,15 +336,15 @@ export class ElasticIndexerService implements IndexerInterface {
338336 . withMustMatchCondition ( 'type' , 'unsigned' )
339337 . withPagination ( { from : 0 , size : transactionHashes . length + 1 } )
340338 . withSort ( [ { name : 'timestamp' , order : ElasticSortOrder . ascending } ] )
341- . withTerms ( new TermsQuery ( 'originalTxHash' , transactionHashes ) ) ;
339+ . withMustMultiShouldCondition ( transactionHashes , hash => QueryType . Match ( 'originalTxHash' , hash ) ) ;
342340
343341 return await this . elasticService . getList ( 'operations' , 'scHash' , elasticQuery ) ;
344342 }
345343
346344 async getAccountsForAddresses ( addresses : string [ ] ) : Promise < any [ ] > {
347345 const elasticQuery : ElasticQuery = ElasticQuery . create ( )
348346 . withPagination ( { from : 0 , size : addresses . length + 1 } )
349- . withTerms ( new TermsQuery ( 'address' , addresses ) ) ;
347+ . withMustMultiShouldCondition ( addresses , address => QueryType . Match ( 'address' , address ) ) ;
350348
351349 return await this . elasticService . getList ( 'accounts' , 'address' , elasticQuery ) ;
352350 }
@@ -383,9 +381,7 @@ export class ElasticIndexerService implements IndexerInterface {
383381
384382 const results = await this . elasticService . getList ( 'operations' , 'hash' , elasticQuery ) ;
385383
386- for ( const result of results ) {
387- this . processTransaction ( result ) ;
388- }
384+ this . bulkProcessTransactions ( results ) ;
389385
390386 return results ;
391387 }
@@ -548,9 +544,7 @@ export class ElasticIndexerService implements IndexerInterface {
548544
549545 const transactions = await this . elasticService . getList ( 'operations' , 'txHash' , elasticQuery ) ;
550546
551- for ( const transaction of transactions ) {
552- this . processTransaction ( transaction ) ;
553- }
547+ this . bulkProcessTransactions ( transactions ) ;
554548
555549 return transactions ;
556550 }
@@ -561,6 +555,19 @@ export class ElasticIndexerService implements IndexerInterface {
561555 }
562556 }
563557
558+ private bulkProcessTransactions ( transactions : any [ ] ) {
559+ if ( ! transactions || transactions . length === 0 ) {
560+ return ;
561+ }
562+
563+ for ( let i = 0 ; i < transactions . length ; i ++ ) {
564+ const transaction = transactions [ i ] ;
565+ if ( transaction && ! transaction . function ) {
566+ transaction . function = transaction . operation ;
567+ }
568+ }
569+ }
570+
564571 private buildTokenFilter ( query : ElasticQuery , filter : TokenFilter ) : ElasticQuery {
565572 if ( filter . includeMetaESDT === true ) {
566573 query = query . withMustMultiShouldCondition ( [ TokenType . FungibleESDT , TokenType . MetaESDT ] , type => QueryType . Match ( 'type' , type ) ) ;
@@ -616,9 +623,7 @@ export class ElasticIndexerService implements IndexerInterface {
616623
617624 const results = await this . elasticService . getList ( 'operations' , 'hash' , elasticQuerySc ) ;
618625
619- for ( const result of results ) {
620- this . processTransaction ( result ) ;
621- }
626+ this . bulkProcessTransactions ( results ) ;
622627
623628 return results ;
624629 }
@@ -632,9 +637,11 @@ export class ElasticIndexerService implements IndexerInterface {
632637 return [ ] ;
633638 }
634639
640+ const maxSize = Math . min ( hashes . length * 10 , 1000 ) ;
641+
635642 const elasticQuery = ElasticQuery . create ( )
636643 . withMustMatchCondition ( 'type' , 'unsigned' )
637- . withPagination ( { from : 0 , size : 10000 } )
644+ . withPagination ( { from : 0 , size : maxSize } )
638645 . withSort ( [ { name : 'timestamp' , order : ElasticSortOrder . ascending } ] )
639646 . withMustMultiShouldCondition ( hashes , hash => QueryType . Match ( 'originalTxHash' , hash ) ) ;
640647
@@ -646,19 +653,19 @@ export class ElasticIndexerService implements IndexerInterface {
646653 return [ ] ;
647654 }
648655
649- const queries = identifiers . map ( ( identifier ) => QueryType . Match ( 'identifier' , identifier , QueryOperator . AND ) ) ;
650-
651656 let elasticQuery = ElasticQuery . create ( ) ;
652657
653658 if ( pagination ) {
654659 elasticQuery = elasticQuery . withPagination ( pagination ) ;
655660 }
656661
657662 elasticQuery = elasticQuery
658- . withSort ( [ { name : "balanceNum" , order : ElasticSortOrder . descending } ] )
663+ . withSort ( [
664+ { name : "balanceNum" , order : ElasticSortOrder . descending } ,
665+ { name : 'timestamp' , order : ElasticSortOrder . descending } ,
666+ ] )
659667 . withCondition ( QueryConditionOptions . mustNot , [ QueryType . Match ( 'address' , 'pending' ) ] )
660- . withCondition ( QueryConditionOptions . should , queries )
661- . withSort ( [ { name : 'timestamp' , order : ElasticSortOrder . descending } ] ) ;
668+ . withMustMultiShouldCondition ( identifiers , identifier => QueryType . Match ( 'identifier' , identifier , QueryOperator . AND ) ) ;
662669
663670 return await this . elasticService . getList ( 'accountsesdt' , 'identifier' , elasticQuery ) ;
664671 }
@@ -668,19 +675,19 @@ export class ElasticIndexerService implements IndexerInterface {
668675 return [ ] ;
669676 }
670677
671- const queries = identifiers . map ( ( identifier ) => QueryType . Match ( 'collection' , identifier , QueryOperator . AND ) ) ;
672-
673678 let elasticQuery = ElasticQuery . create ( ) ;
674679
675680 if ( pagination ) {
676681 elasticQuery = elasticQuery . withPagination ( pagination ) ;
677682 }
678683
679684 elasticQuery = elasticQuery
680- . withSort ( [ { name : "balanceNum" , order : ElasticSortOrder . descending } ] )
685+ . withSort ( [
686+ { name : "balanceNum" , order : ElasticSortOrder . descending } ,
687+ { name : 'timestamp' , order : ElasticSortOrder . descending } ,
688+ ] )
681689 . withCondition ( QueryConditionOptions . mustNot , [ QueryType . Match ( 'address' , 'pending' ) ] )
682- . withCondition ( QueryConditionOptions . should , queries )
683- . withSort ( [ { name : 'timestamp' , order : ElasticSortOrder . descending } ] ) ;
690+ . withMustMultiShouldCondition ( identifiers , identifier => QueryType . Match ( 'collection' , identifier , QueryOperator . AND ) ) ;
684691
685692 return await this . elasticService . getList ( 'accountsesdt' , 'identifier' , elasticQuery ) ;
686693 }
@@ -711,11 +718,22 @@ export class ElasticIndexerService implements IndexerInterface {
711718 ] ) ;
712719 }
713720
714- let elasticNfts = await this . elasticService . getList ( 'tokens' , 'identifier' , elasticQuery ) ;
715- if ( elasticNfts . length === 0 && identifier !== undefined ) {
716- elasticNfts = await this . elasticService . getList ( 'accountsesdt' , 'identifier' , ElasticQuery . create ( ) . withMustMatchCondition ( 'identifier' , identifier , QueryOperator . AND ) ) ;
721+ if ( identifier !== undefined ) {
722+ const [ tokensResult , accountsResult ] = await Promise . all ( [
723+ this . elasticService . getList ( 'tokens' , 'identifier' , elasticQuery ) . catch ( ( ) => [ ] ) ,
724+ this . elasticService . getList ( 'accountsesdt' , 'identifier' ,
725+ ElasticQuery . create ( )
726+ . withMustMatchCondition ( 'identifier' , identifier , QueryOperator . AND )
727+ . withPagination ( pagination )
728+ ) . catch ( ( ) => [ ] ) ,
729+ ] ) ;
730+
731+ const elasticNfts = tokensResult . length > 0 ? tokensResult : accountsResult ;
732+ return elasticNfts ;
733+ } else {
734+ const elasticNfts = await this . elasticService . getList ( 'tokens' , 'identifier' , elasticQuery ) ;
735+ return elasticNfts ;
717736 }
718- return elasticNfts ;
719737 }
720738
721739 async getTransactionBySenderAndNonce ( sender : string , nonce : number ) : Promise < any [ ] > {
@@ -819,52 +837,100 @@ export class ElasticIndexerService implements IndexerInterface {
819837 }
820838 }
821839
822- const elasticQuery = ElasticQuery . create ( )
823- . withMustExistCondition ( 'identifier' )
824- . withMustMatchCondition ( 'address' , address )
825- . withPagination ( { from : 0 , size : 0 } )
826- . withMustMatchCondition ( 'token' , filter . collection , QueryOperator . AND )
827- . withMustMultiShouldCondition ( filter . identifiers , identifier => QueryType . Match ( 'token' , identifier , QueryOperator . AND ) )
828- . withSearchWildcardCondition ( filter . search , [ 'token' , 'name' ] )
829- . withMustMultiShouldCondition ( filterTypes , type => QueryType . Match ( 'type' , type ) )
830- . withMustMultiShouldCondition ( filter . subType , subType => QueryType . Match ( 'type' , subType ) )
831- . withExtra ( {
832- aggs : {
833- collections : {
834- composite : {
835- size : 10000 ,
836- sources : [
837- {
838- collection : {
839- terms : {
840- field : 'token' ,
841- } ,
842- } ,
843- } ,
844- ] ,
840+ const data : { collection : string , count : number , balance : number } [ ] = [ ] ;
841+ let afterKey : any = null ;
842+ let remainingToSkip = pagination . from ;
843+ let remainingToCollect = pagination . size ;
844+
845+ while ( data . length < pagination . size ) {
846+ const batchSize = Math . min ( 1000 , remainingToSkip + remainingToCollect ) ;
847+
848+ const compositeAgg : any = {
849+ size : batchSize ,
850+ sources : [
851+ {
852+ collection : {
853+ terms : {
854+ field : 'token' ,
855+ order : 'asc' ,
856+ } ,
845857 } ,
846- aggs : {
847- balance : {
848- sum : {
849- field : 'balanceNum' ,
858+ } ,
859+ ] ,
860+ } ;
861+
862+ if ( afterKey ) {
863+ compositeAgg . after = afterKey ;
864+ }
865+
866+ const elasticQuery = ElasticQuery . create ( )
867+ . withMustExistCondition ( 'identifier' )
868+ . withMustMatchCondition ( 'address' , address )
869+ . withPagination ( { from : 0 , size : 0 } )
870+ . withMustMatchCondition ( 'token' , filter . collection , QueryOperator . AND )
871+ . withMustMultiShouldCondition ( filter . identifiers , identifier => QueryType . Match ( 'token' , identifier , QueryOperator . AND ) )
872+ . withSearchWildcardCondition ( filter . search , [ 'token' , 'name' ] )
873+ . withMustMultiShouldCondition ( filterTypes , type => QueryType . Match ( 'type' , type ) )
874+ . withMustMultiShouldCondition ( filter . subType , subType => QueryType . Match ( 'type' , subType ) )
875+ . withExtra ( {
876+ aggs : {
877+ collections : {
878+ composite : compositeAgg ,
879+ aggs : {
880+ balance : {
881+ sum : {
882+ field : 'balanceNum' ,
883+ } ,
850884 } ,
851885 } ,
852886 } ,
853887 } ,
854- } ,
855- } ) ;
888+ } ) ;
856889
857- const result = await this . elasticService . post ( `${ this . apiConfigService . getElasticUrl ( ) } /accountsesdt/_search` , elasticQuery . toJson ( ) ) ;
890+ const result = await this . elasticService . post ( `${ this . apiConfigService . getElasticUrl ( ) } /accountsesdt/_search` , elasticQuery . toJson ( ) ) ;
891+ const buckets = result ?. data ?. aggregations ?. collections ?. buckets || [ ] ;
892+
893+ if ( buckets . length === 0 ) {
894+ break ;
895+ }
858896
859- const buckets = result ?. data ?. aggregations ?. collections ?. buckets ;
897+ const batchData : { collection : string , count : number , balance : number } [ ] = buckets . map ( ( bucket : any ) => ( {
898+ collection : bucket . key . collection ,
899+ count : bucket . doc_count ,
900+ balance : bucket . balance . value ,
901+ } ) ) ;
860902
861- let data : { collection : string , count : number , balance : number } [ ] = buckets . map ( ( bucket : any ) => ( {
862- collection : bucket . key . collection ,
863- count : bucket . doc_count ,
864- balance : bucket . balance . value ,
865- } ) ) ;
903+ if ( remainingToSkip > 0 ) {
904+ const skipFromBatch = Math . min ( remainingToSkip , batchData . length ) ;
905+ remainingToSkip -= skipFromBatch ;
906+
907+ if ( remainingToSkip === 0 ) {
908+ const collectFromBatch = Math . min ( remainingToCollect , batchData . length - skipFromBatch ) ;
909+ data . push ( ...batchData . slice ( skipFromBatch , skipFromBatch + collectFromBatch ) ) ;
910+ remainingToCollect -= collectFromBatch ;
911+ }
912+ } else {
913+ const collectFromBatch = Math . min ( remainingToCollect , batchData . length ) ;
914+ data . push ( ...batchData . slice ( 0 , collectFromBatch ) ) ;
915+ remainingToCollect -= collectFromBatch ;
916+ }
917+
918+ if ( remainingToCollect === 0 ) {
919+ break ;
920+ }
921+
922+ const aggregations = result ?. data ?. aggregations ?. collections ;
923+ if ( aggregations ?. after_key ) {
924+ afterKey = aggregations . after_key ;
925+ } else {
926+ break ;
927+ }
928+
929+ if ( buckets . length < batchSize ) {
930+ break ;
931+ }
932+ }
866933
867- data = data . slice ( pagination . from , pagination . from + pagination . size ) ;
868934 return data ;
869935 }
870936
0 commit comments