Skip to content

Commit 4f24604

Browse files
committed
refactor: task queue system
Changes include: - Added ITaskContext interface for handling WebSocket and Http tasks - Created specialized task context classes for different WebSocket events - Implemented ThreadSafeQueue<ITaskContext *> g_TaskQueue for thread-safe task management - Added WebsocketExtension::AddWsTaskToQueue method to queue tasks - Extract duplicate remote address logic to static helper method
1 parent 57edf17 commit 4f24604

13 files changed

Lines changed: 376 additions & 192 deletions

pushbuild.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
0x0
22
0x1
33
0x2
4-
0x3
4+
0x3
5+
0x4

scripting/websocket_discord.sp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ void onMessage(WebSocket ws, const YYJSON message, int wireSize)
108108

109109
if (strcmp(events, "MESSAGE_CREATE") == 0)
110110
{
111-
int len = message.PtrGetLength("/d/content");
111+
int len = message.PtrGetLength("/d/content");
112112

113-
char[] content = new char[len];
113+
char[] content = new char[len];
114114
message.PtrGetString("/d/content", content, len);
115-
115+
116116
PrintToServer("content: %s", content);
117117
}
118118
}

src/extension.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,30 @@ WsServerHandler g_WsServerHandler;
1111
JSONHandler g_JSONHandler;
1212
HttpHandler g_HttpHandler;
1313

14-
ThreadSafeQueue<std::function<void()>> g_TaskQueue;
14+
ThreadSafeQueue<ITaskContext *> g_TaskQueue;
1515

1616
static void OnGameFrame(bool simulating) {
17-
std::function<void()> task;
1817
int count = 0;
19-
while (g_TaskQueue.TryPop(task) && count < MAX_PROCESS) {
20-
task();
21-
count++;
18+
ITaskContext *context = nullptr;
19+
20+
while (count < MAX_PROCESS) {
21+
if (!g_TaskQueue.TryPop(context)) {
22+
break;
23+
}
24+
25+
if (context) {
26+
context->OnCompleted();
27+
delete context;
28+
count++;
29+
}
2230
}
2331
}
2432

33+
void WebsocketExtension::AddTaskToQueue(ITaskContext *context)
34+
{
35+
g_TaskQueue.Push(context);
36+
}
37+
2538
bool WebsocketExtension::SDK_OnLoad(char* error, size_t maxlen, bool late)
2639
{
2740
sharesys->AddNatives(myself, ws_natives);

src/extension.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <IXWebSocketServer.h>
88
#include <IXHttpClient.h>
99
#include <yyjsonwrapper.h>
10+
#include <task_context.h>
1011
#include <ws_client.h>
1112
#include <ws_server.h>
1213
#include <http_request.h>
@@ -19,6 +20,8 @@ class WebsocketExtension : public SDKExtension
1920
virtual bool SDK_OnLoad(char *error, size_t maxlength, bool late);
2021
virtual void SDK_OnUnload();
2122
virtual YYJsonWrapper *GetJSONPointer(IPluginContext *pContext, Handle_t handle);
23+
public:
24+
void AddTaskToQueue(ITaskContext *context);
2225
};
2326

2427
class WsClientHandler : public IHandleTypeDispatch
@@ -51,7 +54,7 @@ extern WsClientHandler g_WsClientHandler;
5154
extern WsServerHandler g_WsServerHandler;
5255
extern JSONHandler g_JSONHandler;
5356
extern HttpHandler g_HttpHandler;
54-
extern ThreadSafeQueue<std::function<void()>> g_TaskQueue;
57+
extern ThreadSafeQueue<ITaskContext *> g_TaskQueue;
5558

5659
extern const sp_nativeinfo_t ws_natives[];
5760
extern const sp_nativeinfo_t ws_natives_server[];

src/http_request.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,22 @@ void HttpRequest::onResponse(const ix::HttpResponsePtr response)
5959
return;
6060
}
6161

62-
g_TaskQueue.Push([this, response]()
63-
{
64-
HandleError err;
65-
HandleSecurity sec(nullptr, myself->GetIdentity());
62+
HttpResponseTaskContext *context = new HttpResponseTaskContext(this, response);
63+
g_WebsocketExt.AddTaskToQueue(context);
64+
}
65+
66+
void HttpResponseTaskContext::OnCompleted()
67+
{
68+
HandleError err;
69+
HandleSecurity sec(nullptr, myself->GetIdentity());
6670

67-
this->pResponseForward->PushCell(this->m_httpclient_handle);
68-
this->pResponseForward->PushString(response->body.c_str());
69-
this->pResponseForward->PushCell(response->statusCode);
70-
this->pResponseForward->PushCell(response->body.size());
71-
this->pResponseForward->Execute(nullptr);
71+
m_client->pResponseForward->PushCell(m_client->m_httpclient_handle);
72+
m_client->pResponseForward->PushString(m_response->body.c_str());
73+
m_client->pResponseForward->PushCell(m_response->statusCode);
74+
m_client->pResponseForward->PushCell(m_response->body.size());
75+
m_client->pResponseForward->Execute(nullptr);
7276

73-
handlesys->FreeHandle(this->m_httpclient_handle, &sec);
74-
});
77+
handlesys->FreeHandle(m_client->m_httpclient_handle, &sec);
7578
}
7679

7780
bool HttpRequest::Perform()

src/http_request.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,17 @@ class HttpRequest
2828
Handle_t m_httpclient_handle = BAD_HANDLE;
2929

3030
IChangeableForward *pResponseForward = nullptr;
31+
};
32+
33+
class HttpResponseTaskContext : public ITaskContext
34+
{
35+
public:
36+
HttpResponseTaskContext(HttpRequest* client, const ix::HttpResponsePtr& response)
37+
: m_client(client), m_response(response) {}
38+
39+
virtual void OnCompleted() override;
40+
41+
private:
42+
HttpRequest* m_client;
43+
ix::HttpResponsePtr m_response;
3144
};

src/smsdk_config.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
#define SMEXT_CONF_NAME "SourceMod WebSocket Extension"
55
#define SMEXT_CONF_DESCRIPTION "Provide JSON and WebSocket Native"
6-
#define SMEXT_CONF_VERSION "1.0.5"
6+
#define SMEXT_CONF_VERSION "1.0.4"
77
#define SMEXT_CONF_AUTHOR "ProjectSky"
88
#define SMEXT_CONF_URL "https://github.com/ProjectSky/sm-ext-websocket"
99
#define SMEXT_CONF_LOGTAG "websocket"

src/task_context.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class ITaskContext
2+
{
3+
public:
4+
virtual void OnCompleted() = 0;
5+
virtual ~ITaskContext() {}
6+
};

src/ws_client.cpp

Lines changed: 80 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -55,57 +55,8 @@ void WebSocketClient::OnMessage(const std::string& message)
5555
return;
5656
}
5757

58-
g_TaskQueue.Push([this, message]()
59-
{
60-
const size_t messageLength = message.length() + 1;
61-
62-
switch (m_callback_type)
63-
{
64-
case Websocket_STRING:
65-
{
66-
pMessageForward->PushCell(m_websocket_handle);
67-
pMessageForward->PushString(message.c_str());
68-
pMessageForward->PushCell(messageLength);
69-
pMessageForward->Execute(nullptr);
70-
break;
71-
}
72-
case WebSocket_JSON:
73-
{
74-
auto pYYJsonWrapper = CreateWrapper();
75-
76-
yyjson_read_err readError;
77-
yyjson_doc *idoc = yyjson_read_opts(const_cast<char*>(message.c_str()), message.length(), 0, nullptr, &readError);
78-
79-
if (readError.code)
80-
{
81-
yyjson_doc_free(idoc);
82-
smutils->LogError(myself, "parse JSON message error (%u): %s at position: %d", readError.code, readError.msg, readError.pos);
83-
return;
84-
}
85-
86-
pYYJsonWrapper->m_pDocument = WrapImmutableDocument(idoc);
87-
pYYJsonWrapper->m_pVal = yyjson_doc_get_root(idoc);
88-
89-
HandleError err;
90-
HandleSecurity pSec(nullptr, myself->GetIdentity());
91-
m_json_handle = handlesys->CreateHandleEx(g_htJSON, pYYJsonWrapper.release(), &pSec, nullptr, &err);
92-
93-
if (!m_json_handle)
94-
{
95-
smutils->LogError(myself, "Could not create JSON handle (error %d)", err);
96-
return;
97-
}
98-
99-
pMessageForward->PushCell(m_websocket_handle);
100-
pMessageForward->PushCell(m_json_handle);
101-
pMessageForward->PushCell(messageLength);
102-
pMessageForward->Execute(nullptr);
103-
104-
handlesys->FreeHandle(m_json_handle, &pSec);
105-
break;
106-
}
107-
}
108-
});
58+
WsMessageTaskContext *context = new WsMessageTaskContext(this, message);
59+
g_WebsocketExt.AddTaskToQueue(context);
10960
}
11061

11162
void WebSocketClient::OnOpen(ix::WebSocketOpenInfo openInfo)
@@ -115,11 +66,8 @@ void WebSocketClient::OnOpen(ix::WebSocketOpenInfo openInfo)
11566
return;
11667
}
11768

118-
g_TaskQueue.Push([this, openInfo]()
119-
{
120-
pOpenForward->PushCell(m_websocket_handle);
121-
pOpenForward->Execute(nullptr);
122-
});
69+
WsOpenTaskContext *context = new WsOpenTaskContext(this, openInfo);
70+
g_WebsocketExt.AddTaskToQueue(context);
12371
}
12472

12573
void WebSocketClient::OnClose(ix::WebSocketCloseInfo closeInfo)
@@ -129,15 +77,8 @@ void WebSocketClient::OnClose(ix::WebSocketCloseInfo closeInfo)
12977
return;
13078
}
13179

132-
// TODO: Fixed crash when unload extension after connecting
133-
// 2024/06/30 - 23:09 - Fixed
134-
g_TaskQueue.Push([this, closeInfo]()
135-
{
136-
pCloseForward->PushCell(m_websocket_handle);
137-
pCloseForward->PushCell(closeInfo.code);
138-
pCloseForward->PushString(closeInfo.reason.c_str());
139-
pCloseForward->Execute(nullptr);
140-
});
80+
WsCloseTaskContext *context = new WsCloseTaskContext(this, closeInfo);
81+
g_WebsocketExt.AddTaskToQueue(context);
14182
}
14283

14384
void WebSocketClient::OnError(ix::WebSocketErrorInfo errorInfo)
@@ -147,10 +88,79 @@ void WebSocketClient::OnError(ix::WebSocketErrorInfo errorInfo)
14788
return;
14889
}
14990

150-
g_TaskQueue.Push([this, errorInfo]()
91+
WsErrorTaskContext *context = new WsErrorTaskContext(this, errorInfo);
92+
g_WebsocketExt.AddTaskToQueue(context);
93+
}
94+
95+
void WsMessageTaskContext::OnCompleted()
96+
{
97+
const size_t messageLength = m_message.length() + 1;
98+
99+
switch (m_client->m_callback_type)
151100
{
152-
pErrorForward->PushCell(m_websocket_handle);
153-
pErrorForward->PushString(errorInfo.reason.c_str());
154-
pErrorForward->Execute(nullptr);
155-
});
101+
case Websocket_STRING:
102+
{
103+
m_client->pMessageForward->PushCell(m_client->m_websocket_handle);
104+
m_client->pMessageForward->PushString(m_message.c_str());
105+
m_client->pMessageForward->PushCell(messageLength);
106+
m_client->pMessageForward->Execute(nullptr);
107+
break;
108+
}
109+
case WebSocket_JSON:
110+
{
111+
auto pYYJsonWrapper = CreateWrapper();
112+
113+
yyjson_read_err readError;
114+
yyjson_doc *idoc = yyjson_read_opts(const_cast<char*>(m_message.c_str()), messageLength, 0, nullptr, &readError);
115+
116+
if (readError.code)
117+
{
118+
yyjson_doc_free(idoc);
119+
smutils->LogError(myself, "parse JSON message error (%u): %s at position: %d", readError.code, readError.msg, readError.pos);
120+
return;
121+
}
122+
123+
pYYJsonWrapper->m_pDocument = WrapImmutableDocument(idoc);
124+
pYYJsonWrapper->m_pVal = yyjson_doc_get_root(idoc);
125+
126+
HandleError err;
127+
HandleSecurity pSec(nullptr, myself->GetIdentity());
128+
m_client->m_json_handle = handlesys->CreateHandleEx(g_htJSON, pYYJsonWrapper.release(), &pSec, nullptr, &err);
129+
130+
if (!m_client->m_json_handle)
131+
{
132+
smutils->LogError(myself, "Could not create JSON handle (error %d)", err);
133+
return;
134+
}
135+
136+
m_client->pMessageForward->PushCell(m_client->m_websocket_handle);
137+
m_client->pMessageForward->PushCell(m_client->m_json_handle);
138+
m_client->pMessageForward->PushCell(messageLength);
139+
m_client->pMessageForward->Execute(nullptr);
140+
141+
handlesys->FreeHandle(m_client->m_json_handle, &pSec);
142+
break;
143+
}
144+
}
145+
}
146+
147+
void WsOpenTaskContext::OnCompleted()
148+
{
149+
m_client->pOpenForward->PushCell(m_client->m_websocket_handle);
150+
m_client->pOpenForward->Execute(nullptr);
151+
}
152+
153+
void WsCloseTaskContext::OnCompleted()
154+
{
155+
m_client->pCloseForward->PushCell(m_client->m_websocket_handle);
156+
m_client->pCloseForward->PushCell(m_closeInfo.code);
157+
m_client->pCloseForward->PushString(m_closeInfo.reason.c_str());
158+
m_client->pCloseForward->Execute(nullptr);
159+
}
160+
161+
void WsErrorTaskContext::OnCompleted()
162+
{
163+
m_client->pErrorForward->PushCell(m_client->m_websocket_handle);
164+
m_client->pErrorForward->PushString(m_errorInfo.reason.c_str());
165+
m_client->pErrorForward->Execute(nullptr);
156166
}

src/ws_client.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,56 @@ class WebSocketClient
3434
IChangeableForward *pOpenForward = nullptr;
3535
IChangeableForward *pCloseForward = nullptr;
3636
IChangeableForward *pErrorForward = nullptr;
37+
};
38+
39+
class WsMessageTaskContext : public ITaskContext
40+
{
41+
public:
42+
WsMessageTaskContext(WebSocketClient* client, const std::string& message)
43+
: m_client(client), m_message(message) {}
44+
45+
virtual void OnCompleted() override;
46+
47+
private:
48+
WebSocketClient* m_client;
49+
std::string m_message;
50+
};
51+
52+
class WsOpenTaskContext : public ITaskContext
53+
{
54+
public:
55+
WsOpenTaskContext(WebSocketClient* client, ix::WebSocketOpenInfo openInfo)
56+
: m_client(client), m_openInfo(openInfo) {}
57+
58+
virtual void OnCompleted() override;
59+
60+
private:
61+
WebSocketClient* m_client;
62+
ix::WebSocketOpenInfo m_openInfo;
63+
};
64+
65+
class WsCloseTaskContext : public ITaskContext
66+
{
67+
public:
68+
WsCloseTaskContext(WebSocketClient* client, ix::WebSocketCloseInfo closeInfo)
69+
: m_client(client), m_closeInfo(closeInfo) {}
70+
71+
virtual void OnCompleted() override;
72+
73+
private:
74+
WebSocketClient* m_client;
75+
ix::WebSocketCloseInfo m_closeInfo;
76+
};
77+
78+
class WsErrorTaskContext : public ITaskContext
79+
{
80+
public:
81+
WsErrorTaskContext(WebSocketClient* client, ix::WebSocketErrorInfo errorInfo)
82+
: m_client(client), m_errorInfo(errorInfo) {}
83+
84+
virtual void OnCompleted() override;
85+
86+
private:
87+
WebSocketClient* m_client;
88+
ix::WebSocketErrorInfo m_errorInfo;
3789
};

0 commit comments

Comments
 (0)