Skip to content

Commit 69a785e

Browse files
committed
[event_routing] added new sync wait_for_event() function
This is exeactly the same as the async function, but does inline blocking/waiting until the event is received. The function is very useful in script routes that does not have async support
1 parent 9834744 commit 69a785e

4 files changed

Lines changed: 168 additions & 12 deletions

File tree

modules/event_routing/doc/event_routing_admin.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,39 @@ route[reg_done] {
176176
</example>
177177
</section>
178178

179+
<section id="func_wait_for_event" xreflabel="wait_for_event()">
180+
<title>
181+
<function moreinfo="none">wait_for_event(event,filter,timeout)</function>
182+
</title>
183+
<para>
184+
Exactly as the async <xref linkend="afunc_wait_for_event"/> function,
185+
but sync/blocking version. The script execution will block and wait
186+
until the event is delivered or the timeout hits
187+
</para>
188+
<para>
189+
The function return 1 upon success (an event was received), -1 in error
190+
case or -2 in timeout case (no event was received).
191+
</para>
192+
<para>
193+
This function can be used from any type of route.
194+
</para>
195+
<example>
196+
<title><function>wait_for_event</function> usage</title>
197+
<programlisting format="linespecific">
198+
...
199+
# block until the callee to register
200+
$avp(filter) = "aor="+$rU+"@"+$rd
201+
wait_for_event("E_UL_AOR_INSERT",$avp(filter), 40);
202+
if ($rc>0) {
203+
xlog("user $avp(aor) is now registered\n");
204+
lookup("location");
205+
t_relay();
206+
}
207+
</programlisting>
208+
</example>
209+
</section>
210+
211+
179212
</section>
180213

181214

modules/event_routing/ebr_data.c

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ int add_ebr_subscription( struct sip_msg *msg, ebr_event *ev,
323323
sub->tm.hash = 0;
324324
sub->tm.label = 0;
325325
}
326+
326327
LM_DBG("transaction reference is %X:%X\n",sub->tm.hash,sub->tm.label);
327328

328329
/* link subscription to the event */
@@ -332,7 +333,7 @@ int add_ebr_subscription( struct sip_msg *msg, ebr_event *ev,
332333
lock_release( &(ev->lock) );
333334

334335
LM_DBG("new subscription [%s] on event %.*s/%d successfully added from "
335-
"process %d\n", (flags&EBR_SUBS_TYPE_WAIT)?"WAIT":"NOTIFY",
336+
"process %d\n", EBR_SUBS_TYPE(sub),
336337
ev->event_name.len, ev->event_name.s, ev->event_id, process_no);
337338

338339
return 0;
@@ -481,8 +482,7 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)
481482
/* discard expired subscriptions */
482483
if (sub->expire<my_time) {
483484
LM_DBG("subscription type [%s] from process %d(pid %d) on "
484-
"event <%.*s> expired at %d\n",
485-
(sub->flags&EBR_SUBS_TYPE_WAIT)?"WAIT":"NOTIFY",
485+
"event <%.*s> expired at %d\n", EBR_SUBS_TYPE(sub),
486486
sub->proc_no, pt[sub->proc_no].pid,
487487
sub->event->event_name.len, sub->event->event_name.s,
488488
sub->expire );
@@ -505,6 +505,20 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)
505505
shm_free(job);
506506
continue; /* keep it and try next time */
507507
}
508+
} else
509+
/* resume if an sync/blocking WAIT */
510+
if (sub->flags&EBR_SUBS_TYPE_SWAIT) {
511+
struct swait_pack *swait_data = (struct swait_pack*)sub->data;
512+
cond_lock(&swait_data->cond);
513+
cond_signal(&swait_data->cond);
514+
cond_unlock(&swait_data->cond);
515+
/* the "swait_data" will be freed by the waiting proc,
516+
* we will free here only the subcription (without the
517+
* data field) */
518+
sub->data = NULL; /*just to avod earlier free*/
519+
/* setting swait_data->ret_avps to -1 serves as an
520+
* indication of a timeout */
521+
swait_data->ret_avps = ((void*)(long)-1);
508522
}
509523

510524
/* unlink it */
@@ -541,8 +555,7 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)
541555
if (matches) {
542556

543557
LM_DBG("subscription type [%s]from process %d(pid %d) matched "
544-
"event, generating notification via IPC\n",
545-
(sub->flags&EBR_SUBS_TYPE_WAIT)?"WAIT":"NOTIFY",
558+
"event, generating notification via IPC\n", EBR_SUBS_TYPE(sub),
546559
sub->proc_no, pt[sub->proc_no].pid);
547560

548561
/* convert the EVI params into AVP (only once) */
@@ -582,14 +595,34 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)
582595
if (job->data) shm_free(job->data);
583596
shm_free(job);
584597
}
585-
} else {
598+
} else
599+
if (sub->flags&EBR_SUBS_TYPE_WAIT) {
586600
/* sent the event notification via IPC to resume on the
587601
* subscribing process */
588602
if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) {
589603
LM_ERR("failed to send job via IPC, skipping...\n");
590604
shm_free(job);
591605
}
592606

607+
/* unlink it */
608+
if (sub_prev) sub_prev->next = sub_next;
609+
else ev->subs = sub_next;
610+
/* free it */
611+
free_ebr_subscription(sub);
612+
/* do not count us as prev, as we are removed */
613+
sub = sub_prev;
614+
} else
615+
/* resume if an sync/blocking WAIT */
616+
if (sub->flags&EBR_SUBS_TYPE_SWAIT) {
617+
struct swait_pack *swait_data = (struct swait_pack*)sub->data;
618+
swait_data->ret_avps = job->avps;
619+
shm_free(job); /* we need only the AVPs in this scenario */
620+
cond_lock(&swait_data->cond);
621+
cond_signal(&swait_data->cond);
622+
cond_unlock(&swait_data->cond);
623+
/* and destroy the subscription as it is only one time
624+
* triggering */
625+
sub->data = NULL; /* just to be sure it is not freed here */
593626
/* unlink it */
594627
if (sub_prev) sub_prev->next = sub_next;
595628
else ev->subs = sub_next;
@@ -651,8 +684,7 @@ void ebr_timeout(unsigned int ticks, void* param)
651684
continue;
652685

653686
LM_DBG("subscription type [%s] from process %d(pid %d) on "
654-
"event <%.*s> expired at %d, now %d\n",
655-
(sub->flags&EBR_SUBS_TYPE_WAIT)?"WAIT":"NOTIFY",
687+
"event <%.*s> expired at %d, now %d\n", EBR_SUBS_TYPE(sub),
656688
sub->proc_no, pt[sub->proc_no].pid,
657689
sub->event->event_name.len, sub->event->event_name.s,
658690
sub->expire, my_time );

modules/event_routing/ebr_data.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#define _MODULE_EBR_H
2525

2626
#include "../../locking.h"
27+
#include "../../lib/cond.h"
2728
#include "../tm/t_lookup.h"
2829

2930
#define EVI_ROUTING_NAME "routing"
@@ -42,8 +43,14 @@ struct _ebr_event;
4243

4344
#define EBR_SUBS_TYPE_WAIT (1<<0)
4445
#define EBR_SUBS_TYPE_NOTY (1<<1)
45-
#define EBR_DATA_TYPE_ROUT (1<<2)
46-
#define EBR_DATA_TYPE_FUNC (1<<3)
46+
#define EBR_SUBS_TYPE_SWAIT (1<<2)
47+
#define EBR_DATA_TYPE_ROUT (1<<3)
48+
#define EBR_DATA_TYPE_FUNC (1<<4)
49+
50+
#define EBR_SUBS_TYPE(_s) \
51+
( ((_s)->flags&EBR_SUBS_TYPE_WAIT)?"WAIT": \
52+
(((_s)->flags&EBR_SUBS_TYPE_SWAIT)?"SWAIT":"NOTIFY") )
53+
4754

4855
typedef struct usr_avp *(*ebr_pack_params_cb) (evi_params_t *params);
4956

@@ -55,8 +62,8 @@ typedef struct _ebr_subscription {
5562
ebr_pack_params_cb pack_params;
5663
void *data;
5764
int expire;
58-
/* Transaction ID data */
59-
struct tm_id tm ;
65+
/* Transaction ID data , used for NOTIFY */
66+
struct tm_id tm;
6067
struct _ebr_subscription *next;
6168
} ebr_subscription;
6269

@@ -70,6 +77,13 @@ typedef struct _ebr_event {
7077
struct _ebr_event *next;
7178
} ebr_event;
7279

80+
81+
struct swait_pack {
82+
gen_cond_t cond;
83+
struct usr_avp *ret_avps;
84+
};
85+
86+
7387
ebr_event * search_ebr_event( const str *name );
7488

7589
ebr_event * add_ebr_event( const str *name );

modules/event_routing/event_routing.c

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "../../ut.h"
2323
#include "../../ipc.h"
2424
#include "../../mod_fix.h"
25+
#include "../../lib/cond.h"
2526
#include "../../evi/evi_transport.h"
2627
#include "../../evi/evi_modules.h"
2728
#include "../tm/tm_load.h"
@@ -40,6 +41,8 @@ static int mod_init(void);
4041
static int cfg_validate(void);
4142
static int notify_on_event(struct sip_msg *msg, ebr_event* event,
4243
pv_spec_t *avp_filter, void *route, int *timeout);
44+
static int wait_for_event_sync(struct sip_msg* msg,
45+
ebr_event* event, pv_spec_t* avp_filter, int* timeout);
4346
static int wait_for_event(struct sip_msg* msg, async_ctx *ctx,
4447
ebr_event* event, pv_spec_t* avp_filter, int* timeout);
4548

@@ -83,6 +86,11 @@ static const cmd_export_t cmds[]={
8386
{CMD_PARAM_STR, fix_notification_route, free_notification_route},
8487
{CMD_PARAM_INT, 0 ,0}, {0,0,0}},
8588
EVENT_ROUTE|REQUEST_ROUTE|ONREPLY_ROUTE|FAILURE_ROUTE|BRANCH_ROUTE},
89+
{"wait_for_event", (cmd_function)wait_for_event_sync, {
90+
{CMD_PARAM_STR, fix_event_name, 0},
91+
{CMD_PARAM_VAR, fixup_check_avp, 0},
92+
{CMD_PARAM_INT, 0 ,0}, {0,0,0}},
93+
ALL_ROUTES},
8694
{"ebr_bind", (cmd_function)ebr_bind, {{0,0,0}}, 0},
8795
{0,0,{{0,0,0}},0}
8896
};
@@ -431,6 +439,75 @@ int api_wait_for_event(struct sip_msg *msg, async_ctx *ctx,
431439
return _wait_for_event(msg, ctx, event, filters_cpy, timeout, pack_params);
432440
}
433441

442+
443+
static int wait_for_event_sync(struct sip_msg* msg,
444+
ebr_event* event, pv_spec_t* avp_filter, int* timeout)
445+
{
446+
struct swait_pack *swait_data;
447+
ebr_filter *filters;
448+
int rc = -1;
449+
450+
if (pack_ebr_filters(msg, avp_filter->pvp.pvn.u.isname.name.n,
451+
&filters) < 0) {
452+
LM_ERR("failed to build list of EBR filters\n");
453+
goto done;
454+
}
455+
456+
if (event->event_id == -1) {
457+
/* do the init of the event*/
458+
if (init_ebr_event(event) < 0) {
459+
LM_ERR("failed to init event\n");
460+
goto done;
461+
}
462+
}
463+
464+
swait_data = (struct swait_pack*)shm_malloc( sizeof(struct swait_pack) );
465+
if (swait_data==NULL) {
466+
LM_ERR("failed to allocated SWAIT data\n");
467+
goto done;
468+
}
469+
if (cond_init(&swait_data->cond) != 0) {
470+
LM_ERR("could not initialize subscription cond\n");
471+
goto done;
472+
}
473+
swait_data->ret_avps = NULL; /*to be populated on resume*/
474+
475+
/* we have a valid EBR event here, let's subscribe on it */
476+
if (add_ebr_subscription(msg, event, filters,
477+
*timeout, NULL, (void*)swait_data, EBR_SUBS_TYPE_SWAIT) < 0) {
478+
LM_ERR("failed to add ebr subscription for event %d\n",
479+
event->event_id);
480+
goto done1;
481+
}
482+
483+
/* now just wait on the codition */
484+
cond_lock(&swait_data->cond);
485+
cond_wait(&swait_data->cond);
486+
cond_unlock(&swait_data->cond);
487+
488+
/* timeout or event received ? */
489+
if (swait_data->ret_avps==((void*)(long)-1) ) {
490+
/* was a timeout */
491+
rc = -2;
492+
} else {
493+
/* event data received */
494+
if (swait_data->ret_avps!=NULL)
495+
/* push the received event AVPs into the current list */
496+
ebr_resume_from_wait( NULL, NULL, (void*)swait_data->ret_avps);
497+
/* success */
498+
rc = 1;
499+
}
500+
501+
return rc == 0 ? 1 : rc;
502+
done1:
503+
cond_destroy(&swait_data->cond);
504+
done:
505+
shm_free(swait_data);
506+
return rc;
507+
}
508+
509+
510+
434511
/************ implementation of the EVI transport API *******************/
435512

436513
static int ebr_match(evi_reply_sock *sock1, evi_reply_sock *sock2)

0 commit comments

Comments
 (0)