Skip to content

Commit cb8fd4c

Browse files
authored
Reapply "Fix CQ recovery gap and stale callback contamination (#17734) (#17820)" (#17826) (#17829)
This reverts commit 5e1587f.
1 parent 5e1587f commit cb8fd4c

16 files changed

Lines changed: 635 additions & 106 deletions

File tree

iotdb-core/confignode/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@
137137
<groupId>org.apache.commons</groupId>
138138
<artifactId>commons-lang3</artifactId>
139139
</dependency>
140-
<dependency>
141-
<groupId>commons-codec</groupId>
142-
<artifactId>commons-codec</artifactId>
143-
</dependency>
144140
<dependency>
145141
<groupId>org.apache.thrift</groupId>
146142
<artifactId>libthrift</artifactId>

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,24 @@
2121

2222
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
2323

24+
import java.util.Optional;
25+
2426
import static org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ;
2527

2628
public class ShowCQPlan extends ConfigPhysicalReadPlan {
2729

30+
private final String cqId;
31+
2832
public ShowCQPlan() {
33+
this(null);
34+
}
35+
36+
public ShowCQPlan(String cqId) {
2937
super(SHOW_CQ);
38+
this.cqId = cqId;
39+
}
40+
41+
public Optional<String> getCqId() {
42+
return Optional.ofNullable(cqId);
3043
}
3144
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
3535

3636
private String cqId;
3737

38-
private String md5;
38+
private String cqToken;
3939

4040
public ActiveCQPlan() {
4141
super(ACTIVE_CQ);
4242
}
4343

44-
public ActiveCQPlan(String cqId, String md5) {
44+
public ActiveCQPlan(String cqId, String cqToken) {
4545
super(ACTIVE_CQ);
4646
Validate.notNull(cqId);
47-
Validate.notNull(md5);
47+
Validate.notNull(cqToken);
4848
this.cqId = cqId;
49-
this.md5 = md5;
49+
this.cqToken = cqToken;
5050
}
5151

5252
public String getCqId() {
5353
return cqId;
5454
}
5555

56-
public String getMd5() {
57-
return md5;
56+
public String getCqToken() {
57+
return cqToken;
5858
}
5959

6060
@Override
6161
protected void serializeImpl(DataOutputStream stream) throws IOException {
6262
stream.writeShort(getType().getPlanType());
6363
ReadWriteIOUtils.write(cqId, stream);
64-
ReadWriteIOUtils.write(md5, stream);
64+
ReadWriteIOUtils.write(cqToken, stream);
6565
}
6666

6767
@Override
6868
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
6969
cqId = ReadWriteIOUtils.readString(buffer);
70-
md5 = ReadWriteIOUtils.readString(buffer);
70+
cqToken = ReadWriteIOUtils.readString(buffer);
7171
}
7272

7373
@Override
@@ -82,11 +82,11 @@ public boolean equals(Object o) {
8282
return false;
8383
}
8484
ActiveCQPlan that = (ActiveCQPlan) o;
85-
return cqId.equals(that.cqId) && md5.equals(that.md5);
85+
return cqId.equals(that.cqId) && cqToken.equals(that.cqToken);
8686
}
8787

8888
@Override
8989
public int hashCode() {
90-
return Objects.hash(super.hashCode(), cqId, md5);
90+
return Objects.hash(super.hashCode(), cqId, cqToken);
9191
}
9292
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,29 @@ public class AddCQPlan extends ConfigPhysicalPlan {
3737

3838
private TCreateCQReq req;
3939

40-
private String md5;
40+
private String cqToken;
4141

4242
private long firstExecutionTime;
4343

4444
public AddCQPlan() {
4545
super(ADD_CQ);
4646
}
4747

48-
public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) {
48+
public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) {
4949
super(ADD_CQ);
5050
Validate.notNull(req);
51-
Validate.notNull(md5);
51+
Validate.notNull(cqToken);
5252
this.req = req;
53-
this.md5 = md5;
53+
this.cqToken = cqToken;
5454
this.firstExecutionTime = firstExecutionTime;
5555
}
5656

5757
public TCreateCQReq getReq() {
5858
return req;
5959
}
6060

61-
public String getMd5() {
62-
return md5;
61+
public String getCqToken() {
62+
return cqToken;
6363
}
6464

6565
public long getFirstExecutionTime() {
@@ -70,14 +70,14 @@ public long getFirstExecutionTime() {
7070
protected void serializeImpl(DataOutputStream stream) throws IOException {
7171
stream.writeShort(getType().getPlanType());
7272
ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
73-
ReadWriteIOUtils.write(md5, stream);
73+
ReadWriteIOUtils.write(cqToken, stream);
7474
ReadWriteIOUtils.write(firstExecutionTime, stream);
7575
}
7676

7777
@Override
7878
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7979
req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer);
80-
md5 = ReadWriteIOUtils.readString(buffer);
80+
cqToken = ReadWriteIOUtils.readString(buffer);
8181
firstExecutionTime = ReadWriteIOUtils.readLong(buffer);
8282
}
8383

@@ -95,11 +95,11 @@ public boolean equals(Object o) {
9595
AddCQPlan addCQPlan = (AddCQPlan) o;
9696
return firstExecutionTime == addCQPlan.firstExecutionTime
9797
&& Objects.equals(req, addCQPlan.req)
98-
&& Objects.equals(md5, addCQPlan.md5);
98+
&& Objects.equals(cqToken, addCQPlan.cqToken);
9999
}
100100

101101
@Override
102102
public int hashCode() {
103-
return Objects.hash(super.hashCode(), req, md5, firstExecutionTime);
103+
return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime);
104104
}
105105
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan {
3737
private String cqId;
3838

3939
// may be null in user call of drop CQ
40-
private String md5;
40+
private String cqToken;
4141

4242
public DropCQPlan() {
4343
super(DROP_CQ);
@@ -49,33 +49,33 @@ public DropCQPlan(String cqId) {
4949
this.cqId = cqId;
5050
}
5151

52-
public DropCQPlan(String cqId, String md5) {
52+
public DropCQPlan(String cqId, String cqToken) {
5353
super(DROP_CQ);
5454
Validate.notNull(cqId);
55-
Validate.notNull(md5);
55+
Validate.notNull(cqToken);
5656
this.cqId = cqId;
57-
this.md5 = md5;
57+
this.cqToken = cqToken;
5858
}
5959

6060
public String getCqId() {
6161
return cqId;
6262
}
6363

64-
public Optional<String> getMd5() {
65-
return Optional.ofNullable(md5);
64+
public Optional<String> getCqToken() {
65+
return Optional.ofNullable(cqToken);
6666
}
6767

6868
@Override
6969
protected void serializeImpl(DataOutputStream stream) throws IOException {
7070
stream.writeShort(getType().getPlanType());
7171
ReadWriteIOUtils.write(cqId, stream);
72-
ReadWriteIOUtils.write(md5, stream);
72+
ReadWriteIOUtils.write(cqToken, stream);
7373
}
7474

7575
@Override
7676
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7777
cqId = ReadWriteIOUtils.readString(buffer);
78-
md5 = ReadWriteIOUtils.readString(buffer);
78+
cqToken = ReadWriteIOUtils.readString(buffer);
7979
}
8080

8181
@Override
@@ -90,11 +90,11 @@ public boolean equals(Object o) {
9090
return false;
9191
}
9292
DropCQPlan that = (DropCQPlan) o;
93-
return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
93+
return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken);
9494
}
9595

9696
@Override
9797
public int hashCode() {
98-
return Objects.hash(super.hashCode(), cqId, md5);
98+
return Objects.hash(super.hashCode(), cqId, cqToken);
9999
}
100100
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,19 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan {
3737

3838
private long executionTime;
3939

40-
// may be null in user call of drop CQ
41-
private String md5;
40+
private String cqToken;
4241

4342
public UpdateCQLastExecTimePlan() {
4443
super(UPDATE_CQ_LAST_EXEC_TIME);
4544
}
4645

47-
public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) {
46+
public UpdateCQLastExecTimePlan(String cqId, long executionTime, String cqToken) {
4847
super(UPDATE_CQ_LAST_EXEC_TIME);
4948
Validate.notNull(cqId);
50-
Validate.notNull(md5);
49+
Validate.notNull(cqToken);
5150
this.cqId = cqId;
5251
this.executionTime = executionTime;
53-
this.md5 = md5;
52+
this.cqToken = cqToken;
5453
}
5554

5655
public String getCqId() {
@@ -61,23 +60,23 @@ public long getExecutionTime() {
6160
return executionTime;
6261
}
6362

64-
public String getMd5() {
65-
return md5;
63+
public String getCqToken() {
64+
return cqToken;
6665
}
6766

6867
@Override
6968
protected void serializeImpl(DataOutputStream stream) throws IOException {
7069
stream.writeShort(getType().getPlanType());
7170
ReadWriteIOUtils.write(cqId, stream);
7271
ReadWriteIOUtils.write(executionTime, stream);
73-
ReadWriteIOUtils.write(md5, stream);
72+
ReadWriteIOUtils.write(cqToken, stream);
7473
}
7574

7675
@Override
7776
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7877
cqId = ReadWriteIOUtils.readString(buffer);
7978
executionTime = ReadWriteIOUtils.readLong(buffer);
80-
md5 = ReadWriteIOUtils.readString(buffer);
79+
cqToken = ReadWriteIOUtils.readString(buffer);
8180
}
8281

8382
@Override
@@ -92,11 +91,13 @@ public boolean equals(Object o) {
9291
return false;
9392
}
9493
UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
95-
return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5);
94+
return executionTime == that.executionTime
95+
&& cqId.equals(that.cqId)
96+
&& cqToken.equals(that.cqToken);
9697
}
9798

9899
@Override
99100
public int hashCode() {
100-
return Objects.hash(super.hashCode(), cqId, executionTime, md5);
101+
return Objects.hash(super.hashCode(), cqId, executionTime, cqToken);
101102
}
102103
}

0 commit comments

Comments
 (0)