-
Notifications
You must be signed in to change notification settings - Fork 255
Expand file tree
/
Copy pathAerospikeTestOperations.java
More file actions
127 lines (108 loc) · 4.01 KB
/
AerospikeTestOperations.java
File metadata and controls
127 lines (108 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.playtika.testcontainer.aerospike;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.Value;
import org.springframework.util.StringUtils;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@RequiredArgsConstructor
public class AerospikeTestOperations {
private final ExpiredDocumentsCleaner expiredDocumentsCleaner;
private final GenericContainer<?> aerospikeContainer;
public void addDuration(Duration duration) {
timeTravel(DateTimeUtils.now().plus(duration).plusMinutes(1));
}
public void timeTravelTo(OffsetDateTime futureTime) {
OffsetDateTime now = DateTimeUtils.now();
if (futureTime.isBefore(now)) {
throw new IllegalArgumentException("Time should be in future. Now is: " + now + " time is:" + futureTime);
} else {
timeTravel(futureTime);
}
}
public void rollbackTime() {
DateTimeUtils.setCurrentMillisSystem();
}
private void timeTravel(OffsetDateTime newNow) {
DateTimeUtils.setCurrentMillisFixed(newNow.toInstant().toEpochMilli());
expiredDocumentsCleaner.cleanExpiredDocumentsBefore(newNow.toInstant().toEpochMilli());
}
/**
* More at https://www.aerospike.com/docs/guide/scan.html
*
* @return performed scans on aerospike server instance.
*/
@SneakyThrows
public List<ScanJob> getScans() {
Container.ExecResult execResult = aerospikeContainer.execInContainer("asinfo", "-v", "query-show");
String stdout = execResult.getStdout();
return getScanJobs(stdout);
}
private List<ScanJob> getScanJobs(String stdout) {
if (!StringUtils.hasText(stdout)) {
return Collections.emptyList();
}
return Arrays.stream(stdout.replace("\n", "").split(";"))
.map(this::parseToObScanJobObject)
.collect(Collectors.toList());
}
private ScanJob parseToObScanJobObject(String job) {
String[] pairs = job.split(":");
Map<String, String> pairsMap = Arrays.stream(pairs)
.map(pair -> {
String[] kv = pair.split("=");
return new AbstractMap.SimpleEntry<>(kv[0], kv[1]);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return ScanJob.builder()
.module(pairsMap.get("module"))
.set(pairsMap.get("set"))
.udfFunction(pairsMap.get("udf-function"))
.status(pairsMap.get("status"))
.trid(pairsMap.get("trid"))
.namespace(pairsMap.get("ns"))
.build();
}
@SneakyThrows
public void killAllScans() {
Container.ExecResult execResult = aerospikeContainer.execInContainer("asinfo", "-v", "scan-abort-all:");
assertThat(execResult.getStdout())
.as("Scan jobs killed")
.contains("OK");
}
public void assertNoScans() {
assertNoScans(scanJob -> true);
}
public void assertNoScans(Predicate<ScanJob> scanJobPredicate) {
List<ScanJob> scanJobs = getScans().stream()
.filter(scanJobPredicate)
.collect(Collectors.toList());
assertThat(scanJobs)
.as("Scan jobs")
.isEmpty();
}
public void assertNoScansForSet(String setName) {
assertNoScans(job -> setName.equals(job.set));
}
@Value
@Builder
public static class ScanJob {
String module;
String set;
String udfFunction;
String status;
String trid;
String namespace;
}
}