Skip to content

Commit b876851

Browse files
authored
refactor: remove deprecated curator ServiceAnnouncer stuff (#19523)
1 parent e61fddc commit b876851

52 files changed

Lines changed: 157 additions & 1435 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/configuration/index.md

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,6 @@ Druid interacts with ZooKeeper through a set of standard path configurations. We
156156
|`druid.zk.paths.base`|Base ZooKeeper path.|`/druid`|
157157
|`druid.zk.paths.coordinatorPath`|Used by the Coordinator for leader election.|`${druid.zk.paths.base}/coordinator`|
158158

159-
The following path is used for service discovery. It is **not** affected by `druid.zk.paths.base` and **must** be specified separately.
160-
161-
|Property|Description|Default|
162-
|--------|-----------|-------|
163-
|`druid.discovery.curator.path`|Services announce themselves under this ZooKeeper path.|`/druid/discovery`|
164-
165159
### TLS
166160

167161
#### General configuration

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
3636
import org.apache.druid.segment.indexing.DataSchema;
3737
import org.apache.druid.segment.realtime.ChatHandlerProvider;
38-
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
3938
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
4039
import org.apache.druid.server.security.Action;
4140
import org.apache.druid.server.security.Resource;
@@ -160,7 +159,7 @@ private static ObjectMapper createObjectMapper()
160159
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
161160
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
162161
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
163-
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
162+
binder.bind(ChatHandlerProvider.class).toInstance(new ChatHandlerProvider());
164163
binder.bind(RowIngestionMetersFactory.class).toInstance(new DropwizardRowIngestionMetersFactory());
165164
binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager());
166165
}

indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
435435
// ParallelIndexSupervisorTask because it doesn't support APIs for live ingestion reports.
436436
log.warn("Chat handler is already registered. Skipping chat handler registration.");
437437
} else {
438-
toolbox.getChatHandlerProvider().register(getId(), this, false);
438+
toolbox.getChatHandlerProvider().register(getId(), this);
439439
}
440440

441441
this.authorizerMapper = toolbox.getAuthorizerMapper();

indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
522522
Preconditions.checkNotNull(toolbox.getChatHandlerProvider(), "chatHandlerProvider").getClass().getName()
523523
);
524524
authorizerMapper = toolbox.getAuthorizerMapper();
525-
toolbox.getChatHandlerProvider().register(getId(), this, false);
525+
toolbox.getChatHandlerProvider().register(getId(), this);
526526

527527
// the lineage-based segment allocation protocol must be used as the legacy protocol has a critical bug
528528
// (see SinglePhaseParallelIndexTaskRunner.allocateNewSegment()). However, we tell subtasks to use

indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
242242
}
243243
this.authorizerMapper = toolbox.getAuthorizerMapper();
244244

245-
toolbox.getChatHandlerProvider().register(getId(), this, false);
245+
toolbox.getChatHandlerProvider().register(getId(), this);
246246

247247
rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
248248
parseExceptionHandler = new ParseExceptionHandler(

indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import com.google.common.base.Optional;
2424
import com.google.inject.Inject;
2525
import org.apache.druid.client.indexing.IndexingService;
26-
import org.apache.druid.curator.discovery.ServiceAnnouncer;
2726
import org.apache.druid.discovery.DruidLeaderSelector;
28-
import org.apache.druid.guice.annotations.Self;
2927
import org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
3028
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
3129
import org.apache.druid.indexing.common.task.TaskContextEnricher;
@@ -42,7 +40,6 @@
4240
import org.apache.druid.java.util.emitter.EmittingLogger;
4341
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
4442
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
45-
import org.apache.druid.server.DruidNode;
4643
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
4744

4845
import java.util.concurrent.atomic.AtomicReference;
@@ -67,10 +64,9 @@ public class DruidOverlord
6764
private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<>(null);
6865

6966
/**
70-
* Indicates that all services have been started and the node can now announce
71-
* itself with {@link ServiceAnnouncer#announce}. This must be set to false
72-
* as soon as {@link DruidLeaderSelector.Listener#stopBeingLeader()} is
73-
* called.
67+
* Indicates that all services have been started and the node is ready to serve
68+
* leader-only HTTP routes. This must be set to false as soon as
69+
* {@link DruidLeaderSelector.Listener#stopBeingLeader()} is called.
7470
*/
7571
private volatile boolean initialized;
7672

@@ -83,9 +79,7 @@ public DruidOverlord(
8379
final GlobalTaskLockbox taskLockbox,
8480
final TaskStorage taskStorage,
8581
final TaskActionClientFactory taskActionClientFactory,
86-
@Self final DruidNode selfNode,
8782
final TaskRunnerFactory runnerFactory,
88-
final ServiceAnnouncer serviceAnnouncer,
8983
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
9084
final ServiceEmitter emitter,
9185
final SupervisorManager supervisorManager,
@@ -103,9 +97,6 @@ public DruidOverlord(
10397
this.segmentMetadataCache = segmentMetadataCache;
10498
this.coordinatorOverlordServiceConfig = coordinatorOverlordServiceConfig;
10599

106-
final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
107-
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());
108-
109100
this.leadershipListener = new DruidLeaderSelector.Listener()
110101
{
111102
@Override
@@ -173,15 +164,13 @@ public void start()
173164
compactionScheduler.becomeLeader();
174165
scheduledBatchTaskManager.start();
175166

176-
// Announce the node only after all the services have been initialized
167+
// Mark ready only after all the services have been initialized
177168
initialized = true;
178-
serviceAnnouncer.announce(node);
179169
}
180170

181171
@Override
182172
public void stop()
183173
{
184-
serviceAnnouncer.unannounce(node);
185174
scheduledBatchTaskManager.stop();
186175
compactionScheduler.stopBeingLeader();
187176
taskMaster.downgradeToHalfLeader();

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
423423
initializeSequences();
424424

425425
log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName());
426-
toolbox.getChatHandlerProvider().register(task.getId(), this, false);
426+
toolbox.getChatHandlerProvider().register(task.getId(), this);
427427

428428
runThread = Thread.currentThread();
429429

indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.apache.druid.segment.loading.SegmentLoaderConfig;
5454
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
5555
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
56-
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
56+
import org.apache.druid.segment.realtime.ChatHandlerProvider;
5757
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
5858
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
5959
import org.apache.druid.server.DruidNode;
@@ -150,7 +150,7 @@ public void setUp() throws IOException
150150
new NoopTestTaskReportFileWriter(),
151151
null,
152152
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
153-
new NoopChatHandlerProvider(),
153+
new ChatHandlerProvider(),
154154
new DropwizardRowIngestionMetersFactory(),
155155
new TestAppenderatorsManager(),
156156
new NoopOverlordClient(),

indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
4848
import org.apache.druid.segment.loading.LocalLoadSpec;
4949
import org.apache.druid.segment.realtime.ChatHandlerProvider;
50-
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
5150
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
5251
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
5352
import org.apache.druid.server.security.AuthConfig;
@@ -94,7 +93,7 @@ public TestUtils()
9493
.addValue(ExprMacroTable.class, LookupEnabledTestExprMacroTable.INSTANCE)
9594
.addValue(IndexIO.class, indexIO)
9695
.addValue(ObjectMapper.class, jsonMapper)
97-
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
96+
.addValue(ChatHandlerProvider.class, new ChatHandlerProvider())
9897
.addValue(AuthConfig.class, new AuthConfig())
9998
.addValue(AuthorizerMapper.class, null)
10099
.addValue(RowIngestionMetersFactory.class, rowIngestionMetersFactory)

indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
6161
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
6262
import org.apache.druid.segment.realtime.ChatHandlerProvider;
63-
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
6463
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
6564
import org.apache.druid.segment.transform.CompactionTransformSpec;
6665
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -174,7 +173,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa
174173
ImmutableList.of(
175174
binder -> {
176175
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
177-
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
176+
binder.bind(ChatHandlerProvider.class).toInstance(new ChatHandlerProvider());
178177
binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
179178
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
180179
binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(TestIndex.INDEX_IO, objectMapper));

0 commit comments

Comments
 (0)