Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,9 @@ public void removeDataListener(final String key) {
}

private Type getEventChangedType(final WatchEvent event) {
if (1 == event.getKeyValue().getVersion()) {
return Type.ADDED;
}
switch (event.getEventType()) {
case PUT:
return Type.UPDATED;
return 1 == event.getKeyValue().getVersion() ? Type.ADDED : Type.UPDATED;
case DELETE:
return Type.DELETED;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.internal.configuration.plugins.Plugins;
Expand All @@ -52,10 +56,13 @@
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -66,6 +73,7 @@

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EtcdRepositoryTest {

private final EtcdRepository repository = new EtcdRepository();
Expand Down Expand Up @@ -168,38 +176,33 @@ void assertPersistEphemeral() {
}

@Test
void assertWatchUpdate() {
doAnswer(invocationOnMock -> {
Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2];
listener.onNext(buildWatchResponse(WatchEvent.EventType.PUT));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
repository.watch("key1", event -> {
});
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
void assertWatchWithPutAndVersionOne() throws ExecutionException, InterruptedException, TimeoutException {
DataChangedEvent actual = assertWatch(WatchEvent.EventType.PUT, 1L);
assertThat(actual.getType(), is(Type.ADDED));
}

@Test
void assertWatchDelete() {
doAnswer(invocationOnMock -> {
Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2];
listener.onNext(buildWatchResponse(WatchEvent.EventType.DELETE));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
repository.watch("key1", event -> {
});
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
void assertWatchWithPutAndVersionMoreThanOne() throws ExecutionException, InterruptedException, TimeoutException {
DataChangedEvent actual = assertWatch(WatchEvent.EventType.PUT, 2L);
assertThat(actual.getType(), is(Type.UPDATED));
}

@Test
void assertWatchWithDeleteAndVersionOne() throws ExecutionException, InterruptedException, TimeoutException {
DataChangedEvent actual = assertWatch(WatchEvent.EventType.DELETE, 1L);
assertThat(actual.getType(), is(Type.DELETED));
}

@Test
void assertWatchIgnored() {
doAnswer(invocationOnMock -> {
Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2];
listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED));
listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED, 1L));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
repository.watch("key1", event -> {
});
CompletableFuture<DataChangedEvent> actual = new CompletableFuture<>();
repository.watch("key1", actual::complete);
assertFalse(actual.isDone());
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
}

Expand All @@ -215,8 +218,8 @@ void assertPersist() {
verify(kv).put(any(ByteSequence.class), any(ByteSequence.class));
}

@Test
void assertClose() {
@AfterAll
void closeRepositoryAfterAllTests() {
repository.close();
verify(client).close();
}
Expand Down Expand Up @@ -269,13 +272,26 @@ void assertGetChildrenKeysWhenThrowExecutionException() throws ExecutionExceptio
}
}

private DataChangedEvent assertWatch(final WatchEvent.EventType eventType, final long version) throws ExecutionException, InterruptedException, TimeoutException {
doAnswer(invocationOnMock -> {
Watch.Listener listener = (Watch.Listener) invocationOnMock.getArguments()[2];
listener.onNext(buildWatchResponse(eventType, version));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
CompletableFuture<DataChangedEvent> changedEventFuture = new CompletableFuture<>();
repository.watch("key1", changedEventFuture::complete);
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class), any(Watch.Listener.class));
return changedEventFuture.get(5L, TimeUnit.SECONDS);
}

@SneakyThrows({NoSuchFieldException.class, SecurityException.class, IllegalAccessException.class})
private WatchResponse buildWatchResponse(final WatchEvent.EventType eventType) {
private WatchResponse buildWatchResponse(final WatchEvent.EventType eventType, final long version) {
WatchResponse result = new WatchResponse(mock(io.etcd.jetcd.api.WatchResponse.class), ByteSequence.EMPTY);
List<WatchEvent> events = new LinkedList<>();
io.etcd.jetcd.api.KeyValue keyValue1 = io.etcd.jetcd.api.KeyValue.newBuilder()
.setKey(ByteString.copyFromUtf8("key1"))
.setValue(ByteString.copyFromUtf8("value1")).build();
.setValue(ByteString.copyFromUtf8("value1"))
.setVersion(version).build();
KeyValue keyValue = new KeyValue(keyValue1, ByteSequence.EMPTY);
events.add(new WatchEvent(keyValue, mock(KeyValue.class), eventType));
Plugins.getMemberAccessor().set(WatchResponse.class.getDeclaredField("events"), result, events);
Expand Down
Loading