Skip to content
This repository was archived by the owner on Nov 8, 2020. It is now read-only.

Commit 4e8d214

Browse files
author
无名
committed
postgresql的jsonb替换为json,提高插入性能
1 parent b97c257 commit 4e8d214

5 files changed

Lines changed: 16 additions & 16 deletions

File tree

src/Ray.PostgreSQL/Core/PSQLBuildService.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public async Task CreateEventTable(EventSubTable subTable)
5757
{stateIdSql},
5858
UniqueId varchar(250) null,
5959
TypeCode varchar(100) not null,
60-
Data jsonb not null,
60+
Data json not null,
6161
Version int8 not null,
6262
Timestamp int8 not null,
6363
constraint {subTable.SubTable}_id_unique unique(StateId,TypeCode,UniqueId)
@@ -91,7 +91,7 @@ public async Task CreateEventArchiveTable()
9191
{stateIdSql},
9292
UniqueId varchar(250) null,
9393
TypeCode varchar(100) not null,
94-
Data jsonb not null,
94+
Data json not null,
9595
Version int8 not null,
9696
Timestamp int8 not null,
9797
constraint {storageOptions.EventArchiveTable}_id_unique unique(StateId,TypeCode,UniqueId)
@@ -130,7 +130,7 @@ Id varchar(50) not null PRIMARY KEY,
130130
EndTimestamp int8 not null,
131131
Index int4 not null,
132132
EventIsCleared bool not null,
133-
Data jsonb not null,
133+
Data json not null,
134134
IsOver bool not null,
135135
Version int8 not null)WITH (OIDS=FALSE);
136136
CREATE INDEX IF NOT EXISTS {storageOptions.SnapshotArchiveTable}_StateId ON {storageOptions.SnapshotArchiveTable} USING btree(StateId)";
@@ -146,7 +146,7 @@ public async Task CreateSnapshotTable()
146146
var sql = $@"
147147
CREATE TABLE if not exists {storageOptions.SnapshotTable}(
148148
{stateIdSql},
149-
Data jsonb not null,
149+
Data json not null,
150150
Version int8 not null,
151151
StartTimestamp int8 not null,
152152
LatestMinEventTimestamp int8 not null,

src/Ray.PostgreSQL/Storage/ArchiveStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public ArchiveStorage(IServiceProvider serviceProvider, ISerializer serializer,
3838
getByIdSql = $"select * FROM {tableName} where id=@Id";
3939
getListByStateIdSql = $"select Id,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared FROM {tableName} where stateid=@StateId";
4040
getLatestByStateIdSql = $"select Id,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared FROM {tableName} where stateid=@StateId order by index desc limit 1";
41-
insertSql = $"INSERT into {tableName}(Id,stateid,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared,data,IsOver,Version)VALUES(@Id,@StateId,@StartVersion,@EndVersion,@StartTimestamp,@EndTimestamp,@Index,@EventIsCleared,(@Data)::jsonb,@IsOver,@Version)";
41+
insertSql = $"INSERT into {tableName}(Id,stateid,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared,data,IsOver,Version)VALUES(@Id,@StateId,@StartVersion,@EndVersion,@StartTimestamp,@EndTimestamp,@Index,@EventIsCleared,(@Data)::json,@IsOver,@Version)";
4242
updateOverSql = $"update {tableName} set IsOver=@IsOver where stateid=@StateId";
4343
updateEventIsClearSql = $"update {tableName} set EventIsCleared=true where id=@Id";
4444
}

src/Ray.PostgreSQL/Storage/DistributedTxStorage.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void CreateEventSubRecordTable()
4343
CREATE TABLE if not exists {options.Value.TableName}(
4444
UnitName varchar(500) not null,
4545
TransactionId int8 not null,
46-
Data jsonb not null,
46+
Data json not null,
4747
Status int2 not null)WITH (OIDS=FALSE);
4848
CREATE UNIQUE INDEX IF NOT EXISTS UnitName_TransId ON {options.Value.TableName} USING btree(UnitName, TransactionId)";
4949
using (var connection = CreateConnection())
@@ -117,7 +117,7 @@ private async Task BatchProcessing(List<AsyncInputEvent<AppendInput, bool>> wrap
117117
writer.StartRow();
118118
writer.Write(wrapper.Value.UnitName, NpgsqlDbType.Varchar);
119119
writer.Write(wrapper.Value.TransactionId, NpgsqlDbType.Bigint);
120-
writer.Write(wrapper.Value.Data, NpgsqlDbType.Jsonb);
120+
writer.Write(wrapper.Value.Data, NpgsqlDbType.Json);
121121
writer.Write((short)wrapper.Value.Status, NpgsqlDbType.Smallint);
122122
}
123123
writer.Complete();
@@ -127,7 +127,7 @@ private async Task BatchProcessing(List<AsyncInputEvent<AppendInput, bool>> wrap
127127
}
128128
catch
129129
{
130-
var saveSql = $"INSERT INTO {options.Value.TableName}(UnitName,TransactionId,Data,Status) VALUES(@UnitName,@TransactionId,(@Data)::jsonb,@Status) ON CONFLICT ON CONSTRAINT UnitName_TransId DO NOTHING";
130+
var saveSql = $"INSERT INTO {options.Value.TableName}(UnitName,TransactionId,Data,Status) VALUES(@UnitName,@TransactionId,(@Data)::json,@Status) ON CONFLICT ON CONSTRAINT UnitName_TransId DO NOTHING";
131131
using (var conn = CreateConnection())
132132
{
133133
await conn.OpenAsync();

src/Ray.PostgreSQL/Storage/EventStorage.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ await Task.Run(async () =>
5050
while (reader.StartRow() != -1)
5151
{
5252
var typeCode = reader.Read<string>(NpgsqlDbType.Varchar);
53-
var data = reader.Read<string>(NpgsqlDbType.Jsonb);
53+
var data = reader.Read<string>(NpgsqlDbType.Json);
5454
var version = reader.Read<long>(NpgsqlDbType.Bigint);
5555
var timestamp = reader.Read<long>(NpgsqlDbType.Bigint);
5656
if (version <= endVersion && version >= startVersion)
@@ -93,7 +93,7 @@ await Task.Run(async () =>
9393
{
9494
while (reader.StartRow() != -1)
9595
{
96-
var data = reader.Read<string>(NpgsqlDbType.Jsonb);
96+
var data = reader.Read<string>(NpgsqlDbType.Json);
9797
var version = reader.Read<long>(NpgsqlDbType.Bigint);
9898
var timestamp = reader.Read<long>(NpgsqlDbType.Bigint);
9999
if (version >= startVersion && serializer.Deserialize(type, Encoding.Default.GetBytes(data)) is IEvent evt)
@@ -170,7 +170,7 @@ async Task BatchCopy(string tableName, List<AsyncInputEvent<BatchAppendTransport
170170
writer.Write(wrapper.Value.Event.StateId);
171171
writer.Write(wrapper.Value.UniqueId, NpgsqlDbType.Varchar);
172172
writer.Write(wrapper.Value.Event.Event.GetType().FullName, NpgsqlDbType.Varchar);
173-
writer.Write(Encoding.Default.GetString(wrapper.Value.BytesTransport.EventBytes), NpgsqlDbType.Jsonb);
173+
writer.Write(Encoding.Default.GetString(wrapper.Value.BytesTransport.EventBytes), NpgsqlDbType.Json);
174174
writer.Write(wrapper.Value.Event.Base.Version, NpgsqlDbType.Bigint);
175175
writer.Write(wrapper.Value.Event.Base.Timestamp, NpgsqlDbType.Bigint);
176176
}
@@ -183,7 +183,7 @@ async Task BatchCopy(string tableName, List<AsyncInputEvent<BatchAppendTransport
183183
{
184184
logger.LogError(ex, ex.Message);
185185
var saveSql = saveSqlDict.GetOrAdd(tableName,
186-
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::jsonb,@Version,@Timestamp) ON CONFLICT ON CONSTRAINT {key}_id_unique DO NOTHING");
186+
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::json,@Version,@Timestamp) ON CONFLICT ON CONSTRAINT {key}_id_unique DO NOTHING");
187187
await BatchInsert(saveSql, wrapperList);
188188
}
189189
}
@@ -268,7 +268,7 @@ await Task.Run(async () =>
268268
writer.Write(wrapper.FullyEvent.StateId);
269269
writer.Write(wrapper.UniqueId, NpgsqlDbType.Varchar);
270270
writer.Write(wrapper.FullyEvent.Event.GetType().FullName, NpgsqlDbType.Varchar);
271-
writer.Write(Encoding.Default.GetString(wrapper.BytesTransport.EventBytes), NpgsqlDbType.Jsonb);
271+
writer.Write(Encoding.Default.GetString(wrapper.BytesTransport.EventBytes), NpgsqlDbType.Json);
272272
writer.Write(wrapper.FullyEvent.Base.Version, NpgsqlDbType.Bigint);
273273
writer.Write(wrapper.FullyEvent.Base.Timestamp, NpgsqlDbType.Bigint);
274274
}
@@ -296,7 +296,7 @@ await Task.Run(async () =>
296296
foreach (var group in groups)
297297
{
298298
var saveSql = saveSqlDict.GetOrAdd(group.Key,
299-
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::jsonb,@Version,@Timestamp)");
299+
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::json,@Version,@Timestamp)");
300300
await conn.ExecuteAsync(saveSql, group.Select(g => new
301301
{
302302
g.t.FullyEvent.StateId,

src/Ray.PostgreSQL/Storage/SnapshotStorage.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ public SnapshotStorage(ISerializer serializer, StorageOptions config)
2727
this.config = config;
2828
deleteSql = $"DELETE FROM {this.config.SnapshotTable} where stateid=@StateId";
2929
getByIdSql = $"select * FROM {this.config.SnapshotTable} where stateid=@StateId";
30-
insertSql = $"INSERT into {this.config.SnapshotTable}(stateid,data,version,StartTimestamp,LatestMinEventTimestamp,IsLatest,IsOver)VALUES(@StateId,(@Data)::jsonb,@Version,@StartTimestamp,@LatestMinEventTimestamp,@IsLatest,@IsOver)";
31-
updateSql = $"update {this.config.SnapshotTable} set data=(@Data)::jsonb,version=@Version,LatestMinEventTimestamp=@LatestMinEventTimestamp,IsLatest=@IsLatest,IsOver=@IsOver where stateid=@StateId";
30+
insertSql = $"INSERT into {this.config.SnapshotTable}(stateid,data,version,StartTimestamp,LatestMinEventTimestamp,IsLatest,IsOver)VALUES(@StateId,(@Data)::json,@Version,@StartTimestamp,@LatestMinEventTimestamp,@IsLatest,@IsOver)";
31+
updateSql = $"update {this.config.SnapshotTable} set data=(@Data)::json,version=@Version,LatestMinEventTimestamp=@LatestMinEventTimestamp,IsLatest=@IsLatest,IsOver=@IsOver where stateid=@StateId";
3232
updateOverSql = $"update {this.config.SnapshotTable} set IsOver=@IsOver where stateid=@StateId";
3333
updateIsLatestSql = $"update {this.config.SnapshotTable} set IsLatest=@IsLatest where stateid=@StateId";
3434
updateLatestTimestampSql = $"update {this.config.SnapshotTable} set LatestMinEventTimestamp=@LatestMinEventTimestamp where stateid=@StateId";

0 commit comments

Comments
 (0)