Skip to content

Commit e8920c7

Browse files
authored
Add logic to handle end timestamp of mutable change stream bounded query (#37459)
* Add logic to handle end timestamp of mutable change stream bounded query. * Resolve comments * Fix tests
1 parent d51ea3a commit e8920c7

6 files changed

Lines changed: 174 additions & 19 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
187187
PartitionStartRecordAction partitionStartRecordAction,
188188
PartitionEndRecordAction partitionEndRecordAction,
189189
PartitionEventRecordAction partitionEventRecordAction,
190-
ChangeStreamMetrics metrics) {
190+
ChangeStreamMetrics metrics,
191+
boolean isMutableChangeStream) {
191192
if (queryChangeStreamActionInstance == null) {
192193
queryChangeStreamActionInstance =
193194
new QueryChangeStreamAction(
@@ -201,7 +202,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
201202
partitionStartRecordAction,
202203
partitionEndRecordAction,
203204
partitionEventRecordAction,
204-
metrics);
205+
metrics,
206+
isMutableChangeStream);
205207
}
206208
return queryChangeStreamActionInstance;
207209
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
3535
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
3636
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
37+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
3738
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
3839
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
3940
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -89,6 +90,7 @@ public class QueryChangeStreamAction {
8990
private final PartitionEndRecordAction partitionEndRecordAction;
9091
private final PartitionEventRecordAction partitionEventRecordAction;
9192
private final ChangeStreamMetrics metrics;
93+
private final boolean isMutableChangeStream;
9294

9395
/**
9496
* Constructs an action class for performing a change stream query for a given partition.
@@ -106,6 +108,7 @@ public class QueryChangeStreamAction {
106108
* @param PartitionEndRecordAction action class to process {@link PartitionEndRecord}s
107109
* @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s
108110
* @param metrics metrics gathering class
111+
* @param isMutableChangeStream whether the change stream is mutable or not
109112
*/
110113
QueryChangeStreamAction(
111114
ChangeStreamDao changeStreamDao,
@@ -118,7 +121,8 @@ public class QueryChangeStreamAction {
118121
PartitionStartRecordAction partitionStartRecordAction,
119122
PartitionEndRecordAction partitionEndRecordAction,
120123
PartitionEventRecordAction partitionEventRecordAction,
121-
ChangeStreamMetrics metrics) {
124+
ChangeStreamMetrics metrics,
125+
boolean isMutableChangeStream) {
122126
this.changeStreamDao = changeStreamDao;
123127
this.partitionMetadataDao = partitionMetadataDao;
124128
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -130,6 +134,7 @@ public class QueryChangeStreamAction {
130134
this.partitionEndRecordAction = partitionEndRecordAction;
131135
this.partitionEventRecordAction = partitionEventRecordAction;
132136
this.metrics = metrics;
137+
this.isMutableChangeStream = isMutableChangeStream;
133138
}
134139

135140
/**
@@ -195,13 +200,23 @@ public ProcessContinuation run(
195200
final Timestamp endTimestamp = partition.getEndTimestamp();
196201
final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
197202
final Timestamp changeStreamQueryEndTimestamp =
198-
isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp();
203+
isBoundedRestriction
204+
? getBoundedQueryEndTimestamp(endTimestamp)
205+
: getNextReadChangeStreamEndTimestamp();
199206

200207
// Once the changeStreamQuery completes we may need to resume reading from the partition if we
201208
// had an unbounded restriction for which we set an arbitrary query end timestamp and for which
202209
// we didn't encounter any indications that the partition is done (explicit end records or
203-
// exceptions about being out of timestamp range).
204-
boolean stopAfterQuerySucceeds = isBoundedRestriction;
210+
// exceptions about being out of timestamp range). We also special case the InitialPartition,
211+
// which always stops after the query succeeds.
212+
boolean stopAfterQuerySucceeds = false;
213+
if (InitialPartition.isInitialPartition(partition.getPartitionToken())) {
214+
stopAfterQuerySucceeds = true;
215+
} else {
216+
stopAfterQuerySucceeds =
217+
isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp);
218+
}
219+
205220
try (ChangeStreamResultSet resultSet =
206221
changeStreamDao.changeStreamQuery(
207222
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
@@ -379,4 +394,14 @@ private Timestamp getNextReadChangeStreamEndTimestamp() {
379394
final Timestamp current = Timestamp.now();
380395
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
381396
}
397+
398+
// For Mutable Change Stream bounded queries, update the query end timestamp to be within 2
399+
// minutes in the future.
400+
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
401+
if (this.isMutableChangeStream) {
402+
Timestamp nextTimestamp = getNextReadChangeStreamEndTimestamp();
403+
return nextTimestamp.compareTo(endTimestamp) < 0 ? nextTimestamp : endTimestamp;
404+
}
405+
return endTimestamp;
406+
}
382407
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
151151
}
152152
return changeStreamDaoInstance;
153153
}
154+
155+
public boolean isMutableChangeStream() {
156+
return this.isMutableChangeStream;
157+
}
154158
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
7373
private final MapperFactory mapperFactory;
7474
private final ActionFactory actionFactory;
7575
private final ChangeStreamMetrics metrics;
76+
private final boolean isMutableChangeStream;
7677
/**
7778
* Needs to be set through the {@link
7879
* ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} call.
@@ -104,6 +105,7 @@ public ReadChangeStreamPartitionDoFn(
104105
this.mapperFactory = mapperFactory;
105106
this.actionFactory = actionFactory;
106107
this.metrics = metrics;
108+
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
107109
this.throughputEstimator = new NullThroughputEstimator<>();
108110
}
109111

@@ -215,7 +217,8 @@ public void setup() {
215217
partitionStartRecordAction,
216218
partitionEndRecordAction,
217219
partitionEventRecordAction,
218-
metrics);
220+
metrics,
221+
isMutableChangeStream);
219222
}
220223

221224
/**

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
2222
import static org.junit.Assert.assertEquals;
2323
import static org.junit.Assert.assertNotEquals;
24+
import static org.junit.Assert.assertTrue;
2425
import static org.mockito.ArgumentMatchers.any;
26+
import static org.mockito.ArgumentMatchers.anyLong;
2527
import static org.mockito.ArgumentMatchers.eq;
2628
import static org.mockito.Mockito.mock;
2729
import static org.mockito.Mockito.never;
@@ -116,7 +118,8 @@ public void setUp() throws Exception {
116118
partitionStartRecordAction,
117119
partitionEndRecordAction,
118120
partitionEventRecordAction,
119-
metrics);
121+
metrics,
122+
false);
120123
final Struct row = mock(Struct.class);
121124
partition =
122125
PartitionMetadata.newBuilder()
@@ -916,6 +919,121 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
916919
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
917920
}
918921

922+
@Test
923+
public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
924+
// Initialize action with isMutableChangeStream = true
925+
action =
926+
new QueryChangeStreamAction(
927+
changeStreamDao,
928+
partitionMetadataDao,
929+
changeStreamRecordMapper,
930+
partitionMetadataMapper,
931+
dataChangeRecordAction,
932+
heartbeatRecordAction,
933+
childPartitionsRecordAction,
934+
partitionStartRecordAction,
935+
partitionEndRecordAction,
936+
partitionEventRecordAction,
937+
metrics,
938+
true);
939+
940+
// Set endTimestamp to 60 minutes in the future
941+
Timestamp now = Timestamp.now();
942+
Timestamp endTimestamp =
943+
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos());
944+
945+
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
946+
when(restriction.getTo()).thenReturn(endTimestamp);
947+
when(partitionMetadataMapper.from(any())).thenReturn(partition);
948+
949+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
950+
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
951+
when(changeStreamDao.changeStreamQuery(
952+
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
953+
timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
954+
.thenReturn(resultSet);
955+
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
956+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
957+
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
958+
959+
final ProcessContinuation result =
960+
action.run(
961+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
962+
963+
// Verify query was capped at ~2 minutes
964+
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
965+
assertTrue("Query should be capped at approx 2 minutes (120s)", Math.abs(diff - 120) < 10);
966+
967+
// Crucial: Should RESUME to process the rest later
968+
assertEquals(ProcessContinuation.resume(), result);
969+
}
970+
971+
@Test
972+
public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
973+
action =
974+
new QueryChangeStreamAction(
975+
changeStreamDao,
976+
partitionMetadataDao,
977+
changeStreamRecordMapper,
978+
partitionMetadataMapper,
979+
dataChangeRecordAction,
980+
heartbeatRecordAction,
981+
childPartitionsRecordAction,
982+
partitionStartRecordAction,
983+
partitionEndRecordAction,
984+
partitionEventRecordAction,
985+
metrics,
986+
true);
987+
988+
// Set endTimestamp to only 10 seconds in the future
989+
Timestamp now = Timestamp.now();
990+
Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10, now.getNanos());
991+
992+
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
993+
when(restriction.getTo()).thenReturn(endTimestamp);
994+
when(partitionMetadataMapper.from(any())).thenReturn(partition);
995+
996+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
997+
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
998+
when(changeStreamDao.changeStreamQuery(
999+
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
1000+
timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
1001+
.thenReturn(resultSet);
1002+
when(resultSet.next()).thenReturn(false);
1003+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
1004+
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
1005+
1006+
final ProcessContinuation result =
1007+
action.run(
1008+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
1009+
1010+
// Should use the exact endTimestamp since it is within the limit (10s < 2m)
1011+
assertEquals(endTimestamp, timestampCaptor.getValue());
1012+
1013+
// Should STOP because we reached the actual requested endTimestamp
1014+
assertEquals(ProcessContinuation.stop(), result);
1015+
}
1016+
1017+
@Test
1018+
public void testQueryChangeStreamUnboundedResumesCorrectly() {
1019+
// Unbounded restriction (streaming forever)
1020+
setupUnboundedPartition();
1021+
1022+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1023+
when(changeStreamDao.changeStreamQuery(any(), any(), any(), anyLong())).thenReturn(resultSet);
1024+
when(resultSet.next()).thenReturn(false);
1025+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
1026+
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
1027+
1028+
final ProcessContinuation result =
1029+
action.run(
1030+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
1031+
1032+
// Should return RESUME to continue reading the stream every 2 minutes
1033+
assertEquals(ProcessContinuation.resume(), result);
1034+
verify(metrics).incQueryCounter();
1035+
}
1036+
9191037
private static class BundleFinalizerStub implements BundleFinalizer {
9201038
@Override
9211039
public void afterBundleCommit(Instant callbackExpiry, Callback callback) {

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
2121
import static org.junit.Assert.assertEquals;
2222
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
24+
import static org.mockito.ArgumentMatchers.eq;
2325
import static org.mockito.Mockito.mock;
2426
import static org.mockito.Mockito.never;
2527
import static org.mockito.Mockito.verify;
@@ -139,17 +141,18 @@ public void setUp() {
139141
when(actionFactory.partitionEventRecordAction(partitionMetadataDao, metrics))
140142
.thenReturn(partitionEventRecordAction);
141143
when(actionFactory.queryChangeStreamAction(
142-
changeStreamDao,
143-
partitionMetadataDao,
144-
changeStreamRecordMapper,
145-
partitionMetadataMapper,
146-
dataChangeRecordAction,
147-
heartbeatRecordAction,
148-
childPartitionsRecordAction,
149-
partitionStartRecordAction,
150-
partitionEndRecordAction,
151-
partitionEventRecordAction,
152-
metrics))
144+
eq(changeStreamDao),
145+
eq(partitionMetadataDao),
146+
eq(changeStreamRecordMapper),
147+
eq(partitionMetadataMapper),
148+
eq(dataChangeRecordAction),
149+
eq(heartbeatRecordAction),
150+
eq(childPartitionsRecordAction),
151+
eq(partitionStartRecordAction),
152+
eq(partitionEndRecordAction),
153+
eq(partitionEventRecordAction),
154+
eq(metrics),
155+
anyBoolean()))
153156
.thenReturn(queryChangeStreamAction);
154157

155158
doFn.setup();

0 commit comments

Comments
 (0)