Skip to content

Commit ad5cba0

Browse files
committed
debezium/dbz#1858 Add support for Spanner Omni
1 parent 4a36cb1 commit ad5cba0

7 files changed

Lines changed: 137 additions & 8 deletions

File tree

src/main/java/io/debezium/connector/spanner/SpannerConnectorConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,4 +320,20 @@ public String syncTopicMaxMessageSize() {
320320
public Duration getAwaitTaskAnswerTimeout() {
321321
return getConfig().getDuration(TASK_AWAIT_ANSWER_TIMEOUT, ChronoUnit.MILLIS);
322322
}
323+
324+
public String spannerOmniEndpoint() {
325+
return getConfig().getString(SPANNER_OMNI_ENDPOINT_PROPERTY_NAME);
326+
}
327+
328+
public boolean usePlainText() {
329+
return getConfig().getBoolean(SPANNER_OMNI_USE_PLAINTEXT_PROPERTY_NAME);
330+
}
331+
332+
public String clientKeyPath() {
333+
return getConfig().getString(SPANNER_OMNI_CLIENT_KEY_PATH);
334+
}
335+
336+
public String clientCertPath() {
337+
return getConfig().getString(SPANNER_OMNI_CLIENT_CERT_PATH);
338+
}
323339
}

src/main/java/io/debezium/connector/spanner/config/BaseSpannerConnectorConfig.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ public abstract class BaseSpannerConnectorConfig extends CommonConnectorConfig {
4747
protected static final String END_TIME_PROPERTY_NAME = "gcp.spanner.end.time";
4848
protected static final String GCP_SPANNER_CREDENTIALS_PATH_PROPERTY_NAME = "gcp.spanner.credentials.path";
4949
protected static final String GCP_SPANNER_CREDENTIALS_JSON_PROPERTY_NAME = "gcp.spanner.credentials.json";
50+
51+
protected static final String SPANNER_OMNI_ENDPOINT_PROPERTY_NAME = "spanner.omni.endpoint";
52+
protected static final String SPANNER_OMNI_USE_PLAINTEXT_PROPERTY_NAME = "spanner.omni.use.plaintext";
53+
protected static final String SPANNER_OMNI_CLIENT_KEY_PATH_PROPERTY_NAME = "spanner.omni.client.key.path";
54+
protected static final String SPANNER_OMNI_CLIENT_CERT_PATH_PROPERTY_NAME = "spanner.omni.client.cert.path";
55+
5056
private static final String STREAM_EVENT_QUEUE_CAPACITY_PROPERTY_NAME = "gcp.spanner.stream.event.queue.capacity";
5157

5258
private static final String TASK_STATE_CHANGE_EVENT_QUEUE_CAPACITY_PROPERTY_NAME = "connector.spanner.task.state.change.event.queue.capacity";
@@ -584,6 +590,38 @@ public abstract class BaseSpannerConnectorConfig extends CommonConnectorConfig {
584590
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
585591
.withDefault(SpannerSourceInfoStructMaker.class.getName());
586592

593+
public static final Field SPANNER_OMNI_ENDPOINT = Field.create(SPANNER_OMNI_ENDPOINT_PROPERTY_NAME)
594+
.withDisplayName("SpannerOmniEndpoint")
595+
.withType(Type.STRING)
596+
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 7))
597+
.withWidth(Width.MEDIUM)
598+
.withImportance(Importance.LOW)
599+
.withDescription("Spanner Omni endpoint");
600+
601+
public static final Field SPANNER_OMNI_USE_PLAINTEXT = Field.create(SPANNER_OMNI_USE_PLAINTEXT_PROPERTY_NAME)
602+
.withDisplayName("SpannerOmniUsePlaintext")
603+
.withType(Type.BOOLEAN)
604+
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 8))
605+
.withWidth(Width.SHORT)
606+
.withImportance(Importance.LOW)
607+
.withDescription("Whether to use plaintext for Spanner Omni connection");
608+
609+
public static final Field SPANNER_OMNI_CLIENT_KEY_PATH = Field.create(SPANNER_OMNI_CLIENT_KEY_PATH_PROPERTY_NAME)
610+
.withDisplayName("SpannerOmniClientKeyPath")
611+
.withType(Type.STRING)
612+
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 9))
613+
.withWidth(Width.MEDIUM)
614+
.withImportance(Importance.LOW)
615+
.withDescription("Path to the client key file for Spanner Omni connection");
616+
617+
public static final Field SPANNER_OMNI_CLIENT_CERT_PATH = Field.create(SPANNER_OMNI_CLIENT_CERT_PATH_PROPERTY_NAME)
618+
.withDisplayName("SpannerOmniClientCertPath")
619+
.withType(Type.STRING)
620+
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 10))
621+
.withWidth(Width.MEDIUM)
622+
.withImportance(Importance.LOW)
623+
.withDescription("Path to the client certificate file for Spanner Omni connection");
624+
587625
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
588626
.name("Spanner")
589627
.type(PROJECT_ID)
@@ -597,6 +635,10 @@ public abstract class BaseSpannerConnectorConfig extends CommonConnectorConfig {
597635
SPANNER_CREDENTIALS_JSON,
598636
SPANNER_HOST,
599637
SPANNER_EMULATOR_HOST,
638+
SPANNER_OMNI_ENDPOINT,
639+
SPANNER_OMNI_USE_PLAINTEXT,
640+
SPANNER_OMNI_CLIENT_KEY_PATH,
641+
SPANNER_OMNI_CLIENT_CERT_PATH,
600642
STREAM_EVENT_QUEUE_CAPACITY,
601643
TASK_STATE_CHANGE_EVENT_QUEUE_CAPACITY,
602644
VALUE_CAPTURE_MODE,

src/main/java/io/debezium/connector/spanner/config/validation/ChangeStreamValidator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_CREDENTIALS_PATH;
1515
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_EMULATOR_HOST;
1616
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_HOST;
17+
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_OMNI_CLIENT_CERT_PATH;
18+
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_OMNI_CLIENT_KEY_PATH;
19+
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_OMNI_ENDPOINT;
20+
import static io.debezium.connector.spanner.config.BaseSpannerConnectorConfig.SPANNER_OMNI_USE_PLAINTEXT;
1721
import static org.slf4j.LoggerFactory.getLogger;
1822

1923
import org.slf4j.Logger;
@@ -71,7 +75,11 @@ public ChangeStreamValidator validate() {
7175
context.getString(SPANNER_CREDENTIALS_PATH),
7276
context.getString(SPANNER_HOST),
7377
context.getString(SPANNER_EMULATOR_HOST),
74-
context.getString(DATABASE_ROLE));
78+
context.getString(DATABASE_ROLE),
79+
context.getString(SPANNER_OMNI_ENDPOINT),
80+
Boolean.parseBoolean(context.getString(SPANNER_OMNI_USE_PLAINTEXT)),
81+
context.getString(SPANNER_OMNI_CLIENT_KEY_PATH),
82+
context.getString(SPANNER_OMNI_CLIENT_CERT_PATH));
7583

7684
this.result = isStreamExist(databaseClientFactory.getDatabaseClient(), changeStreamName);
7785

src/main/java/io/debezium/connector/spanner/db/DatabaseClientFactory.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DatabaseClientFactory {
3838
private final String instanceId;
3939
private final String databaseId;
4040

41+
private final String SPANNER_OMNI_DEFAULT_ID = "default";
4142
private final SpannerOptions options;
4243
private volatile Spanner spanner;
4344

@@ -46,8 +47,23 @@ public class DatabaseClientFactory {
4647
public DatabaseClientFactory(String projectId, String instanceId, String databaseId,
4748
String credentialsJson,
4849
String credentialsPath, String host, String emulatorHost, String databaseRole) {
49-
this.projectId = projectId;
50-
this.instanceId = instanceId;
50+
51+
this(projectId, instanceId, databaseId, credentialsJson, credentialsPath, host, emulatorHost, databaseRole, null, false, null, null);
52+
}
53+
54+
public DatabaseClientFactory(String projectId, String instanceId, String databaseId,
55+
String credentialsJson,
56+
String credentialsPath, String host, String emulatorHost, String databaseRole, String spannerOmniEndpoint, boolean usePlainText,
57+
String clientKeyPath, String clientCertPath) {
58+
59+
if (Strings.isNullOrEmpty(spannerOmniEndpoint)) {
60+
this.projectId = projectId;
61+
this.instanceId = instanceId;
62+
}
63+
else {
64+
this.projectId = SPANNER_OMNI_DEFAULT_ID;
65+
this.instanceId = SPANNER_OMNI_DEFAULT_ID;
66+
}
5167
this.databaseId = databaseId;
5268

5369
SpannerOptions.Builder builder = SpannerOptions.newBuilder();
@@ -58,7 +74,19 @@ public DatabaseClientFactory(String projectId, String instanceId, String databas
5874
if (!Strings.isNullOrEmpty(host)) {
5975
builder.setHost(host);
6076
}
61-
if (!Strings.isNullOrEmpty(emulatorHost)) {
77+
if (!Strings.isNullOrEmpty(spannerOmniEndpoint)) {
78+
builder.setExperimentalHost(spannerOmniEndpoint);
79+
builder.setCredentials(NoCredentials.getInstance());
80+
builder.setBuiltInMetricsEnabled(false);
81+
if (usePlainText) {
82+
builder.setCredentials(NoCredentials.getInstance());
83+
builder.usePlainText();
84+
}
85+
else if (!Strings.isNullOrEmpty(clientCertPath) && !Strings.isNullOrEmpty(clientKeyPath)) {
86+
builder.useClientCert(clientCertPath, clientKeyPath);
87+
}
88+
}
89+
else if (!Strings.isNullOrEmpty(emulatorHost)) {
6290
builder.setEmulatorHost(emulatorHost);
6391
builder.setCredentials(NoCredentials.getInstance());
6492
}
@@ -68,7 +96,7 @@ public DatabaseClientFactory(String projectId, String instanceId, String databas
6896
}
6997
}
7098

71-
if (!Strings.isNullOrEmpty(databaseRole)) {
99+
if (!Strings.isNullOrEmpty(databaseRole) && Strings.isNullOrEmpty(spannerOmniEndpoint)) {
72100
builder.setDatabaseRole(databaseRole);
73101
}
74102
String userAgentString = USER_AGENT_PREFIX + Module.version();
@@ -81,7 +109,8 @@ public DatabaseClientFactory(String projectId, String instanceId, String databas
81109
public DatabaseClientFactory(SpannerConnectorConfig config) {
82110
this(config.projectId(), config.instanceId(), config.databaseId(),
83111
config.gcpSpannerCredentialsJson(), config.gcpSpannerCredentialsPath(),
84-
config.spannerHost(), config.spannerEmulatorHost(), config.databaseRole());
112+
config.spannerHost(), config.spannerEmulatorHost(), config.databaseRole(), config.spannerOmniEndpoint(), config.usePlainText(), config.clientKeyPath(),
113+
config.clientCertPath());
85114
}
86115

87116
@VisibleForTesting

src/test/java/io/debezium/connector/spanner/AbstractSpannerConnectorIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class AbstractSpannerConnectorIT extends AbstractAsyncEngineConnectorTest
4646
.with("heartbeat.interval.ms", "300000")
4747
.with("gcp.spanner.low-watermark.enabled", false)
4848
.with("tasks.max", 3) // see DBZ-8428
49+
.with("spanner.omni.endpoint", Database.getSpannerOmniEndpoint())
50+
.with("spanner.omni.use.plaintext", true)
4951
.build();
5052

5153
protected static final Configuration basePgConfig = Configuration.copy(baseConfig)

src/test/java/io/debezium/connector/spanner/util/Connection.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package io.debezium.connector.spanner.util;
77

8+
import static io.debezium.connector.spanner.util.Database.IsSpannerOmniEndpoint;
9+
import static io.debezium.connector.spanner.util.Database.getSpannerOmniEndpoint;
810
import static org.awaitility.Awaitility.await;
911

1012
import java.time.Duration;
@@ -142,6 +144,9 @@ public void createChangeStreamNewRow(String changeStreamName, String... tables)
142144
}
143145

144146
private String createInstance() {
147+
if (IsSpannerOmniEndpoint()) {
148+
return "default";
149+
}
145150
for (Instance value : this.spanner.getInstanceAdminClient().listInstances().iterateAll()) {
146151
if (value.getId().getInstance().equals("test-instance")) {
147152
return "test-instance";
@@ -269,7 +274,9 @@ public void dropDatabase(String databaseId) {
269274
}
270275

271276
public void createDatabase(String databaseId, Dialect dialect) throws InterruptedException {
272-
createInstance();
277+
if (!IsSpannerOmniEndpoint()) {
278+
createInstance();
279+
}
273280
DatabaseAdminClient dbAdminClient = this.spanner.getDatabaseAdminClient();
274281
OperationFuture<com.google.cloud.spanner.Database, CreateDatabaseMetadata> operationFuture = dbAdminClient
275282
.createDatabase(
@@ -311,7 +318,14 @@ private void init() {
311318

312319
builder.setCredentials(NoCredentials.getInstance());
313320
builder.setProjectId(projectId);
314-
builder.setEmulatorHost(emulatorHost);
321+
if (IsSpannerOmniEndpoint()) {
322+
builder.setExperimentalHost(getSpannerOmniEndpoint());
323+
builder.usePlainText()
324+
.setCredentials(NoCredentials.getInstance());
325+
}
326+
else {
327+
builder.setEmulatorHost(emulatorHost);
328+
}
315329

316330
SpannerOptions options = builder.build();
317331
try {

src/test/java/io/debezium/connector/spanner/util/Database.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@
88
import java.util.UUID;
99

1010
import com.google.cloud.spanner.Dialect;
11+
import com.google.common.base.Strings;
1112

1213
public class Database {
1314

1415
private static final String projectId = "test-project";
1516
private static final String instanceId = "test-instance";
17+
private static final String spannerOmniDefaultId = "default";
18+
private static final String SPANNER_OMNI_ENDPOINT_PROPERTY_NAME = "spanner.omni.endpoint";
19+
1620
private final String databaseId;
1721

1822
private Connection connection;
@@ -24,6 +28,14 @@ private Database(String databaseId, Dialect dialect) {
2428
this.dialect = dialect;
2529
}
2630

31+
public static String getSpannerOmniEndpoint() {
32+
return System.getProperty(SPANNER_OMNI_ENDPOINT_PROPERTY_NAME);
33+
}
34+
35+
public static boolean IsSpannerOmniEndpoint() {
36+
return !Strings.isNullOrEmpty(getSpannerOmniEndpoint());
37+
}
38+
2739
public static final Database TEST_DATABASE = Database.builder()
2840
.generateDatabaseId()
2941
.build();
@@ -34,10 +46,16 @@ private Database(String databaseId, Dialect dialect) {
3446
.build();
3547

3648
public String getProjectId() {
49+
if (IsSpannerOmniEndpoint()) {
50+
return spannerOmniDefaultId;
51+
}
3752
return projectId;
3853
}
3954

4055
public String getInstanceId() {
56+
if (IsSpannerOmniEndpoint()) {
57+
return spannerOmniDefaultId;
58+
}
4159
return instanceId;
4260
}
4361

0 commit comments

Comments
 (0)