Skip to content

Commit 288bfcc

Browse files
authored
Merge pull request #993: Weak reference for stream element
2 parents a247bf8 + 90d262d commit 288bfcc

4 files changed

Lines changed: 223 additions & 147 deletions

File tree

core/src/main/java/cz/o2/proxima/core/storage/StreamElement.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
2222
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
2323
import java.io.Serializable;
24+
import java.lang.ref.WeakReference;
2425
import java.util.Date;
2526
import java.util.Objects;
2627
import java.util.Optional;
@@ -246,7 +247,7 @@ public static StreamElement deleteWildcard(
246247

247248
private final boolean deleteWildcard;
248249

249-
private transient Object parsed;
250+
private transient WeakReference<Object> parsedRef;
250251
private transient String cachedUuid;
251252

252253
protected StreamElement(
@@ -298,11 +299,8 @@ protected StreamElement(
298299

299300
public String getUuid() {
300301
if (cachedUuid == null) {
301-
if (uuid != null) {
302-
cachedUuid = uuid;
303-
} else {
304-
cachedUuid = key + ":" + attribute + ":" + sequentialId;
305-
}
302+
cachedUuid =
303+
Objects.requireNonNullElseGet(uuid, () -> key + ":" + attribute + ":" + sequentialId);
306304
}
307305
return cachedUuid;
308306
}
@@ -356,14 +354,17 @@ public <T> Optional<T> getParsed() {
356354
if (value == null) {
357355
return Optional.empty();
358356
}
359-
if (parsed == null) {
357+
if (parsedRef == null) {
358+
parsedRef = new WeakReference<>(null);
359+
}
360+
if (parsedRef.get() == null) {
360361
attributeDescriptor.getValueSerializer().deserialize(value).ifPresent(this::setParsed);
361362
}
362-
return (Optional<T>) Optional.ofNullable(parsed);
363+
return (Optional<T>) Optional.ofNullable(parsedRef.get());
363364
}
364365

365366
protected final void setParsed(Object parsed) {
366-
this.parsed = parsed;
367+
this.parsedRef = new WeakReference<>(parsed);
367368
}
368369

369370
@Override

direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import cz.o2.proxima.core.storage.StreamElement;
2424
import cz.o2.proxima.core.storage.commitlog.Position;
2525
import cz.o2.proxima.core.util.ExceptionUtils;
26+
import cz.o2.proxima.core.util.Optionals;
2627
import cz.o2.proxima.core.util.Pair;
2728
import cz.o2.proxima.direct.core.CommitCallback;
2829
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
@@ -108,20 +109,15 @@ protected void onCache(StreamElement element, boolean overwrite) {
108109
final Pair<Long, Payload> oldVal;
109110
synchronized (cache) {
110111
oldVal = cache.get(element.getKey(), attrName, Long.MAX_VALUE);
111-
long sequentialId = element.hasSequentialId() ? element.getSequentialId() : 0L;
112-
updated =
113-
cache.put(
114-
element.getKey(),
115-
attrName,
116-
element.getStamp(),
117-
sequentialId,
118-
overwrite,
119-
parsed.orElse(null));
112+
updated = cache.put(element, overwrite);
120113
}
121114
if (updated) {
122115
updateCallback.accept(
123116
element,
124-
oldVal != null ? Pair.of(oldVal.getFirst(), oldVal.getSecond().getData()) : null);
117+
oldVal != null && !oldVal.getSecond().getData().isDelete()
118+
? Pair.of(
119+
oldVal.getFirst(), Optionals.get(oldVal.getSecond().getData().getParsed()))
120+
: null);
125121
}
126122
}
127123
}
@@ -276,7 +272,7 @@ public <T> Optional<KeyValue<T>> get(
276272
if (desc.isWildcard()) {
277273
// check for wildcard delete
278274
Pair<Long, Payload> wildcard = cache.get(key, desc.toAttributePrefix(), stamp);
279-
if (wildcard != null && wildcard.getSecond().getData() == null) {
275+
if (wildcard != null && wildcard.getSecond().getData().isDelete()) {
280276
// this is delete
281277
// move the required stamp after the delete
282278
deleteStamp = wildcard.getFirst();
@@ -285,7 +281,7 @@ public <T> Optional<KeyValue<T>> get(
285281
final long filterStamp = deleteStamp;
286282
return Optional.ofNullable(cache.get(key, attribute, stamp))
287283
.filter(e -> e.getFirst() >= filterStamp)
288-
.flatMap(e -> Optional.ofNullable(toKv(key, attribute, e)));
284+
.flatMap(e -> Optional.ofNullable(toKv(e)));
289285
}
290286
}
291287

@@ -339,7 +335,7 @@ private void scanWildcardPrefix(
339335
return null;
340336
},
341337
(attr, e) -> {
342-
KeyValue<Object> kv = toKv(key, attr, e);
338+
KeyValue<Object> kv = toKv(e);
343339
if (kv != null) {
344340
if (missing.getAndDecrement() != 0) {
345341
consumer.accept(kv);
@@ -368,45 +364,33 @@ public void close() {
368364
cache.clear();
369365
}
370366

371-
private @Nullable <T> KeyValue<T> toKv(
372-
String key, String attribute, @Nullable Pair<Long, Payload> p) {
373-
374-
Optional<AttributeDescriptor<Object>> attrDesc = entity.findAttribute(attribute, true);
375-
if (attrDesc.isPresent()) {
376-
return toKv(key, attribute, attrDesc.get(), p);
377-
}
378-
log.warn("Missing attribute {} in entity {}", attribute, entity);
379-
return null;
380-
}
381-
382367
@SuppressWarnings("unchecked")
383-
private @Nullable <T> KeyValue<T> toKv(
384-
String key, String attribute, AttributeDescriptor<?> attr, @Nullable Pair<Long, Payload> p) {
385-
386-
if (p == null || p.getSecond() == null || p.getSecond().getData() == null) {
368+
private @Nullable <T> KeyValue<T> toKv(@Nullable Pair<Long, Payload> p) {
369+
if (p == null || p.getSecond() == null || p.getSecond().getData().isDelete()) {
387370
return null;
388371
}
389-
if (p.getSecond().getSeqId() > 0) {
372+
StreamElement data = p.getSecond().getData();
373+
if (data.getSequentialId() > 0) {
390374
return KeyValue.of(
391-
entity,
392-
(AttributeDescriptor<T>) attr,
393-
p.getSecond().getSeqId(),
394-
key,
395-
attribute,
396-
new RawOffset(attribute),
397-
(T) p.getSecond().getData(),
398-
null,
399-
p.getFirst());
375+
data.getEntityDescriptor(),
376+
(AttributeDescriptor<T>) data.getAttributeDescriptor(),
377+
data.getSequentialId(),
378+
data.getKey(),
379+
data.getAttribute(),
380+
new RawOffset(data.getAttribute()),
381+
(T) data.getParsed().orElse(null),
382+
data.getValue(),
383+
data.getStamp());
400384
}
401385
return KeyValue.of(
402-
entity,
403-
(AttributeDescriptor<T>) attr,
404-
key,
405-
attribute,
406-
new RawOffset(attribute),
407-
(T) p.getSecond().getData(),
408-
null,
409-
p.getFirst());
386+
data.getEntityDescriptor(),
387+
(AttributeDescriptor<T>) data.getAttributeDescriptor(),
388+
data.getKey(),
389+
data.getAttribute(),
390+
new RawOffset(data.getAttribute()),
391+
(T) data.getParsed().orElse(null),
392+
data.getValue(),
393+
data.getStamp());
410394
}
411395

412396
@Override

direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import cz.o2.proxima.core.functional.UnaryFunction;
2121
import cz.o2.proxima.core.repository.AttributeDescriptor;
2222
import cz.o2.proxima.core.repository.EntityDescriptor;
23+
import cz.o2.proxima.core.storage.StreamElement;
2324
import cz.o2.proxima.core.util.Pair;
2425
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
2526
import java.io.Serializable;
@@ -28,6 +29,7 @@
2829
import java.util.Map;
2930
import java.util.NavigableMap;
3031
import java.util.Objects;
32+
import java.util.Optional;
3133
import java.util.Set;
3234
import java.util.TreeMap;
3335
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,8 +46,7 @@ class TimeBoundedVersionedCache implements Serializable {
4446

4547
@Value
4648
static class Payload {
47-
@Nullable Object data;
48-
long seqId;
49+
StreamElement data;
4950
boolean overridable;
5051
}
5152

@@ -63,7 +64,6 @@ static class Payload {
6364

6465
@Nullable
6566
synchronized Pair<Long, Payload> get(String key, String attribute, long stamp) {
66-
6767
NavigableMap<String, NavigableMap<Long, Payload>> attrMap;
6868
attrMap = cache.get(key);
6969
if (attrMap != null) {
@@ -107,16 +107,15 @@ synchronized void scan(
107107
return;
108108
}
109109
String lastParent = null;
110-
Pair<Long, Payload> parentEntry = null;
110+
Pair<Long, Payload> parentEntry;
111111
long parentTombstoneStamp = stamp;
112112
for (Map.Entry<String, NavigableMap<Long, Payload>> e : attrMap.tailMap(offset).entrySet()) {
113-
114113
if (e.getKey().startsWith(prefix)) {
115114
if (!e.getKey().equals(offset)) {
116115
if (lastParent == null || !e.getKey().startsWith(lastParent)) {
117116
lastParent = parentRecordExtractor.apply(e.getKey());
118117
parentEntry = lastParent == null ? null : get(key, lastParent, stamp);
119-
boolean isDelete = parentEntry != null && parentEntry.getSecond().getData() == null;
118+
boolean isDelete = parentEntry != null && parentEntry.getSecond().getData().isDelete();
120119
parentTombstoneStamp = isDelete ? parentEntry.getFirst() : -1;
121120
}
122121
Map.Entry<Long, Payload> floorEntry = e.getValue().floorEntry(stamp);
@@ -194,45 +193,56 @@ synchronized void keys(int offset, int limit, Consumer<String> keyConsumer) {
194193
}
195194
}
196195

197-
synchronized boolean put(
198-
String key,
199-
String attribute,
200-
long stamp,
201-
long sequentialId,
202-
boolean overwrite,
203-
@Nullable Object value) {
204-
196+
synchronized boolean put(StreamElement element, boolean overwrite) {
205197
AtomicBoolean updated = new AtomicBoolean();
206198
cache.compute(
207-
key,
208-
(k, attrMap) -> {
209-
if (attrMap == null) {
210-
attrMap = new TreeMap<>();
211-
}
212-
NavigableMap<Long, Payload> valueMap =
213-
attrMap.computeIfAbsent(attribute, tmp -> new TreeMap<>());
214-
if (valueMap.isEmpty() || valueMap.firstKey() - keepDuration < stamp) {
215-
final Payload oldPayload = valueMap.get(stamp);
216-
if (overwrite || oldPayload == null || oldPayload.overridable) {
217-
logPayloadUpdateIfNecessary(key, attribute, stamp, value);
218-
Payload newPayload = new Payload(value, sequentialId, !overwrite);
219-
valueMap.put(stamp, newPayload);
220-
updated.set(!newPayload.equals(oldPayload));
221-
}
222-
}
223-
long first;
224-
while (!valueMap.isEmpty() && (first = valueMap.firstKey()) + keepDuration < stamp) {
225-
valueMap.remove(first);
226-
}
227-
return attrMap;
228-
});
199+
element.getKey(),
200+
(k, attrMap) -> updateAttrMapByElement(element, overwrite, updated, attrMap));
229201
return updated.get();
230202
}
231203

232-
private void logPayloadUpdateIfNecessary(
233-
String key, String attribute, long stamp, @Nullable Object value) {
204+
private NavigableMap<String, NavigableMap<Long, Payload>> updateAttrMapByElement(
205+
StreamElement element,
206+
boolean overwrite,
207+
AtomicBoolean updated,
208+
@Nullable NavigableMap<String, NavigableMap<Long, Payload>> attrMap) {
209+
210+
if (attrMap == null) {
211+
attrMap = new TreeMap<>();
212+
}
213+
String attribute =
214+
element.isDeleteWildcard()
215+
? element.getAttributeDescriptor().toAttributePrefix()
216+
: element.getAttribute();
217+
long stamp = element.getStamp();
218+
219+
NavigableMap<Long, Payload> valueMap =
220+
attrMap.computeIfAbsent(attribute, tmp -> new TreeMap<>());
221+
if (valueMap.isEmpty() || valueMap.firstKey() - keepDuration < stamp) {
222+
final Payload oldPayload = valueMap.get(stamp);
223+
if (overwrite || oldPayload == null || oldPayload.overridable) {
224+
logPayloadUpdateIfNecessary(attribute, stamp, element);
225+
Payload newPayload = new Payload(element, !overwrite);
226+
valueMap.put(stamp, newPayload);
227+
Object oldParsed =
228+
Optional.ofNullable(oldPayload == null ? null : oldPayload.getData())
229+
.flatMap(StreamElement::getParsed)
230+
.orElse(null);
231+
updated.set(!element.getParsed().equals(oldParsed));
232+
}
233+
}
234+
long first;
235+
while (!valueMap.isEmpty() && (first = valueMap.firstKey()) + keepDuration < stamp) {
236+
valueMap.remove(first);
237+
}
238+
return attrMap;
239+
}
240+
241+
private void logPayloadUpdateIfNecessary(String attribute, long stamp, StreamElement element) {
234242

235243
if (log.isDebugEnabled()) {
244+
String key = element.getKey();
245+
Object value = element.getParsed().orElse(null);
236246
AttributeDescriptor<Object> attrDesc = entity.findAttribute(attribute, true).orElse(null);
237247
if (attrDesc != null) {
238248
log.debug(

0 commit comments

Comments
 (0)