-
Notifications
You must be signed in to change notification settings - Fork 333
Expand file tree
/
Copy pathTransactionInfo.java
More file actions
87 lines (70 loc) · 2.67 KB
/
TransactionInfo.java
File metadata and controls
87 lines (70 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package datadog.trace.api.datastreams;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public final class TransactionInfo implements InboxItem {
private static final int MAX_ID_SIZE = 256;
private static final Map<String, Integer> CACHE = new ConcurrentHashMap<>();
private static volatile byte[] CACHE_BYTES = new byte[0];
private static final AtomicInteger ID_COUNTER = new AtomicInteger(1);
private final String id;
private final long timestamp;
private final int checkpointId;
public TransactionInfo(String id, long timestamp, String checkpoint) {
this.id = id;
this.timestamp = timestamp;
this.checkpointId = CACHE.computeIfAbsent(checkpoint, k -> generateCheckpointId(checkpoint));
}
public String getId() {
return id;
}
public long getTimestamp() {
return timestamp;
}
public int getCheckpointId() {
return checkpointId;
}
private int generateCheckpointId(String checkpoint) {
int id = ID_COUNTER.getAndIncrement();
// update cache bytes
byte[] checkpointBytes = checkpoint.getBytes(StandardCharsets.UTF_8);
byte[] bytesToAdd = new byte[checkpointBytes.length + 2];
bytesToAdd[0] = (byte) id;
bytesToAdd[1] = (byte) checkpointBytes.length;
System.arraycopy(checkpointBytes, 0, bytesToAdd, 2, checkpointBytes.length);
appendCacheBytes(bytesToAdd);
return id;
}
private static synchronized void appendCacheBytes(byte[] bytes) {
byte[] newCacheBytes = new byte[CACHE_BYTES.length + bytes.length];
System.arraycopy(CACHE_BYTES, 0, newCacheBytes, 0, CACHE_BYTES.length);
System.arraycopy(bytes, 0, newCacheBytes, CACHE_BYTES.length, bytes.length);
CACHE_BYTES = newCacheBytes;
}
public byte[] getBytes() {
byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
// long ids will be truncated
int idLen = Math.min(idBytes.length, MAX_ID_SIZE);
ByteBuffer buffer = ByteBuffer.allocate(1 + Long.BYTES + 1 + idLen).order(ByteOrder.BIG_ENDIAN);
buffer.put((byte) checkpointId);
buffer.putLong(timestamp);
buffer.put((byte) idLen);
buffer.put(idBytes, 0, idLen);
return buffer.array();
}
// @VisibleForTesting
static synchronized void resetCache() {
CACHE.clear();
CACHE_BYTES = new byte[0];
ID_COUNTER.set(1);
}
public static byte[] getCheckpointIdCacheBytes() {
// CACHE_BYTES are never modified outside of this class, so it's safe to pass the
// value directly without cloning
// no-dd-sa:java-best-practices/return-internal-array
return CACHE_BYTES;
}
}