Skip to content

Commit 5e1587f

Browse files
authored
Revert "Fix CQ recovery gap and stale callback contamination (#17734) (#17820)" (#17826)
This reverts commit 24be79a.
1 parent b0033af commit 5e1587f

16 files changed

Lines changed: 106 additions & 635 deletions

File tree

iotdb-core/confignode/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@
137137
<groupId>org.apache.commons</groupId>
138138
<artifactId>commons-lang3</artifactId>
139139
</dependency>
140+
<dependency>
141+
<groupId>commons-codec</groupId>
142+
<artifactId>commons-codec</artifactId>
143+
</dependency>
140144
<dependency>
141145
<groupId>org.apache.thrift</groupId>
142146
<artifactId>libthrift</artifactId>

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,11 @@
2121

2222
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
2323

24-
import java.util.Optional;
25-
2624
import static org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ;
2725

2826
public class ShowCQPlan extends ConfigPhysicalReadPlan {
2927

30-
private final String cqId;
31-
3228
public ShowCQPlan() {
33-
this(null);
34-
}
35-
36-
public ShowCQPlan(String cqId) {
3729
super(SHOW_CQ);
38-
this.cqId = cqId;
39-
}
40-
41-
public Optional<String> getCqId() {
42-
return Optional.ofNullable(cqId);
4330
}
4431
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
3535

3636
private String cqId;
3737

38-
private String cqToken;
38+
private String md5;
3939

4040
public ActiveCQPlan() {
4141
super(ACTIVE_CQ);
4242
}
4343

44-
public ActiveCQPlan(String cqId, String cqToken) {
44+
public ActiveCQPlan(String cqId, String md5) {
4545
super(ACTIVE_CQ);
4646
Validate.notNull(cqId);
47-
Validate.notNull(cqToken);
47+
Validate.notNull(md5);
4848
this.cqId = cqId;
49-
this.cqToken = cqToken;
49+
this.md5 = md5;
5050
}
5151

5252
public String getCqId() {
5353
return cqId;
5454
}
5555

56-
public String getCqToken() {
57-
return cqToken;
56+
public String getMd5() {
57+
return md5;
5858
}
5959

6060
@Override
6161
protected void serializeImpl(DataOutputStream stream) throws IOException {
6262
stream.writeShort(getType().getPlanType());
6363
ReadWriteIOUtils.write(cqId, stream);
64-
ReadWriteIOUtils.write(cqToken, stream);
64+
ReadWriteIOUtils.write(md5, stream);
6565
}
6666

6767
@Override
6868
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
6969
cqId = ReadWriteIOUtils.readString(buffer);
70-
cqToken = ReadWriteIOUtils.readString(buffer);
70+
md5 = ReadWriteIOUtils.readString(buffer);
7171
}
7272

7373
@Override
@@ -82,11 +82,11 @@ public boolean equals(Object o) {
8282
return false;
8383
}
8484
ActiveCQPlan that = (ActiveCQPlan) o;
85-
return cqId.equals(that.cqId) && cqToken.equals(that.cqToken);
85+
return cqId.equals(that.cqId) && md5.equals(that.md5);
8686
}
8787

8888
@Override
8989
public int hashCode() {
90-
return Objects.hash(super.hashCode(), cqId, cqToken);
90+
return Objects.hash(super.hashCode(), cqId, md5);
9191
}
9292
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,29 @@ public class AddCQPlan extends ConfigPhysicalPlan {
3737

3838
private TCreateCQReq req;
3939

40-
private String cqToken;
40+
private String md5;
4141

4242
private long firstExecutionTime;
4343

4444
public AddCQPlan() {
4545
super(ADD_CQ);
4646
}
4747

48-
public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) {
48+
public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) {
4949
super(ADD_CQ);
5050
Validate.notNull(req);
51-
Validate.notNull(cqToken);
51+
Validate.notNull(md5);
5252
this.req = req;
53-
this.cqToken = cqToken;
53+
this.md5 = md5;
5454
this.firstExecutionTime = firstExecutionTime;
5555
}
5656

5757
public TCreateCQReq getReq() {
5858
return req;
5959
}
6060

61-
public String getCqToken() {
62-
return cqToken;
61+
public String getMd5() {
62+
return md5;
6363
}
6464

6565
public long getFirstExecutionTime() {
@@ -70,14 +70,14 @@ public long getFirstExecutionTime() {
7070
protected void serializeImpl(DataOutputStream stream) throws IOException {
7171
stream.writeShort(getType().getPlanType());
7272
ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
73-
ReadWriteIOUtils.write(cqToken, stream);
73+
ReadWriteIOUtils.write(md5, stream);
7474
ReadWriteIOUtils.write(firstExecutionTime, stream);
7575
}
7676

7777
@Override
7878
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7979
req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer);
80-
cqToken = ReadWriteIOUtils.readString(buffer);
80+
md5 = ReadWriteIOUtils.readString(buffer);
8181
firstExecutionTime = ReadWriteIOUtils.readLong(buffer);
8282
}
8383

@@ -95,11 +95,11 @@ public boolean equals(Object o) {
9595
AddCQPlan addCQPlan = (AddCQPlan) o;
9696
return firstExecutionTime == addCQPlan.firstExecutionTime
9797
&& Objects.equals(req, addCQPlan.req)
98-
&& Objects.equals(cqToken, addCQPlan.cqToken);
98+
&& Objects.equals(md5, addCQPlan.md5);
9999
}
100100

101101
@Override
102102
public int hashCode() {
103-
return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime);
103+
return Objects.hash(super.hashCode(), req, md5, firstExecutionTime);
104104
}
105105
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan {
3737
private String cqId;
3838

3939
// may be null in user call of drop CQ
40-
private String cqToken;
40+
private String md5;
4141

4242
public DropCQPlan() {
4343
super(DROP_CQ);
@@ -49,33 +49,33 @@ public DropCQPlan(String cqId) {
4949
this.cqId = cqId;
5050
}
5151

52-
public DropCQPlan(String cqId, String cqToken) {
52+
public DropCQPlan(String cqId, String md5) {
5353
super(DROP_CQ);
5454
Validate.notNull(cqId);
55-
Validate.notNull(cqToken);
55+
Validate.notNull(md5);
5656
this.cqId = cqId;
57-
this.cqToken = cqToken;
57+
this.md5 = md5;
5858
}
5959

6060
public String getCqId() {
6161
return cqId;
6262
}
6363

64-
public Optional<String> getCqToken() {
65-
return Optional.ofNullable(cqToken);
64+
public Optional<String> getMd5() {
65+
return Optional.ofNullable(md5);
6666
}
6767

6868
@Override
6969
protected void serializeImpl(DataOutputStream stream) throws IOException {
7070
stream.writeShort(getType().getPlanType());
7171
ReadWriteIOUtils.write(cqId, stream);
72-
ReadWriteIOUtils.write(cqToken, stream);
72+
ReadWriteIOUtils.write(md5, stream);
7373
}
7474

7575
@Override
7676
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7777
cqId = ReadWriteIOUtils.readString(buffer);
78-
cqToken = ReadWriteIOUtils.readString(buffer);
78+
md5 = ReadWriteIOUtils.readString(buffer);
7979
}
8080

8181
@Override
@@ -90,11 +90,11 @@ public boolean equals(Object o) {
9090
return false;
9191
}
9292
DropCQPlan that = (DropCQPlan) o;
93-
return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken);
93+
return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
9494
}
9595

9696
@Override
9797
public int hashCode() {
98-
return Objects.hash(super.hashCode(), cqId, cqToken);
98+
return Objects.hash(super.hashCode(), cqId, md5);
9999
}
100100
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,20 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan {
3737

3838
private long executionTime;
3939

40-
private String cqToken;
40+
// may be null in user call of drop CQ
41+
private String md5;
4142

4243
public UpdateCQLastExecTimePlan() {
4344
super(UPDATE_CQ_LAST_EXEC_TIME);
4445
}
4546

46-
public UpdateCQLastExecTimePlan(String cqId, long executionTime, String cqToken) {
47+
public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) {
4748
super(UPDATE_CQ_LAST_EXEC_TIME);
4849
Validate.notNull(cqId);
49-
Validate.notNull(cqToken);
50+
Validate.notNull(md5);
5051
this.cqId = cqId;
5152
this.executionTime = executionTime;
52-
this.cqToken = cqToken;
53+
this.md5 = md5;
5354
}
5455

5556
public String getCqId() {
@@ -60,23 +61,23 @@ public long getExecutionTime() {
6061
return executionTime;
6162
}
6263

63-
public String getCqToken() {
64-
return cqToken;
64+
public String getMd5() {
65+
return md5;
6566
}
6667

6768
@Override
6869
protected void serializeImpl(DataOutputStream stream) throws IOException {
6970
stream.writeShort(getType().getPlanType());
7071
ReadWriteIOUtils.write(cqId, stream);
7172
ReadWriteIOUtils.write(executionTime, stream);
72-
ReadWriteIOUtils.write(cqToken, stream);
73+
ReadWriteIOUtils.write(md5, stream);
7374
}
7475

7576
@Override
7677
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7778
cqId = ReadWriteIOUtils.readString(buffer);
7879
executionTime = ReadWriteIOUtils.readLong(buffer);
79-
cqToken = ReadWriteIOUtils.readString(buffer);
80+
md5 = ReadWriteIOUtils.readString(buffer);
8081
}
8182

8283
@Override
@@ -91,13 +92,11 @@ public boolean equals(Object o) {
9192
return false;
9293
}
9394
UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
94-
return executionTime == that.executionTime
95-
&& cqId.equals(that.cqId)
96-
&& cqToken.equals(that.cqToken);
95+
return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5);
9796
}
9897

9998
@Override
10099
public int hashCode() {
101-
return Objects.hash(super.hashCode(), cqId, executionTime, cqToken);
100+
return Objects.hash(super.hashCode(), cqId, executionTime, md5);
102101
}
103102
}

0 commit comments

Comments
 (0)