Skip to content

Commit 2834437

Browse files
committed
redis-7.2
1 parent 0780f00 commit 2834437

18 files changed

Lines changed: 571 additions & 6 deletions

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
<dependency>
7171
<groupId>com.moilioncircle</groupId>
7272
<artifactId>redis-replicator</artifactId>
73-
<version>3.7.0</version>
73+
<version>3.8.0</version>
7474
<exclusions>
7575
<exclusion>
7676
<groupId>org.slf4j</groupId>
@@ -81,7 +81,7 @@
8181
<dependency>
8282
<groupId>com.moilioncircle</groupId>
8383
<artifactId>redis-rdb-cli-api</artifactId>
84-
<version>1.8.0</version>
84+
<version>1.9.0</version>
8585
</dependency>
8686

8787
<dependency>
@@ -102,7 +102,7 @@
102102
<dependency>
103103
<groupId>org.slf4j</groupId>
104104
<artifactId>slf4j-api</artifactId>
105-
<version>2.0.6</version>
105+
<version>2.0.7</version>
106106
</dependency>
107107
<dependency>
108108
<groupId>org.apache.logging.log4j</groupId>

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/AbstractJsonRdbVisitor.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,33 @@ protected Event doApplySet(RedisInputStream in, int version, byte[] key, int typ
144144
return context.valueOf(new DummyKeyValuePair());
145145
}
146146

147+
@Override
148+
protected Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
149+
json(context, key, type, () -> {
150+
Outputs.write('[', out);
151+
BaseRdbParser parser = new BaseRdbParser(in);
152+
RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
153+
boolean flag = true;
154+
listPack.skip(4); // total-bytes
155+
int len = listPack.readInt(2);
156+
while (len > 0) {
157+
if (!flag) {
158+
Outputs.write(',', out);
159+
}
160+
byte[] element = listPackEntry(listPack);
161+
emitString(element);
162+
flag = false;
163+
len--;
164+
}
165+
int lpend = listPack.read(); // lp-end
166+
if (lpend != 255) {
167+
throw new AssertionError("listpack expect 255 but " + lpend);
168+
}
169+
Outputs.write(']', out);
170+
});
171+
return context.valueOf(new DummyKeyValuePair());
172+
}
173+
147174
@Override
148175
protected Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
149176
json(context, key, type, () -> {
@@ -556,4 +583,22 @@ protected Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[]
556583
});
557584
return context.valueOf(new DummyKeyValuePair());
558585
}
586+
587+
@Override
588+
protected Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
589+
json(context, key, type, () -> {
590+
Outputs.write('"', out);
591+
int ver = getVersion(version);
592+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, ver, out, redis)) {
593+
if (ver < 11) {
594+
listener.write((byte) Constants.RDB_TYPE_STREAM_LISTPACKS);
595+
} else {
596+
listener.write((byte) type);
597+
}
598+
super.doApplyStreamListPacks3(in, ver, key, type, context, listener);
599+
}
600+
Outputs.write('"', out);
601+
});
602+
return context.valueOf(new DummyKeyValuePair());
603+
}
559604
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/AbstractRctRdbVisitor.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,52 @@ protected Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[]
155155
if (version < 10) context.setValueRdbType(RDB_TYPE_STREAM_LISTPACKS);
156156
return context.valueOf(new DummyKeyValuePair());
157157
}
158+
159+
protected Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context, RawByteListener listener) throws IOException {
160+
SkipRdbParser skipParser = new SkipRdbParser(in);
161+
long listPacks = skipParser.rdbLoadLen().len;
162+
while (listPacks-- > 0) {
163+
skipParser.rdbLoadPlainStringObject();
164+
skipParser.rdbLoadPlainStringObject();
165+
}
166+
skipParser.rdbLoadLen(); // length
167+
skipParser.rdbLoadLen(); // lastId
168+
skipParser.rdbLoadLen(); // lastId
169+
if (version < 11) replicator.removeRawByteListener(listener);
170+
skipParser.rdbLoadLen(); // firstId
171+
skipParser.rdbLoadLen(); // firstId
172+
skipParser.rdbLoadLen(); // maxDeletedEntryId
173+
skipParser.rdbLoadLen(); // maxDeletedEntryId
174+
skipParser.rdbLoadLen(); // entriesAdded
175+
if (version < 11) replicator.addRawByteListener(listener);
176+
long groupCount = skipParser.rdbLoadLen().len;
177+
while (groupCount-- > 0) {
178+
skipParser.rdbLoadPlainStringObject();
179+
skipParser.rdbLoadLen();
180+
skipParser.rdbLoadLen();
181+
if (version < 11) replicator.removeRawByteListener(listener);
182+
skipParser.rdbLoadLen(); // entriesRead
183+
if (version < 11) replicator.addRawByteListener(listener);
184+
long groupPel = skipParser.rdbLoadLen().len;
185+
while (groupPel-- > 0) {
186+
in.skip(16);
187+
skipParser.rdbLoadMillisecondTime();
188+
skipParser.rdbLoadLen();
189+
}
190+
long consumerCount = skipParser.rdbLoadLen().len;
191+
while (consumerCount-- > 0) {
192+
skipParser.rdbLoadPlainStringObject();
193+
skipParser.rdbLoadMillisecondTime();
194+
if (version < 11) replicator.removeRawByteListener(listener);
195+
skipParser.rdbLoadMillisecondTime(); // activeTime
196+
if (version < 11) replicator.addRawByteListener(listener);
197+
long consumerPel = skipParser.rdbLoadLen().len;
198+
while (consumerPel-- > 0) {
199+
in.skip(16);
200+
}
201+
}
202+
}
203+
if (version < 11) context.setValueRdbType(RDB_TYPE_STREAM_LISTPACKS);
204+
return context.valueOf(new DummyKeyValuePair());
205+
}
158206
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/CountRdbVisitor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ public Event doApplySet(RedisInputStream in, int version, byte[] key, int type,
110110
counter.compute(DataType.parse(type).getValue(), (k, v) -> v == null ? 1 : v + 1);
111111
return super.doApplySet(in, version, key, type, context);
112112
}
113+
114+
@Override
115+
public Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
116+
counter.compute(DataType.parse(type).getValue(), (k, v) -> v == null ? 1 : v + 1);
117+
return super.doApplySetListPack(in, version, key, type, context);
118+
}
113119

114120
@Override
115121
public Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
@@ -206,4 +212,10 @@ public Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[] ke
206212
counter.compute(DataType.parse(type).getValue(), (k, v) -> v == null ? 1 : v + 1);
207213
return super.doApplyStreamListPacks2(in, version, key, type, context);
208214
}
215+
216+
@Override
217+
public Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
218+
counter.compute(DataType.parse(type).getValue(), (k, v) -> v == null ? 1 : v + 1);
219+
return super.doApplyStreamListPacks3(in, version, key, type, context);
220+
}
209221
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/DiffRdbVisitor.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,20 @@ public Event doApplySet(RedisInputStream in, int version, byte[] key, int type,
124124
return context.valueOf(new DummyKeyValuePair());
125125
}
126126

127+
@Override
128+
public Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
129+
escaper.encode(key, out);
130+
delimiter(out);
131+
expire(context.getExpiredType(), context.getExpiredValue());
132+
version = getVersion(version);
133+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
134+
listener.write((byte) type);
135+
super.doApplySetListPack(in, version, key, type, context);
136+
}
137+
Outputs.write('\n', out);
138+
return context.valueOf(new DummyKeyValuePair());
139+
}
140+
127141
@Override
128142
public Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
129143
escaper.encode(key, out);
@@ -351,4 +365,22 @@ public Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[] ke
351365
Outputs.write('\n', out);
352366
return context.valueOf(new DummyKeyValuePair());
353367
}
368+
369+
@Override
370+
public Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
371+
escaper.encode(key, out);
372+
delimiter(out);
373+
expire(context.getExpiredType(), context.getExpiredValue());
374+
version = getVersion(version);
375+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
376+
if (version < 11) {
377+
listener.write((byte) Constants.RDB_TYPE_STREAM_LISTPACKS);
378+
} else {
379+
listener.write((byte) type);
380+
}
381+
super.doApplyStreamListPacks3(in, version, key, type, context, listener);
382+
}
383+
Outputs.write('\n', out);
384+
return context.valueOf(new DummyKeyValuePair());
385+
}
354386
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/DumpRdbVisitor.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_FUNCTION2;
2626
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_HASH;
2727
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST;
28+
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_SET;
2829
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET;
2930
import static com.moilioncircle.redis.replicator.rdb.BaseRdbParser.StringHelper.listPackEntry;
3031
import static java.nio.ByteBuffer.wrap;
@@ -173,6 +174,49 @@ protected Event doApplySet(RedisInputStream in, int version, byte[] key, int typ
173174
}
174175
}
175176

177+
@Override
178+
protected Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
179+
ByteBuffer ex = ZERO_BUF;
180+
if (context.getExpiredValue() != null) {
181+
long ms = context.getExpiredValue() - System.currentTimeMillis();
182+
if (ms <= 0) {
183+
return super.doApplySetListPack(in, version, key, type, context);
184+
} else {
185+
ex = wrap(String.valueOf(ms).getBytes());
186+
}
187+
}
188+
version = getVersion(version);
189+
try (LayeredOutputStream out = new LayeredOutputStream(configure)) {
190+
if (version < 11 /* since redis rdb version 11 */) {
191+
// downgrade to RDB_TYPE_SET
192+
BaseRdbParser parser = new BaseRdbParser(in);
193+
BaseRdbEncoder encoder = new BaseRdbEncoder();
194+
ByteBufferOutputStream out1 = new ByteBufferOutputStream(configure.getOutputBufferSize());
195+
RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
196+
listPack.skip(4); // total-bytes
197+
int len = listPack.readInt(2);
198+
long targetLen = len;
199+
while (len > 0) {
200+
byte[] element = listPackEntry(listPack);
201+
encoder.rdbGenericSaveStringObject(new ByteArray(element), out1);
202+
len--;
203+
}
204+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper, false)) {
205+
listener.write((byte) RDB_TYPE_SET);
206+
listener.handle(encoder.rdbSaveLen(targetLen));
207+
listener.handle(out1.toByteBuffer());
208+
}
209+
} else {
210+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
211+
listener.write((byte) type);
212+
super.doApplySetListPack(in, version, key, type, context);
213+
}
214+
}
215+
Protocols.restore(this.out, wrap(key), ex, out.toByteBuffers(), replace);
216+
return context.valueOf(new DummyKeyValuePair());
217+
}
218+
}
219+
176220
@Override
177221
protected Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
178222
ByteBuffer ex = ZERO_BUF;
@@ -663,4 +707,30 @@ protected Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[]
663707
return context.valueOf(new DummyKeyValuePair());
664708
}
665709
}
710+
711+
@Override
712+
protected Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
713+
ByteBuffer ex = ZERO_BUF;
714+
if (context.getExpiredValue() != null) {
715+
long ms = context.getExpiredValue() - System.currentTimeMillis();
716+
if (ms <= 0) {
717+
return super.doApplyStreamListPacks3(in, version, key, type, context);
718+
} else {
719+
ex = wrap(String.valueOf(ms).getBytes());
720+
}
721+
}
722+
version = getVersion(version);
723+
try (LayeredOutputStream out = new LayeredOutputStream(configure)) {
724+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, escaper)) {
725+
if (version < 11) {
726+
listener.write((byte) Constants.RDB_TYPE_STREAM_LISTPACKS);
727+
} else {
728+
listener.write((byte) type);
729+
}
730+
super.doApplyStreamListPacks3(in, version, key, type, context, listener);
731+
}
732+
Protocols.restore(this.out, wrap(key), ex, out.toByteBuffers(), replace);
733+
return context.valueOf(new DummyKeyValuePair());
734+
}
735+
}
666736
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/FormatterRdbVisitor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ protected Event doApplyList(RedisInputStream in, int version, byte[] key, int ty
106106
protected Event doApplySet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
107107
return formatter.applySet(replicator, in, version, key, type, context);
108108
}
109+
110+
@Override
111+
protected Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
112+
return formatter.applySetListPack(replicator, in, version, key, type, context);
113+
}
109114

110115
@Override
111116
protected Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
@@ -186,4 +191,9 @@ protected Event doApplyStreamListPacks(RedisInputStream in, int version, byte[]
186191
protected Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
187192
return formatter.applyStreamListPacks2(replicator, in, version, key, type, context);
188193
}
194+
195+
@Override
196+
protected Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
197+
return formatter.applyStreamListPacks3(replicator, in, version, key, type, context);
198+
}
189199
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/KeyRdbVisitor.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ public Event doApplySet(RedisInputStream in, int version, byte[] key, int type,
5757
return super.doApplySet(in, version, key, type, context);
5858
}
5959

60+
@Override
61+
public Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
62+
quote(key, out);
63+
Outputs.write('\n', out);
64+
return super.doApplySetListPack(in, version, key, type, context);
65+
}
66+
6067
@Override
6168
public Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
6269
quote(key, out);
@@ -168,4 +175,11 @@ public Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[] ke
168175
Outputs.write('\n', out);
169176
return super.doApplyStreamListPacks2(in, version, key, type, context);
170177
}
178+
179+
@Override
180+
public Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
181+
quote(key, out);
182+
Outputs.write('\n', out);
183+
return super.doApplyStreamListPacks3(in, version, key, type, context);
184+
}
171185
}

src/main/java/com/moilioncircle/redis/rdb/cli/ext/rct/KeyValRdbVisitor.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,28 @@ public Event doApplySet(RedisInputStream in, int version, byte[] key, int type,
9696
return context.valueOf(new DummyKeyValuePair());
9797
}
9898

99+
@Override
100+
protected Event doApplySetListPack(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
101+
quote(key, out);
102+
delimiter(out);
103+
BaseRdbParser parser = new BaseRdbParser(in);
104+
RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
105+
listPack.skip(4); // total-bytes
106+
int len = listPack.readInt(2);
107+
while (len > 0) {
108+
byte[] element = listPackEntry(listPack);
109+
len--;
110+
quote(element, out);
111+
if (len - 1 > 0) delimiter(out);
112+
else Outputs.write('\n', out);
113+
}
114+
int lpend = listPack.read(); // lp-end
115+
if (lpend != 255) {
116+
throw new AssertionError("listpack expect 255 but " + lpend);
117+
}
118+
return context.valueOf(new DummyKeyValuePair());
119+
}
120+
99121
@Override
100122
public Event doApplyZSet(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
101123
quote(key, out);
@@ -474,4 +496,23 @@ public Event doApplyStreamListPacks2(RedisInputStream in, int version, byte[] ke
474496
Outputs.write('\n', out);
475497
return context.valueOf(new DummyKeyValuePair());
476498
}
499+
500+
@Override
501+
public Event doApplyStreamListPacks3(RedisInputStream in, int version, byte[] key, int type, ContextKeyValuePair context) throws IOException {
502+
quote(key, out);
503+
delimiter(out);
504+
out.write(configure.getQuote());
505+
version = getVersion(version);
506+
try (DumpRawByteListener listener = new DumpRawByteListener(replicator, version, out, redis)) {
507+
if (version < 10) {
508+
listener.write((byte) Constants.RDB_TYPE_STREAM_LISTPACKS);
509+
} else {
510+
listener.write((byte) type);
511+
}
512+
super.doApplyStreamListPacks3(in, version, key, type, context, listener);
513+
}
514+
out.write(configure.getQuote());
515+
Outputs.write('\n', out);
516+
return context.valueOf(new DummyKeyValuePair());
517+
}
477518
}

0 commit comments

Comments
 (0)