66#include < chrono>
77#include < httplib.h>
88#include < iostream>
9+ #include < mutex>
910#include < string>
1011#include < thread>
1112#include < vector>
@@ -15,11 +16,19 @@ using fastmcpp::server::SseServerWrapper;
1516
1617int main ()
1718{
18- auto handler = [](const Json& request) -> Json { return request; };
19- // Bind to any available port and start wrapper
19+ // Echo handler: returns a minimal JSON-RPC response carrying the posted value.
20+ auto handler = [](const Json& request) -> Json
21+ {
22+ Json response = {{" jsonrpc" , " 2.0" },
23+ {" id" , request.value (" id" , Json (nullptr ))},
24+ {" result" , request.value (" params" , Json::object ())}};
25+ return response;
26+ };
27+
28+ // Choose port with fallback range
2029 int port = -1 ;
2130 std::unique_ptr<SseServerWrapper> server;
22- for (int candidate = 18111 ; candidate <= 18131 ; ++candidate)
31+ for (int candidate = 18110 ; candidate <= 18130 ; ++candidate)
2332 {
2433 auto trial = std::make_unique<SseServerWrapper>(handler, " 127.0.0.1" , candidate, " /sse" ,
2534 " /messages" );
@@ -32,70 +41,85 @@ int main()
3241 }
3342 if (port < 0 || !server)
3443 {
35- std::cerr << " Failed to start SSE server" << std::endl;
44+ std::cerr << " Failed to start SSE server on candidates " << std::endl;
3645 return 1 ;
3746 }
47+
3848 std::this_thread::sleep_for (std::chrono::milliseconds (1000 ));
3949
40- // Skip strict probe; receiver will retry until connected
50+ // Do not hard-fail on probe; the receiver thread retries connections
4151
42- std::vector<int > seen;
43- std::mutex m;
52+ // Start SSE receiver
4453 std::atomic<bool > sse_connected{false };
45- std::string session_id;
54+ std::atomic<bool > have_endpoint{false };
55+ std::string message_endpoint;
56+ std::vector<int > seen;
57+ std::mutex seen_mutex;
58+ std::mutex endpoint_mutex;
59+
60+ httplib::Client sse_client (" 127.0.0.1" , port);
61+ sse_client.set_connection_timeout (std::chrono::seconds (10 ));
62+ sse_client.set_read_timeout (std::chrono::seconds (20 ));
4663
47- // NOTE: httplib::Client must be created in the same thread that uses it on Linux
4864 std::thread sse_thread (
49- [&, port ]()
65+ [&]()
5066 {
51- // Create client inside thread - httplib::Client is not thread-safe across threads on
52- // Linux
53- httplib::Client cli (" 127.0.0.1" , port);
54- cli.set_connection_timeout (std::chrono::seconds (10 ));
55- cli.set_read_timeout (std::chrono::seconds (20 ));
56-
67+ std::string buffer;
5768 auto receiver = [&](const char * data, size_t len)
5869 {
5970 sse_connected = true ;
60- std::string chunk (data, len);
61-
62- // Parse SSE endpoint event to extract session_id
63- if (chunk.find (" event: endpoint" ) != std::string::npos)
71+ buffer.append (data, len);
72+
73+ // Process complete SSE blocks separated by a blank line.
74+ // Each block can contain lines like:
75+ // event: endpoint
76+ // data: /messages?session_id=...
77+ // or:
78+ // data: {json}\n\n
79+ while (true )
6480 {
65- size_t data_pos = chunk.find (" data: " );
66- if (data_pos != std::string::npos)
67- {
68- size_t start = data_pos + 6 ;
69- size_t end = chunk.find_first_of (" \n\r " , start);
70- std::string endpoint_url = chunk.substr (start, end - start);
81+ size_t end = buffer.find (" \n\n " );
82+ if (end == std::string::npos)
83+ break ;
7184
72- size_t sid_pos = endpoint_url.find (" session_id=" );
73- if (sid_pos != std::string::npos)
85+ std::string block = buffer.substr (0 , end);
86+ buffer.erase (0 , end + 2 );
87+
88+ // Extract endpoint path if present
89+ if (block.find (" event: endpoint" ) != std::string::npos)
90+ {
91+ size_t data_pos = block.find (" data: " );
92+ if (data_pos != std::string::npos)
7493 {
75- size_t sid_start = sid_pos + 11 ;
76- size_t sid_end = endpoint_url.find_first_of (" &\n\r " , sid_start);
77- std::lock_guard<std::mutex> lock (m);
78- session_id = endpoint_url.substr (sid_start, sid_end - sid_start);
94+ size_t value_start = data_pos + 6 ;
95+ size_t value_end = block.find (' \n ' , value_start);
96+ std::string endpoint =
97+ block.substr (value_start, value_end == std::string::npos
98+ ? std::string::npos
99+ : value_end - value_start);
100+ {
101+ std::lock_guard<std::mutex> lock (endpoint_mutex);
102+ message_endpoint = endpoint;
103+ have_endpoint = !message_endpoint.empty ();
104+ }
79105 }
106+ continue ;
80107 }
81- }
82108
83- if (chunk.find (" data: " ) == 0 )
84- {
85- size_t start = 6 ;
86- size_t end = chunk.find (" \n\n " );
87- if (end != std::string::npos)
109+ // Parse "data: {json}" events and collect result.n values.
110+ if (block.rfind (" data: " , 0 ) == 0 )
88111 {
89- std::string json_str = chunk .substr (start, end - start );
112+ std::string json_str = block .substr (6 );
90113 try
91114 {
92115 Json j = Json::parse (json_str);
93- if (j.contains (" n" ))
116+ if (j.contains (" result" ) && j[" result" ].is_object () &&
117+ j[" result" ].contains (" n" ))
94118 {
95- std::lock_guard<std::mutex> lock (m );
96- seen.push_back (j[" n" ].get <int >());
119+ std::lock_guard<std::mutex> lock (seen_mutex );
120+ seen.push_back (j[" result " ][ " n" ].get <int >());
97121 if (seen.size () >= 3 )
98- return false ;
122+ return false ; // stop after 3
99123 }
100124 }
101125 catch (...)
@@ -105,9 +129,9 @@ int main()
105129 }
106130 return true ;
107131 };
108- for (int attempt = 0 ; attempt < 20 && !sse_connected; ++attempt)
132+ for (int attempt = 0 ; attempt < 60 && !sse_connected; ++attempt)
109133 {
110- auto res = cli .Get (" /sse" , receiver);
134+ auto res = sse_client .Get (" /sse" , receiver);
111135 if (!res)
112136 {
113137 std::this_thread::sleep_for (std::chrono::milliseconds (200 ));
@@ -118,6 +142,7 @@ int main()
118142 }
119143 });
120144
145+ // Wait for connection
121146 for (int i = 0 ; i < 500 && !sse_connected; ++i)
122147 std::this_thread::sleep_for (std::chrono::milliseconds (10 ));
123148 if (!sse_connected)
@@ -129,36 +154,29 @@ int main()
129154 return 1 ;
130155 }
131156
132- // Wait for session_id to be extracted
133- for (int i = 0 ; i < 100 ; ++i)
134- {
135- std::lock_guard<std::mutex> lock (m);
136- if (!session_id.empty ())
137- break ;
157+ // Wait for server to tell us the message endpoint (includes required session_id).
158+ for (int i = 0 ; i < 500 && !have_endpoint; ++i)
138159 std::this_thread::sleep_for (std::chrono::milliseconds (10 ));
139- }
140-
141- std::string sid;
142- {
143- std::lock_guard<std::mutex> lock (m);
144- sid = session_id;
145- }
146-
147- if (sid.empty ())
160+ if (!have_endpoint)
148161 {
149162 server->stop ();
150163 if (sse_thread.joinable ())
151164 sse_thread.join ();
152- std::cerr << " Failed to extract session_id " << std::endl;
165+ std::cerr << " Missing endpoint event " << std::endl;
153166 return 1 ;
154167 }
155168
169+ // Post three messages
156170 httplib::Client post (" 127.0.0.1" , port);
171+ std::string post_path;
172+ {
173+ std::lock_guard<std::mutex> lock (endpoint_mutex);
174+ post_path = message_endpoint;
175+ }
157176 for (int i = 1 ; i <= 3 ; ++i)
158177 {
159- Json j = Json{{" n" , i}};
160- std::string post_url = " /messages?session_id=" + sid;
161- auto res = post .Post (post_url, j.dump (), " application/json" );
178+ Json j = {{" jsonrpc" , " 2.0" }, {" id" , i}, {" method" , " echo" }, {" params" , {{" n" , i}}}};
179+ auto res = post .Post (post_path, j.dump (), " application/json" );
162180 if (!res || res->status != 200 )
163181 {
164182 server->stop ();
@@ -169,23 +187,35 @@ int main()
169187 }
170188 }
171189
190+ // Wait briefly for all events
172191 for (int i = 0 ; i < 200 ; ++i)
173192 {
174- std::lock_guard<std::mutex> lock (m);
175- if (seen.size () >= 3 )
176- break ;
193+ {
194+ std::lock_guard<std::mutex> lock (seen_mutex);
195+ if (seen.size () >= 3 )
196+ break ;
197+ }
177198 std::this_thread::sleep_for (std::chrono::milliseconds (10 ));
178199 }
179200
180201 server->stop ();
181202 if (sse_thread.joinable ())
182203 sse_thread.join ();
183204
184- if (seen.size () != 3 )
185205 {
186- std::cerr << " expected 3 events, got " << seen.size () << " \n " ;
187- return 1 ;
206+ std::lock_guard<std::mutex> lock (seen_mutex);
207+ if (seen.size () != 3 )
208+ {
209+ std::cerr << " expected 3 events, got " << seen.size () << " \n " ;
210+ return 1 ;
211+ }
212+ if (seen[0 ] != 1 || seen[1 ] != 2 || seen[2 ] != 3 )
213+ {
214+ std::cerr << " unexpected event sequence\n " ;
215+ return 1 ;
216+ }
188217 }
218+
189219 std::cout << " ok\n " ;
190220 return 0 ;
191221}
0 commit comments