diff --git a/README.md b/README.md
index fa063ab9..fd63f9c4 100644
--- a/README.md
+++ b/README.md
@@ -198,6 +198,7 @@ configurations:
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
+| cps.useEmulator | Boolean | false | When true, use the Pub/Sub emulator instead of the production service. The emulator endpoint will be determined by the PUBSUB_EMULATOR_HOST environment variable, or fallback to the cps.endpoint configuration. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxBufferBytes | Long | 9,500,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
diff --git a/pom.xml b/pom.xml
index 0060acdf..e08a4e16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,10 @@
com.google.api
gax-grpc
+
+ io.grpc
+ grpc-api
+
com.google.auth
google-auth-library-credentials
diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java
index 70044257..63ad7c0c 100644
--- a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java
+++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java
@@ -29,6 +29,8 @@ public class ConnectorUtils {
public static final String CPS_TOPIC_CONFIG = "cps.topic";
public static final String CPS_ENDPOINT = "cps.endpoint";
public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443";
+ public static final String CPS_USE_EMULATOR = "cps.useEmulator";
+ public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key";
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";
diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java
index d57988a2..9b8fcc8a 100644
--- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java
+++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java
@@ -287,7 +287,13 @@ public ConfigDef config() {
Type.STRING,
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
Importance.LOW,
- "The Pub/Sub endpoint to use.");
+ "The Pub/Sub endpoint to use.")
+ .define(
+ ConnectorUtils.CPS_USE_EMULATOR,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ "When true, use the Pub/Sub emulator instead of the production service.");
}
@Override
diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
index ce5fe152..d9abd6de 100644
--- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
+++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
@@ -69,6 +69,7 @@ public class CloudPubSubSinkTask extends SinkTask {
private String cpsProject;
private String cpsTopic;
private String cpsEndpoint;
+ private boolean useEmulator;
private String messageBodyName;
private long maxBufferSize;
private long maxBufferBytes;
@@ -118,6 +119,7 @@ public void start(Map props) {
cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString();
cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString();
+ useEmulator = (Boolean) validatedProps.get(ConnectorUtils.CPS_USE_EMULATOR);
maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG);
maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG);
maxOutstandingRequestBytes =
@@ -399,7 +401,6 @@ private void createPublisher() {
com.google.cloud.pubsub.v1.Publisher.Builder builder =
com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
- .setCredentialsProvider(gcpCredentialsProvider)
.setBatchingSettings(batchingSettings.build())
.setRetrySettings(
RetrySettings.newBuilder()
@@ -413,8 +414,26 @@ private void createPublisher() {
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setRpcTimeoutMultiplier(2)
.build())
- .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()))
- .setEndpoint(cpsEndpoint);
+ .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()));
+
+ // Configure endpoint, credentials and channel based on whether we're using emulator or
+ // production
+ if (useEmulator) {
+ // For emulator: use PUBSUB_EMULATOR_HOST env var, fallback to configured cps.endpoint, then
+ // default
+ String emulatorHost = System.getenv(ConnectorUtils.PUBSUB_EMULATOR_HOST);
+ String endpoint = emulatorHost != null ? emulatorHost : cpsEndpoint;
+ builder
+ .setCredentialsProvider(com.google.api.gax.core.NoCredentialsProvider.create())
+ .setChannelProvider(
+ com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.newBuilder()
+ .setEndpoint(endpoint)
+ .setChannelConfigurator(channel -> channel.usePlaintext())
+ .build());
+ } else {
+ // For production: use configured credentials and endpoint
+ builder.setCredentialsProvider(gcpCredentialsProvider).setEndpoint(cpsEndpoint);
+ }
if (orderingKeySource != OrderingKeySource.NONE) {
builder.setEnableMessageOrdering(true);
}
diff --git a/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java
index d25918f3..e763703b 100644
--- a/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java
+++ b/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java
@@ -16,6 +16,7 @@
package com.google.pubsub.kafka.sink;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -38,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -612,6 +614,44 @@ public void testPublisherShutdownOnStop() throws Exception {
verify(publisher, times(1)).awaitTermination(maxShutdownTimeoutMs, TimeUnit.MILLISECONDS);
}
+ /** Tests that the emulator configuration is properly defined and parsed. */
+ @Test
+ public void testEmulatorConfiguration() {
+ CloudPubSubSinkConnector connector = new CloudPubSubSinkConnector();
+ ConfigDef configDef = connector.config();
+
+ assertTrue(
+ "Emulator configuration should be defined",
+ configDef.names().contains(ConnectorUtils.CPS_USE_EMULATOR));
+
+ Map emulatorProps = new HashMap<>();
+ emulatorProps.put(ConnectorUtils.CPS_TOPIC_CONFIG, CPS_TOPIC);
+ emulatorProps.put(ConnectorUtils.CPS_PROJECT_CONFIG, CPS_PROJECT);
+ emulatorProps.put(ConnectorUtils.CPS_USE_EMULATOR, "true");
+
+ Map parsedProps = configDef.parse(emulatorProps);
+ assertTrue(
+ "Emulator should be enabled", (Boolean) parsedProps.get(ConnectorUtils.CPS_USE_EMULATOR));
+ }
+
+ @Test
+ public void testCreatePublisherWithEmulatorEnabled() {
+ props.put(ConnectorUtils.CPS_USE_EMULATOR, "true");
+ props.put(ConnectorUtils.CPS_ENDPOINT, "localhost:8085");
+ CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher);
+ task.start(props);
+ assertEquals(CloudPubSubSinkTask.class, task.getClass());
+ }
+
+ @Test
+ public void testCreatePublisherWithEmulatorDisabled() {
+ props.put(ConnectorUtils.CPS_USE_EMULATOR, "false");
+ props.put(ConnectorUtils.CPS_ENDPOINT, "pubsub.googleapis.com:443");
+ CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher);
+ task.start(props);
+ assertEquals(CloudPubSubSinkTask.class, task.getClass());
+ }
+
/** Get some sample SinkRecords's to use in the tests. */
private List getSampleRecords() {
List records = new ArrayList<>();