Skip to content

Commit 23b703f

Browse files
committed
Re-work: keep one publish token, thread through indexing pipeline
1 parent 0dd42c7 commit 23b703f

31 files changed

Lines changed: 471 additions & 256 deletions

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ impl IndexingScheduler {
420420
) {
421421
debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan");
422422
APPLY_PLAN_TOTAL.inc();
423-
// The indexing plan ID is a monotonically increased time based ID that's used as the
423+
// The indexing plan ID is a monotonically increasing time based ID that's used as the
424424
// publish token for indexers, which ensures indexing plans and shard acquisition are always
425425
// informed by the most recent plan.
426426
let indexing_plan_id = Ulid::new().to_string();
@@ -446,12 +446,15 @@ impl IndexingScheduler {
446446
IngesterStatus::Retiring | IngesterStatus::Decommissioning
447447
)
448448
.then_some(APPLY_INDEXING_PLAN_TIMEOUT);
449+
449450
let notify_on_drop = notify_on_drop.clone();
450451
let indexing_plan_id = indexing_plan_id.clone();
451452
tokio::spawn(async move {
452453
let client = indexer.client.clone();
453-
let apply_plan_fut =
454-
client.apply_indexing_plan(ApplyIndexingPlanRequest { indexing_tasks, indexing_plan_id });
454+
let apply_plan_fut = client.apply_indexing_plan(ApplyIndexingPlanRequest {
455+
indexing_tasks,
456+
indexing_plan_id,
457+
});
455458
let apply_result = match apply_deadline {
456459
Some(timeout) => tokio::time::timeout(timeout, apply_plan_fut).await,
457460
None => Ok(apply_plan_fut.await),

quickwit/quickwit-indexing/src/actors/doc_processor.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ use tokio::runtime::Handle;
4141
use super::vrl_processing::*;
4242
use crate::actors::Indexer;
4343
use crate::metrics::{PROCESSED_BYTES, PROCESSED_DOCS_TOTAL};
44-
use crate::models::{
45-
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
46-
};
44+
use crate::models::{NewPublishLock, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch};
4745

4846
const PLAIN_TEXT: &str = "plain_text";
4947
pub(super) struct JsonDoc {
@@ -607,20 +605,6 @@ impl Handler<NewPublishLock> for DocProcessor {
607605
}
608606
}
609607

610-
#[async_trait]
611-
impl Handler<NewPublishToken> for DocProcessor {
612-
type Reply = ();
613-
614-
async fn handle(
615-
&mut self,
616-
message: NewPublishToken,
617-
ctx: &ActorContext<Self>,
618-
) -> Result<(), ActorExitStatus> {
619-
ctx.send_message(&self.indexer_mailbox, message).await?;
620-
Ok(())
621-
}
622-
}
623-
624608
#[cfg(test)]
625609
mod tests {
626610
use std::sync::Arc;

quickwit/quickwit-indexing/src/actors/index_serializer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
8989
splits,
9090
checkpoint_delta_opt: batch_builder.checkpoint_delta_opt,
9191
publish_lock: batch_builder.publish_lock,
92-
publish_token_opt: batch_builder.publish_token_opt,
9392
merge_task_opt: None,
9493
batch_parent_span: batch_builder.batch_parent_span,
9594
};

quickwit/quickwit-indexing/src/actors/indexer.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics};
3838
use quickwit_proto::metastore::{
3939
LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient,
4040
};
41-
use quickwit_proto::types::{DocMappingUid, PublishToken};
41+
use quickwit_proto::types::DocMappingUid;
4242
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
4343
use serde::Serialize;
4444
use tantivy::schema::Schema;
@@ -55,7 +55,7 @@ use super::cooperative_indexing::{CooperativeIndexingCycle, CooperativeIndexingP
5555
use crate::metrics::SPLIT_BUILDERS;
5656
use crate::models::{
5757
CommitTrigger, EmptySplit, IndexedSplitBatchBuilder, IndexedSplitBuilder, NewPublishLock,
58-
NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock,
58+
ProcessedDoc, ProcessedDocBatch, PublishLock,
5959
};
6060

6161
// Random partition ID used to gather partitions exceeding the maximum number of partitions.
@@ -93,7 +93,6 @@ struct IndexerState {
9393
indexing_directory: TempDirectory,
9494
indexing_settings: IndexingSettings,
9595
publish_lock: PublishLock,
96-
publish_token_opt: Option<PublishToken>,
9796
schema: Schema,
9897
doc_mapping_uid: DocMappingUid,
9998
tokenizer_manager: TokenizerManager,
@@ -219,7 +218,6 @@ impl IndexerState {
219218
source_delta: SourceCheckpointDelta::default(),
220219
};
221220
let publish_lock = self.publish_lock.clone();
222-
let publish_token_opt = self.publish_token_opt.clone();
223221

224222
let split_builders_guard = GaugeGuard::new(&SPLIT_BUILDERS, 1.0);
225223

@@ -231,7 +229,6 @@ impl IndexerState {
231229
other_indexed_split_opt: None,
232230
checkpoint_delta,
233231
publish_lock,
234-
publish_token_opt,
235232
last_delete_opstamp,
236233
memory_usage: GaugeGuard::new(&IN_FLIGHT_INDEX_WRITER, 0.0),
237234
cooperative_indexing_period,
@@ -349,7 +346,6 @@ struct IndexingWorkbench {
349346

350347
checkpoint_delta: IndexCheckpointDelta,
351348
publish_lock: PublishLock,
352-
publish_token_opt: Option<PublishToken>,
353349
// On workbench creation, we fetch from the metastore the last delete task opstamp.
354350
// We use this value to set the `delete_opstamp` of the workbench splits.
355351
last_delete_opstamp: u64,
@@ -513,21 +509,6 @@ impl Handler<NewPublishLock> for Indexer {
513509
}
514510
}
515511

516-
#[async_trait]
517-
impl Handler<NewPublishToken> for Indexer {
518-
type Reply = ();
519-
520-
async fn handle(
521-
&mut self,
522-
message: NewPublishToken,
523-
_ctx: &ActorContext<Self>,
524-
) -> Result<(), ActorExitStatus> {
525-
let NewPublishToken(publish_token) = message;
526-
self.indexer_state.publish_token_opt = Some(publish_token);
527-
Ok(())
528-
}
529-
}
530-
531512
impl Indexer {
532513
pub fn new(
533514
pipeline_id: IndexingPipelineId,
@@ -563,7 +544,6 @@ impl Indexer {
563544
indexing_directory,
564545
indexing_settings,
565546
publish_lock: PublishLock::default(),
566-
publish_token_opt: None,
567547
schema,
568548
doc_mapping_uid: doc_mapper.doc_mapping_uid(),
569549
tokenizer_manager: tokenizer_manager.tantivy_manager().clone(),
@@ -630,7 +610,6 @@ impl Indexer {
630610
other_indexed_split_opt,
631611
checkpoint_delta,
632612
publish_lock,
633-
publish_token_opt,
634613
batch_parent_span,
635614
memory_usage,
636615
split_builders_guard,
@@ -656,7 +635,6 @@ impl Indexer {
656635
index_uid: self.indexer_state.pipeline_id.index_uid.clone(),
657636
checkpoint_delta,
658637
publish_lock,
659-
publish_token_opt,
660638
batch_parent_span,
661639
},
662640
)
@@ -680,7 +658,6 @@ impl Indexer {
680658
splits,
681659
checkpoint_delta_opt: Some(checkpoint_delta),
682660
publish_lock,
683-
publish_token_opt,
684661
commit_trigger,
685662
batch_parent_span,
686663
memory_usage,

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::actors::uploader::UploaderType;
4646
use crate::actors::{Publisher, Uploader};
4747
use crate::merge_policy::MergePolicy;
4848
use crate::metrics::{ACTOR_NAME, BACKPRESSURE_MICROS, INDEXING_PIPELINES};
49-
use crate::models::IndexingStatistics;
49+
use crate::models::{IndexingStatistics, SharedPublishToken};
5050
use crate::source::{
5151
AssignShards, Assignment, SourceActor, SourceRuntime, quickwit_supported_sources,
5252
};
@@ -92,6 +92,7 @@ pub struct IndexingPipeline {
9292
// Id of the last indexing plan assigned to this pipeline. Kept here, like `shard_ids`, so it
9393
// can be re-sent to the source on respawn; the source adopts it as its publish token.
9494
indexing_plan_id: String,
95+
publish_token: SharedPublishToken,
9596
_indexing_pipelines_gauge_guard: GaugeGuard,
9697
}
9798

@@ -141,6 +142,7 @@ impl IndexingPipeline {
141142
},
142143
shard_ids: Default::default(),
143144
indexing_plan_id: String::new(),
145+
publish_token: SharedPublishToken::default(),
144146
_indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard,
145147
}
146148
}
@@ -310,6 +312,7 @@ impl IndexingPipeline {
310312
self.params.metastore.clone(),
311313
Some(self.params.merge_planner_mailbox.clone()),
312314
Some(source_mailbox.clone()),
315+
self.publish_token.clone(),
313316
);
314317
let (publisher_mailbox, publisher_handle) = ctx
315318
.spawn_actor()
@@ -394,6 +397,7 @@ impl IndexingPipeline {
394397
storage_resolver: self.params.source_storage_resolver.clone(),
395398
event_broker: self.params.event_broker.clone(),
396399
indexing_setting: self.params.indexing_settings.clone(),
400+
publish_token: self.publish_token.clone(),
397401
};
398402
let source = ctx
399403
.protect_future(quickwit_supported_sources().load_source(source_runtime))

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use quickwit_proto::metastore::{
5252
ListIndexesMetadataRequest, ListSplitsRequest, MetastoreResult, MetastoreService,
5353
MetastoreServiceClient,
5454
};
55-
use quickwit_proto::types::{IndexId, IndexUid, NodeId, PipelineUid, ShardId};
55+
use quickwit_proto::types::{IndexId, IndexUid, NodeId, PipelineUid, PublishToken, ShardId};
5656
use quickwit_storage::StorageResolver;
5757
use serde::{Deserialize, Serialize};
5858
use time::OffsetDateTime;
@@ -110,6 +110,7 @@ pub struct IndexingService {
110110
pub(crate) ingester_pool: IngesterPool,
111111
pub(crate) storage_resolver: StorageResolver,
112112
indexing_pipelines: HashMap<PipelineUid, BoxedPipelineHandle>,
113+
latest_indexing_plan_id: PublishToken,
113114
counters: IndexingServiceCounters,
114115
local_split_store: Arc<IndexingSplitCache>,
115116
pub(crate) max_concurrent_split_uploads: usize,
@@ -182,6 +183,7 @@ impl IndexingService {
182183
storage_resolver,
183184
local_split_store: Arc::new(local_split_store),
184185
indexing_pipelines: Default::default(),
186+
latest_indexing_plan_id: String::new(),
185187
counters: Default::default(),
186188
max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads,
187189
#[cfg(feature = "metrics")]
@@ -789,6 +791,17 @@ impl IndexingService {
789791
plan_request: ApplyIndexingPlanRequest,
790792
ctx: &ActorContext<Self>,
791793
) -> Result<(), IndexingError> {
794+
// Plan ids are ULIDs
795+
if plan_request.indexing_plan_id < self.latest_indexing_plan_id {
796+
info!(
797+
dropped_plan_id = %plan_request.indexing_plan_id,
798+
latest_plan_id = %self.latest_indexing_plan_id,
799+
"ignoring stale indexing plan"
800+
);
801+
return Ok(());
802+
}
803+
self.latest_indexing_plan_id = plan_request.indexing_plan_id.clone();
804+
792805
let pipeline_diff = self.compute_pipeline_diff(&plan_request.indexing_tasks);
793806

794807
if !pipeline_diff.pipelines_to_shutdown.is_empty() {
@@ -1684,6 +1697,110 @@ mod tests {
16841697
universe.assert_quit().await;
16851698
}
16861699

1700+
#[tokio::test]
1701+
async fn test_indexing_service_drops_superseded_plan() {
1702+
quickwit_common::setup_logging_for_tests();
1703+
let transport = ChitchatTransport::default();
1704+
let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)
1705+
.await
1706+
.unwrap();
1707+
let metastore = metastore_for_test();
1708+
1709+
let index_id = append_random_suffix("test-plan-gate");
1710+
let index_uri = format!("ram:///indexes/{index_id}");
1711+
let index_config = IndexConfig::for_test(&index_id, &index_uri);
1712+
let create_index_request =
1713+
CreateIndexRequest::try_from_index_config(&index_config).unwrap();
1714+
let index_uid: IndexUid = metastore
1715+
.create_index(create_index_request)
1716+
.await
1717+
.unwrap()
1718+
.index_uid()
1719+
.clone();
1720+
1721+
let source_config = SourceConfig {
1722+
source_id: "test-plan-gate--source".to_string(),
1723+
num_pipelines: NonZeroUsize::MIN,
1724+
enabled: true,
1725+
source_params: SourceParams::void(),
1726+
transform_config: None,
1727+
input_format: SourceInputFormat::Json,
1728+
};
1729+
let add_source_request =
1730+
AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap();
1731+
metastore.add_source(add_source_request).await.unwrap();
1732+
1733+
let universe = Universe::new();
1734+
let temp_dir = tempfile::tempdir().unwrap();
1735+
let (indexing_service, indexing_service_handle) = spawn_indexing_service_for_test(
1736+
temp_dir.path(),
1737+
&universe,
1738+
metastore.clone(),
1739+
cluster.clone(),
1740+
)
1741+
.await;
1742+
1743+
let params_fingerprint =
1744+
indexing_pipeline_params_fingerprint(&index_config, &source_config);
1745+
let task = |pipeline_uid: u128| IndexingTask {
1746+
index_uid: Some(index_uid.clone()),
1747+
source_id: source_config.source_id.clone(),
1748+
shard_ids: Vec::new(),
1749+
pipeline_uid: Some(PipelineUid::for_test(pipeline_uid)),
1750+
params_fingerprint,
1751+
};
1752+
1753+
indexing_service
1754+
.ask_for_res(ApplyIndexingPlanRequest {
1755+
indexing_tasks: vec![task(0), task(1)],
1756+
indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5F50".to_string(),
1757+
})
1758+
.await
1759+
.unwrap();
1760+
assert_eq!(
1761+
indexing_service_handle
1762+
.observe()
1763+
.await
1764+
.num_running_pipelines,
1765+
2
1766+
);
1767+
1768+
// A superseded (older id) plan that would drop a pipeline is ignored.
1769+
indexing_service
1770+
.ask_for_res(ApplyIndexingPlanRequest {
1771+
indexing_tasks: vec![task(0)],
1772+
indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5F40".to_string(),
1773+
})
1774+
.await
1775+
.unwrap();
1776+
assert_eq!(
1777+
indexing_service_handle
1778+
.observe()
1779+
.await
1780+
.num_running_pipelines,
1781+
2
1782+
);
1783+
1784+
// A newer plan applies, dropping the second pipeline.
1785+
indexing_service
1786+
.ask_for_res(ApplyIndexingPlanRequest {
1787+
indexing_tasks: vec![task(0)],
1788+
indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5F60".to_string(),
1789+
})
1790+
.await
1791+
.unwrap();
1792+
assert_eq!(
1793+
indexing_service_handle
1794+
.observe()
1795+
.await
1796+
.num_running_pipelines,
1797+
1
1798+
);
1799+
1800+
indexing_service_handle.quit().await;
1801+
universe.assert_quit().await;
1802+
}
1803+
16871804
#[tokio::test]
16881805
async fn test_indexing_service_shutdown_merge_pipeline_when_no_indexing_pipeline() {
16891806
quickwit_common::setup_logging_for_tests();

0 commit comments

Comments
 (0)