4949import org .testng .annotations .BeforeMethod ;
5050import org .testng .annotations .DataProvider ;
5151import org .testng .annotations .Factory ;
52- import org .testng .annotations .Ignore ;
5352import org .testng .annotations .Test ;
5453import reactor .core .publisher .Flux ;
5554import reactor .core .publisher .Mono ;
@@ -107,17 +106,17 @@ public static Object[][] changeFeedQueryCompleteAfterAvailableNowDataProvider()
107106 @ DataProvider (name = "changeFeedQueryEndLSNDataProvider" )
108107 public static Object [][] changeFeedQueryEndLSNDataProvider () {
109108 return new Object [][]{
110- // container RU, continuous ingest items, partition count
109+ // container RU, continuous ingest items
111110 // number of docs from cf, documents to write
112111
113112 // endLSN is less than number of documents
114- { 400 , true , 1 , 3 , 6 },
115- { 400 , false , 1 , 3 , 6 },
113+ { 400 , true , 3 , 6 },
114+ { 400 , false , 3 , 6 },
116115 // endLSN is equal to number of documents
117- { 400 , false , 1 , 3 , 3 },
116+ { 400 , false , 3 , 3 },
118117 // both partitions have more than the endLSN
119- { 11000 , true , 5 , 6 , 30 },
120- { 11000 , false , 5 , 6 , 30 },
118+ { 11000 , true , 6 , 30 },
119+ { 11000 , false , 6 , 30 },
121120 };
122121 }
123122
@@ -947,12 +946,10 @@ public void split_only_notModified() throws Exception {
947946 assertThat (stateAfterLastDrainAttempt .getContinuation ().getCompositeContinuationTokens ()).hasSize (3 );
948947 }
949948
950- @ Ignore
951949 @ Test (groups = { "fast" }, dataProvider = "changeFeedQueryEndLSNDataProvider" , timeOut = 100 * TIMEOUT )
952950 public void changeFeedQueryCompleteAfterEndLSN (
953951 int throughput ,
954952 boolean shouldContinuouslyIngestItems ,
955- int partitionCount ,
956953 int expectedDocs ,
957954 int docsToWrite ) {
958955 String testContainerId = UUID .randomUUID ().toString ();
@@ -969,7 +966,7 @@ public void changeFeedQueryCompleteAfterEndLSN(
969966 List <FeedRange > feedRanges = testContainer .getFeedRanges ().block ();
970967 AtomicInteger currentPageCount = new AtomicInteger (0 );
971968
972- List <String > partitionKeys = insertDocumentsCore ( partitionCount , docsToWrite , testContainer );
969+ List <String > partitionKeys = insertDocumentsIntoTwoPartitions ( docsToWrite , testContainer );
973970 CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions =
974971 CosmosChangeFeedRequestOptions .createForProcessingFromBeginning (FeedRange .forFullRange ());
975972 ImplementationBridgeHelpers .CosmosChangeFeedRequestOptionsHelper .getCosmosChangeFeedRequestOptionsAccessor ()
@@ -1146,6 +1143,42 @@ List<String> insertDocumentsCore(
11461143 return partitionKeys ;
11471144 }
11481145
1146+ List <String > insertDocumentsIntoTwoPartitions (
1147+ int documentCount ,
1148+ CosmosAsyncContainer container ) {
1149+
1150+ List <ObjectNode > docs = new ArrayList <>();
1151+ List <String > partitionKeys = new ArrayList <>();
1152+
1153+ // these partition keys will land on two different partitions for hash v2
1154+ String partitionKey1 = "pk-1" ;
1155+ String partitionKey2 = "pk-8fbakldbas" ;
1156+ for (int j = 0 ; j < documentCount ; j ++) {
1157+ docs .add (getDocumentDefinition (partitionKey1 ));
1158+ docs .add (getDocumentDefinition (partitionKey2 ));
1159+ }
1160+ partitionKeys .add (partitionKey1 );
1161+ partitionKeys .add (partitionKey2 );
1162+
1163+ ArrayList <Mono <CosmosItemResponse <ObjectNode >>> result = new ArrayList <>();
1164+ for (ObjectNode jsonNodes : docs ) {
1165+ result .add (container .createItem (jsonNodes ));
1166+ }
1167+
1168+ List <ObjectNode > insertedDocs = Flux .merge (
1169+ Flux .fromIterable (result ),
1170+ 2 )
1171+ .map (CosmosItemResponse ::getItem ).collectList ().block ();
1172+
1173+ for (ObjectNode doc : insertedDocs ) {
1174+ partitionKeyToDocuments .put (
1175+ doc .get (PARTITION_KEY_FIELD_NAME ).textValue (),
1176+ doc );
1177+ }
1178+ logger .info ("FINISHED INSERT" );
1179+ return partitionKeys ;
1180+ }
1181+
11491182 void deleteDocuments (
11501183 int partitionCount ,
11511184 int documentCount ) {
0 commit comments