@@ -166,7 +166,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
166166 final TTLMap <String , int > _reliableReceivedState = TTLMap <String , int >(30000 );
167167 bool _isReconnecting = false ;
168168
169- Future <void >? _publisherConnectionFuture ;
169+ Completer <void >? _publisherConnectionCompleter ;
170170
171171 String ? _reliableParticipantKey (lk_models.DataPacket packet) {
172172 if (packet.hasParticipantSid () && packet.participantSid.isNotEmpty) {
@@ -487,11 +487,12 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
487487 }
488488
489489 Future <void > _publisherEnsureConnected () async {
490- if ((await publisher? .pc.getConnectionState ())? .isConnected () != true ) {
490+ final state = await publisher? .pc.getConnectionState ();
491+ if (state? .isConnected () != true ) {
491492 logger.fine ('Publisher is not connected...' );
492493
493494 // start negotiation
494- if (await publisher ? .pc. getConnectionState () != rtc.RTCPeerConnectionState .RTCPeerConnectionStateConnecting ) {
495+ if (state != rtc.RTCPeerConnectionState .RTCPeerConnectionStateConnecting ) {
495496 await negotiate ();
496497 }
497498 if (! lkPlatformIsTest ()) {
@@ -505,14 +506,41 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
505506 }
506507
507508 @internal
508- Future <void > ensurePublisherConnected () async {
509- _publisherConnectionFuture ?? = _publisherEnsureConnected ();
510- try {
511- await _publisherConnectionFuture;
512- } catch (_) {
513- _publisherConnectionFuture = null ;
514- rethrow ;
509+ Future <void > ensurePublisherConnected () {
510+ final existing = _publisherConnectionCompleter;
511+ if (existing != null && ! existing.isCompleted) {
512+ return existing.future;
513+ }
514+
515+ final completer = Completer <void >();
516+ _publisherConnectionCompleter = completer;
517+
518+ unawaited (
519+ _publisherEnsureConnected ().then ((_) {
520+ if (! completer.isCompleted) {
521+ completer.complete ();
522+ }
523+ }, onError: (Object error, StackTrace stackTrace) {
524+ if (! completer.isCompleted) {
525+ completer.completeError (error, stackTrace);
526+ }
527+ }).whenComplete (() {
528+ if (identical (_publisherConnectionCompleter, completer)) {
529+ _publisherConnectionCompleter = null ;
530+ }
531+ }),
532+ );
533+
534+ return completer.future;
535+ }
536+
537+ void _resetPublisherConnection () {
538+ final completer = _publisherConnectionCompleter;
539+ if (completer != null && ! completer.isCompleted) {
540+ completer
541+ .completeError (ConnectException ('Publisher connection reset' , reason: ConnectionErrorReason .InternalError ));
515542 }
543+ _publisherConnectionCompleter = null ;
516544 }
517545
518546 lk_models.EncryptedPacketPayload ? asEncryptablePacket (lk_models.DataPacket packet) {
@@ -652,7 +680,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
652680 rtc.RTCPeerConnectionState .RTCPeerConnectionStateFailed ,
653681 rtc.RTCPeerConnectionState .RTCPeerConnectionStateDisconnected
654682 ].contains (state)) {
655- _publisherConnectionFuture = null ;
683+ _resetPublisherConnection () ;
656684 }
657685 events.emit (EnginePublisherPeerStateUpdatedEvent (
658686 state: state,
@@ -1130,7 +1158,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
11301158 await publisher? .dispose ();
11311159 publisher = null ;
11321160
1133- _publisherConnectionFuture = null ;
1161+ _resetPublisherConnection () ;
11341162
11351163 await subscriber? .dispose ();
11361164 subscriber = null ;
0 commit comments