4242import com .google .cloud .dataflow .sdk .util .state .ValueState ;
4343import com .google .cloud .dataflow .sdk .util .state .WatermarkStateInternal ;
4444import com .google .common .base .Supplier ;
45+ import com .google .common .collect .ImmutableList ;
4546import com .google .common .util .concurrent .Futures ;
4647import com .google .common .util .concurrent .SettableFuture ;
4748import com .google .protobuf .ByteString ;
@@ -70,6 +71,7 @@ public class WindmillStateInternalsTest {
7071 private static final StateTag <CombiningValueState <Integer , Integer >> COMBINING_ADDR =
7172 StateTags .combiningValueFromInputInternal (
7273 "combining" , VarIntCoder .of (), new Sum .SumIntegerFn ());
74+ private static final ByteString COMBINING_KEY = key (NAMESPACE , "combining" );
7375 private final Coder <int []> accumCoder =
7476 new Sum .SumIntegerFn ().getAccumulatorCoder (null , VarIntCoder .of ());
7577
@@ -81,11 +83,11 @@ public class WindmillStateInternalsTest {
8183 @ Mock
8284 private Supplier <StateSampler .ScopedState > readStateSupplier ;
8385
84- private ByteString key (StateNamespace namespace , String addrId ) {
86+ private static ByteString key (StateNamespace namespace , String addrId ) {
8587 return key ("" , namespace , addrId );
8688 }
8789
88- private ByteString key (String prefix , StateNamespace namespace , String addrId ) {
90+ private static ByteString key (String prefix , StateNamespace namespace , String addrId ) {
8991 return ByteString .copyFromUtf8 (prefix + namespace .stringKey () + "+u" + addrId );
9092 }
9193
@@ -286,7 +288,7 @@ public void testCombiningAddBeforeRead() throws Exception {
286288 CombiningValueState <Integer , Integer > value = underTest .state (NAMESPACE , COMBINING_ADDR );
287289
288290 SettableFuture <Iterable <int []>> future = SettableFuture .create ();
289- when (mockReader .listFuture (key ( NAMESPACE , "combining" ) , STATE_FAMILY , accumCoder ))
291+ when (mockReader .listFuture (COMBINING_KEY , STATE_FAMILY , accumCoder ))
290292 .thenReturn (future );
291293
292294 StateContents <Integer > result = value .get ();
@@ -326,10 +328,10 @@ public void testCombiningIsEmpty() throws Exception {
326328 CombiningValueState <Integer , Integer > value = underTest .state (NAMESPACE , COMBINING_ADDR );
327329
328330 SettableFuture <Iterable <int []>> future = SettableFuture .create ();
329- when (mockReader .listFuture (key ( NAMESPACE , "combining" ) , STATE_FAMILY , accumCoder ))
331+ when (mockReader .listFuture (COMBINING_KEY , STATE_FAMILY , accumCoder ))
330332 .thenReturn (future );
331333 StateContents <Boolean > result = value .isEmpty ();
332- Mockito .verify (mockReader ).listFuture (key ( NAMESPACE , "combining" ) , STATE_FAMILY , accumCoder );
334+ Mockito .verify (mockReader ).listFuture (COMBINING_KEY , STATE_FAMILY , accumCoder );
333335
334336 waitAndSet (future , Arrays .asList (new int [] {29 }), 200 );
335337 assertThat (result .read (), Matchers .is (false ));
@@ -342,7 +344,7 @@ public void testCombiningIsEmptyAfterClear() throws Exception {
342344 value .clear ();
343345 StateContents <Boolean > result = value .isEmpty ();
344346 Mockito .verify (mockReader , never ())
345- .listFuture (key ( NAMESPACE , "combining" ) , STATE_FAMILY , accumCoder );
347+ .listFuture (COMBINING_KEY , STATE_FAMILY , accumCoder );
346348 assertThat (result .read (), Matchers .is (true ));
347349
348350 value .add (87 );
@@ -351,6 +353,8 @@ public void testCombiningIsEmptyAfterClear() throws Exception {
351353
352354 @ Test
353355 public void testCombiningAddPersist () throws Exception {
356+ disableCompactOnWrite ();
357+
354358 CombiningValueState <Integer , Integer > value = underTest .state (NAMESPACE , COMBINING_ADDR );
355359
356360 value .add (5 );
@@ -363,7 +367,7 @@ public void testCombiningAddPersist() throws Exception {
363367 assertEquals (1 , commitBuilder .getListUpdatesCount ());
364368
365369 TagList listUpdates = commitBuilder .getListUpdates (0 );
366- assertEquals (key ( NAMESPACE , "combining" ) , listUpdates .getTag ());
370+ assertEquals (COMBINING_KEY , listUpdates .getTag ());
367371 assertEquals (1 , listUpdates .getValuesCount ());
368372 assertEquals (
369373 11 ,
@@ -375,8 +379,45 @@ public void testCombiningAddPersist() throws Exception {
375379 Mockito .verifyNoMoreInteractions (mockReader );
376380 }
377381
382+ @ Test
383+ public void testCombiningAddPersistWithCompact () throws Exception {
384+ forceCompactOnWrite ();
385+
386+ Mockito .stub (
387+ mockReader .listFuture (
388+ org .mockito .Matchers .<ByteString >any (),
389+ org .mockito .Matchers .<String >any (),
390+ org .mockito .Matchers .<Coder <int []>>any ()))
391+ .toReturn (
392+ Futures .<Iterable <int []>>immediateFuture (
393+ ImmutableList .of (new int [] {40 }, new int [] {60 })));
394+
395+ CombiningValueState <Integer , Integer > value = underTest .state (NAMESPACE , COMBINING_ADDR );
396+
397+ value .add (5 );
398+ value .add (6 );
399+
400+ Windmill .WorkItemCommitRequest .Builder commitBuilder =
401+ Windmill .WorkItemCommitRequest .newBuilder ();
402+ underTest .persist (commitBuilder );
403+
404+ assertEquals (2 , commitBuilder .getListUpdatesCount ());
405+ assertEquals (0 , commitBuilder .getListUpdates (0 ).getValuesCount ());
406+
407+ TagList listUpdates = commitBuilder .getListUpdates (1 );
408+ assertEquals (COMBINING_KEY , listUpdates .getTag ());
409+ assertEquals (1 , listUpdates .getValuesCount ());
410+ assertEquals (
411+ 111 ,
412+ CoderUtils .decodeFromByteArray (
413+ accumCoder , listUpdates .getValues (0 ).getData ().substring (1 ).toByteArray ())[
414+ 0 ]);
415+ }
416+
378417 @ Test
379418 public void testCombiningClearPersist () throws Exception {
419+ disableCompactOnWrite ();
420+
380421 CombiningValueState <Integer , Integer > value = underTest .state (NAMESPACE , COMBINING_ADDR );
381422
382423 value .clear ();
@@ -390,20 +431,20 @@ public void testCombiningClearPersist() throws Exception {
390431 assertEquals (2 , commitBuilder .getListUpdatesCount ());
391432
392433 TagList listClear = commitBuilder .getListUpdates (0 );
393- assertEquals (key ( NAMESPACE , "combining" ) , listClear .getTag ());
434+ assertEquals (COMBINING_KEY , listClear .getTag ());
394435 assertEquals (Long .MAX_VALUE , listClear .getEndTimestamp ());
395436 assertEquals (0 , listClear .getValuesCount ());
396437
397438 TagList listUpdates = commitBuilder .getListUpdates (1 );
398- assertEquals (key ( NAMESPACE , "combining" ) , listUpdates .getTag ());
439+ assertEquals (COMBINING_KEY , listUpdates .getTag ());
399440 assertEquals (1 , listUpdates .getValuesCount ());
400441 assertEquals (
401442 11 ,
402443 CoderUtils .decodeFromByteArray (
403444 accumCoder , listUpdates .getValues (0 ).getData ().substring (1 ).toByteArray ())[0 ]);
404445
405446 // Blind adds should not need to read the future.
406- Mockito .verify (mockReader ).listFuture (key ( NAMESPACE , "combining" ) , STATE_FAMILY , accumCoder );
447+ Mockito .verify (mockReader ).listFuture (COMBINING_KEY , STATE_FAMILY , accumCoder );
407448 Mockito .verify (mockReader ).startBatchAndBlock ();
408449 Mockito .verifyNoMoreInteractions (mockReader );
409450 }
@@ -866,4 +907,22 @@ public void testValueNoStateFamilies() throws Exception {
866907
867908 assertEquals ("World" , value .get ().read ());
868909 }
910+
911+ private void disableCompactOnWrite () {
912+ WindmillStateInternals .COMPACT_NOW .set (
913+ new Supplier <Boolean >() {
914+ public Boolean get () {
915+ return false ;
916+ }
917+ });
918+ }
919+
920+ private void forceCompactOnWrite () {
921+ WindmillStateInternals .COMPACT_NOW .set (
922+ new Supplier <Boolean >() {
923+ public Boolean get () {
924+ return true ;
925+ }
926+ });
927+ }
869928}
0 commit comments