Skip to content

Core: Configuration errors should fail CloudPubSubSinkConnector #401

@yeikel

Description

@yeikel

Environment details

  1. OS type and version: Ubuntu jammy
  2. Java version: 21
  3. version(s): https://github.com/googleapis/java-pubsub-group-kafka-connector/releases/tag/v1.3.1

Steps to reproduce

  1. Run the connector and configure it using invalid configuration. For example invalid project, topic or credentials

Expected: The configuration is validated and I cannot post the connector config unless it is valid

Actual: Any configuration is accepted but it then fails at runtime

Example

{
	"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
	"topics": "Any topic",
	"cps.project": "foo",
	"max.retries": "1",
	"metadata.publish": "true",
	"gcp.credentials.json": "path to the secrets",
	"name": "MyConnector",
	"value.converter.schemas.enable": "false",
	"errors.tolerance": "all",
	"cps.topic": "any topic"
}

Expected: The Sink Connector fails to start
Actual: The Sink Connector starts but prints PERMISSION_DENIED errors in the logs

Stack trace

13:58:20.030 [task-thread-my-sink-connector-0] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - WorkerSinkTask{id=my-sink--0} Commit of offsets threw an unexpected exception for sequence number 13: null
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:352) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:139) ~[connect-api-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:445) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:415) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:226) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254) ~[connect-runtime-4.1.0.jar:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:596) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:555) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:111) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:67) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:350) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
... 13 more
Caused by: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.

Additional context

I think that the issue is here

try {
publisher = builder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions