66package org .opensearch .dataprepper .plugins .mongo .export ;
77
88import com .mongodb .MongoClientException ;
9- import com .mongodb .client .FindIterable ;
109import com .mongodb .client .MongoClient ;
1110import com .mongodb .client .MongoCollection ;
12- import com .mongodb .client .MongoCursor ;
1311import com .mongodb .client .MongoDatabase ;
1412import com .mongodb .client .model .Filters ;
13+ import com .mongodb .client .model .Projections ;
14+ import com .mongodb .client .model .Sorts ;
1515import org .bson .Document ;
16+ import org .bson .conversions .Bson ;
1617import org .opensearch .dataprepper .model .source .coordinator .PartitionIdentifier ;
1718import org .opensearch .dataprepper .model .source .coordinator .enhanced .EnhancedSourceCoordinator ;
1819import org .opensearch .dataprepper .plugins .mongo .client .BsonHelper ;
@@ -37,6 +38,9 @@ public class MongoDBExportPartitionSupplier implements Function<ExportPartition,
3738 private static final Logger LOG = LoggerFactory .getLogger (MongoDBExportPartitionSupplier .class );
3839 private static final String MONGODB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s|%s" ; // partition format: <db.collection>|<gte>|<lt>|<gteClassName>|<lteClassName>
3940 private static final String COLLECTION_SPLITTER = "\\ ." ;
41+ private static final Bson ID_PROJECTION = Projections .include ("_id" );
42+ private static final Bson ID_ASC = Sorts .ascending ("_id" );
43+ private static final Bson ID_DESC = Sorts .descending ("_id" );
4044
4145 private final MongoDBSourceConfig sourceConfig ;
4246 private final EnhancedSourceCoordinator enhancedSourceCoordinator ;
@@ -50,95 +54,143 @@ public MongoDBExportPartitionSupplier(final MongoDBSourceConfig sourceConfig,
5054 this .documentDBAggregateMetrics = documentDBAggregateMetrics ;
5155 }
5256
57+ /**
58+ * Detects whether the collection has a uniform _id type by checking the first and last documents.
59+ * If uniform, we can use a simple Filters.gt() instead of the complex $or query across all BSON types.
60+ */
61+ boolean isUniformIdType (final MongoCollection <Document > col ) {
62+ final Document first = col .find ().projection (ID_PROJECTION ).sort (ID_ASC ).limit (1 ).first ();
63+ final Document last = col .find ().projection (ID_PROJECTION ).sort (ID_DESC ).limit (1 ).first ();
64+ if (first == null || last == null ) {
65+ return true ;
66+ }
67+ final String firstType = first .get ("_id" ).getClass ().getName ();
68+ final String lastType = last .get ("_id" ).getClass ().getName ();
69+ if (BsonHelper .isClassNumber (firstType ) && BsonHelper .isClassNumber (lastType )) {
70+ return true ;
71+ }
72+ return firstType .equals (lastType );
73+ }
74+
75+ private Bson buildNextStartFilter (final Object lastLteValue , final String lteClassName , final boolean uniformType ) {
76+ if (uniformType ) {
77+ return Filters .gt ("_id" , lastLteValue );
78+ }
79+ final String lteValueString = BsonHelper .getPartitionStringFromMongoDBId (lastLteValue , lteClassName );
80+ return buildGtQuery (lteValueString , lteClassName , MAX_KEY );
81+ }
82+
83+ private void addPartition (final List <PartitionIdentifier > partitions , final String collectionDbName ,
84+ final Object gteValue , final String gteClassName ,
85+ final Object lteValue , final String lteClassName ) {
86+ final String gteValueString = BsonHelper .getPartitionStringFromMongoDBId (gteValue , gteClassName );
87+ final String lteValueString = BsonHelper .getPartitionStringFromMongoDBId (lteValue , lteClassName );
88+ LOG .debug ("Partition of {} : { gte: {} class: {}, lte: {} class {} }" ,
89+ collectionDbName , gteValueString , gteClassName , lteValueString , lteClassName );
90+ partitions .add (PartitionIdentifier .builder ()
91+ .withPartitionKey (String .format (MONGODB_PARTITION_KEY_FORMAT ,
92+ collectionDbName , gteValueString , lteValueString , gteClassName , lteClassName ))
93+ .build ());
94+ }
95+
5396 private PartitionIdentifierBatch buildPartitions (final ExportPartition exportPartition ) {
5497 documentDBAggregateMetrics .getExportApiInvocations ().increment ();
5598 final List <PartitionIdentifier > collectionPartitions = new ArrayList <>();
5699 final String collectionDbName = exportPartition .getCollection ();
57- List <String > collection = List .of (collectionDbName .split (COLLECTION_SPLITTER ));
100+ final List <String > collection = List .of (collectionDbName .split (COLLECTION_SPLITTER ));
58101 if (collection .size () < 2 ) {
59102 documentDBAggregateMetrics .getExport4xxErrors ().increment ();
60103 throw new IllegalArgumentException ("Invalid Collection Name. Must be in db.collection format" );
61104 }
62- final Optional <ExportProgressState > exportProgressStateOptional = exportPartition
63- .getProgressState ();
64- final Object lastEndDocId = exportProgressStateOptional .map (
65- ExportProgressState ::getLastEndDocId ).orElse (null );
105+
106+ final Optional <ExportProgressState > exportProgressStateOptional = exportPartition .getProgressState ();
107+ final Object lastEndDocId = exportProgressStateOptional .map (ExportProgressState ::getLastEndDocId ).orElse (null );
66108 boolean isLastBatch = false ;
67109 Object endDocId = lastEndDocId ;
110+
68111 try (MongoClient mongoClient = MongoDBConnection .getMongoClient (sourceConfig )) {
69112 final MongoDatabase db = mongoClient .getDatabase (collection .get (0 ));
70- final MongoCollection <Document > col = db .getCollection (collectionDbName .substring (collection .get (0 ).length ()+1 ));
113+ final MongoCollection <Document > col = db .getCollection (
114+ collectionDbName .substring (collection .get (0 ).length () + 1 ));
71115 final int partitionSize = exportPartition .getPartitionSize ();
72- FindIterable <Document > startIterable ;
116+
117+ final boolean uniformType = isUniformIdType (col );
118+ LOG .info ("Collection {} has {} _id type. Using {} partition query strategy." ,
119+ collectionDbName , uniformType ? "uniform" : "mixed" , uniformType ? "simple $gt" : "$or-based" );
120+
121+ Bson startFilter ;
73122 if (lastEndDocId != null ) {
74- startIterable = col .find (Filters .gt ("_id" , lastEndDocId ))
75- .projection (new Document ("_id" , 1 ))
76- .sort (new Document ("_id" , 1 ))
77- .limit (1 );
123+ startFilter = Filters .gt ("_id" , lastEndDocId );
78124 } else {
79- startIterable = col .find ()
80- .projection (new Document ("_id" , 1 ))
81- .sort (new Document ("_id" , 1 ))
82- .limit (1 );
125+ startFilter = new Document ();
83126 }
127+
84128 while (!Thread .currentThread ().isInterrupted ()) {
85- try (final MongoCursor <Document > startCursor = startIterable .iterator ()) {
86- if (!startCursor .hasNext ()) {
87- LOG .info ("No records to process or has reached end of the export partition." );
88- isLastBatch = true ;
89- break ;
90- }
91- final Document startDoc = startCursor .next ();
92- final Object gteValue = startDoc .get ("_id" );
93- final String gteClassName = gteValue .getClass ().getName ();
94-
95- // Get end doc
96- Document endDoc = startIterable .skip (partitionSize - 1 ).limit (1 ).first ();
97- if (endDoc == null ) {
98- // this means we have reached the end of the doc
99- endDoc = col .find ()
100- .projection (new Document ("_id" , 1 ))
101- .sort (new Document ("_id" , -1 ))
102- .limit (1 )
103- .first ();
104- isLastBatch = true ;
105- }
129+ final Document startDoc = col .find (startFilter )
130+ .projection (ID_PROJECTION )
131+ .sort (ID_ASC )
132+ .limit (1 )
133+ .first ();
134+
135+ if (startDoc == null ) {
136+ LOG .info ("No records to process or has reached end of the export partition." );
137+ isLastBatch = true ;
138+ break ;
139+ }
106140
107- final Object lteValue = endDoc .get ("_id" );
108- final String lteClassName = lteValue .getClass ().getName ();
109- endDocId = lteValue ;
110- final String gteValueString = BsonHelper .getPartitionStringFromMongoDBId (gteValue , gteClassName );
111- final String lteValueString = BsonHelper .getPartitionStringFromMongoDBId (lteValue , lteClassName );
112- LOG .debug ("Partition of {} : { gte: {} class: {}, lte: {} class {} }" , collectionDbName , gteValueString , gteClassName , lteValueString , lteClassName );
113- collectionPartitions .add (
114- PartitionIdentifier
115- .builder ()
116- .withPartitionKey (String .format (MONGODB_PARTITION_KEY_FORMAT , collectionDbName , gteValueString , lteValueString , gteClassName , lteClassName ))
117- .build ());
118- documentDBAggregateMetrics .getExportPartitionQueryCount ().increment ();
119-
120- if (isLastBatch ) {
141+ final Object gteValue = startDoc .get ("_id" );
142+ final String gteClassName = gteValue .getClass ().getName ();
143+
144+ final Document endDoc = col .find (Filters .gte ("_id" , gteValue ))
145+ .projection (ID_PROJECTION )
146+ .sort (ID_ASC )
147+ .skip (partitionSize - 1 )
148+ .limit (1 )
149+ .first ();
150+
151+ final Object lteValue ;
152+ final String lteClassName ;
153+
154+ if (endDoc == null ) {
155+ final Document lastDoc = col .find ()
156+ .projection (ID_PROJECTION )
157+ .sort (ID_DESC )
158+ .limit (1 )
159+ .first ();
160+ if (lastDoc == null ) {
161+ isLastBatch = true ;
121162 break ;
122163 }
164+ lteValue = lastDoc .get ("_id" );
165+ lteClassName = lteValue .getClass ().getName ();
166+ isLastBatch = true ;
167+ } else {
168+ lteValue = endDoc .get ("_id" );
169+ lteClassName = lteValue .getClass ().getName ();
170+ }
123171
124- // extend the ownership of the partition
125- enhancedSourceCoordinator .saveProgressStateForPartition (exportPartition , null );
172+ endDocId = lteValue ;
173+ addPartition (collectionPartitions , collectionDbName , gteValue , gteClassName , lteValue , lteClassName );
174+ documentDBAggregateMetrics .getExportPartitionQueryCount ().increment ();
126175
127- startIterable = col .find (buildGtQuery (lteValueString , lteClassName , MAX_KEY ))
128- .projection (new Document ("_id" , 1 ))
129- .sort (new Document ("_id" , 1 ))
130- .limit (1 );
176+ if (isLastBatch ) {
177+ break ;
131178 }
179+
180+ // extend the ownership of the partition
181+ enhancedSourceCoordinator .saveProgressStateForPartition (exportPartition , null );
182+
183+ startFilter = buildNextStartFilter (lteValue , lteClassName , uniformType );
132184 }
133185 } catch (final IllegalArgumentException | MongoClientException e ) {
134186 // IllegalArgumentException is thrown when database or collection name is not valid
135187 // MongoClientException is thrown for exceptions indicating a failure condition with the MongoClient
136188 documentDBAggregateMetrics .getExport4xxErrors ().increment ();
137- LOG .error ("Client side exception while build partitions." , e );
189+ LOG .error ("Client side exception while building partitions." , e );
138190 throw new RuntimeException (e );
139191 } catch (final Exception e ) {
140192 documentDBAggregateMetrics .getExport5xxErrors ().increment ();
141- LOG .error ("Server side exception while build partitions." , e );
193+ LOG .error ("Server side exception while building partitions." , e );
142194 throw new RuntimeException (e );
143195 }
144196
0 commit comments