Skip to content

Commit 40beafd

Browse files
authored
fix(rpc_loader): creating per-call curl handler for sync invocation for thread-safety (#765)
* test: add synchronous concurrent producers test for RPC calls * refactor: improve Sync calls' thread safety and error handling in RPC calls
1 parent 1c09007 commit 40beafd

3 files changed

Lines changed: 131 additions & 127 deletions

File tree

source/loaders/rpc_loader/source/rpc_loader_impl.cpp

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ struct rpc_async_context;
6565
typedef struct loader_impl_rpc_type
6666
{
6767
CURL *discover_curl;
68-
CURL *invoke_curl;
6968
CURLM *async_multi;
7069
std::thread poll_thread;
7170
std::atomic<bool> exit_flag;
@@ -195,22 +194,39 @@ function_return function_rpc_interface_invoke(function func, function_impl impl,
195194
return NULL;
196195
}
197196

198-
/* Execute a POST to the endpoint */
197+
/* This creates a per-call CURL handle for thread safety */
198+
CURL *easy = curl_easy_init();
199+
200+
if (easy == NULL)
201+
{
202+
log_write("metacall", LOG_LEVEL_ERROR, "Could not create CURL handle for sync call to %s", rpc_function->url.c_str());
203+
metacall_allocator_free(rpc_impl->allocator, buffer);
204+
return NULL;
205+
}
206+
199207
loader_impl_rpc_write_data_type write_data;
200208

201-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_URL, rpc_function->url.c_str());
202-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_POSTFIELDS, buffer);
203-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_POSTFIELDSIZE, body_request_size - 1);
204-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_WRITEDATA, static_cast<loader_impl_rpc_write_data>(&write_data));
209+
curl_easy_setopt(easy, CURLOPT_VERBOSE, CURL_VERBOSE);
210+
curl_easy_setopt(easy, CURLOPT_HEADER, 0L);
211+
curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST, "POST");
212+
curl_easy_setopt(easy, CURLOPT_HTTPHEADER, rpc_impl->headers);
213+
curl_easy_setopt(easy, CURLOPT_USERAGENT, "librpc_loader/0.1");
214+
curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, rpc_loader_impl_write_data);
215+
curl_easy_setopt(easy, CURLOPT_URL, rpc_function->url.c_str());
216+
curl_easy_setopt(easy, CURLOPT_POSTFIELDS, buffer);
217+
curl_easy_setopt(easy, CURLOPT_POSTFIELDSIZE, body_request_size - 1);
218+
curl_easy_setopt(easy, CURLOPT_WRITEDATA, static_cast<loader_impl_rpc_write_data>(&write_data));
219+
220+
CURLcode res = curl_easy_perform(easy);
205221

206-
CURLcode res = curl_easy_perform(rpc_impl->invoke_curl);
222+
curl_easy_cleanup(easy);
207223

208224
/* Clear the request buffer */
209225
metacall_allocator_free(rpc_function->rpc_impl->allocator, buffer);
210226

211227
if (res != CURLE_OK)
212228
{
213-
log_write("metacall", LOG_LEVEL_ERROR, "Could not call to the API endpoint %s [%]", rpc_function->url.c_str(), curl_easy_strerror(res));
229+
log_write("metacall", LOG_LEVEL_ERROR, "Could not call to the API endpoint %s [%s]", rpc_function->url.c_str(), curl_easy_strerror(res));
214230
return NULL;
215231
}
216232

@@ -541,34 +557,11 @@ loader_impl_data rpc_loader_impl_initialize(loader_impl impl, configuration conf
541557
curl_easy_setopt(rpc_impl->discover_curl, CURLOPT_HEADER, 0L);
542558
curl_easy_setopt(rpc_impl->discover_curl, CURLOPT_WRITEFUNCTION, rpc_loader_impl_write_data);
543559

544-
/* Initialize invoke CURL object */
545-
rpc_impl->invoke_curl = curl_easy_init();
546-
547-
if (rpc_impl->invoke_curl == NULL)
548-
{
549-
log_write("metacall", LOG_LEVEL_ERROR, "Could not create CURL invoke object");
550-
551-
curl_easy_cleanup(rpc_impl->discover_curl);
552-
553-
metacall_allocator_destroy(rpc_impl->allocator);
554-
555-
delete rpc_impl;
556-
557-
return NULL;
558-
}
559-
560560
rpc_impl->headers = NULL;
561561
rpc_impl->headers = curl_slist_append(rpc_impl->headers, "Accept: application/json");
562562
rpc_impl->headers = curl_slist_append(rpc_impl->headers, "Content-Type: application/json");
563563
rpc_impl->headers = curl_slist_append(rpc_impl->headers, "charset: utf-8");
564564

565-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_VERBOSE, CURL_VERBOSE);
566-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_HEADER, 0L);
567-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_CUSTOMREQUEST, "POST");
568-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_HTTPHEADER, rpc_impl->headers);
569-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_USERAGENT, "librpc_loader/0.1");
570-
curl_easy_setopt(rpc_impl->invoke_curl, CURLOPT_WRITEFUNCTION, rpc_loader_impl_write_data);
571-
572565
/* Initialize async multi handle */
573566
rpc_impl->async_multi = curl_multi_init();
574567

@@ -577,7 +570,6 @@ loader_impl_data rpc_loader_impl_initialize(loader_impl impl, configuration conf
577570
log_write("metacall", LOG_LEVEL_ERROR, "Could not create CURL multi handle for async");
578571

579572
curl_easy_cleanup(rpc_impl->discover_curl);
580-
curl_easy_cleanup(rpc_impl->invoke_curl);
581573
metacall_allocator_destroy(rpc_impl->allocator);
582574
delete rpc_impl;
583575

@@ -598,7 +590,6 @@ loader_impl_data rpc_loader_impl_initialize(loader_impl impl, configuration conf
598590

599591
curl_multi_cleanup(rpc_impl->async_multi);
600592
curl_easy_cleanup(rpc_impl->discover_curl);
601-
curl_easy_cleanup(rpc_impl->invoke_curl);
602593
metacall_allocator_destroy(rpc_impl->allocator);
603594
delete rpc_impl;
604595

@@ -937,11 +928,9 @@ int rpc_loader_impl_destroy(loader_impl impl)
937928

938929
curl_easy_cleanup(rpc_impl->discover_curl);
939930

940-
curl_easy_cleanup(rpc_impl->invoke_curl);
941-
942931
curl_global_cleanup();
943932

944933
delete rpc_impl;
945934

946935
return 0;
947-
}
936+
}

source/tests/metacall_rpc_test/source/metacall_rpc_test.cpp

Lines changed: 82 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,87 @@ static const int NUM_THREADS = 4;
286286
static const int CALLS_PER_THREAD = 10;
287287
static const int TOTAL_CONCURRENT = NUM_THREADS * CALLS_PER_THREAD;
288288

289+
TEST_F(metacall_rpc_test, SyncConcurrentProducers)
290+
{
291+
ASSERT_EQ((int)0, (int)metacall_initialize());
292+
293+
#if defined(OPTION_BUILD_LOADERS_RPC)
294+
{
295+
const char *rpc_scripts[] = { "remote.url" };
296+
void *handle = NULL;
297+
298+
ASSERT_EQ((int)0, (int)metacall_load_from_file("rpc", rpc_scripts, 1, &handle));
299+
300+
ASSERT_NE((void *)NULL, (void *)handle);
301+
302+
std::atomic<int> sync_failures(0);
303+
std::atomic<int> sync_mismatches(0);
304+
std::vector<std::thread> threads;
305+
306+
for (int t = 0; t < NUM_THREADS; t++)
307+
{
308+
threads.emplace_back([t, handle, &sync_failures, &sync_mismatches]() {
309+
for (int i = 0; i < CALLS_PER_THREAD; i++)
310+
{
311+
float a = static_cast<float>(t * 100 + i + 1);
312+
float b = 1.0f;
313+
float expected = a / b;
314+
315+
const enum metacall_value_id divide_ids[] = {
316+
METACALL_FLOAT, METACALL_FLOAT
317+
};
318+
319+
void *ret = metacallht_s(handle, "divide", divide_ids, 2, a, b);
320+
321+
if (ret == NULL)
322+
{
323+
std::cout << " [SYNC FAIL] thread=" << t
324+
<< " call=" << i
325+
<< " ret=NULL" << std::endl;
326+
sync_failures.fetch_add(1);
327+
continue;
328+
}
329+
330+
double actual = extract_numeric(ret);
331+
332+
if (actual < expected - 0.01 || actual > expected + 0.01)
333+
{
334+
std::cout << " [SYNC MISMATCH] thread=" << t
335+
<< " call=" << i
336+
<< " expected=" << expected
337+
<< " got=" << actual << std::endl;
338+
sync_mismatches.fetch_add(1);
339+
}
340+
341+
metacall_value_destroy(ret);
342+
}
343+
344+
std::cout << "Thread " << t << ": completed "
345+
<< CALLS_PER_THREAD << " sync calls" << std::endl;
346+
});
347+
}
348+
349+
for (auto &th : threads)
350+
{
351+
th.join();
352+
}
353+
354+
int total_calls = NUM_THREADS * CALLS_PER_THREAD;
355+
356+
std::cout << "SyncConcurrentProducers: total=" << total_calls
357+
<< ", failures=" << sync_failures.load()
358+
<< ", mismatches=" << sync_mismatches.load() << std::endl;
359+
360+
EXPECT_EQ(sync_failures.load(), 0) << "All sync calls should succeed";
361+
EXPECT_EQ(sync_mismatches.load(), 0) << "All sync results should match expected values";
362+
363+
EXPECT_EQ((int)0, (int)metacall_clear(handle));
364+
}
365+
#endif
366+
367+
metacall_destroy();
368+
}
369+
289370
TEST_F(metacall_rpc_test, AsyncConcurrentProducers)
290371
{
291372
ASSERT_EQ((int)0, (int)metacall_initialize());
@@ -470,86 +551,4 @@ static void *on_error_reject(void *error, void * /*data*/)
470551

471552
g_error_rejected.fetch_add(1);
472553
return NULL;
473-
}
474-
475-
static const int GOOD_CALLS = 5;
476-
477-
// TODO: It fails under address sanitizer and thread sanitizer
478-
TEST_F(metacall_rpc_test, ErrorUnderConcurrency)
479-
{
480-
ASSERT_EQ((int)0, (int)metacall_initialize());
481-
482-
#if defined(OPTION_BUILD_LOADERS_RPC)
483-
{
484-
const char *rpc_scripts[] = { "remote.url" };
485-
ASSERT_EQ((int)0, (int)metacall_load_from_file("rpc", rpc_scripts, 1, NULL));
486-
487-
g_error_resolved.store(0);
488-
g_error_rejected.store(0);
489-
490-
call_context good_contexts[GOOD_CALLS];
491-
492-
for (int i = 0; i < GOOD_CALLS; i++)
493-
{
494-
float a = static_cast<float>((i + 1) * 10);
495-
float b = static_cast<float>(i + 1);
496-
497-
good_contexts[i].call_id = i;
498-
good_contexts[i].expected = static_cast<double>(a / b);
499-
500-
void *args[] = {
501-
metacall_value_create_float(a),
502-
metacall_value_create_float(b)
503-
};
504-
505-
metacall_await_s("async_divide", args, 2,
506-
on_error_resolve, on_error_reject, &good_contexts[i]);
507-
508-
metacall_value_destroy(args[0]);
509-
metacall_value_destroy(args[1]);
510-
}
511-
512-
{
513-
void *bad_args[] = {
514-
metacall_value_create_int(1),
515-
metacall_value_create_int(2)
516-
};
517-
518-
metacall_await_s("sum", bad_args, 2,
519-
on_error_resolve, on_error_reject, NULL);
520-
521-
metacall_value_destroy(bad_args[0]);
522-
metacall_value_destroy(bad_args[1]);
523-
}
524-
525-
int total_expected = GOOD_CALLS + 1;
526-
bool reached = false;
527-
int waited = 0;
528-
529-
while (waited < 10000)
530-
{
531-
int total = g_error_resolved.load() + g_error_rejected.load();
532-
533-
if (total >= total_expected)
534-
{
535-
reached = true;
536-
break;
537-
}
538-
539-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
540-
waited += 10;
541-
}
542-
543-
std::cout << "ErrorUnderConcurrency: Resolved=" << g_error_resolved.load()
544-
<< ", Rejected=" << g_error_rejected.load() << std::endl;
545-
546-
EXPECT_TRUE(reached) << "All callbacks (good + bad) should fire within 10s";
547-
548-
EXPECT_EQ(g_error_resolved.load(), GOOD_CALLS) << "All valid calls should resolve";
549-
550-
EXPECT_GE(g_error_rejected.load(), 1) << "Bad endpoint call should be rejected";
551-
}
552-
#endif
553-
554-
metacall_destroy();
555-
}
554+
}

source/tests/metacall_rpc_test/source/server.js

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,30 @@ const server = http.createServer((req, res) => {
3535
} else if (req.method === 'POST') {
3636
if (req.url === '/viferga/example/v1/call/divide') {
3737
data.then((body) => {
38-
console.log('¡Call recieved!');
39-
if (body !== '[50.0,10.0]') {
40-
console.error('Invalid body:', body);
41-
process.exit(1);
42-
}
43-
const result = '5.0';
38+
console.log('Call received: divide', body);
39+
const args = JSON.parse(body);
40+
const result = args[0] / args[1];
41+
res.setHeader('Content-Type', 'application/json');
42+
const str = Number.isInteger(result) ? result.toFixed(1) : JSON.stringify(result);
43+
res.end(str);
44+
});
45+
return;
46+
} else if (req.url === '/viferga/example/v1/call/sum') {
47+
data.then((body) => {
48+
console.log('Call received: sum', body);
49+
const args = JSON.parse(body);
50+
const result = args[0] + args[1];
4451
res.setHeader('Content-Type', 'application/json');
45-
res.end(result);
52+
res.end(JSON.stringify(result));
53+
});
54+
return;
55+
} else if (req.url === '/viferga/example/v1/call/multiply') {
56+
data.then((body) => {
57+
console.log('Call received: multiply', body);
58+
const args = JSON.parse(body);
59+
const result = args[0] * args[1];
60+
res.setHeader('Content-Type', 'application/json');
61+
res.end(JSON.stringify(result));
4662
});
4763
return;
4864
} else if (req.url === '/viferga/example/v1/await/async_divide') {
@@ -68,4 +84,4 @@ const server = http.createServer((req, res) => {
6884

6985
server.listen(port, () => {
7086
console.log(`MetaCall server listening at ${port}`);
71-
});
87+
});

0 commit comments

Comments
 (0)