Skip to content

Commit 025f5ab

Browse files
committed
bugfix: 修复多个协程同时抛出异常时处理不正确的 BUG
1 parent 8d2a390 commit 025f5ab

7 files changed

Lines changed: 201 additions & 181 deletions

File tree

src/extensions/redis.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,17 @@ static void on_complete(redisAsyncContext *c, void *r, void *privdata) {
6161

6262
redisReply *reply = r;
6363
if (reply == NULL) {
64-
lgx_co_throw_s(vm, co, c->errstr);
64+
lgx_co_throw_s(co, c->errstr);
6565
return;
6666
}
6767

6868
switch (reply->type) {
6969
case REDIS_REPLY_ERROR:
70-
lgx_co_throw_s(vm, co, "%.*s", reply->len, reply->str);
70+
lgx_co_throw_s(co, "%.*s", reply->len, reply->str);
7171
break;
7272
case REDIS_REPLY_STATUS:
7373
if (redis->ctx->err) {
74-
lgx_co_throw_s(vm, co, "%s (%d)", redis->ctx->errstr, redis->ctx->err);
74+
lgx_co_throw_s(co, "%s (%d)", redis->ctx->errstr, redis->ctx->err);
7575
} else {
7676
LGX_RETURN_STRING(lgx_str_new(reply->str, reply->len));
7777
}
@@ -121,7 +121,7 @@ static void on_connect(const redisAsyncContext *c, int status) {
121121
lgx_co_resume(vm, redis->co);
122122

123123
if (status != REDIS_OK) {
124-
lgx_co_throw_s(vm, redis->co, c->errstr);
124+
lgx_co_throw_s(redis->co, c->errstr);
125125
} else {
126126
// 写入返回值
127127
LGX_RETURN_TRUE();

src/extensions/std_net_socket.c

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ static void do_accept(lgx_socket_t *sock) {
192192
if ( wbt_nonblocking(conn_sock) == -1 ) {
193193
wbt_close_socket(conn_sock);
194194
sock->co = NULL;
195-
lgx_co_throw_s(vm, co, "wbt_nonblocking() failed");
195+
lgx_co_throw_s(co, "wbt_nonblocking() failed");
196196
return;
197197
}
198198
}
@@ -203,15 +203,15 @@ static void do_accept(lgx_socket_t *sock) {
203203
if (setsockopt(conn_sock, IPPROTO_TCP, TCP_NODELAY, (char *)&on, sizeof(on)) != 0) {
204204
wbt_close_socket(conn_sock);
205205
sock->co = NULL;
206-
lgx_co_throw_s(vm, co, "setsockopt(TCP_NODELAY) failed");
206+
lgx_co_throw_s(co, "setsockopt(TCP_NODELAY) failed");
207207
return;
208208
}
209209

210210
lgx_socket_t *newsock = xcalloc(1, sizeof(lgx_socket_t));
211211
if (!newsock) {
212212
wbt_close_socket(conn_sock);
213213
sock->co = NULL;
214-
lgx_co_throw_s(vm, co, "xcalloc() failed");
214+
lgx_co_throw_s(co, "xcalloc() failed");
215215
return;
216216
}
217217

@@ -220,7 +220,7 @@ static void do_accept(lgx_socket_t *sock) {
220220
wbt_close_socket(conn_sock);
221221
xfree(newsock);
222222
sock->co = NULL;
223-
lgx_co_throw_s(vm, co, "lgx_res_new() failed");
223+
lgx_co_throw_s(co, "lgx_res_new() failed");
224224
return;
225225
}
226226

@@ -236,22 +236,22 @@ static void do_accept(lgx_socket_t *sock) {
236236
wbt_close_socket(conn_sock);
237237
lgx_res_delete(res);
238238
sock->co = NULL;
239-
lgx_co_throw_s(vm, co, "lgx_obj_set() failed");
239+
lgx_co_throw_s(co, "lgx_obj_set() failed");
240240
return;
241241
}
242242

243243
newsock->obj = newobj;
244244
newsock->fd = conn_sock;
245245

246246
sock->co = NULL;
247-
LGX_RETURN_OBJECT(newobj);
247+
lgx_co_return_object(co, newobj);
248248
} else {
249249
wbt_err_t err = wbt_socket_errno;
250250

251251
if (err == WBT_EAGAIN) {
252252
if (!(co && co->status == CO_RUNNING)) {
253253
sock->co = NULL;
254-
lgx_co_throw_s(vm, co, "unexpected EAGAIN");
254+
lgx_co_throw_s(co, "unexpected EAGAIN");
255255
return;
256256
}
257257

@@ -266,7 +266,7 @@ static void do_accept(lgx_socket_t *sock) {
266266

267267
if((p_ev = wbt_event_add(vm->events, &ev)) == NULL) {
268268
sock->co = NULL;
269-
lgx_co_throw_s(vm, co, "wbt_event_add() failed");
269+
lgx_co_throw_s(co, "wbt_event_add() failed");
270270
return;
271271
}
272272

@@ -275,7 +275,7 @@ static void do_accept(lgx_socket_t *sock) {
275275
lgx_co_suspend(vm);
276276
} else {
277277
sock->co = NULL;
278-
lgx_co_throw_s(vm, co, strerror(err));
278+
lgx_co_throw_s(co, strerror(err));
279279
}
280280
}
281281
}
@@ -328,7 +328,7 @@ static wbt_status on_timeout(wbt_timer_t *timer) {
328328

329329
lgx_socket_t *sock = ev->ctx;
330330
lgx_co_resume(sock->co->vm, sock->co);
331-
lgx_co_throw_s(sock->co->vm, sock->co, "timeout");
331+
lgx_co_throw_s(sock->co, "timeout");
332332

333333
wbt_event_del(sock->co->vm->events, ev);
334334

@@ -359,7 +359,7 @@ static void do_send(lgx_socket_t *sock) {
359359
goto wait_send;
360360
} else {
361361
sock->co = NULL;
362-
lgx_co_throw_s(vm, co, strerror(err));
362+
lgx_co_throw_s(co, strerror(err));
363363
return;
364364
}
365365
}
@@ -375,7 +375,7 @@ static void do_send(lgx_socket_t *sock) {
375375

376376
// 发送完毕
377377
sock->co = NULL;
378-
LGX_RETURN_TRUE();
378+
lgx_co_return_true(co);
379379
return;
380380

381381
wait_send:;
@@ -391,7 +391,7 @@ wait_send:;
391391

392392
if((p_ev = wbt_event_add(vm->events, &ev)) == NULL) {
393393
sock->co = NULL;
394-
lgx_co_throw_s(vm, co, "wbt_event_add() failed");
394+
lgx_co_throw_s(co, "wbt_event_add() failed");
395395
return;
396396
}
397397

@@ -465,15 +465,15 @@ static void do_recv(lgx_socket_t *sock) {
465465
xfree(sock->buffer);
466466
sock->buffer= NULL;
467467
sock->length = sock->offset = 0;
468-
lgx_co_throw_s(vm, co, strerror(err));
468+
lgx_co_throw_s(co, strerror(err));
469469
return;
470470
}
471471
} else if (nread == 0) {
472472
sock->co = NULL;
473473
xfree(sock->buffer);
474474
sock->buffer= NULL;
475475
sock->length = sock->offset = 0;
476-
lgx_co_throw_s(vm, co, "connection closed by peer");
476+
lgx_co_throw_s(co, "connection closed by peer");
477477
return;
478478
}
479479

@@ -488,7 +488,7 @@ static void do_recv(lgx_socket_t *sock) {
488488
str->is_ref = 0;
489489

490490
sock->co = NULL;
491-
LGX_RETURN_STRING(str);
491+
lgx_co_return_string(co, str);
492492
return;
493493

494494
wait_recv:;
@@ -504,7 +504,7 @@ wait_recv:;
504504

505505
if((p_ev = wbt_event_add(vm->events, &ev)) == NULL) {
506506
sock->co = NULL;
507-
lgx_co_throw_s(vm, co, "wbt_event_add() failed");
507+
lgx_co_throw_s(co, "wbt_event_add() failed");
508508
return;
509509
}
510510

src/extensions/std_net_tcp_client.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ static wbt_status on_close(wbt_event_t *ev) {
151151

152152
// 写入返回值
153153
if (!client->is_return) {
154-
lgx_co_throw_s(client->vm, client->co, "get failed");
154+
lgx_co_throw_s(client->co, "get failed");
155155
}
156156

157157
xfree(client);

src/interpreter/coroutine.c

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
#include "../common/str.h"
44
#include "../common/obj.h"
55
#include "../common/res.h"
6+
#include "../common/exception.h"
67
#include "coroutine.h"
78
#include "gc.h"
89

10+
#define LGX_MAX_STACK_SIZE (256 << 8)
11+
912
int lgx_co_stack_init(lgx_co_stack_t *stack, unsigned size) {
1013
stack->size = size;
1114
stack->base = 0;
@@ -429,5 +432,167 @@ int lgx_co_await(lgx_vm_t *vm) {
429432

430433
co->on_yield = NULL;
431434

435+
return 0;
436+
}
437+
438+
// 输出协程的当前调用栈
439+
int lgx_co_backtrace(lgx_co_t *co) {
440+
unsigned int base = co->stack.base;
441+
while (1) {
442+
printf(" <function:%d> %d\n", co->stack.buf[base].v.fun->addr, base);
443+
444+
if (base != 0) {
445+
base = co->stack.buf[base+3].v.l;
446+
} else {
447+
break;
448+
}
449+
}
450+
451+
return 0;
452+
}
453+
454+
void lgx_co_throw(lgx_co_t *co, lgx_val_t *e) {
455+
unsigned pc = co->pc - 1;
456+
457+
// 匹配最接近的 try-catch 块
458+
unsigned long long int key = ((unsigned long long int)pc + 1) << 32;
459+
460+
wbt_str_t k;
461+
k.str = (char *)&key;
462+
k.len = sizeof(key);
463+
464+
lgx_exception_t *exception;
465+
lgx_exception_block_t *block = NULL;
466+
wbt_rb_node_t *n = wbt_rb_get_lesser(co->vm->exception, &k);
467+
if (n) {
468+
exception = (lgx_exception_t *)n->value.str;
469+
if (pc >= exception->try_block.start && pc <= exception->try_block.end) {
470+
// 匹配参数类型符合的 catch block
471+
lgx_exception_block_t *b;
472+
wbt_list_for_each_entry(b, lgx_exception_block_t, &exception->catch_blocks, head) {
473+
if (b->e->type == T_UNDEFINED) {
474+
block = b;
475+
break;
476+
} else if (b->e->type == e->type) {
477+
if (b->e->type == T_OBJECT) {
478+
if (lgx_obj_is_same_class(b->e->v.obj, e->v.obj) == 0) {
479+
block = b;
480+
break;
481+
}
482+
} else {
483+
block = b;
484+
break;
485+
}
486+
}
487+
}
488+
}
489+
}
490+
491+
if (block) {
492+
// 跳转到指定的 catch block
493+
co->pc = block->start;
494+
495+
// 把异常变量写入到 catch block 的参数中
496+
//printf("%d\n",block->e->u.c.reg);
497+
lgx_gc_ref_del(&co->stack.buf[co->stack.base + block->e->u.c.reg]);
498+
co->stack.buf[co->stack.base + block->e->u.c.reg] = *e;
499+
} else {
500+
// 没有匹配的 catch 块,递归向上寻找
501+
unsigned base = co->stack.base;
502+
lgx_val_t *regs = co->stack.buf + base;
503+
504+
assert(regs[0].type == T_FUNCTION);
505+
assert(regs[2].type == T_LONG);
506+
assert(regs[3].type == T_LONG);
507+
508+
if (regs[2].v.l >= 0) {
509+
// 释放所有局部变量和临时变量
510+
int n;
511+
for (n = 0; n < regs[0].v.fun->stack_size; n ++) {
512+
lgx_gc_ref_del(&regs[n]);
513+
regs[n].type = T_UNDEFINED;
514+
}
515+
516+
// 切换执行堆栈
517+
co->stack.base = regs[3].v.l;
518+
519+
// 在函数调用点重新抛出异常
520+
co->pc = regs[2].v.l;
521+
lgx_co_throw(co, e);
522+
} else {
523+
// 遍历调用栈依然未能找到匹配的 catch 块,退出当前协程
524+
printf("[uncaught exception] [%llu] ", co->id);
525+
lgx_val_print(e);
526+
printf("\n");
527+
528+
lgx_gc_ref_del(e);
529+
// TODO 写入到协程返回值中?
530+
531+
lgx_co_backtrace(co);
532+
533+
co->pc = regs[0].v.fun->end;
534+
}
535+
}
536+
}
537+
538+
void lgx_co_throw_s(lgx_co_t *co, const char *fmt, ...) {
539+
char *buf = (char *)xmalloc(128);
540+
541+
va_list args;
542+
va_start(args, fmt);
543+
int len = vsnprintf(buf, 128, fmt, args);
544+
va_end(args);
545+
546+
lgx_val_t e;
547+
e.type = T_STRING;
548+
e.v.str = lgx_str_new_ref(buf, 128);
549+
e.v.str->length = len;
550+
e.v.str->is_ref = 0;
551+
552+
lgx_gc_ref_add(&e);
553+
554+
lgx_co_throw(co, &e);
555+
}
556+
557+
void lgx_co_throw_v(lgx_co_t *co, lgx_val_t *v) {
558+
lgx_val_t e;
559+
e = *v;
560+
561+
// 把原始变量标记为 undefined,避免 exception 值被释放
562+
v->type = T_UNDEFINED;
563+
564+
lgx_co_throw(co, &e);
565+
}
566+
567+
// 确保堆栈上有足够的剩余空间
568+
int lgx_co_checkstack(lgx_co_t *co, unsigned int stack_size) {
569+
lgx_co_stack_t *stack = &co->stack;
570+
lgx_val_t *regs = &stack->buf[stack->base];
571+
572+
assert(regs[0].type == T_FUNCTION);
573+
574+
if (stack->base + regs[0].v.fun->stack_size + stack_size < stack->size) {
575+
return 0;
576+
}
577+
578+
unsigned int size = stack->size;
579+
while (stack->base + regs[0].v.fun->stack_size + stack_size >= size) {
580+
size *= 2;
581+
}
582+
583+
if (size > LGX_MAX_STACK_SIZE) {
584+
return 1;
585+
}
586+
587+
lgx_val_t *s = (lgx_val_t *)xrealloc(stack->buf, size * sizeof(lgx_val_t));
588+
if (!s) {
589+
return 1;
590+
}
591+
// 初始化新空间
592+
memset(s + stack->size, 0, (size - stack->size) * sizeof(lgx_val_t));
593+
594+
stack->buf = s;
595+
stack->size = size;
596+
432597
return 0;
433598
}

src/interpreter/coroutine.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,12 @@ int lgx_co_set_object(lgx_co_t *co, unsigned pos, lgx_obj_t *obj);
3939
lgx_obj_t* lgx_co_obj_new(lgx_vm_t *vm, lgx_co_t *co);
4040
int lgx_co_await(lgx_vm_t *vm);
4141

42+
int lgx_co_backtrace(lgx_co_t *co);
43+
44+
void lgx_co_throw(lgx_co_t *co, lgx_val_t *e);
45+
void lgx_co_throw_s(lgx_co_t *co, const char *fmt, ...);
46+
void lgx_co_throw_v(lgx_co_t *co, lgx_val_t *v);
47+
48+
int lgx_co_checkstack(lgx_co_t *co, unsigned int stack_size);
49+
4250
#endif // LGX_COROUTINE_H

0 commit comments

Comments
 (0)