Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
/**
* {@link PipeProcessor}
*
* <p>{@link PipeProcessor} is used to filter and transform the {@link Event} formed by the {@link
* PipeExtractor}.
* <p>{@link PipeProcessor} is used to filter and transform the {@link Event} supplied by the {@link
* PipeSource}.
*
* <p>The lifecycle of a {@link PipeProcessor} is as follows:
*
Expand All @@ -44,15 +44,14 @@
* to config the runtime behavior of the {@link PipeProcessor}.
* <li>While the collaboration task is in progress:
* <ul>
* <li>{@link PipeExtractor} captures the {@link Event}s and wraps them into three types of
* <li>{@link PipeSource} captures the {@link Event}s and wraps them into three types of
* {@link Event} instances.
* <li>{@link PipeProcessor} processes the {@link Event} and then passes them to the {@link
* PipeConnector}. The following 3 methods will be called: {@link
* PipeSink}. The following 3 methods will be called: {@link
* PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
* PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
* PipeProcessor#process(Event, EventCollector)}.
* <li>{@link PipeConnector} serializes the {@link Event}s into binaries and send them to
* sinks.
* <li>{@link PipeSink} serializes the {@link Event}s into binaries and sends them to sinks.
* </ul>
* <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
* PipeProcessor#close() } method will be called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
* <p>The lifecycle of a {@link PipeSource} is as follows:
*
* <ul>
* <li>When a collaboration task is created, the KV pairs of `WITH EXTRACTOR` clause in SQL are
* parsed and the validation method {@link PipeSource#validate(PipeParameterValidator)} will
* be called to validate the {@link PipeParameters}.
* <li>When a collaboration task is created, the KV pairs of `WITH SOURCE` (or the legacy `WITH
* EXTRACTOR`) clause in SQL are parsed and the validation method {@link
* PipeSource#validate(PipeParameterValidator)} will be called to validate the {@link
* PipeParameters}.
* <li>Before the collaboration task starts, the method {@link
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} will be called to
* configure the runtime behavior of the {@link PipeSource}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package org.apache.iotdb.pipe.api.customizer.parameter;

import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.PipeSink;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -38,9 +38,9 @@
import java.util.stream.Collectors;

/**
* Used in {@link PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} ,
* {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} and {@link
* PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
* Used in {@link PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)}, {@link
* PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} and {@link
* PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)}.
*
* <p>This class is used to parse the parameters in WITH SOURCE, WITH PROCESSOR and WITH SINK when
* creating a pipe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public final class SubscriptionMessages {

// --- SubscriptionPollRequest ---
public static final String UNEXPECTED_REQUEST_TYPE =
"unexpected request type: {}, payload will be null";
"Unexpected request type: {}, payload will be null";

// --- SubscriptionPollResponse ---
public static final String UNEXPECTED_RESPONSE_TYPE =
"unexpected response type: {}, payload will be null";
"Unexpected response type: {}, payload will be null";

// --- IdentifierUtils ---
public static final String NULL_IDENTIFIER_NOT_SUPPORTED = "null identifier is not supported";
Expand All @@ -41,55 +41,56 @@ public final class SubscriptionMessages {

// --- AbstractSubscriptionPushConsumer ---
public static final String PUSH_CONSUMER_CANCEL_AUTO_POLL =
"SubscriptionPushConsumer {} cancel auto poll worker";
"SubscriptionPushConsumer {} is canceling the auto-poll worker";
public static final String PUSH_CONSUMER_SUBMIT_AUTO_POLL =
"SubscriptionPushConsumer {} submit auto poll worker";
"SubscriptionPushConsumer {} is submitting the auto-poll worker";
public static final String CONSUMER_LISTENER_FAILURE =
"Consumer listener result failure when consuming message: {}";
public static final String AUTO_POLL_UNEXPECTED = "something unexpected happened when auto poll messages...";
"Consumer listener returned failure when consuming message: {}";
public static final String AUTO_POLL_UNEXPECTED =
"Unexpected exception while auto-polling messages.";

// --- SubscriptionExecutorServiceManager ---
public static final String EXECUTOR_LAUNCHING = "Launching {} with core pool size {}...";
public static final String EXECUTOR_SHUTTING_DOWN = "Shutting down {}...";
public static final String EXECUTOR_NOT_LAUNCHED_SUBMIT =
"{} has not been launched, ignore submit task";
"{} has not been launched, ignoring submitted task";
public static final String EXECUTOR_NOT_LAUNCHED_INVOKE =
"{} has not been launched, ignore invoke all tasks";
"{} has not been launched, ignoring invokeAll tasks";
public static final String EXECUTOR_NOT_LAUNCHED_ZERO =
"{} has not been launched, return zero";
"{} has not been launched, returning zero";
public static final String EXECUTOR_NOT_LAUNCHED_SCHEDULE =
"{} has not been launched, ignore scheduleWithFixedDelay for task";
"{} has not been launched, ignoring scheduleWithFixedDelay task";

// --- AbstractSubscriptionProviders ---
public static final String PROVIDER_CLOSE_FAILED =
"Failed to close subscription provider {} because of {}";
public static final String ADD_NEW_PROVIDER = "add new subscription provider {}";
public static final String CLOSE_STALE_PROVIDER = "close and remove stale subscription provider {}";
"Failed to close subscription provider {} because {}";
public static final String ADD_NEW_PROVIDER = "Adding new subscription provider {}";
public static final String CLOSE_STALE_PROVIDER = "Closing and removing stale subscription provider {}";
public static final String OPEN_PROVIDERS_FAILED =
"Failed to open providers for consumer {} because of {}";
"Failed to open providers for consumer {} because {}";
public static final String FETCH_ENDPOINTS_FAILED =
"Failed to fetch all endpoints for consumer {} because of {}";
"Failed to fetch all endpoints for consumer {} because {}";

// --- AbstractSubscriptionPullConsumer ---
public static final String PULL_CONSUMER_CANCEL_AUTO_COMMIT =
"SubscriptionPullConsumer {} cancel auto commit worker";
"SubscriptionPullConsumer {} is canceling the auto-commit worker";
public static final String PULL_CONSUMER_SUBMIT_AUTO_COMMIT =
"SubscriptionPullConsumer {} submit auto commit worker";
"SubscriptionPullConsumer {} is submitting the auto-commit worker";
public static final String AUTO_COMMIT_UNEXPECTED =
"something unexpected happened when auto commit messages...";
"Unexpected exception while auto-committing messages.";
public static final String COMMIT_DURING_CLOSE_UNEXPECTED =
"something unexpected happened when commit messages during close";
"Unexpected exception while committing messages during close.";

// --- AbstractSubscriptionConsumer ---
public static final String UNEXPECTED_RESPONSE_TYPE_WARN = "unexpected response type: {}";
public static final String UNEXPECTED_RESPONSE_TYPE_WARN = "Unexpected response type: {}";
public static final String CONSUMER_CANCEL_HEARTBEAT_WORKER =
"SubscriptionConsumer {} cancel heartbeat worker";
"SubscriptionConsumer {} is canceling the heartbeat worker";
public static final String CONSUMER_SUBMIT_HEARTBEAT_WORKER =
"SubscriptionConsumer {} submit heartbeat worker";
"SubscriptionConsumer {} is submitting the heartbeat worker";
public static final String CONSUMER_CANCEL_ENDPOINTS_SYNCER =
"SubscriptionConsumer {} cancel endpoints syncer";
"SubscriptionConsumer {} is canceling the endpoints syncer";
public static final String CONSUMER_SUBMIT_ENDPOINTS_SYNCER =
"SubscriptionConsumer {} submit endpoints syncer";
"SubscriptionConsumer {} is submitting the endpoints syncer";

private SubscriptionMessages() {}
}
Loading
Loading