@@ -25,6 +25,7 @@ abstract class AbstractDriver implements Driver
2525
2626 private \Fiber $ callbackFiber ;
2727 private \Fiber $ queueFiber ;
28+ private \Closure $ errorCallback ;
2829
2930 /** @var Callback[] */
3031 private array $ callbacks = [];
@@ -51,15 +52,18 @@ abstract class AbstractDriver implements Driver
5152
5253 private bool $ running = false ;
5354
55+ private bool $ inFiber = false ;
56+
5457 private \stdClass $ internalSuspensionMarker ;
5558
5659 public function __construct ()
5760 {
5861 $ this ->internalSuspensionMarker = new \stdClass ();
5962 $ this ->createCallbackFiber ();
6063 $ this ->createQueueFiber ();
64+ $ this ->createErrorCallback ();
6165 /** @psalm-suppress InvalidArgument */
62- $ this ->interruptCallback = \Closure::fromCallable ([$ this , 'interrupt ' ]);
66+ $ this ->interruptCallback = \Closure::fromCallable ([$ this , 'setInterrupt ' ]);
6367 }
6468
6569 /**
@@ -84,9 +88,14 @@ public function run(): void
8488 }
8589
8690 $ this ->running = true ;
91+ $ this ->inFiber = \Fiber::getCurrent () !== null ;
8792
8893 try {
8994 while ($ this ->running ) {
95+ if ($ this ->interrupt ) {
96+ $ this ->invokeInterrupt ();
97+ }
98+
9099 $ this ->invokeMicrotasks ();
91100
92101 if ($ this ->isEmpty ()) {
@@ -97,6 +106,7 @@ public function run(): void
97106 }
98107 } finally {
99108 $ this ->running = false ;
109+ $ this ->inFiber = false ;
100110 }
101111 }
102112
@@ -594,10 +604,7 @@ protected function invokeCallback(Callback $callback): void
594604 }
595605
596606 if ($ this ->interrupt ) {
597- $ interrupt = $ this ->interrupt ;
598- $ this ->interrupt = null ;
599-
600- \Fiber::suspend ($ interrupt );
607+ $ this ->invokeInterrupt ();
601608 }
602609
603610 if ($ this ->microQueue ) {
@@ -615,10 +622,12 @@ protected function invokeCallback(Callback $callback): void
615622 protected function error (\Throwable $ exception ): void
616623 {
617624 if ($ this ->errorHandler === null ) {
618- throw $ exception ;
625+ $ this ->setInterrupt (static fn () => throw $ exception );
626+ return ;
619627 }
620628
621- ($ this ->errorHandler )($ exception );
629+ $ fiber = new \Fiber ($ this ->errorCallback );
630+ $ fiber ->start ($ this ->errorHandler , $ exception );
622631 }
623632
624633 /**
@@ -698,18 +707,31 @@ private function invokeMicrotasks(): void
698707 }
699708
700709 if ($ this ->interrupt ) {
701- $ interrupt = $ this ->interrupt ;
702- $ this ->interrupt = null ;
703-
704- \Fiber::suspend ($ interrupt );
710+ $ this ->invokeInterrupt ();
705711 }
706712 }
707713 }
708714 }
709715
710- private function interrupt (callable $ callback ): void
716+ private function setInterrupt (callable $ interrupt ): void
711717 {
712- $ this ->interrupt = $ callback ;
718+ \assert ($ this ->interrupt === null );
719+ $ this ->interrupt = $ interrupt ;
720+ }
721+
722+ private function invokeInterrupt (): void
723+ {
724+ \assert ($ this ->interrupt !== null );
725+
726+ $ interrupt = $ this ->interrupt ;
727+ $ this ->interrupt = null ;
728+
729+ if (!$ this ->inFiber ) {
730+ $ interrupt ();
731+ throw new \Error ('Interrupt must throw if not executing in a fiber ' );
732+ }
733+
734+ \Fiber::suspend ($ interrupt );
713735 }
714736
715737 private function createCallbackFiber (): void
@@ -749,4 +771,15 @@ private function createQueueFiber(): void
749771
750772 $ this ->queueFiber ->start ();
751773 }
774+
775+ private function createErrorCallback (): void
776+ {
777+ $ this ->errorCallback = function (callable $ errorHandler , \Throwable $ exception ): void {
778+ try {
779+ $ errorHandler ($ exception );
780+ } catch (\Throwable $ exception ) {
781+ $ this ->interrupt = static fn () => throw $ exception ;
782+ }
783+ };
784+ }
752785}
0 commit comments