Environment details
- OS type and version: Ubuntu jammy
- Java version: 21
- version(s): https://github.com/googleapis/java-pubsub-group-kafka-connector/releases/tag/v1.3.1
Steps to reproduce
- Run the connector and configure it using invalid configuration. For example invalid project, topic or credentials
Expected: The configuration is validated and I cannot post the connector config unless it is valid
Actual: Any configuration is accepted but it then fails at runtime
Example
{
"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
"topics": "Any topic",
"cps.project": "foo",
"max.retries": "1",
"metadata.publish": "true",
"gcp.credentials.json": "path to the secrets",
"name": "MyConnector",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"cps.topic": "any topic"
}
Expected: The Sink Connector fails to start
Actual: The Sink Connector starts but prints PERMISSION_DENIED errors in the logs
Stack trace
13:58:20.030 [task-thread-my-sink-connector-0] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - WorkerSinkTask{id=my-sink--0} Commit of offsets threw an unexpected exception for sequence number 13: null
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:352) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:139) ~[connect-api-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:445) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:415) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:226) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) ~[connect-runtime-4.1.0.jar:?]
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254) ~[connect-runtime-4.1.0.jar:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:596) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:555) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:111) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:67) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
at com.google.pubsub.kafka.sink.CloudPubSubSinkTask.flush(CloudPubSubSinkTask.java:350) ~[pubsub-group-kafka-connector-1.3.1.jar:1.3.1]
... 13 more
Caused by: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
Additional context
I think that the issue is here
|
try { |
|
publisher = builder.build(); |
|
} catch (Exception e) { |
|
throw new RuntimeException(e); |
|
} |
Environment details
Steps to reproduce
Expected: The configuration is validated and I cannot post the connector config unless it is valid
Actual: Any configuration is accepted but it then fails at runtime
Example
{ "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", "topics": "Any topic", "cps.project": "foo", "max.retries": "1", "metadata.publish": "true", "gcp.credentials.json": "path to the secrets", "name": "MyConnector", "value.converter.schemas.enable": "false", "errors.tolerance": "all", "cps.topic": "any topic" }Expected: The Sink Connector fails to start
Actual: The Sink Connector starts but prints
PERMISSION_DENIEDerrors in the logsStack trace
Additional context
I think that the issue is here
java-pubsub-group-kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java
Lines 425 to 429 in 2aa7a18