Skip to content

Commit ca33278

Browse files
committed
Fix SessionDataSet sync-over-async deadlock and migrate to 4-arg RowRecord constructor
- Add HasNextAsync() and FetchResultsAsync() to SessionDataSet to avoid sync-over-async deadlock in FetchResults() - Mark HasNext() as [Obsolete] with guidance to use HasNextAsync() - Update ConstructOneRow() to use 4-arg RowRecord constructor with dataTypes - Fix IoTDBCommand.BindParameters to use 4-arg RowRecord constructor - Add #pragma warning suppress in IoTDBDataReader for required sync calls - Update SessionPool.CheckTimeSeriesExistsAsync to use HasNextAsync() - Migrate all sample code to use await HasNextAsync() and 4-arg RowRecord
1 parent 26d96b1 commit ca33278

15 files changed

+1838
-1710
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="Apache.IoTDB" Version="1.0.0.3" />
11+
<ProjectReference Include="..\src\Apache.IoTDB\Apache.IoTDB.csproj" />
1212
</ItemGroup>
1313

14-
</Project>
14+
</Project>

Apache-IoTDB-Client-CSharp-UserCase/Program.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
using System;
2121
using System.Collections.Generic;
22-
using System.Threading;
2322
using System.Threading.Tasks;
2423
using Apache.IoTDB;
2524
using Apache.IoTDB.DataStructure;
@@ -32,10 +31,19 @@ class Program
3231
static int port = 6667;
3332
static int pool_size = 2;
3433

34+
static SessionPool CreateSessionPool()
35+
{
36+
return new SessionPool.Builder()
37+
.SetHost(host)
38+
.SetPort(port)
39+
.SetPoolSize(pool_size)
40+
.Build();
41+
}
42+
3543

3644
static async Task OpenAndCloseSessionPool()
3745
{
38-
var session_pool = new SessionPool(host, port, pool_size);
46+
var session_pool = CreateSessionPool();
3947
await session_pool.Open(false);
4048
if (session_pool.IsOpen())
4149
{
@@ -50,7 +58,7 @@ static async Task OpenAndCloseSessionPool()
5058

5159
static async Task CreateTimeseries()
5260
{
53-
var session_pool = new SessionPool(host, port, pool_size);
61+
var session_pool = CreateSessionPool();
5462
await session_pool.Open(false);
5563

5664
await session_pool.DeleteDatabaseAsync("root.ln.wf01.wt01");
@@ -63,20 +71,21 @@ static async Task CreateTimeseries()
6371

6472
static async Task InsertRecord()
6573
{
66-
var session_pool = new SessionPool(host, port, pool_size);
74+
var session_pool = CreateSessionPool();
6775
await session_pool.Open(false);
6876
long timestamp = 1;
6977
var values = new List<object> { true, (double)1.1, "test" };
7078
var measures = new List<string> { "status", "temperature", "hardware" };
71-
var rowRecord = new RowRecord(timestamp, values, measures);
79+
var dataTypes = new List<TSDataType> { TSDataType.BOOLEAN, TSDataType.DOUBLE, TSDataType.TEXT };
80+
var rowRecord = new RowRecord(timestamp, values, measures, dataTypes);
7281
var status = await session_pool.InsertRecordAsync("root.ln.wf01.wt01", rowRecord);
7382

7483
await session_pool.Close();
7584
}
7685

7786
static async Task InsertTablet()
7887
{
79-
var session_pool = new SessionPool(host, port, pool_size);
88+
var session_pool = CreateSessionPool();
8089
await session_pool.Open(false);
8190
var device_id = "root.ln.wf01.wt01";
8291
var measurement_lst = new List<string> { "status", "temperature", "hardware" };
@@ -95,11 +104,11 @@ static async Task InsertTablet()
95104

96105
static async Task ExecuteQueryStatement()
97106
{
98-
var session_pool = new SessionPool(host, port, pool_size);
107+
var session_pool = CreateSessionPool();
99108
await session_pool.Open(false);
100109
var res = await session_pool.ExecuteQueryStatementAsync("select * from root.ln.wf01.wt01");
101110
res.ShowTableNames();
102-
while (res.HasNext())
111+
while (await res.HasNextAsync())
103112
{
104113
Console.WriteLine(res.Next());
105114
}

samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs

Lines changed: 397 additions & 386 deletions
Large diffs are not rendered by default.

samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs

Lines changed: 121 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -25,100 +25,100 @@
2525

2626
namespace Apache.IoTDB.Samples
2727
{
28-
public partial class SessionPoolTest
29-
{
30-
public async Task TestInsertAlignedTablet()
28+
public partial class SessionPoolTest
3129
{
32-
var session_pool = new SessionPool(host, port, poolSize);
33-
var status = 0;
34-
await session_pool.Open(false);
35-
if (debug) session_pool.OpenDebugMode();
30+
public async Task TestInsertAlignedTablet()
31+
{
32+
var session_pool = new SessionPool(host, port, poolSize);
33+
var status = 0;
34+
await session_pool.Open(false);
35+
if (debug) session_pool.OpenDebugMode();
3636

37-
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
38-
await session_pool.DeleteDatabaseAsync(testDatabaseName);
39-
var device_id = string.Format("{0}.{1}", testDatabaseName, testDevice);
40-
var measurement_lst = new List<string>
37+
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
38+
await session_pool.DeleteDatabaseAsync(testDatabaseName);
39+
var device_id = string.Format("{0}.{1}", testDatabaseName, testDevice);
40+
var measurement_lst = new List<string>
4141
{ testMeasurements[1],
4242
testMeasurements[2],
4343
testMeasurements[3]
4444
};
45-
var value_lst = new List<List<object>>
45+
var value_lst = new List<List<object>>
4646
{
4747
new() {"iotdb", true, (int) 12}, new() {"c#", false, (int) 13},
4848
new() {"client", true, (int) 14}
4949
};
50-
var timestamp_lst = new List<long> { 1, 2, 3 };
51-
var datatype_lst = new List<TSDataType> { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 };
52-
var tablet = new Tablet(device_id, measurement_lst, datatype_lst, value_lst, timestamp_lst);
53-
status = await session_pool.InsertAlignedTabletAsync(tablet);
54-
System.Diagnostics.Debug.Assert(status == 0);
55-
var res = await session_pool.ExecuteQueryStatementAsync(
56-
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15");
57-
res.ShowTableNames();
58-
while (res.HasNext()) Console.WriteLine(res.Next());
50+
var timestamp_lst = new List<long> { 1, 2, 3 };
51+
var datatype_lst = new List<TSDataType> { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 };
52+
var tablet = new Tablet(device_id, measurement_lst, datatype_lst, value_lst, timestamp_lst);
53+
status = await session_pool.InsertAlignedTabletAsync(tablet);
54+
System.Diagnostics.Debug.Assert(status == 0);
55+
var res = await session_pool.ExecuteQueryStatementAsync(
56+
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15");
57+
res.ShowTableNames();
58+
while (await res.HasNextAsync()) Console.WriteLine(res.Next());
5959

60-
await res.Close();
61-
// large data test
62-
value_lst = new List<List<object>>() { };
63-
timestamp_lst = new List<long>() { };
64-
var tasks = new List<Task<int>>();
65-
var start_ms = DateTime.Now.Ticks / 10000;
66-
for (var timestamp = 4; timestamp <= fetchSize * processedSize; timestamp++)
67-
{
68-
timestamp_lst.Add(timestamp);
69-
value_lst.Add(new List<object>() { "iotdb", true, (int)timestamp });
70-
if (timestamp % fetchSize == 0)
71-
{
72-
tablet = new Tablet(device_id, measurement_lst, datatype_lst, value_lst, timestamp_lst);
73-
tasks.Add(session_pool.InsertAlignedTabletAsync(tablet));
74-
value_lst = new List<List<object>>() { };
75-
timestamp_lst = new List<long>() { };
76-
}
77-
}
78-
Console.WriteLine(tasks.Count);
60+
await res.Close();
61+
// large data test
62+
value_lst = new List<List<object>>() { };
63+
timestamp_lst = new List<long>() { };
64+
var tasks = new List<Task<int>>();
65+
var start_ms = DateTime.Now.Ticks / 10000;
66+
for (var timestamp = 4; timestamp <= fetchSize * processedSize; timestamp++)
67+
{
68+
timestamp_lst.Add(timestamp);
69+
value_lst.Add(new List<object>() { "iotdb", true, (int)timestamp });
70+
if (timestamp % fetchSize == 0)
71+
{
72+
tablet = new Tablet(device_id, measurement_lst, datatype_lst, value_lst, timestamp_lst);
73+
tasks.Add(session_pool.InsertAlignedTabletAsync(tablet));
74+
value_lst = new List<List<object>>() { };
75+
timestamp_lst = new List<long>() { };
76+
}
77+
}
78+
Console.WriteLine(tasks.Count);
7979

80-
Task.WaitAll(tasks.ToArray());
81-
var end_ms = DateTime.Now.Ticks / 10000;
82-
Console.WriteLine(string.Format("total tablet insert time is {0}", end_ms - start_ms));
83-
res = await session_pool.ExecuteQueryStatementAsync(
84-
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
85-
res.ShowTableNames();
86-
var res_count = 0;
87-
while (res.HasNext())
88-
{
89-
res.Next();
90-
res_count += 1;
91-
}
80+
Task.WaitAll(tasks.ToArray());
81+
var end_ms = DateTime.Now.Ticks / 10000;
82+
Console.WriteLine(string.Format("total tablet insert time is {0}", end_ms - start_ms));
83+
res = await session_pool.ExecuteQueryStatementAsync(
84+
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
85+
res.ShowTableNames();
86+
var res_count = 0;
87+
while (await res.HasNextAsync())
88+
{
89+
res.Next();
90+
res_count += 1;
91+
}
9292

93-
await res.Close();
94-
Console.WriteLine(res_count + " " + fetchSize * processedSize);
95-
System.Diagnostics.Debug.Assert(res_count == fetchSize * processedSize);
96-
status = await session_pool.DeleteDatabaseAsync(testDatabaseName);
97-
System.Diagnostics.Debug.Assert(status == 0);
98-
await session_pool.Close();
99-
Console.WriteLine("TestInsertAlignedTablet Passed!");
100-
}
93+
await res.Close();
94+
Console.WriteLine(res_count + " " + fetchSize * processedSize);
95+
System.Diagnostics.Debug.Assert(res_count == fetchSize * processedSize);
96+
status = await session_pool.DeleteDatabaseAsync(testDatabaseName);
97+
System.Diagnostics.Debug.Assert(status == 0);
98+
await session_pool.Close();
99+
Console.WriteLine("TestInsertAlignedTablet Passed!");
100+
}
101101

102-
public async Task TestInsertAlignedTablets()
103-
{
104-
var session_pool = new SessionPool(host, port, poolSize);
105-
var status = 0;
106-
await session_pool.Open(false);
107-
if (debug) session_pool.OpenDebugMode();
102+
public async Task TestInsertAlignedTablets()
103+
{
104+
var session_pool = new SessionPool(host, port, poolSize);
105+
var status = 0;
106+
await session_pool.Open(false);
107+
if (debug) session_pool.OpenDebugMode();
108108

109-
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
110-
await session_pool.DeleteDatabaseAsync(testDatabaseName);
111-
var device_id = new List<string>()
109+
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
110+
await session_pool.DeleteDatabaseAsync(testDatabaseName);
111+
var device_id = new List<string>()
112112
{
113113
string.Format("{0}.{1}", testDatabaseName, testDevices[1]),
114114
string.Format("{0}.{1}", testDatabaseName, testDevices[2])
115115
};
116-
var measurements_lst = new List<List<string>>()
116+
var measurements_lst = new List<List<string>>()
117117
{
118118
new() {testMeasurements[1], testMeasurements[2], testMeasurements[3] },
119119
new() {testMeasurements[1], testMeasurements[2], testMeasurements[3] }
120120
};
121-
var values_lst = new List<List<List<object>>>()
121+
var values_lst = new List<List<List<object>>>()
122122
{
123123
new()
124124
{
@@ -131,65 +131,65 @@ public async Task TestInsertAlignedTablets()
131131
new List<object>() {"client_2", true, (int) 3}
132132
}
133133
};
134-
var datatype_lst = new List<List<TSDataType>>()
134+
var datatype_lst = new List<List<TSDataType>>()
135135
{
136136
new() {TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32},
137137
new() {TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32}
138138
};
139-
var timestamp_lst = new List<List<long>>()
139+
var timestamp_lst = new List<List<long>>()
140140
{new() {2, 1, 3}, new() {3, 1, 2}};
141-
var tablets = new List<Tablet>() { };
142-
for (var i = 0; i < device_id.Count; i++)
143-
{
144-
var tablet = new Tablet(device_id[i], measurements_lst[i], datatype_lst[i], values_lst[i], timestamp_lst[i]);
145-
tablets.Add(tablet);
146-
}
141+
var tablets = new List<Tablet>() { };
142+
for (var i = 0; i < device_id.Count; i++)
143+
{
144+
var tablet = new Tablet(device_id[i], measurements_lst[i], datatype_lst[i], values_lst[i], timestamp_lst[i]);
145+
tablets.Add(tablet);
146+
}
147147

148-
status = await session_pool.InsertAlignedTabletsAsync(tablets);
149-
System.Diagnostics.Debug.Assert(status == 0);
150-
var res = await session_pool.ExecuteQueryStatementAsync(
151-
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15");
152-
res.ShowTableNames();
153-
while (res.HasNext()) Console.WriteLine(res.Next());
148+
status = await session_pool.InsertAlignedTabletsAsync(tablets);
149+
System.Diagnostics.Debug.Assert(status == 0);
150+
var res = await session_pool.ExecuteQueryStatementAsync(
151+
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15");
152+
res.ShowTableNames();
153+
while (await res.HasNextAsync()) Console.WriteLine(res.Next());
154154

155-
// large data test
156-
var tasks = new List<Task<int>>();
157-
// tablets = new List<Tablet>() { };
158-
for (var timestamp = 4; timestamp <= processedSize * fetchSize; timestamp++)
159-
{
160-
var local_device_id = string.Format("{0}.{1}", testDatabaseName, testDevices[1]);
161-
var local_measurements = new List<string>()
155+
// large data test
156+
var tasks = new List<Task<int>>();
157+
// tablets = new List<Tablet>() { };
158+
for (var timestamp = 4; timestamp <= processedSize * fetchSize; timestamp++)
159+
{
160+
var local_device_id = string.Format("{0}.{1}", testDatabaseName, testDevices[1]);
161+
var local_measurements = new List<string>()
162162
{testMeasurements[1], testMeasurements[2], testMeasurements[3]};
163-
var local_value = new List<List<object>>() { new() { "iotdb", true, (int)timestamp } };
164-
var local_data_type = new List<TSDataType>() { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 };
165-
var local_timestamp = new List<long> { timestamp };
166-
var tablet = new Tablet(local_device_id, local_measurements, local_data_type, local_value, local_timestamp);
167-
tablets.Add(tablet);
168-
if (timestamp % fetchSize == 0)
169-
{
170-
tasks.Add(session_pool.InsertAlignedTabletsAsync(tablets));
171-
tablets = new List<Tablet>() { };
172-
}
173-
}
163+
var local_value = new List<List<object>>() { new() { "iotdb", true, (int)timestamp } };
164+
var local_data_type = new List<TSDataType>() { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 };
165+
var local_timestamp = new List<long> { timestamp };
166+
var tablet = new Tablet(local_device_id, local_measurements, local_data_type, local_value, local_timestamp);
167+
tablets.Add(tablet);
168+
if (timestamp % fetchSize == 0)
169+
{
170+
tasks.Add(session_pool.InsertAlignedTabletsAsync(tablets));
171+
tablets = new List<Tablet>() { };
172+
}
173+
}
174174

175-
Task.WaitAll(tasks.ToArray());
176-
res = await session_pool.ExecuteQueryStatementAsync(
177-
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]));
178-
res.ShowTableNames();
179-
var res_count = 0;
180-
while (res.HasNext())
181-
{
182-
res.Next();
183-
res_count += 1;
184-
}
175+
Task.WaitAll(tasks.ToArray());
176+
res = await session_pool.ExecuteQueryStatementAsync(
177+
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]));
178+
res.ShowTableNames();
179+
var res_count = 0;
180+
while (await res.HasNextAsync())
181+
{
182+
res.Next();
183+
res_count += 1;
184+
}
185185

186-
await res.Close();
187-
Console.WriteLine(res_count + " " + fetchSize * processedSize);
188-
System.Diagnostics.Debug.Assert(res_count == fetchSize * processedSize);
189-
status = await session_pool.DeleteDatabaseAsync(testDatabaseName);
190-
System.Diagnostics.Debug.Assert(status == 0);
191-
await session_pool.Close();
192-
Console.WriteLine("TestInsertAlignedTablets Passed!");
186+
await res.Close();
187+
Console.WriteLine(res_count + " " + fetchSize * processedSize);
188+
System.Diagnostics.Debug.Assert(res_count == fetchSize * processedSize);
189+
status = await session_pool.DeleteDatabaseAsync(testDatabaseName);
190+
System.Diagnostics.Debug.Assert(status == 0);
191+
await session_pool.Close();
192+
Console.WriteLine("TestInsertAlignedTablets Passed!");
193+
}
193194
}
194-
}
195195
}

0 commit comments

Comments
 (0)