Skip to content

Commit 7674981

Browse files
author
Ahmad Noman Musleh
committed
Changed the ReadTcp method to use while loop instead of Rx
1 parent f2814c9 commit 7674981

3 files changed

Lines changed: 89 additions & 35 deletions

File tree

src/Notebook.Sample/notebook.ipynb

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"metadata": {
7+
"dotnet_interactive": {
8+
"language": "csharp"
9+
}
10+
},
11+
"outputs": [
12+
{
13+
"data": {
14+
"text/html": [
15+
"<div><div></div><div></div><div><strong>Installed Packages</strong><ul><li><span>Spotware.OpenAPI.Net, 1.3.5</span></li></ul></div></div>"
16+
]
17+
},
18+
"metadata": {},
19+
"output_type": "display_data"
20+
}
21+
],
22+
"source": [
23+
"#r \"nuget: Spotware.OpenAPI.Net\""
24+
]
25+
},
26+
{
27+
"cell_type": "code",
28+
"execution_count": null,
29+
"metadata": {
30+
"dotnet_interactive": {
31+
"language": "csharp"
32+
}
33+
},
34+
"outputs": [],
35+
"source": [
36+
"using OpenAPI.Net;\n",
37+
"using OpenAPI.Net.Auth;\n",
38+
"using OpenAPI.Net.Helpers;"
39+
]
40+
}
41+
],
42+
"metadata": {
43+
"kernelspec": {
44+
"display_name": ".NET (C#)",
45+
"language": "C#",
46+
"name": ".net-csharp"
47+
},
48+
"language_info": {
49+
"name": "C#"
50+
}
51+
},
52+
"nbformat": 4,
53+
"nbformat_minor": 2
54+
}

src/OpenAPI.Net/OpenAPI.Net.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<PackageTags>cTrader, Open API, Spotware</PackageTags>
1010
<Description>A .NET RX library for Spotware Open API</Description>
1111
<PackageId>Spotware.OpenAPI.Net</PackageId>
12-
<Version>1.3.5</Version>
12+
<Version>1.3.6-rc0</Version>
1313
<Platforms>AnyCPU</Platforms>
1414
<Company>Spotware</Company>
1515
<Authors>Spotware</Authors>

src/OpenAPI.Net/OpenClient.cs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,7 @@ private async Task ConnectTcp()
181181

182182
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(false);
183183

184-
_listenerDisposable = Observable.DoWhile(Observable.FromAsync(ReadTcp), () => !IsDisposed)
185-
.Where(message => message != null)
186-
.ObserveOn(TaskPoolScheduler.Default)
187-
.Subscribe(OnNext);
184+
_ = ReadTcp();
188185
}
189186

190187
/// <summary>
@@ -349,51 +346,54 @@ public void Dispose()
349346
/// <summary>
350347
/// This method will read the TCP stream for incoming messages
351348
/// </summary>
352-
/// <returns>Task<ProtoMessage></returns>
353-
private async Task<ProtoMessage> ReadTcp()
349+
/// <returns>Task</returns>
350+
private async Task ReadTcp()
354351
{
355-
try
352+
while (!IsDisposed)
356353
{
357-
var lengthArray = new byte[sizeof(int)];
354+
try
355+
{
356+
var lengthArray = new byte[sizeof(int)];
358357

359-
var readBytes = 0;
358+
var readBytes = 0;
360359

361-
do
362-
{
363-
var count = lengthArray.Length - readBytes;
360+
do
361+
{
362+
var count = lengthArray.Length - readBytes;
364363

365-
readBytes += await _sslStream.ReadAsync(lengthArray, readBytes, count).ConfigureAwait(false);
366-
}
367-
while (readBytes < lengthArray.Length);
364+
readBytes += await _sslStream.ReadAsync(lengthArray, readBytes, count).ConfigureAwait(false);
365+
}
366+
while (readBytes < lengthArray.Length);
368367

369-
Array.Reverse(lengthArray);
368+
Array.Reverse(lengthArray);
370369

371-
var length = BitConverter.ToInt32(lengthArray, 0);
370+
var length = BitConverter.ToInt32(lengthArray, 0);
372371

373-
if (length <= 0) return null;
372+
if (length <= 0) continue;
374373

375-
var data = new byte[length];
374+
var data = new byte[length];
376375

377-
readBytes = 0;
376+
readBytes = 0;
378377

379-
do
380-
{
381-
var count = data.Length - readBytes;
378+
do
379+
{
380+
var count = data.Length - readBytes;
382381

383-
readBytes += await _sslStream.ReadAsync(data, readBytes, count).ConfigureAwait(false);
384-
}
385-
while (readBytes < length);
382+
readBytes += await _sslStream.ReadAsync(data, readBytes, count).ConfigureAwait(false);
383+
}
384+
while (readBytes < length);
386385

387-
return ProtoMessage.Parser.ParseFrom(data);
388-
}
389-
catch (Exception ex)
390-
{
391-
var readException = new ReadException(ex);
386+
var message = ProtoMessage.Parser.ParseFrom(data);
392387

393-
OnError(readException);
394-
}
388+
OnNext(message);
389+
}
390+
catch (Exception ex)
391+
{
392+
var readException = new ReadException(ex);
395393

396-
return null;
394+
OnError(readException);
395+
}
396+
}
397397
}
398398

399399
/// <summary>

0 commit comments

Comments
 (0)