Skip to content

Commit d9ab51d

Browse files
authored
[To dev/1.3] Pipe: Change the name of the Extractor plugin to Source &&Change the name of the Connector plugin to Sink (#16034) (#16045)
* update extractor plugin name * update connector plugin name * fix
1 parent a0b4978 commit d9ab51d

300 files changed

Lines changed: 2127 additions & 2213 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java renamed to integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
@RunWith(IoTDBTestRunner.class)
5353
@Category({MultiClusterIT2AutoCreateSchema.class})
54-
public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
54+
public class IoTDBPipeSinkCompressionIT extends AbstractPipeDualAutoIT {
5555

5656
@Override
5757
@Before

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorParallelIT.java renamed to integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkParallelIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
@RunWith(IoTDBTestRunner.class)
4343
@Category({MultiClusterIT2AutoCreateSchema.class})
44-
public class IoTDBPipeConnectorParallelIT extends AbstractPipeDualAutoIT {
44+
public class IoTDBPipeSinkParallelIT extends AbstractPipeDualAutoIT {
4545
@Test
4646
public void testIoTConnectorParallel() throws Exception {
4747
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java renamed to integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
@RunWith(IoTDBTestRunner.class)
5555
@Category({MultiClusterIT2AutoCreateSchema.class})
56-
public class IoTDBPipeExtractorIT extends AbstractPipeDualAutoIT {
56+
public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT {
5757

5858
@Before
5959
public void setUp() {

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2525
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
2626
import org.apache.iotdb.commons.pipe.config.PipeConfig;
27-
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
28-
import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
29-
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
30-
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
31-
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
32-
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
33-
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
34-
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
35-
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
36-
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
27+
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
28+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
29+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
30+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
31+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
32+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
33+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
34+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
35+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealReq;
36+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
3737
import org.apache.iotdb.isession.SessionConfig;
3838
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
3939
import org.apache.iotdb.pipe.api.exception.PipeException;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
2828
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2929
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
30-
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
30+
import org.apache.iotdb.confignode.manager.pipe.source.ConfigRegionListeningFilter;
3131
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
3232
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
3333
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.apache.iotdb.commons.path.PartialPath;
5050
import org.apache.iotdb.commons.path.PathPatternTree;
5151
import org.apache.iotdb.commons.path.PathPatternUtil;
52-
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
52+
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
5353
import org.apache.iotdb.commons.schema.SchemaConstant;
5454
import org.apache.iotdb.commons.schema.ttl.TTLCache;
5555
import org.apache.iotdb.commons.service.metric.MetricService;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.PipePluginAgent;
23-
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeExtractorConstructor;
2523
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
24+
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
25+
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSourceConstructor;
2626
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
2727

2828
public class PipeConfigNodePluginAgent extends PipePluginAgent {
@@ -32,9 +32,9 @@ public PipeConfigNodePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
3232
}
3333

3434
@Override
35-
protected PipeExtractorConstructor createPipeExtractorConstructor(
35+
protected PipeSourceConstructor createPipeExtractorConstructor(
3636
PipePluginMetaKeeper pipePluginMetaKeeper) {
37-
return new PipeConfigRegionExtractorConstructor();
37+
return new PipeConfigRegionSourceConstructor();
3838
}
3939

4040
@Override
@@ -44,8 +44,8 @@ protected PipeProcessorConstructor createPipeProcessorConstructor(
4444
}
4545

4646
@Override
47-
protected PipeConnectorConstructor createPipeConnectorConstructor(
47+
protected PipeSinkConstructor createPipeConnectorConstructor(
4848
PipePluginMetaKeeper pipePluginMetaKeeper) {
49-
return new PipeConfigRegionConnectorConstructor();
49+
return new PipeConfigRegionSinkConstructor();
5050
}
5151
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java renamed to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,50 @@
2020
package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
23-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor;
25-
import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionAirGapConnector;
26-
import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionConnector;
23+
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
24+
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
25+
import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink;
26+
import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink;
2727
import org.apache.iotdb.pipe.api.PipeConnector;
2828

29-
class PipeConfigRegionConnectorConstructor extends PipeConnectorConstructor {
29+
class PipeConfigRegionSinkConstructor extends PipeSinkConstructor {
3030

3131
@Override
3232
protected void initConstructors() {
3333
pluginConstructors.put(
34-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
35-
IoTDBConfigRegionConnector::new);
34+
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(), IoTDBConfigRegionSink::new);
3635
pluginConstructors.put(
3736
BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName(),
38-
IoTDBConfigRegionConnector::new);
37+
IoTDBConfigRegionSink::new);
3938
pluginConstructors.put(
4039
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
41-
IoTDBConfigRegionConnector::new);
40+
IoTDBConfigRegionSink::new);
4241
pluginConstructors.put(
4342
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
44-
IoTDBConfigRegionConnector::new);
43+
IoTDBConfigRegionSink::new);
4544
pluginConstructors.put(
4645
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
47-
IoTDBConfigRegionAirGapConnector::new);
46+
IoTDBConfigRegionAirGapSink::new);
4847
pluginConstructors.put(
49-
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new);
48+
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
5049

5150
pluginConstructors.put(
52-
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), IoTDBConfigRegionConnector::new);
51+
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
5352
pluginConstructors.put(
54-
BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName(),
55-
IoTDBConfigRegionConnector::new);
53+
BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
5654
pluginConstructors.put(
57-
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(),
58-
IoTDBConfigRegionConnector::new);
55+
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
5956
pluginConstructors.put(
60-
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(),
61-
IoTDBConfigRegionConnector::new);
57+
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
6258
pluginConstructors.put(
63-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
64-
IoTDBConfigRegionAirGapConnector::new);
59+
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBConfigRegionAirGapSink::new);
6560
pluginConstructors.put(
66-
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new);
61+
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
6762
}
6863

6964
@Override
7065
public PipeConnector reflectPluginByKey(String pluginKey) {
7166
// TODO: support constructing plugin by reflection
72-
return (PipeConnector)
73-
pluginConstructors.getOrDefault(pluginKey, DoNothingConnector::new).get();
67+
return (PipeConnector) pluginConstructors.getOrDefault(pluginKey, DoNothingSink::new).get();
7468
}
7569
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionExtractorConstructor.java renamed to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSourceConstructor.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,29 @@
2020
package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
23-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.donothing.DoNothingExtractor;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeExtractorConstructor;
25-
import org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
23+
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource;
24+
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSourceConstructor;
25+
import org.apache.iotdb.confignode.manager.pipe.source.IoTDBConfigRegionSource;
2626
import org.apache.iotdb.pipe.api.PipeExtractor;
2727

28-
class PipeConfigRegionExtractorConstructor extends PipeExtractorConstructor {
28+
class PipeConfigRegionSourceConstructor extends PipeSourceConstructor {
2929

3030
@Override
3131
protected void initConstructors() {
3232
pluginConstructors.put(
33-
BuiltinPipePlugin.DO_NOTHING_EXTRACTOR.getPipePluginName(), DoNothingExtractor::new);
33+
BuiltinPipePlugin.DO_NOTHING_EXTRACTOR.getPipePluginName(), DoNothingSource::new);
3434
pluginConstructors.put(
35-
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName(), IoTDBConfigRegionExtractor::new);
35+
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName(), IoTDBConfigRegionSource::new);
3636

3737
pluginConstructors.put(
38-
BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), DoNothingExtractor::new);
38+
BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), DoNothingSource::new);
3939
pluginConstructors.put(
40-
BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), IoTDBConfigRegionExtractor::new);
40+
BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), IoTDBConfigRegionSource::new);
4141
}
4242

4343
@Override
4444
public PipeExtractor reflectPluginByKey(String pluginKey) {
4545
// TODO: support constructing plugin by reflection
46-
return (PipeExtractor)
47-
pluginConstructors.getOrDefault(pluginKey, DoNothingExtractor::new).get();
46+
return (PipeExtractor) pluginConstructors.getOrDefault(pluginKey, DoNothingSource::new).get();
4847
}
4948
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
package org.apache.iotdb.confignode.manager.pipe.agent.receiver;
2121

22-
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
2322
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
2423
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
24+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
2525
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
2626
import org.apache.iotdb.confignode.manager.pipe.receiver.protocol.IoTDBConfigNodeReceiver;
2727

@@ -37,7 +37,7 @@ public class IoTDBConfigNodeReceiverAgent extends IoTDBReceiverAgent {
3737
@Override
3838
protected void initConstructors() {
3939
RECEIVER_CONSTRUCTORS.put(
40-
IoTDBConnectorRequestVersion.VERSION_1.getVersion(), IoTDBConfigNodeReceiver::new);
40+
IoTDBSinkRequestVersion.VERSION_1.getVersion(), IoTDBConfigNodeReceiver::new);
4141
}
4242

4343
@Override

0 commit comments

Comments
 (0)