diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index 59394a6bdf1b7..28d9fe5a7b4d3 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -176,12 +176,9 @@ public void removeDataListener(final String key) { } private Type getEventChangedType(final WatchEvent event) { - if (1 == event.getKeyValue().getVersion()) { - return Type.ADDED; - } switch (event.getEventType()) { case PUT: - return Type.UPDATED; + return 1 == event.getKeyValue().getVersion() ? Type.ADDED : Type.UPDATED; case DELETE: return Type.DELETED; default: diff --git a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java index daa1e6f87f1d0..4c0e77028c0e3 100644 --- a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java +++ b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java @@ -34,9 +34,13 @@ import io.etcd.jetcd.watch.WatchResponse; import io.grpc.stub.StreamObserver; import lombok.SneakyThrows; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.internal.configuration.plugins.Plugins; @@ -52,10 +56,13 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.isA; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; @@ -66,6 +73,7 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class EtcdRepositoryTest { private final EtcdRepository repository = new EtcdRepository(); @@ -168,38 +176,33 @@ void assertPersistEphemeral() { } @Test - void assertWatchUpdate() { - doAnswer(invocationOnMock -> { - Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2]; - listener.onNext(buildWatchResponse(WatchEvent.EventType.PUT)); - return mock(Watch.Watcher.class); - }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); - repository.watch("key1", event -> { - }); - verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); + void assertWatchWithPutAndVersionOne() throws ExecutionException, InterruptedException, TimeoutException { + DataChangedEvent actual = assertWatch(WatchEvent.EventType.PUT, 1L); + assertThat(actual.getType(), is(Type.ADDED)); } @Test - void assertWatchDelete() { - doAnswer(invocationOnMock -> { - Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2]; - listener.onNext(buildWatchResponse(WatchEvent.EventType.DELETE)); - return mock(Watch.Watcher.class); - }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); - repository.watch("key1", event -> { - }); - verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); + void assertWatchWithPutAndVersionMoreThanOne() throws ExecutionException, InterruptedException, TimeoutException { + DataChangedEvent actual = assertWatch(WatchEvent.EventType.PUT, 2L); + assertThat(actual.getType(), is(Type.UPDATED)); + } + + @Test + void assertWatchWithDeleteAndVersionOne() throws ExecutionException, InterruptedException, TimeoutException { + DataChangedEvent actual = assertWatch(WatchEvent.EventType.DELETE, 1L); + assertThat(actual.getType(), is(Type.DELETED)); } @Test void assertWatchIgnored() { doAnswer(invocationOnMock -> { Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2]; - listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED)); + listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED, 1L)); return mock(Watch.Watcher.class); }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); - repository.watch("key1", event -> { - }); + CompletableFuture actual = new CompletableFuture<>(); + repository.watch("key1", actual::complete); + assertFalse(actual.isDone()); verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); } @@ -215,8 +218,8 @@ void assertPersist() { verify(kv).put(any(ByteSequence.class), any(ByteSequence.class)); } - @Test - void assertClose() { + @AfterAll + void closeRepositoryAfterAllTests() { repository.close(); verify(client).close(); } @@ -269,13 +272,26 @@ void assertGetChildrenKeysWhenThrowExecutionException() throws ExecutionExceptio } } + private DataChangedEvent assertWatch(final WatchEvent.EventType eventType, final long version) throws ExecutionException, InterruptedException, TimeoutException { + doAnswer(invocationOnMock -> { + Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2]; + listener.onNext(buildWatchResponse(eventType, version)); + return mock(Watch.Watcher.class); + }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); + CompletableFuture changedEventFuture = new CompletableFuture<>(); + repository.watch("key1", changedEventFuture::complete); + verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class)); + return changedEventFuture.get(5L, TimeUnit.SECONDS); + } + @SneakyThrows({NoSuchFieldException.class, SecurityException.class, IllegalAccessException.class}) - private WatchResponse buildWatchResponse(final WatchEvent.EventType eventType) { + private WatchResponse buildWatchResponse(final WatchEvent.EventType eventType, final long version) { WatchResponse result = new WatchResponse(mock(io.etcd.jetcd.api.WatchResponse.class), ByteSequence.EMPTY); List events = new LinkedList<>(); io.etcd.jetcd.api.KeyValue keyValue1 = io.etcd.jetcd.api.KeyValue.newBuilder() .setKey(ByteString.copyFromUtf8("key1")) - .setValue(ByteString.copyFromUtf8("value1")).build(); + .setValue(ByteString.copyFromUtf8("value1")) + .setVersion(version).build(); KeyValue keyValue = new KeyValue(keyValue1, ByteSequence.EMPTY); events.add(new WatchEvent(keyValue, mock(KeyValue.class), eventType)); Plugins.getMemberAccessor().set(WatchResponse.class.getDeclaredField("events"), result, events);