Skip to content

Commit fcdfff3

Browse files
ccr_feature
1 parent 910c424 commit fcdfff3

6 files changed

Lines changed: 245 additions & 3 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ public void addCreateTableRecord(CreateTableRecord createTableRecord) {
230230
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, createTableRecord);
231231
}
232232

233+
public void addCreateDatabaseRecord(CreateDatabaseRecord createDatabaseRecord) {
234+
long dbId = createDatabaseRecord.getDbId();
235+
List<Long> tableIds = Lists.newArrayList();
236+
long commitSeq = createDatabaseRecord.getCommitSeq();
237+
long timestamp = System.currentTimeMillis();
238+
TBinlogType type = TBinlogType.CREATE_DATABASE;
239+
String data = createDatabaseRecord.toJson();
240+
241+
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, createDatabaseRecord);
242+
}
243+
233244
public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
234245
long dbId = dropPartitionInfo.getDbId();
235246
List<Long> tableIds = Lists.newArrayList();
@@ -241,6 +252,17 @@ public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long com
241252
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, dropPartitionInfo);
242253
}
243254

255+
public void addDropDatabaseRecord(DropDatabaseRecord record) {
256+
long dbId = record.getDbId();
257+
List<Long> tableIds = Lists.newArrayList();
258+
long commitSeq = record.getCommitSeq();
259+
long timestamp = System.currentTimeMillis();
260+
TBinlogType type = TBinlogType.DROP_DATABASE;
261+
String data = record.toJson();
262+
263+
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
264+
}
265+
244266
public void addDropTableRecord(DropTableRecord record) {
245267
long dbId = record.getDbId();
246268
List<Long> tableIds = Lists.newArrayList();
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.binlog;
19+
20+
import org.apache.doris.catalog.Database;
21+
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.common.io.Writable;
23+
import org.apache.doris.persist.gson.GsonUtils;
24+
25+
import com.google.common.collect.Maps;
26+
import com.google.gson.annotations.SerializedName;
27+
import org.apache.logging.log4j.LogManager;
28+
import org.apache.logging.log4j.Logger;
29+
30+
import java.io.DataInput;
31+
import java.io.DataOutput;
32+
import java.io.IOException;
33+
import java.util.Map;
34+
35+
/**
36+
* 记录创建数据库的binlog记录
37+
*/
38+
public class CreateDatabaseRecord {
39+
private static final Logger LOG = LogManager.getLogger(CreateDatabaseRecord.class);
40+
41+
@SerializedName(value = "commitSeq")
42+
private long commitSeq;
43+
@SerializedName(value = "dbId")
44+
private long dbId;
45+
@SerializedName(value = "dbName")
46+
private String dbName;
47+
@SerializedName(value = "sql")
48+
private String sql;
49+
@SerializedName(value = "properties")
50+
private Map<String, String> properties;
51+
52+
public CreateDatabaseRecord(long commitSeq, Database db) {
53+
this.commitSeq = commitSeq;
54+
this.dbId = db.getId();
55+
this.dbName = db.getFullName();
56+
57+
// 构建创建数据库的SQL语句
58+
StringBuilder sqlBuilder = new StringBuilder();
59+
sqlBuilder.append("CREATE DATABASE IF NOT EXISTS `");
60+
sqlBuilder.append(dbName);
61+
sqlBuilder.append("`");
62+
63+
// 获取数据库的所有属性
64+
this.properties = db.getDbProperties().getProperties();
65+
66+
// 添加数据库属性到SQL语句
67+
if (properties != null && !properties.isEmpty()) {
68+
sqlBuilder.append(" PROPERTIES (");
69+
boolean first = true;
70+
for (Map.Entry<String, String> entry : properties.entrySet()) {
71+
if (!first) {
72+
sqlBuilder.append(", ");
73+
}
74+
sqlBuilder.append("\"").append(entry.getKey()).append("\"=\"").append(entry.getValue()).append("\"");
75+
first = false;
76+
}
77+
sqlBuilder.append(")");
78+
}
79+
80+
this.sql = sqlBuilder.toString();
81+
}
82+
83+
public long getCommitSeq() {
84+
return commitSeq;
85+
}
86+
87+
public long getDbId() {
88+
return dbId;
89+
}
90+
91+
public String getDbName() {
92+
return dbName;
93+
}
94+
95+
public String getSql() {
96+
return sql;
97+
}
98+
99+
public String toJson() {
100+
return GsonUtils.GSON.toJson(this);
101+
}
102+
103+
@Override
104+
public String toString() {
105+
return toJson();
106+
}
107+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.binlog;
19+
20+
import org.apache.doris.persist.DropDbInfo;
21+
import org.apache.doris.persist.gson.GsonUtils;
22+
23+
import com.google.gson.annotations.SerializedName;
24+
25+
/**
26+
* 记录删除数据库的binlog记录
27+
*/
28+
public class DropDatabaseRecord {
29+
@SerializedName(value = "commitSeq")
30+
private long commitSeq;
31+
@SerializedName(value = "dbName")
32+
private String dbName;
33+
@SerializedName(value = "forceDrop")
34+
private boolean forceDrop;
35+
@SerializedName(value = "rawSql")
36+
private String rawSql;
37+
@SerializedName(value = "dbId")
38+
private long dbId;
39+
40+
public DropDatabaseRecord() {
41+
}
42+
43+
public DropDatabaseRecord(long commitSeq, DropDbInfo dropDbInfo) {
44+
this.commitSeq = commitSeq;
45+
this.dbName = dropDbInfo.getDbName();
46+
this.forceDrop = dropDbInfo.isForceDrop();
47+
this.dbId = dropDbInfo.getDbId();
48+
this.rawSql = this.forceDrop
49+
? String.format("DROP DATABASE IF EXISTS `%s` FORCE", this.dbName)
50+
: String.format("DROP DATABASE IF EXISTS `%s`", this.dbName);
51+
}
52+
53+
public long getCommitSeq() {
54+
return commitSeq;
55+
}
56+
57+
public String getDbName() {
58+
return dbName;
59+
}
60+
61+
public boolean isForceDrop() {
62+
return forceDrop;
63+
}
64+
65+
public String getRawSql() {
66+
return rawSql;
67+
}
68+
69+
public long getDbId() {
70+
return dbId;
71+
}
72+
73+
public void setDbId(long dbId) {
74+
this.dbId = dbId;
75+
}
76+
77+
public String toJson() {
78+
return GsonUtils.GSON.toJson(this);
79+
}
80+
81+
public static DropDatabaseRecord fromJson(String json) {
82+
return GsonUtils.GSON.fromJson(json, DropDatabaseRecord.class);
83+
}
84+
85+
@Override
86+
public String toString() {
87+
return toJson();
88+
}
89+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
569569
idToDb.remove(db.getId());
570570
fullNameToDb.remove(db.getFullName());
571571
DropDbInfo info = new DropDbInfo(dbName, stmt.isForceDrop(), recycleTime);
572+
info.setDbId(db.getId());
572573
Env.getCurrentEnv().getEditLog().logDropDb(info);
573574
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId());
574575
} finally {

fe/fe-core/src/main/java/org/apache/doris/persist/DropDbInfo.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class DropDbInfo implements Writable, GsonPostProcessable {
3636
private boolean forceDrop = false;
3737
@SerializedName(value = "recycleTime")
3838
private long recycleTime = 0;
39+
@SerializedName(value = "dbId")
40+
private long dbId = 0;
3941

4042
public DropDbInfo() {
4143
this("", false, 0);
@@ -58,6 +60,14 @@ public boolean isForceDrop() {
5860
public Long getRecycleTime() {
5961
return recycleTime;
6062
}
63+
64+
public long getDbId() {
65+
return dbId;
66+
}
67+
68+
public void setDbId(long dbId) {
69+
this.dbId = dbId;
70+
}
6171

6272
@Deprecated
6373
private void readFields(DataInput in) throws IOException {
@@ -88,7 +98,8 @@ public boolean equals(Object obj) {
8898

8999
return (dbName.equals(info.getDbName()))
90100
&& (forceDrop == info.isForceDrop())
91-
&& (recycleTime == info.getRecycleTime());
101+
&& (recycleTime == info.getRecycleTime())
102+
&& (dbId == info.getDbId());
92103
}
93104

94105
@Override

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import org.apache.doris.backup.Repository;
2626
import org.apache.doris.backup.RestoreJob;
2727
import org.apache.doris.binlog.AddPartitionRecord;
28+
import org.apache.doris.binlog.CreateDatabaseRecord;
2829
import org.apache.doris.binlog.CreateTableRecord;
30+
import org.apache.doris.binlog.DropDatabaseRecord;
2931
import org.apache.doris.binlog.DropTableRecord;
3032
import org.apache.doris.binlog.UpsertRecord;
3133
import org.apache.doris.blockrule.SqlBlockRule;
@@ -191,12 +193,18 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
191193
}
192194
case OperationType.OP_CREATE_DB: {
193195
Database db = (Database) journal.getData();
196+
LOG.info("Begin to unprotect create db = " + db.getName());
197+
CreateDatabaseRecord record = new CreateDatabaseRecord(logId, db);
194198
env.replayCreateDb(db);
199+
env.getBinlogManager().addCreateDatabaseRecord(record);
195200
break;
196201
}
197202
case OperationType.OP_DROP_DB: {
198203
DropDbInfo dropDbInfo = (DropDbInfo) journal.getData();
204+
LOG.info("Begin to unprotect drop db = " + dropDbInfo.getDbName());
205+
DropDatabaseRecord record = new DropDatabaseRecord(logId, dropDbInfo);
199206
env.replayDropDb(dropDbInfo.getDbName(), dropDbInfo.isForceDrop(), dropDbInfo.getRecycleTime());
207+
env.getBinlogManager().addDropDatabaseRecord(record);
200208
break;
201209
}
202210
case OperationType.OP_ALTER_DB: {
@@ -1408,11 +1416,15 @@ public void logSaveTransactionId(long transactionId) {
14081416
}
14091417

14101418
public void logCreateDb(Database db) {
1411-
logEdit(OperationType.OP_CREATE_DB, db);
1419+
long logId = logEdit(OperationType.OP_CREATE_DB, db);
1420+
CreateDatabaseRecord record = new CreateDatabaseRecord(logId, db);
1421+
Env.getCurrentEnv().getBinlogManager().addCreateDatabaseRecord(record);
14121422
}
14131423

14141424
public void logDropDb(DropDbInfo dropDbInfo) {
1415-
logEdit(OperationType.OP_DROP_DB, dropDbInfo);
1425+
long logId = logEdit(OperationType.OP_DROP_DB, dropDbInfo);
1426+
DropDatabaseRecord record = new DropDatabaseRecord(logId, dropDbInfo);
1427+
Env.getCurrentEnv().getBinlogManager().addDropDatabaseRecord(record);
14161428
}
14171429

14181430
public void logEraseDb(long dbId) {

0 commit comments

Comments
 (0)