Skip to content

Commit 90d262d

Browse files
committed
[proxima-direct-core] #356 cache StreamElement in TimeBoundedVersionedCache
1 parent 0982b75 commit 90d262d

3 files changed

Lines changed: 211 additions & 136 deletions

File tree

direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import cz.o2.proxima.core.storage.StreamElement;
2424
import cz.o2.proxima.core.storage.commitlog.Position;
2525
import cz.o2.proxima.core.util.ExceptionUtils;
26+
import cz.o2.proxima.core.util.Optionals;
2627
import cz.o2.proxima.core.util.Pair;
2728
import cz.o2.proxima.direct.core.CommitCallback;
2829
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
@@ -108,20 +109,15 @@ protected void onCache(StreamElement element, boolean overwrite) {
108109
final Pair<Long, Payload> oldVal;
109110
synchronized (cache) {
110111
oldVal = cache.get(element.getKey(), attrName, Long.MAX_VALUE);
111-
long sequentialId = element.hasSequentialId() ? element.getSequentialId() : 0L;
112-
updated =
113-
cache.put(
114-
element.getKey(),
115-
attrName,
116-
element.getStamp(),
117-
sequentialId,
118-
overwrite,
119-
parsed.orElse(null));
112+
updated = cache.put(element, overwrite);
120113
}
121114
if (updated) {
122115
updateCallback.accept(
123116
element,
124-
oldVal != null ? Pair.of(oldVal.getFirst(), oldVal.getSecond().getData()) : null);
117+
oldVal != null && !oldVal.getSecond().getData().isDelete()
118+
? Pair.of(
119+
oldVal.getFirst(), Optionals.get(oldVal.getSecond().getData().getParsed()))
120+
: null);
125121
}
126122
}
127123
}
@@ -276,7 +272,7 @@ public <T> Optional<KeyValue<T>> get(
276272
if (desc.isWildcard()) {
277273
// check for wildcard delete
278274
Pair<Long, Payload> wildcard = cache.get(key, desc.toAttributePrefix(), stamp);
279-
if (wildcard != null && wildcard.getSecond().getData() == null) {
275+
if (wildcard != null && wildcard.getSecond().getData().isDelete()) {
280276
// this is delete
281277
// move the required stamp after the delete
282278
deleteStamp = wildcard.getFirst();
@@ -285,7 +281,7 @@ public <T> Optional<KeyValue<T>> get(
285281
final long filterStamp = deleteStamp;
286282
return Optional.ofNullable(cache.get(key, attribute, stamp))
287283
.filter(e -> e.getFirst() >= filterStamp)
288-
.flatMap(e -> Optional.ofNullable(toKv(key, attribute, e)));
284+
.flatMap(e -> Optional.ofNullable(toKv(e)));
289285
}
290286
}
291287

@@ -339,7 +335,7 @@ private void scanWildcardPrefix(
339335
return null;
340336
},
341337
(attr, e) -> {
342-
KeyValue<Object> kv = toKv(key, attr, e);
338+
KeyValue<Object> kv = toKv(e);
343339
if (kv != null) {
344340
if (missing.getAndDecrement() != 0) {
345341
consumer.accept(kv);
@@ -368,45 +364,33 @@ public void close() {
368364
cache.clear();
369365
}
370366

371-
private @Nullable <T> KeyValue<T> toKv(
372-
String key, String attribute, @Nullable Pair<Long, Payload> p) {
373-
374-
Optional<AttributeDescriptor<Object>> attrDesc = entity.findAttribute(attribute, true);
375-
if (attrDesc.isPresent()) {
376-
return toKv(key, attribute, attrDesc.get(), p);
377-
}
378-
log.warn("Missing attribute {} in entity {}", attribute, entity);
379-
return null;
380-
}
381-
382367
@SuppressWarnings("unchecked")
383-
private @Nullable <T> KeyValue<T> toKv(
384-
String key, String attribute, AttributeDescriptor<?> attr, @Nullable Pair<Long, Payload> p) {
385-
386-
if (p == null || p.getSecond() == null || p.getSecond().getData() == null) {
368+
private @Nullable <T> KeyValue<T> toKv(@Nullable Pair<Long, Payload> p) {
369+
if (p == null || p.getSecond() == null || p.getSecond().getData().isDelete()) {
387370
return null;
388371
}
389-
if (p.getSecond().getSeqId() > 0) {
372+
StreamElement data = p.getSecond().getData();
373+
if (data.getSequentialId() > 0) {
390374
return KeyValue.of(
391-
entity,
392-
(AttributeDescriptor<T>) attr,
393-
p.getSecond().getSeqId(),
394-
key,
395-
attribute,
396-
new RawOffset(attribute),
397-
(T) p.getSecond().getData(),
398-
null,
399-
p.getFirst());
375+
data.getEntityDescriptor(),
376+
(AttributeDescriptor<T>) data.getAttributeDescriptor(),
377+
data.getSequentialId(),
378+
data.getKey(),
379+
data.getAttribute(),
380+
new RawOffset(data.getAttribute()),
381+
(T) data.getParsed().orElse(null),
382+
data.getValue(),
383+
data.getStamp());
400384
}
401385
return KeyValue.of(
402-
entity,
403-
(AttributeDescriptor<T>) attr,
404-
key,
405-
attribute,
406-
new RawOffset(attribute),
407-
(T) p.getSecond().getData(),
408-
null,
409-
p.getFirst());
386+
data.getEntityDescriptor(),
387+
(AttributeDescriptor<T>) data.getAttributeDescriptor(),
388+
data.getKey(),
389+
data.getAttribute(),
390+
new RawOffset(data.getAttribute()),
391+
(T) data.getParsed().orElse(null),
392+
data.getValue(),
393+
data.getStamp());
410394
}
411395

412396
@Override

direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.NavigableMap;
3131
import java.util.Objects;
32+
import java.util.Optional;
3233
import java.util.Set;
3334
import java.util.TreeMap;
3435
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,7 +64,6 @@ static class Payload {
6364

6465
@Nullable
6566
synchronized Pair<Long, Payload> get(String key, String attribute, long stamp) {
66-
6767
NavigableMap<String, NavigableMap<Long, Payload>> attrMap;
6868
attrMap = cache.get(key);
6969
if (attrMap != null) {
@@ -107,16 +107,15 @@ synchronized void scan(
107107
return;
108108
}
109109
String lastParent = null;
110-
Pair<Long, Payload> parentEntry = null;
110+
Pair<Long, Payload> parentEntry;
111111
long parentTombstoneStamp = stamp;
112112
for (Map.Entry<String, NavigableMap<Long, Payload>> e : attrMap.tailMap(offset).entrySet()) {
113-
114113
if (e.getKey().startsWith(prefix)) {
115114
if (!e.getKey().equals(offset)) {
116115
if (lastParent == null || !e.getKey().startsWith(lastParent)) {
117116
lastParent = parentRecordExtractor.apply(e.getKey());
118117
parentEntry = lastParent == null ? null : get(key, lastParent, stamp);
119-
boolean isDelete = parentEntry != null && parentEntry.getSecond().getData() == null;
118+
boolean isDelete = parentEntry != null && parentEntry.getSecond().getData().isDelete();
120119
parentTombstoneStamp = isDelete ? parentEntry.getFirst() : -1;
121120
}
122121
Map.Entry<Long, Payload> floorEntry = e.getValue().floorEntry(stamp);
@@ -194,45 +193,56 @@ synchronized void keys(int offset, int limit, Consumer<String> keyConsumer) {
194193
}
195194
}
196195

197-
synchronized boolean put(
198-
String key,
199-
String attribute,
200-
long stamp,
201-
long sequentialId,
202-
boolean overwrite,
203-
@Nullable Object value) {
204-
196+
synchronized boolean put(StreamElement element, boolean overwrite) {
205197
AtomicBoolean updated = new AtomicBoolean();
206198
cache.compute(
207-
key,
208-
(k, attrMap) -> {
209-
if (attrMap == null) {
210-
attrMap = new TreeMap<>();
211-
}
212-
NavigableMap<Long, Payload> valueMap =
213-
attrMap.computeIfAbsent(attribute, tmp -> new TreeMap<>());
214-
if (valueMap.isEmpty() || valueMap.firstKey() - keepDuration < stamp) {
215-
final Payload oldPayload = valueMap.get(stamp);
216-
if (overwrite || oldPayload == null || oldPayload.overridable) {
217-
logPayloadUpdateIfNecessary(key, attribute, stamp, value);
218-
Payload newPayload = new Payload(value, sequentialId, !overwrite);
219-
valueMap.put(stamp, newPayload);
220-
updated.set(!newPayload.equals(oldPayload));
221-
}
222-
}
223-
long first;
224-
while (!valueMap.isEmpty() && (first = valueMap.firstKey()) + keepDuration < stamp) {
225-
valueMap.remove(first);
226-
}
227-
return attrMap;
228-
});
199+
element.getKey(),
200+
(k, attrMap) -> updateAttrMapByElement(element, overwrite, updated, attrMap));
229201
return updated.get();
230202
}
231203

232-
private void logPayloadUpdateIfNecessary(
233-
String key, String attribute, long stamp, @Nullable Object value) {
204+
private NavigableMap<String, NavigableMap<Long, Payload>> updateAttrMapByElement(
205+
StreamElement element,
206+
boolean overwrite,
207+
AtomicBoolean updated,
208+
@Nullable NavigableMap<String, NavigableMap<Long, Payload>> attrMap) {
209+
210+
if (attrMap == null) {
211+
attrMap = new TreeMap<>();
212+
}
213+
String attribute =
214+
element.isDeleteWildcard()
215+
? element.getAttributeDescriptor().toAttributePrefix()
216+
: element.getAttribute();
217+
long stamp = element.getStamp();
218+
219+
NavigableMap<Long, Payload> valueMap =
220+
attrMap.computeIfAbsent(attribute, tmp -> new TreeMap<>());
221+
if (valueMap.isEmpty() || valueMap.firstKey() - keepDuration < stamp) {
222+
final Payload oldPayload = valueMap.get(stamp);
223+
if (overwrite || oldPayload == null || oldPayload.overridable) {
224+
logPayloadUpdateIfNecessary(attribute, stamp, element);
225+
Payload newPayload = new Payload(element, !overwrite);
226+
valueMap.put(stamp, newPayload);
227+
Object oldParsed =
228+
Optional.ofNullable(oldPayload == null ? null : oldPayload.getData())
229+
.flatMap(StreamElement::getParsed)
230+
.orElse(null);
231+
updated.set(!element.getParsed().equals(oldParsed));
232+
}
233+
}
234+
long first;
235+
while (!valueMap.isEmpty() && (first = valueMap.firstKey()) + keepDuration < stamp) {
236+
valueMap.remove(first);
237+
}
238+
return attrMap;
239+
}
240+
241+
private void logPayloadUpdateIfNecessary(String attribute, long stamp, StreamElement element) {
234242

235243
if (log.isDebugEnabled()) {
244+
String key = element.getKey();
245+
Object value = element.getParsed().orElse(null);
236246
AttributeDescriptor<Object> attrDesc = entity.findAttribute(attribute, true).orElse(null);
237247
if (attrDesc != null) {
238248
log.debug(

0 commit comments

Comments
 (0)