-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathdemo_asynMaster.dos
More file actions
49 lines (42 loc) · 2.19 KB
/
Copy pathdemo_asynMaster.dos
File metadata and controls
49 lines (42 loc) · 2.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 主集群异步复制Demo
*/
login("admin", "123456")
rpc(getControllerAlias(), startClusterReplication) // 主集群的控制节点上执行,启用异步复制功能
// rpc(getControllerAlias(), stopClusterReplication) // 主集群的控制节点上执行,关闭异步复制功能
// 创建数据库
dbName = "dfs://testDB"
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db = database(dbName, VALUE, 2023.01.01..2023.12.31)
setDatabaseForClusterReplication(db, true) // 手动开启新建数据库的异步复制功能,默认为 false
// 查询异步复制状态
schema(db) // 可以通过查看 clusterReplicationEnabled 的值来确认库的异步复制是否开启
// getDatabaseClusterReplicationStatus() // 也可以通过该函数查看所有 database 的异步复制开启状态
// 在主集群上执行库表的 DDL,DML 操作(异步复制只支持DFS表/库的DDL、DML操作)
tbName = "testTB"
colNames = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`Volume`Amount
colTypes = [SYMBOL, DATETIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE]
schemaTable = table(1:0, colNames, colTypes)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime) // 创建分区表
n = 1210000 // 模拟数据并写入分区表
randPrice = round(10+rand(1.0, 100), 2)
randVolume = 100+rand(100, 100)
SecurityID = lpad(string(take(0..4999, 5000)), 6, `0)
DateTime = (2023.01.08T09:30:00 + take(0..120, 121)*60).join(2023.01.08T13:00:00 + take(0..120, 121)*60)
PreClosePx = rand(randPrice, n)
OpenPx = rand(randPrice, n)
HighPx = rand(randPrice, n)
LowPx = rand(randPrice, n)
LastPx = rand(randPrice, n)
Volume = int(rand(randVolume, n))
Amount = round(LastPx*Volume, 2)
tmp = cj(table(SecurityID), table(DateTime))
t = tmp.join!(table(PreClosePx, OpenPx, HighPx, LowPx, LastPx, Volume, Amount))
dbName = "dfs://testDB"
tbName = "testTB"
loadTable(dbName, tbName).append!(t)
// 状态查看
rpc(getControllerAlias(), getMasterReplicationStatus) // 控制节点上执行,提交任务后可以查看任务发送队列的状态
rpc(getControllerAlias(), getRecentSlaveReplicationInfo) // 控制节点上执行,查看各个连接的从集群的回放状态