11import type { DockerService } from '../../docker.service.ts' ;
22import type { ConfigService } from '../../config.service.ts' ;
3+ import type { WsService } from '../../ws.service.ts' ;
34import type { LocalFilesRepository } from '../../../repositories/files/local-files.repository.ts' ;
45import {
56 Role ,
@@ -8,6 +9,11 @@ import {
89} from '../../../types/create-user.type.ts' ;
910import type { LKCreateUserBrowser } from '../../../types/com-modules/livekit.ts' ;
1011import { EmulatedFilePublishStreamService } from './emulated-file-publish-stream.service.ts' ;
12+ import type { JsonValue } from '../../../types/json.type.ts' ;
13+ import {
14+ ERRORS_FILE ,
15+ addSaveStatsToFileToQueue ,
16+ } from '../../../utils/stats-files.ts' ;
1117import * as fs from 'node:fs/promises' ;
1218
1319interface EmulatedContainerInfo {
@@ -27,25 +33,32 @@ interface FailedParticipantCreationContext extends Partial<EmulatedContainerInfo
2733export class EmulatedBrowserService {
2834 // Track container IDs for cleanup (browser-specific info)
2935 private readonly containerMap = new Map < string , EmulatedContainerInfo > ( ) ;
36+ private readonly healthCheckIntervals = new Map < string , NodeJS . Timeout > ( ) ;
37+ private readonly reportedHealthErrors = new Set < string > ( ) ;
3038
3139 private readonly dockerService : DockerService ;
3240 private readonly configService : ConfigService ;
41+ private readonly wsService : WsService ;
3342 private readonly localFilesRepository : LocalFilesRepository ;
3443 private readonly emulatedFilePublishStreamService : EmulatedFilePublishStreamService ;
3544
3645 private readonly LIVEKIT_CLI_IMAGE = 'livekit/livekit-cli' ;
3746 private readonly ROOM_EMPTY_TIMEOUT = 600 ; // 10 minutes
3847 private readonly CREATE_PARTICIPANT_MAX_ATTEMPTS = 3 ;
3948 private readonly CREATE_PARTICIPANT_RETRY_DELAY_MS = 1000 ;
49+ private readonly LIVEKIT_HEALTHCHECK_INTERVAL_MS = 5000 ;
50+ private readonly MAX_ERROR_LOG_CHARS = 3000 ;
4051
4152 constructor (
4253 dockerService : DockerService ,
4354 configService : ConfigService ,
55+ wsService : WsService ,
4456 localFilesRepository : LocalFilesRepository ,
4557 emulatedFilePublishStreamService : EmulatedFilePublishStreamService ,
4658 ) {
4759 this . dockerService = dockerService ;
4860 this . configService = configService ;
61+ this . wsService = wsService ;
4962 this . localFilesRepository = localFilesRepository ;
5063 this . emulatedFilePublishStreamService =
5164 emulatedFilePublishStreamService ;
@@ -67,16 +80,38 @@ export class EmulatedBrowserService {
6780 const streamingFilesExist =
6881 await this . localFilesRepository . existStreamingMediaFiles ( ) ;
6982 if ( ! streamingFilesExist ) {
83+ this . saveErrorToStats ( userId , sessionName , {
84+ event : 'EMULATED_PARTICIPANT_CREATION_ERROR' ,
85+ source : 'emulated-browser-service' ,
86+ participant : userId ,
87+ session : sessionName ,
88+ timestamp : new Date ( ) . toISOString ( ) ,
89+ reason : 'streaming-media-files-missing' ,
90+ error : 'Streaming media files (.h264, .ogg) not found.' ,
91+ } ) ;
7092 throw new Error (
7193 'Streaming media files (.h264, .ogg) not found. Run prepare_scripts/generate-streaming-media.sh first.' ,
7294 ) ;
7395 }
7496
75- // Ensure LiveKit CLI image is available
76- await this . ensureLivekitCliImage ( ) ;
97+ try {
98+ // Ensure LiveKit CLI image is available
99+ await this . ensureLivekitCliImage ( ) ;
77100
78- // Create room if it doesn't exist
79- await this . createRoomIfNeeded ( lkRequest , sessionName ) ;
101+ // Create room if it doesn't exist
102+ await this . createRoomIfNeeded ( lkRequest , sessionName ) ;
103+ } catch ( error ) {
104+ this . saveErrorToStats ( userId , sessionName , {
105+ event : 'EMULATED_PARTICIPANT_CREATION_ERROR' ,
106+ source : 'emulated-browser-service' ,
107+ participant : userId ,
108+ session : sessionName ,
109+ timestamp : new Date ( ) . toISOString ( ) ,
110+ reason : 'creation-preparation-failed' ,
111+ error : String ( error ) ,
112+ } ) ;
113+ throw error ;
114+ }
80115
81116 let lastError : unknown ;
82117
@@ -94,6 +129,17 @@ export class EmulatedBrowserService {
94129 ) ;
95130 } catch ( error ) {
96131 lastError = error ;
132+ this . saveErrorToStats ( userId , sessionName , {
133+ event : 'EMULATED_PARTICIPANT_CREATION_ERROR' ,
134+ source : 'emulated-browser-service' ,
135+ participant : userId ,
136+ session : sessionName ,
137+ timestamp : new Date ( ) . toISOString ( ) ,
138+ reason : 'creation-attempt-failed' ,
139+ attempt,
140+ maxAttempts : this . CREATE_PARTICIPANT_MAX_ATTEMPTS ,
141+ error : String ( error ) ,
142+ } ) ;
97143 console . warn (
98144 `Emulated participant join attempt ${ attempt } /${ this . CREATE_PARTICIPANT_MAX_ATTEMPTS } failed for ${ sessionName } /${ userId } : ${ String ( error ) } ` ,
99145 ) ;
@@ -189,6 +235,8 @@ export class EmulatedBrowserService {
189235 participantId ,
190236 ) ;
191237
238+ this . startParticipantHealthCheck ( connectionId ) ;
239+
192240 console . log (
193241 `Emulated participant created: connectionId=${ connectionId } , containerId=${ containerId } ` ,
194242 ) ;
@@ -212,6 +260,7 @@ export class EmulatedBrowserService {
212260 containerInfo : FailedParticipantCreationContext ,
213261 ) : Promise < void > {
214262 if ( containerInfo . connectionId ) {
263+ this . stopParticipantHealthCheck ( containerInfo . connectionId ) ;
215264 this . containerMap . delete ( containerInfo . connectionId ) ;
216265 }
217266
@@ -386,6 +435,8 @@ export class EmulatedBrowserService {
386435 return ;
387436 }
388437
438+ this . stopParticipantHealthCheck ( connectionId ) ;
439+
389440 // Stop and remove join container
390441 try {
391442 await this . dockerService . stopContainer ( containerInfo . containerId ) ;
@@ -484,6 +535,178 @@ export class EmulatedBrowserService {
484535 return this . containerMap . get ( connectionId ) ?. containerId ;
485536 }
486537
538+ private startParticipantHealthCheck ( connectionId : string ) : void {
539+ this . stopParticipantHealthCheck ( connectionId ) ;
540+ this . reportedHealthErrors . delete ( connectionId ) ;
541+
542+ const interval = setInterval ( ( ) => {
543+ void this . runParticipantHealthCheck ( connectionId ) ;
544+ } , this . LIVEKIT_HEALTHCHECK_INTERVAL_MS ) ;
545+
546+ this . healthCheckIntervals . set ( connectionId , interval ) ;
547+ }
548+
549+ private stopParticipantHealthCheck ( connectionId : string ) : void {
550+ const interval = this . healthCheckIntervals . get ( connectionId ) ;
551+ if ( interval ) {
552+ clearInterval ( interval ) ;
553+ this . healthCheckIntervals . delete ( connectionId ) ;
554+ }
555+ }
556+
557+ private async runParticipantHealthCheck (
558+ connectionId : string ,
559+ ) : Promise < void > {
560+ if ( this . reportedHealthErrors . has ( connectionId ) ) {
561+ return ;
562+ }
563+
564+ const containerInfo = this . containerMap . get ( connectionId ) ;
565+ if ( ! containerInfo ) {
566+ this . stopParticipantHealthCheck ( connectionId ) ;
567+ return ;
568+ }
569+
570+ try {
571+ const [ isRunning , streamFailed ] = await Promise . all ( [
572+ this . dockerService . isContainerRunning (
573+ containerInfo . containerId ,
574+ ) ,
575+ Promise . resolve (
576+ this . emulatedFilePublishStreamService . isParticipantFailed (
577+ containerInfo . participantId ,
578+ ) ,
579+ ) ,
580+ ] ) ;
581+
582+ if ( ! isRunning ) {
583+ const logs = await this . getContainerLogsSafely (
584+ containerInfo . containerId ,
585+ ) ;
586+ await this . handleUnhealthyParticipant (
587+ connectionId ,
588+ 'container-not-running' ,
589+ logs ,
590+ ) ;
591+ return ;
592+ }
593+
594+ if ( streamFailed ) {
595+ await this . handleUnhealthyParticipant (
596+ connectionId ,
597+ 'socket-stream-failed' ,
598+ ) ;
599+ return ;
600+ }
601+
602+ const logs = await this . getContainerLogsSafely (
603+ containerInfo . containerId ,
604+ ) ;
605+ if ( logs && this . hasFatalJoinIndicators ( logs ) ) {
606+ await this . handleUnhealthyParticipant (
607+ connectionId ,
608+ 'fatal-log-indicator' ,
609+ logs ,
610+ ) ;
611+ }
612+ } catch ( error ) {
613+ await this . handleUnhealthyParticipant (
614+ connectionId ,
615+ 'healthcheck-execution-failed' ,
616+ String ( error ) ,
617+ ) ;
618+ }
619+ }
620+
621+ private async handleUnhealthyParticipant (
622+ connectionId : string ,
623+ reason : string ,
624+ logs ?: string ,
625+ ) : Promise < void > {
626+ if ( this . reportedHealthErrors . has ( connectionId ) ) {
627+ return ;
628+ }
629+
630+ this . reportedHealthErrors . add ( connectionId ) ;
631+ this . stopParticipantHealthCheck ( connectionId ) ;
632+
633+ const containerInfo = this . containerMap . get ( connectionId ) ;
634+ if ( containerInfo ) {
635+ this . reportHealthError ( containerInfo , connectionId , reason , logs ) ;
636+ }
637+
638+ await this . deleteStreamManagerWithConnectionId ( connectionId ) . catch (
639+ error => {
640+ console . error (
641+ `Error cleaning unhealthy participant ${ connectionId } :` ,
642+ error ,
643+ ) ;
644+ } ,
645+ ) ;
646+ }
647+
648+ private reportHealthError (
649+ containerInfo : EmulatedContainerInfo ,
650+ connectionId : string ,
651+ reason : string ,
652+ logs ?: string ,
653+ ) : void {
654+ const payload : Record < string , string > = {
655+ event : 'EMULATED_PARTICIPANT_HEALTH_ERROR' ,
656+ source : 'livekit-cli-healthcheck' ,
657+ participant : containerInfo . userName ,
658+ session : containerInfo . sessionName ,
659+ timestamp : new Date ( ) . toISOString ( ) ,
660+ connectionId,
661+ participantId : containerInfo . participantId ,
662+ containerId : containerInfo . containerId ,
663+ reason,
664+ } ;
665+
666+ if ( logs ) {
667+ payload . logs = this . truncateLogs ( logs ) ;
668+ }
669+
670+ this . wsService . send ( JSON . stringify ( payload ) ) ;
671+ this . saveErrorToStats (
672+ containerInfo . userName ,
673+ containerInfo . sessionName ,
674+ payload ,
675+ ) ;
676+ console . error (
677+ `Healthcheck error reported for ${ connectionId } : ${ reason } ` ,
678+ ) ;
679+ }
680+
681+ private saveErrorToStats (
682+ participant : string ,
683+ session : string ,
684+ errorData : Record < string , JsonValue > ,
685+ ) : void {
686+ addSaveStatsToFileToQueue ( participant , session , ERRORS_FILE , errorData ) ;
687+ }
688+
689+ private async getContainerLogsSafely (
690+ containerId : string ,
691+ ) : Promise < string | undefined > {
692+ try {
693+ return await this . dockerService . getLogsFromContainer ( containerId ) ;
694+ } catch ( error ) {
695+ console . warn (
696+ `Failed to read logs from container ${ containerId } : ${ String ( error ) } ` ,
697+ ) ;
698+ return undefined ;
699+ }
700+ }
701+
702+ private truncateLogs ( logs : string ) : string {
703+ if ( logs . length <= this . MAX_ERROR_LOG_CHARS ) {
704+ return logs ;
705+ }
706+
707+ return logs . slice ( - this . MAX_ERROR_LOG_CHARS ) ;
708+ }
709+
487710 private async checkConnectionIsAliveAndCorrect (
488711 properties : UserJoinProperties ,
489712 joinContainerName : string ,
@@ -577,4 +800,20 @@ export class EmulatedBrowserService {
577800
578801 return hasConnectedToRoom || ( hasIceConnected && hasPeerConnected ) ;
579802 }
803+
804+ private hasFatalJoinIndicators ( joinLogs : string ) : boolean {
805+ const normalizedLogs = this . normalizeContainerLogs ( joinLogs ) ;
806+ const fatalPatterns = [
807+ 'peer connection state changed: failed' ,
808+ 'ice connection state changed: failed' ,
809+ 'failed to join' ,
810+ 'panic:' ,
811+ 'fatal' ,
812+ 'disconnected from room' ,
813+ 'unpublished track' ,
814+ 'could not get sample from provider' ,
815+ ] ;
816+
817+ return fatalPatterns . some ( pattern => normalizedLogs . includes ( pattern ) ) ;
818+ }
580819}
0 commit comments