Skip to content

Commit acea863

Browse files
author
“寧々”
committed
Init Push
1 parent 622c9c8 commit acea863

24 files changed

+2108
-0
lines changed

ReservoirServer.sln

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.30907.101
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReservoirServer", "ReservoirServer\ReservoirServer.csproj", "{A15DF483-824F-4918-A327-E2D173ADA443}"
7+
EndProject
8+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ActiveMQTester", "..\ActiveMQTester\ActiveMQTester.csproj", "{2D06635B-1178-41C8-9ACF-B7AF54D7FF2A}"
9+
EndProject
10+
Global
11+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
12+
Debug|Any CPU = Debug|Any CPU
13+
Release|Any CPU = Release|Any CPU
14+
EndGlobalSection
15+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
16+
{A15DF483-824F-4918-A327-E2D173ADA443}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
17+
{A15DF483-824F-4918-A327-E2D173ADA443}.Debug|Any CPU.Build.0 = Debug|Any CPU
18+
{A15DF483-824F-4918-A327-E2D173ADA443}.Release|Any CPU.ActiveCfg = Release|Any CPU
19+
{A15DF483-824F-4918-A327-E2D173ADA443}.Release|Any CPU.Build.0 = Release|Any CPU
20+
{2D06635B-1178-41C8-9ACF-B7AF54D7FF2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
21+
{2D06635B-1178-41C8-9ACF-B7AF54D7FF2A}.Debug|Any CPU.Build.0 = Debug|Any CPU
22+
{2D06635B-1178-41C8-9ACF-B7AF54D7FF2A}.Release|Any CPU.ActiveCfg = Release|Any CPU
23+
{2D06635B-1178-41C8-9ACF-B7AF54D7FF2A}.Release|Any CPU.Build.0 = Release|Any CPU
24+
EndGlobalSection
25+
GlobalSection(SolutionProperties) = preSolution
26+
HideSolutionNode = FALSE
27+
EndGlobalSection
28+
GlobalSection(ExtensibilityGlobals) = postSolution
29+
SolutionGuid = {1A25DE5C-C3D6-4C7C-9D42-BFA030EBF309}
30+
EndGlobalSection
31+
EndGlobal

ReservoirServer/AMQCommunicator.cs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading.Tasks;
5+
using ReservoirServer.Driver;
6+
7+
namespace ReservoirServer
8+
{
9+
public delegate void OnAMQMessageReceivedDlg(string message);
10+
class AMQCommunicator
11+
{
12+
protected SimpleQueueSender _sender = null;
13+
protected SimpleTopicSubscriber _subscriber = null;
14+
protected string platform_id = "";
15+
private string uri;
16+
private string queue_name;
17+
private string topic_name;
18+
private object qsend_locker = new object();
19+
public string PlatformID => platform_id;
20+
21+
public event OnAMQMessageReceivedDlg OnSubscribeReceived;
22+
23+
public AMQCommunicator(string uri,string queue_name,string topic_name,string platformID)
24+
{
25+
this.uri = uri;
26+
this.queue_name = queue_name;
27+
this.topic_name = topic_name;
28+
platform_id = platformID;
29+
}
30+
31+
//Init will start communication
32+
public void Initialization()
33+
{
34+
Dispose();
35+
_sender = new SimpleQueueSender(queue_name, uri);
36+
_subscriber = new SimpleTopicSubscriber(topic_name, uri, "xxx", "zzz", null);
37+
_subscriber.OnMessageReceived += _subscriber_OnMessageReceived;
38+
}
39+
40+
public void SendQueue(string data)
41+
{
42+
//lock(qsend_locker)
43+
//{
44+
_sender.SendMessage(data, null);
45+
//}
46+
}
47+
48+
[System.Diagnostics.Conditional("DEBUG")]
49+
public void MultiSendTest()
50+
{
51+
Parallel.For(0, 25, (i) =>
52+
{
53+
string str = "";
54+
for (int j = 0; j < 30; j++)
55+
str += (char)('a' + i % 26);
56+
_sender.SendMessage(str, null);
57+
});
58+
}
59+
60+
61+
public void Dispose()
62+
{
63+
if(_sender != null)
64+
_sender.Dispose();
65+
if(_subscriber != null)
66+
_subscriber.Dispose();
67+
_sender = null;
68+
_subscriber = null;
69+
}
70+
71+
private void _subscriber_OnMessageReceived(string message)
72+
{
73+
OnSubscribeReceived?.Invoke(message);
74+
}
75+
}
76+
}

ReservoirServer/BoxList.cs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
using ReservoirServer.Enterty;
2+
using System;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Text;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace ReservoirServer
10+
{
11+
class BoxList
12+
{
13+
ConcurrentDictionary<string, Box> dic;
14+
Mutex locker = new Mutex();
15+
16+
int _maxclients = 100;
17+
public int MaxClients => _maxclients;
18+
19+
private byte maxcore = 0;
20+
private int _threshold = 100;
21+
22+
public Action<Action<KeyValuePair<string, Box>>> SmartTraverseMethod
23+
{
24+
get
25+
{
26+
if (maxcore < 0)
27+
return SerialTraverse;
28+
else
29+
{
30+
if(Count > _threshold)
31+
return ParallelTraverse;
32+
else
33+
return SerialTraverse;
34+
}
35+
}
36+
}
37+
38+
public BoxList(int maxclients,byte maxcore)
39+
{
40+
_maxclients = maxclients;
41+
this.maxcore = maxcore;
42+
_threshold = 1000; //TODO: Read from ini
43+
dic = new ConcurrentDictionary<string, Box>(10, _maxclients);
44+
}
45+
46+
private int _count = 0;
47+
public int Count
48+
{
49+
get
50+
{
51+
locker.WaitOne();
52+
int rt = _count;
53+
locker.ReleaseMutex();
54+
return rt;
55+
}
56+
}
57+
58+
public Box this[string id]
59+
{
60+
get
61+
{
62+
bool succ = dic.TryGetValue(id, out Box rt);
63+
return succ ? rt : null;
64+
}
65+
}
66+
67+
public bool Remove(string id)
68+
{
69+
bool rt = dic.TryRemove(id, out _);
70+
if (rt)
71+
{
72+
locker.WaitOne();
73+
_count--;
74+
locker.ReleaseMutex();
75+
}
76+
return rt;
77+
}
78+
79+
public bool AddNew(Box box)
80+
{
81+
locker.WaitOne();
82+
bool rt;
83+
if (_count >= MaxClients)
84+
{
85+
locker.ReleaseMutex();
86+
return false;
87+
}
88+
else
89+
{
90+
rt = dic.TryAdd(box.ID, box);
91+
if (rt)
92+
_count++;
93+
locker.ReleaseMutex();
94+
95+
}
96+
return rt;
97+
98+
}
99+
100+
public void SerialTraverse(Action<KeyValuePair<string, Box>> func)
101+
{
102+
foreach (var pair in dic)
103+
func(pair);
104+
}
105+
106+
public void ParallelTraverse(Action<KeyValuePair<string, Box>> func)
107+
{
108+
if(maxcore > 0)
109+
{
110+
ParallelOptions opt = new ParallelOptions() { MaxDegreeOfParallelism = maxcore };
111+
Parallel.ForEach(dic, opt, func);
112+
}
113+
else
114+
{
115+
Parallel.ForEach(dic, func);
116+
}
117+
}
118+
119+
public void ParallelTraverse(Action<KeyValuePair<string, Box>> func,byte coreusage)
120+
{
121+
if (coreusage > 0)
122+
{
123+
ParallelOptions opt = new ParallelOptions() { MaxDegreeOfParallelism = coreusage };
124+
Parallel.ForEach(dic, opt, func);
125+
}
126+
else
127+
{
128+
Parallel.ForEach(dic, func);
129+
}
130+
}
131+
132+
133+
}
134+
}

0 commit comments

Comments
 (0)