Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,7 @@ public static Props props(ClusterClientSettings settings, Materializer materiali

public interface Command {}

public static class Send implements Command {
public final String path;
public final Object msg;
public final boolean localAffinity;

public Send(String path, Object msg, boolean localAffinity) {
this.path = path;
this.msg = msg;
this.localAffinity = localAffinity;
}

public record Send(String path, Object msg, boolean localAffinity) implements Command {
/**
* Convenience constructor with `localAffinity` false
*/
Expand All @@ -92,17 +82,7 @@ public Send(String path, Object msg) {
/**
* More efficient than `Send` for single request-reply interaction
*/
public static class SendAsk implements Command {
public final String path;
public final Object msg;
public final boolean localAffinity;

public SendAsk(String path, Object msg, boolean localAffinity) {
this.path = path;
this.msg = msg;
this.localAffinity = localAffinity;
}

public record SendAsk(String path, Object msg, boolean localAffinity) implements Command {
/**
* Convenience constructor with `localAffinity` false
*/
Expand All @@ -111,25 +91,9 @@ public SendAsk(String path, Object msg) {
}
}

public static class SendToAll implements Command {
public final String path;
public final Object msg;
public record SendToAll(String path, Object msg) implements Command {}

public SendToAll(String path, Object msg) {
this.path = path;
this.msg = msg;
}
}

public static class Publish implements Command {
public final String topic;
public final Object msg;

public Publish(String topic, Object msg) {
this.topic = topic;
this.msg = msg;
}
}
public record Publish(String topic, Object msg) implements Command {}

private static ClusterClientReceptionistServiceClient createClientStub(ClusterClientSettings settings,
Materializer mat) {
Expand Down Expand Up @@ -161,29 +125,26 @@ private static CompletionStage<ActorRef> newSession(
)
.via(killSwitch.flow())
.map(msg -> {
if (msg instanceof Send) {
Send send = (Send) msg;
Payload payload = serialization.serializePayload(send.msg);
if (msg instanceof Send send) {
Payload payload = serialization.serializePayload(send.msg());
return Req.newBuilder()
.setSend(SendReq.newBuilder()
.setPath(send.path)
.setLocalAffinity(send.localAffinity)
.setPath(send.path())
.setLocalAffinity(send.localAffinity())
.setPayload(payload))
.build();
} else if (msg instanceof SendToAll) {
SendToAll sendToAll = (SendToAll) msg;
Payload payload = serialization.serializePayload(sendToAll.msg);
} else if (msg instanceof SendToAll sendToAll) {
Payload payload = serialization.serializePayload(sendToAll.msg());
return Req.newBuilder()
.setSendToAll(SendToAllReq.newBuilder()
.setPath(sendToAll.path)
.setPath(sendToAll.path())
.setPayload(payload))
.build();
} else if (msg instanceof Publish) {
Publish publish = (Publish) msg;
Payload payload = serialization.serializePayload(publish.msg);
} else if (msg instanceof Publish publish) {
Payload payload = serialization.serializePayload(publish.msg());
return Req.newBuilder()
.setPublish(PublishReq.newBuilder()
.setTopic(publish.topic)
.setTopic(publish.topic())
.setPayload(payload))
.build();
} else {
Expand Down Expand Up @@ -216,10 +177,10 @@ private static CompletionStage<Object> askSend(
ClusterClientReceptionistServiceClient receptionistServiceClient,
SendAsk send,
ClusterClientSerialization serialization) {
Payload payload = serialization.serializePayload(send.msg);
Payload payload = serialization.serializePayload(send.msg());
SendReq sendReq = SendReq.newBuilder()
.setPath(send.path)
.setLocalAffinity(send.localAffinity)
.setPath(send.path())
.setLocalAffinity(send.localAffinity())
.setPayload(payload)
.build();
return receptionistServiceClient.askSend(sendReq).thenApply( rsp ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,104 +27,26 @@ public class ReplicatedCache {

public interface Command {}

public static class PutInCache implements Command {
public final String key;
public final String value;
public record PutInCache(String key, String value) implements Command {}

public PutInCache(String key, String value) {
this.key = key;
this.value = value;
}
}

public static class GetFromCache implements Command {
public final String key;
public final ActorRef<Cached> replyTo;

public GetFromCache(String key, ActorRef<Cached> replyTo) {
this.key = key;
this.replyTo = replyTo;
}
}

public static class Cached {
public final String key;
public final Optional<String> value;

public Cached(String key, Optional<String> value) {
this.key = key;
this.value = value;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Cached other = (Cached) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
public record GetFromCache(String key, ActorRef<Cached> replyTo) implements Command {}

public record Cached(String key, Optional<String> value) {
@Override
public String toString() {
return "Cached [key=" + key + ", value=" + value + "]";
}

}

public static class Evict implements Command {
public final String key;

public Evict(String key) {
this.key = key;
}
}
public record Evict(String key) implements Command {}

private interface InternalCommand extends Command {}

private static class InternalGetResponse implements InternalCommand {
public final String key;
public final ActorRef<Cached> replyTo;
public final GetResponse<LWWMap<String, String>> rsp;

private InternalGetResponse(
String key, ActorRef<Cached> replyTo, GetResponse<LWWMap<String, String>> rsp
) {
this.key = key;
this.replyTo = replyTo;
this.rsp = rsp;
}
}

private static class InternalUpdateResponse implements InternalCommand {
public final UpdateResponse<LWWMap<String, String>> rsp;
private record InternalGetResponse(String key, ActorRef<Cached> replyTo, GetResponse<LWWMap<String, String>> rsp)
implements InternalCommand {}

private InternalUpdateResponse(UpdateResponse<LWWMap<String, String>> rsp) {
this.rsp = rsp;
}
}
private record InternalUpdateResponse(UpdateResponse<LWWMap<String, String>> rsp)
implements InternalCommand {}

public static Behavior<Command> create() {
return Behaviors.setup(context ->
Expand All @@ -147,9 +69,9 @@ public ReplicatedCache(
public Behavior<Command> createBehavior() {
return Behaviors
.receive(Command.class)
.onMessage(PutInCache.class, cmd -> receivePutInCache(cmd.key, cmd.value))
.onMessage(Evict.class, cmd -> receiveEvict(cmd.key))
.onMessage(GetFromCache.class, cmd -> receiveGetFromCache(cmd.key, cmd.replyTo))
.onMessage(PutInCache.class, cmd -> receivePutInCache(cmd.key(), cmd.value()))
.onMessage(Evict.class, cmd -> receiveEvict(cmd.key()))
.onMessage(GetFromCache.class, cmd -> receiveGetFromCache(cmd.key(), cmd.replyTo()))
.onMessage(InternalGetResponse.class, this::onInternalGetResponse)
.onMessage(InternalUpdateResponse.class, notUsed -> Behaviors.same())
.build();
Expand Down Expand Up @@ -194,12 +116,12 @@ private Behavior<Command> receiveGetFromCache(String key, ActorRef<Cached> reply
}

private Behavior<Command> onInternalGetResponse(InternalGetResponse msg) {
if (msg.rsp instanceof GetSuccess) {
Option<String> valueOption = ((GetSuccess<LWWMap<String, String>>) msg.rsp).get(dataKey(msg.key)).get(msg.key);
if (msg.rsp() instanceof GetSuccess<LWWMap<String, String>> success) {
Option<String> valueOption = success.get(dataKey(msg.key())).get(msg.key());
Optional<String> valueOptional = Optional.ofNullable(valueOption.isDefined() ? valueOption.get() : null);
msg.replyTo.tell(new Cached(msg.key, valueOptional));
} else if (msg.rsp instanceof NotFound) {
msg.replyTo.tell(new Cached(msg.key, Optional.empty()));
msg.replyTo().tell(new Cached(msg.key(), valueOptional));
} else if (msg.rsp() instanceof NotFound) {
msg.replyTo().tell(new Cached(msg.key(), Optional.empty()));
}
return Behaviors.same();
}
Expand Down
Loading
Loading