Skip to content

Commit 045135d

Browse files
authored
Merge pull request #3550 from vladpaiu/onreply_async
Add onreply_route async capabilities
2 parents 640132b + 6811373 commit 045135d

3 files changed

Lines changed: 297 additions & 77 deletions

File tree

modules/tm/async.c

Lines changed: 206 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "h_table.h"
3131
#include "t_lookup.h"
3232
#include "t_msgbuilder.h"
33+
#include "t_cancel.h"
3334

3435

3536
typedef struct _async_tm_ctx {
@@ -52,9 +53,15 @@ typedef struct _async_tm_ctx {
5253
/* e2e ACK */
5354
struct cell *e2eack_t;
5455

56+
/* reply related things */
57+
struct sip_msg *reply;
58+
int reply_length;
59+
int branch;
60+
5561
} async_tm_ctx;
5662

5763
extern int return_code; /* from action.c, return code */
64+
extern int _tm_branch_index; /* TM branch index */
5865

5966

6067

@@ -68,10 +75,7 @@ static inline void run_resume_route( struct script_route_ref * resume_route,
6875
exec_post_req_cb(msg);
6976
}
7077

71-
72-
/* function triggered from reactor in order to continue the processing
73-
*/
74-
int t_resume_async(int fd, void *param, int was_timeout)
78+
int t_resume_async_request(int fd, void*param, int was_timeout)
7579
{
7680
static struct sip_msg faked_req;
7781
static struct ua_client uac;
@@ -85,15 +89,9 @@ int t_resume_async(int fd, void *param, int was_timeout)
8589
int route;
8690

8791
if (valid_async_fd(fd))
88-
LM_DBG("resuming on fd %d, transaction %p \n", fd, t);
92+
LM_DBG("resuming request on fd %d, transaction %p \n", fd, t);
8993
else
90-
LM_DBG("resuming without a fd, transaction %p \n", t);
91-
92-
if (current_processing_ctx) {
93-
LM_CRIT("BUG - a context is already set (%p), overwriting it...\n",
94-
current_processing_ctx);
95-
set_global_context(NULL);
96-
}
94+
LM_DBG("resuming request without a fd, transaction %p \n", t);
9795

9896
/* prepare for resume route, by filling in a phony UAC structure to
9997
* trigger the inheritance of the branch specific values */
@@ -219,6 +217,184 @@ int t_resume_async(int fd, void *param, int was_timeout)
219217
set_global_context(NULL);
220218

221219
return 0;
220+
}
221+
222+
int t_resume_async_reply(int fd, void*param, int was_timeout)
223+
{
224+
async_tm_ctx *ctx = (async_tm_ctx *)param;
225+
struct cell *backup_t;
226+
struct usr_avp **backup_list;
227+
struct cell *t= ctx->t;
228+
int route;
229+
int msg_status;
230+
int last_uac_status;
231+
int branch;
232+
struct ua_client *reply_uac;
233+
234+
if (valid_async_fd(fd))
235+
LM_DBG("resuming reply on fd %d, transaction %p \n", fd, t);
236+
else
237+
LM_DBG("resuming reply without a fd, transaction %p \n", t);
238+
239+
240+
/* enviroment setting */
241+
current_processing_ctx = ctx->msg_ctx;
242+
backup_t = get_t();
243+
244+
/* make available the avp list from transaction */
245+
backup_list = set_avp_list( &t->user_avps );
246+
247+
/* avoid double lookup of our branch, just get it from ctx */
248+
branch=ctx->branch;
249+
250+
/* fake transaction */
251+
set_t( t );
252+
253+
msg_status=ctx->reply->REPLY_STATUS;
254+
reply_uac=&t->uac[branch];
255+
LM_DBG("org. status uas=%d, uac[%d]=%d local=%d is_invite=%d)\n",
256+
t->uas.status, branch, reply_uac->last_received,
257+
is_local(t), is_invite(t));
258+
last_uac_status=reply_uac->last_received;
259+
260+
_tm_branch_index = branch;
261+
262+
async_status = ASYNC_DONE; /* assume default status as done */
263+
264+
/* call the resume function in order to read and handle data */
265+
return_code = ((async_resume_module*)
266+
(was_timeout ? ctx->async.timeout_f : ctx->async.resume_f))
267+
( (valid_async_fd(fd) ? fd: ASYNC_FD_NONE), ctx->reply,
268+
ctx->async.resume_param );
269+
270+
if (async_status==ASYNC_CONTINUE) {
271+
/* do not run the resume route */
272+
goto restore;
273+
} else if (async_status==ASYNC_DONE_NO_IO) {
274+
/* don't do any change on the fd, since the module handled everything */
275+
goto route;
276+
} else if (async_status==ASYNC_CHANGE_FD) {
277+
if (return_code<0) {
278+
LM_ERR("ASYNC_CHANGE_FD: given file descriptor shall be positive!\n");
279+
goto restore;
280+
} else if (return_code > 0 && valid_async_fd(fd) && return_code == fd) {
281+
/*trying to add the same fd; shall continue*/
282+
LM_CRIT("You are trying to replace the old fd with the same fd!"
283+
"Will act as in ASYNC_CONTINUE!\n");
284+
goto restore;
285+
}
286+
287+
/* if there was a file descriptor, remove it from the reactor */
288+
reactor_del_reader(fd, -1, IO_FD_CLOSING);
289+
fd=return_code;
290+
291+
/* insert the new fd inside the reactor */
292+
if(reactor_add_reader(fd,F_SCRIPT_ASYNC,RCT_PRIO_ASYNC,(void*)ctx)<0) {
293+
LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
294+
do {
295+
async_status = ASYNC_DONE;
296+
return_code = ((async_resume_module*)ctx->async.resume_f)(
297+
fd, ctx->reply, ctx->async.resume_param );
298+
if (async_status == ASYNC_CHANGE_FD)
299+
fd=return_code;
300+
} while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
301+
goto route;
302+
}
303+
304+
/* changed fd; now restore old state */
305+
goto restore;
306+
}
307+
308+
if (valid_async_fd(fd)) {
309+
/* remove from reactor, we are done */
310+
reactor_del_reader(fd, -1, IO_FD_CLOSING);
311+
}
312+
313+
route:
314+
if (async_status == ASYNC_DONE_CLOSE_FD && valid_async_fd(fd))
315+
close(fd);
316+
317+
318+
/* we run the resume route under lock, if needed */
319+
if (onreply_avp_mode)
320+
LOCK_REPLIES(t);
321+
322+
/* run the resume_route (some type as the original one) */
323+
if (!ref_script_route_check_and_update(ctx->resume_route)) {
324+
LM_ERR("resume route [%s] not present in cfg anymore\n",
325+
ctx->resume_route->name.s);
326+
} else {
327+
swap_route_type(route, ctx->route_type);
328+
/* do not run any post script callback, we are a reply */
329+
run_resume_route( ctx->resume_route, ctx->reply, 0);
330+
set_route_type(route);
331+
}
332+
333+
/* DO TM suspended actions to decide if we need to relay */
334+
if (!onreply_avp_mode)
335+
/* if we haven't locked above to run route under lock,
336+
* lock things now as we process the reply */
337+
LOCK_REPLIES(t);
338+
339+
process_reply_and_timer(t,branch,msg_status,ctx->reply,last_uac_status,reply_uac);
340+
341+
t_unref(ctx->reply);
342+
343+
_tm_branch_index = 0;
344+
345+
/* cleanup any PKG lumps that we might have added in the resume route */
346+
del_notflaged_lumps( &(ctx->reply->add_rm), LUMPFLAG_SHMEM );
347+
del_notflaged_lumps( &(ctx->reply->body_lumps), LUMPFLAG_SHMEM );
348+
del_nonshm_lump_rpl( &(ctx->reply->reply_lump) );
349+
350+
351+
/* we need to cleaup the clone that we made
352+
* any internal callsback might try to do extra parsing which will
353+
* allocate pkg mem - free that now if outside our memory zone */
354+
clean_msg_clone(ctx->reply,ctx->reply,ctx->reply+ctx->reply_length);
355+
/* and free the message and context */
356+
free_cloned_msg(ctx->reply);
357+
if (ctx->resume_route)
358+
shm_free(ctx->resume_route);
359+
shm_free(ctx);
360+
361+
/* free also the processing ctx if still set
362+
* NOTE: it may become null if inside the run_resume_route
363+
* another async jump was made (and context attached again
364+
* to transaction) */
365+
if (current_processing_ctx) {
366+
context_destroy(CONTEXT_GLOBAL, current_processing_ctx);
367+
pkg_free(current_processing_ctx);
368+
}
369+
370+
restore:
371+
/* restore original environment */
372+
set_t(backup_t);
373+
/* restore original avp list */
374+
set_avp_list( backup_list );
375+
376+
current_processing_ctx = NULL;
377+
return 0;
378+
}
379+
380+
/* function triggered from reactor in order to continue the processing
381+
*/
382+
int t_resume_async(int fd, void *param, int was_timeout)
383+
{
384+
async_tm_ctx *ctx = (async_tm_ctx *)param;
385+
386+
if (current_processing_ctx) {
387+
LM_CRIT("BUG - a context is already set (%p), overwriting it...\n",
388+
current_processing_ctx);
389+
set_global_context(NULL);
390+
}
391+
392+
/* for now we only support async in REQUEST and ONREPLY routes,
393+
* dispatch to the correct resume function */
394+
if (ctx->reply) {
395+
return t_resume_async_reply(fd,param,was_timeout);
396+
} else
397+
return t_resume_async_request(fd,param,was_timeout);
222398
}
223399

224400

@@ -318,11 +494,27 @@ int t_handle_async(struct sip_msg *msg, struct action* a,
318494
goto sync;
319495
}
320496

321-
if (route_type!=REQUEST_ROUTE) {
322-
LM_WARN("async detected in non-request route, switching to sync\n");
497+
if (route_type == REQUEST_ROUTE) {
498+
/* for request route we allow async,
499+
* no prep needs to be done, just go there */
500+
goto async;
501+
} else if (route_type == ONREPLY_ROUTE) {
502+
/* for reply route we allow async,
503+
* but need to first clone the reply in shm */
504+
ctx->reply = sip_msg_cloner(msg,&ctx->reply_length,1);
505+
if (ctx->reply == NULL) {
506+
LM_ERR("Failed to store reply copy \n");
507+
goto failure;
508+
}
509+
ctx->branch = _tm_branch_index;
510+
goto async;
511+
} else {
512+
LM_WARN("async detected in non-request or reply route\n");
323513
goto sync;
324514
}
325515

516+
async:
517+
326518
ctx->resume_route = dup_ref_script_route_in_shm(resume_route, 0);
327519
if (!ref_script_route_is_valid(ctx->resume_route)) {
328520
LM_ERR("failed dup resume route -> act in sync mode\n");

0 commit comments

Comments
 (0)