Skip to content

Commit 6a399df

Browse files
authored
Update ChangeStreamDao to query different TVF for postgresSQL based on (#36667)
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent d0014ff commit 6a399df

6 files changed

Lines changed: 416 additions & 98 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.google.cloud.spanner.Options;
4747
import com.google.cloud.spanner.Options.RpcPriority;
4848
import com.google.cloud.spanner.PartitionOptions;
49+
import com.google.cloud.spanner.ReadOnlyTransaction;
50+
import com.google.cloud.spanner.ResultSet;
4951
import com.google.cloud.spanner.Spanner;
5052
import com.google.cloud.spanner.SpannerException;
5153
import com.google.cloud.spanner.SpannerOptions;
@@ -1998,6 +2000,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
19982000
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
19992001
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
20002002
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
2003+
final SpannerAccessor spannerAccessor =
2004+
SpannerAccessor.getOrCreate(changeStreamSpannerConfig);
2005+
final boolean isMutableChangeStream =
2006+
isMutableChangeStream(
2007+
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
20012008
final DaoFactory daoFactory =
20022009
new DaoFactory(
20032010
changeStreamSpannerConfig,
@@ -2007,7 +2014,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20072014
rpcPriority,
20082015
input.getPipeline().getOptions().getJobName(),
20092016
changeStreamDatabaseDialect,
2010-
metadataDatabaseDialect);
2017+
metadataDatabaseDialect,
2018+
isMutableChangeStream);
20112019
final ActionFactory actionFactory = new ActionFactory();
20122020

20132021
final Duration watermarkRefreshRate =
@@ -2689,4 +2697,58 @@ static String resolveSpannerProjectId(SpannerConfig config) {
26892697
? SpannerOptions.getDefaultProjectId()
26902698
: config.getProjectId().get();
26912699
}
2700+
2701+
@VisibleForTesting
2702+
static boolean isMutableChangeStream(
2703+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2704+
String fetchedPartitionMode = fetchPartitionMode(databaseClient, dialect, changeStreamName);
2705+
if (fetchedPartitionMode.isEmpty()
2706+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
2707+
return false;
2708+
}
2709+
return true;
2710+
}
2711+
2712+
private static String fetchPartitionMode(
2713+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2714+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
2715+
Statement statement;
2716+
if (dialect == Dialect.POSTGRESQL) {
2717+
statement =
2718+
Statement.newBuilder(
2719+
"select option_value\n"
2720+
+ "from information_schema.change_stream_options\n"
2721+
+ "where change_stream_name = $1 and option_name = 'partition_mode'")
2722+
.bind("p1")
2723+
.to(changeStreamName)
2724+
.build();
2725+
} else {
2726+
statement =
2727+
Statement.newBuilder(
2728+
"select option_value\n"
2729+
+ "from information_schema.change_stream_options\n"
2730+
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
2731+
.bind("changeStreamName")
2732+
.to(changeStreamName)
2733+
.build();
2734+
}
2735+
ResultSet resultSet = tx.executeQuery(statement);
2736+
while (resultSet.next()) {
2737+
String value = resultSet.getString(0);
2738+
if (value != null) {
2739+
return value;
2740+
}
2741+
}
2742+
return "";
2743+
} catch (RuntimeException e) {
2744+
// Log the failure (with stack trace) but rethrow so the caller still observes
2745+
// the error.
2746+
LOG.warn(
2747+
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
2748+
changeStreamName,
2749+
dialect,
2750+
e);
2751+
throw e;
2752+
}
2753+
}
26922754
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
* as a {@link ResultSet}, which can be consumed until the stream is finished.
3232
*/
3333
public class ChangeStreamDao {
34-
3534
private final String changeStreamName;
3635
private final DatabaseClient databaseClient;
3736
private final RpcPriority rpcPriority;
3837
private final String jobName;
3938
private final Dialect dialect;
39+
private final boolean isMutableChangeStream;
4040

4141
/**
4242
* Constructs a change stream dao. All the queries performed by this class will be for the given
@@ -53,12 +53,14 @@ public class ChangeStreamDao {
5353
DatabaseClient databaseClient,
5454
RpcPriority rpcPriority,
5555
String jobName,
56-
Dialect dialect) {
56+
Dialect dialect,
57+
boolean isMutableChangeStream) {
5758
this.changeStreamName = changeStreamName;
5859
this.databaseClient = databaseClient;
5960
this.rpcPriority = rpcPriority;
6061
this.jobName = jobName;
6162
this.dialect = dialect;
63+
this.isMutableChangeStream = isMutableChangeStream;
6264
}
6365

6466
/**
@@ -91,8 +93,18 @@ public ChangeStreamResultSet changeStreamQuery(
9193
String query = "";
9294
Statement statement;
9395
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
96+
// Ensure we have determined whether change stream uses mutable key range
97+
if (this.isMutableChangeStream) {
98+
query =
99+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
100+
+ changeStreamName
101+
+ "\"($1, $2, $3, $4, null)";
102+
} else {
103+
query =
104+
"SELECT * FROM \"spanner\".\"read_json_"
105+
+ changeStreamName
106+
+ "\"($1, $2, $3, $4, null)";
107+
}
96108
statement =
97109
Statement.newBuilder(query)
98110
.bind("p1")

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class DaoFactory implements Serializable {
4949
private final String jobName;
5050
private final Dialect spannerChangeStreamDatabaseDialect;
5151
private final Dialect metadataDatabaseDialect;
52+
private final boolean isMutableChangeStream;
5253

5354
/**
5455
* Constructs a {@link DaoFactory} with the configuration to be used for the underlying instances.
@@ -68,7 +69,8 @@ public DaoFactory(
6869
RpcPriority rpcPriority,
6970
String jobName,
7071
Dialect spannerChangeStreamDatabaseDialect,
71-
Dialect metadataDatabaseDialect) {
72+
Dialect metadataDatabaseDialect,
73+
boolean isMutableChangeStream) {
7274
if (metadataSpannerConfig.getInstanceId() == null) {
7375
throw new IllegalArgumentException("Metadata instance can not be null");
7476
}
@@ -83,6 +85,7 @@ public DaoFactory(
8385
this.jobName = jobName;
8486
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
8587
this.metadataDatabaseDialect = metadataDatabaseDialect;
88+
this.isMutableChangeStream = isMutableChangeStream;
8689
}
8790

8891
/**
@@ -143,7 +146,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
143146
spannerAccessor.getDatabaseClient(),
144147
rpcPriority,
145148
jobName,
146-
this.spannerChangeStreamDatabaseDialect);
149+
this.spannerChangeStreamDatabaseDialect,
150+
this.isMutableChangeStream);
147151
}
148152
return changeStreamDaoInstance;
149153
}

0 commit comments

Comments
 (0)