@@ -229,6 +229,18 @@ pub async fn test_metastore_acquire_shards<
229229 )
230230 . await ;
231231
232+ // Publish tokens are the ULID of the indexing plan that minted them; ULIDs are time-ordered, so
233+ // lexicographic order is chronological order. A token containing '/' is the legacy pre-ULID
234+ // format: it loses to a ULID when recorded, but always wins when presented, so a rolling
235+ // upgrade can still hand a shard back to an old indexer.
236+ const OLDER_TOKEN : & str = "01000000000000000000000000" ;
237+ const TOKEN : & str = "02000000000000000000000000" ;
238+ const NEWER_TOKEN : & str = "03000000000000000000000000" ;
239+ const LEGACY_TOKEN : & str =
240+ "indexer/test-node/test-index:0/test-source/01000000000000000000000000" ;
241+
242+ // Shard 1 owned by `TOKEN`, shard 2 unowned, shard 3 owned by a legacy token, shard 4 owned by
243+ // `TOKEN`.
232244 let shards = vec ! [
233245 Shard {
234246 index_uid: Some ( test_index. index_uid. clone( ) ) ,
@@ -239,7 +251,7 @@ pub async fn test_metastore_acquire_shards<
239251 follower_id: Some ( "test-ingester-bar" . to_string( ) ) ,
240252 doc_mapping_uid: Some ( DocMappingUid :: default ( ) ) ,
241253 publish_position_inclusive: Some ( Position :: Beginning ) ,
242- publish_token: Some ( "test-publish-token-foo" . to_string( ) ) ,
254+ publish_token: Some ( TOKEN . to_string( ) ) ,
243255 update_timestamp: 1724158996 ,
244256 } ,
245257 Shard {
@@ -251,7 +263,7 @@ pub async fn test_metastore_acquire_shards<
251263 follower_id: Some ( "test-ingester-qux" . to_string( ) ) ,
252264 doc_mapping_uid: Some ( DocMappingUid :: default ( ) ) ,
253265 publish_position_inclusive: Some ( Position :: Beginning ) ,
254- publish_token: Some ( "test-publish-token-bar" . to_string ( ) ) ,
266+ publish_token: None ,
255267 update_timestamp: 1724158996 ,
256268 } ,
257269 Shard {
@@ -263,7 +275,7 @@ pub async fn test_metastore_acquire_shards<
263275 follower_id: Some ( "test-ingester-baz" . to_string( ) ) ,
264276 doc_mapping_uid: Some ( DocMappingUid :: default ( ) ) ,
265277 publish_position_inclusive: Some ( Position :: Beginning ) ,
266- publish_token: None ,
278+ publish_token: Some ( LEGACY_TOKEN . to_string ( ) ) ,
267279 update_timestamp: 1724158996 ,
268280 } ,
269281 Shard {
@@ -275,150 +287,113 @@ pub async fn test_metastore_acquire_shards<
275287 follower_id: Some ( "test-ingester-tux" . to_string( ) ) ,
276288 doc_mapping_uid: Some ( DocMappingUid :: default ( ) ) ,
277289 publish_position_inclusive: Some ( Position :: Beginning ) ,
278- publish_token: None ,
290+ publish_token: Some ( TOKEN . to_string ( ) ) ,
279291 update_timestamp: 1724158996 ,
280292 } ,
281293 ] ;
282294 metastore
283295 . insert_shards ( & test_index. index_uid , & test_index. source_id , shards)
284296 . await ;
285297
286- // Test acquire shards.
287- let acquire_shards_request = AcquireShardsRequest {
288- index_uid : Some ( test_index. index_uid . clone ( ) ) ,
289- source_id : test_index. source_id . clone ( ) ,
290- shard_ids : vec ! [
291- ShardId :: from( 1 ) ,
292- ShardId :: from( 2 ) ,
293- ShardId :: from( 3 ) ,
294- ShardId :: from( 666 ) ,
295- ] , // shard 666 does not exist
296- publish_token : "test-publish-token-foo" . to_string ( ) ,
297- } ;
298- let mut acquire_shards_response = metastore
299- . acquire_shards ( acquire_shards_request)
300- . await
301- . unwrap ( ) ;
302-
303- acquire_shards_response
304- . acquired_shards
305- . sort_unstable_by ( |left, right| left. shard_id ( ) . cmp ( right. shard_id ( ) ) ) ;
306-
307- let shard = & acquire_shards_response. acquired_shards [ 0 ] ;
308- assert_eq ! ( shard. index_uid( ) , & test_index. index_uid) ;
309- assert_eq ! ( shard. source_id, test_index. source_id) ;
310- assert_eq ! ( shard. shard_id( ) , ShardId :: from( 1 ) ) ;
311- assert_eq ! ( shard. shard_state( ) , ShardState :: Closed ) ;
312- assert_eq ! ( shard. leader_id, "test-ingester-foo" ) ;
313- assert_eq ! ( shard. follower_id( ) , "test-ingester-bar" ) ;
314- assert_eq ! ( shard. publish_position_inclusive( ) , Position :: Beginning ) ;
315- assert_eq ! ( shard. publish_token( ) , "test-publish-token-foo" ) ;
316-
317- let shard = & acquire_shards_response. acquired_shards [ 1 ] ;
318- assert_eq ! ( shard. index_uid( ) , & test_index. index_uid) ;
319- assert_eq ! ( shard. source_id, test_index. source_id) ;
320- assert_eq ! ( shard. shard_id( ) , ShardId :: from( 2 ) ) ;
321- assert_eq ! ( shard. shard_state( ) , ShardState :: Open ) ;
322- assert_eq ! ( shard. leader_id, "test-ingester-bar" ) ;
323- assert_eq ! ( shard. follower_id( ) , "test-ingester-qux" ) ;
324- assert_eq ! ( shard. publish_position_inclusive( ) , Position :: Beginning ) ;
325- assert_eq ! ( shard. publish_token( ) , "test-publish-token-foo" ) ;
326-
327- let shard = & acquire_shards_response. acquired_shards [ 2 ] ;
328- assert_eq ! ( shard. index_uid( ) , & test_index. index_uid) ;
329- assert_eq ! ( shard. source_id, test_index. source_id) ;
330- assert_eq ! ( shard. shard_id( ) , ShardId :: from( 3 ) ) ;
331- assert_eq ! ( shard. shard_state( ) , ShardState :: Open ) ;
332- assert_eq ! ( shard. leader_id, "test-ingester-qux" ) ;
333- assert_eq ! ( shard. follower_id( ) , "test-ingester-baz" ) ;
334- assert_eq ! ( shard. publish_position_inclusive( ) , Position :: Beginning ) ;
335- assert_eq ! ( shard. publish_token( ) , "test-publish-token-foo" ) ;
336-
337- // A token ranking below the recorded one is refused: the shard is left untouched and absent
338- // from the response.
298+ // A token ranking below the recorded one — and a non-existent shard — are refused: both are
299+ // omitted from the response and the recorded token is left untouched.
339300 let acquire_shards_response = metastore
340301 . acquire_shards ( AcquireShardsRequest {
341302 index_uid : Some ( test_index. index_uid . clone ( ) ) ,
342303 source_id : test_index. source_id . clone ( ) ,
343- shard_ids : vec ! [ ShardId :: from( 1 ) ] ,
344- publish_token : "test-publish-token-aaa" . to_string ( ) ,
304+ shard_ids : vec ! [ ShardId :: from( 1 ) , ShardId :: from ( 666 ) ] ,
305+ publish_token : OLDER_TOKEN . to_string ( ) ,
345306 } )
346307 . await
347308 . unwrap ( ) ;
348309 assert ! ( acquire_shards_response. acquired_shards. is_empty( ) ) ;
349310
350- // The same token re-acquires successfully (idempotent, e.g. after a local respawn).
311+ // The same token re-acquires successfully (idempotent, e.g. after a local respawn); the full
312+ // shard is returned.
351313 let acquire_shards_response = metastore
352314 . acquire_shards ( AcquireShardsRequest {
353315 index_uid : Some ( test_index. index_uid . clone ( ) ) ,
354316 source_id : test_index. source_id . clone ( ) ,
355317 shard_ids : vec ! [ ShardId :: from( 1 ) ] ,
356- publish_token : "test-publish-token-foo" . to_string ( ) ,
318+ publish_token : TOKEN . to_string ( ) ,
357319 } )
358320 . await
359321 . unwrap ( ) ;
360322 assert_eq ! ( acquire_shards_response. acquired_shards. len( ) , 1 ) ;
361- assert_eq ! (
362- acquire_shards_response. acquired_shards[ 0 ] . publish_token( ) ,
363- "test-publish-token-foo"
364- ) ;
323+ let shard = & acquire_shards_response. acquired_shards [ 0 ] ;
324+ assert_eq ! ( shard. index_uid( ) , & test_index. index_uid) ;
325+ assert_eq ! ( shard. source_id, test_index. source_id) ;
326+ assert_eq ! ( shard. shard_id( ) , ShardId :: from( 1 ) ) ;
327+ assert_eq ! ( shard. shard_state( ) , ShardState :: Closed ) ;
328+ assert_eq ! ( shard. leader_id, "test-ingester-foo" ) ;
329+ assert_eq ! ( shard. follower_id( ) , "test-ingester-bar" ) ;
330+ assert_eq ! ( shard. publish_position_inclusive( ) , Position :: Beginning ) ;
331+ assert_eq ! ( shard. publish_token( ) , TOKEN ) ;
365332
366- // A strictly greater token takes the shard over.
333+ // A strictly newer ULID takes the shard over.
367334 let acquire_shards_response = metastore
368335 . acquire_shards ( AcquireShardsRequest {
369336 index_uid : Some ( test_index. index_uid . clone ( ) ) ,
370337 source_id : test_index. source_id . clone ( ) ,
371338 shard_ids : vec ! [ ShardId :: from( 1 ) ] ,
372- publish_token : "test-publish-token-zzz" . to_string ( ) ,
339+ publish_token : NEWER_TOKEN . to_string ( ) ,
373340 } )
374341 . await
375342 . unwrap ( ) ;
376343 assert_eq ! ( acquire_shards_response. acquired_shards. len( ) , 1 ) ;
377344 assert_eq ! (
378345 acquire_shards_response. acquired_shards[ 0 ] . publish_token( ) ,
379- "test-publish-token-zzz"
346+ NEWER_TOKEN
380347 ) ;
381348
382- // Legacy (pre-ULID, '/'-containing) tokens rank below any ULID. Shard 4 starts unowned: a
383- // legacy token can claim it, but is then superseded by a ULID and can no longer take it back.
384- let legacy_token = "indexer/test-node/test-index:0/test-source/01000000000000000000000000" ;
385- let ulid_token = "02000000000000000000000000" ;
349+ // An unowned shard can be acquired by any ULID.
386350 let acquire_shards_response = metastore
387351 . acquire_shards ( AcquireShardsRequest {
388352 index_uid : Some ( test_index. index_uid . clone ( ) ) ,
389353 source_id : test_index. source_id . clone ( ) ,
390- shard_ids : vec ! [ ShardId :: from( 4 ) ] ,
391- publish_token : legacy_token . to_string ( ) ,
354+ shard_ids : vec ! [ ShardId :: from( 2 ) ] ,
355+ publish_token : TOKEN . to_string ( ) ,
392356 } )
393357 . await
394358 . unwrap ( ) ;
395359 assert_eq ! ( acquire_shards_response. acquired_shards. len( ) , 1 ) ;
360+ assert_eq ! (
361+ acquire_shards_response. acquired_shards[ 0 ] . publish_token( ) ,
362+ TOKEN
363+ ) ;
396364
365+ // A ULID supersedes a recorded legacy token.
397366 let acquire_shards_response = metastore
398367 . acquire_shards ( AcquireShardsRequest {
399368 index_uid : Some ( test_index. index_uid . clone ( ) ) ,
400369 source_id : test_index. source_id . clone ( ) ,
401- shard_ids : vec ! [ ShardId :: from( 4 ) ] ,
402- publish_token : ulid_token . to_string ( ) ,
370+ shard_ids : vec ! [ ShardId :: from( 3 ) ] ,
371+ publish_token : TOKEN . to_string ( ) ,
403372 } )
404373 . await
405374 . unwrap ( ) ;
406375 assert_eq ! ( acquire_shards_response. acquired_shards. len( ) , 1 ) ;
407376 assert_eq ! (
408377 acquire_shards_response. acquired_shards[ 0 ] . publish_token( ) ,
409- ulid_token
378+ TOKEN
410379 ) ;
411380
381+ // Shard 4 is owned by a ULID, yet a legacy token reclaims it: the rolling-upgrade hand-back,
382+ // where the control plane moves a shard from a new indexer back to an old one.
412383 let acquire_shards_response = metastore
413384 . acquire_shards ( AcquireShardsRequest {
414385 index_uid : Some ( test_index. index_uid . clone ( ) ) ,
415386 source_id : test_index. source_id . clone ( ) ,
416387 shard_ids : vec ! [ ShardId :: from( 4 ) ] ,
417- publish_token : legacy_token . to_string ( ) ,
388+ publish_token : LEGACY_TOKEN . to_string ( ) ,
418389 } )
419390 . await
420391 . unwrap ( ) ;
421- assert ! ( acquire_shards_response. acquired_shards. is_empty( ) ) ;
392+ assert_eq ! ( acquire_shards_response. acquired_shards. len( ) , 1 ) ;
393+ assert_eq ! (
394+ acquire_shards_response. acquired_shards[ 0 ] . publish_token( ) ,
395+ LEGACY_TOKEN
396+ ) ;
422397
423398 cleanup_index ( & mut metastore, test_index. index_uid ) . await ;
424399}
0 commit comments