@@ -38,6 +38,7 @@ import {
3838 Effect ,
3939 Exit ,
4040 FileSystem ,
41+ Fiber ,
4142 Layer ,
4243 Path ,
4344 Ref ,
@@ -331,6 +332,13 @@ class RouteRequestError extends Schema.TaggedErrorClass<RouteRequestError>()("Ro
331332 message : Schema . String ,
332333} ) { }
333334
335+ class GitActionStoppedError extends Schema . TaggedErrorClass < GitActionStoppedError > ( ) (
336+ "GitActionStoppedError" ,
337+ {
338+ message : Schema . String ,
339+ } ,
340+ ) { }
341+
334342export const createServer = Effect . fn ( function * ( ) : Effect . fn . Return <
335343 http . Server ,
336344 ServerLifecycleError ,
@@ -374,6 +382,89 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
374382 const clients = yield * Ref . make ( new Set < WebSocket > ( ) ) ;
375383 const logger = createLogger ( "ws" ) ;
376384 const readiness = yield * makeServerReadiness ;
385+ type ActiveGitRequestKind = "pull" | "stacked_action" ;
386+ type ActiveGitRequestHandle = {
387+ readonly kind : ActiveGitRequestKind ;
388+ readonly cwd : string ;
389+ readonly actionId : string | null ;
390+ readonly fiber : Fiber . Fiber < unknown , unknown > ;
391+ } ;
392+ const activeGitRequests = new WeakMap < WebSocket , Set < ActiveGitRequestHandle > > ( ) ;
393+
394+ const registerActiveGitRequest = ( ws : WebSocket , handle : ActiveGitRequestHandle ) =>
395+ Effect . sync ( ( ) => {
396+ const handles = activeGitRequests . get ( ws ) ?? new Set < ActiveGitRequestHandle > ( ) ;
397+ handles . add ( handle ) ;
398+ activeGitRequests . set ( ws , handles ) ;
399+ } ) ;
400+
401+ const unregisterActiveGitRequest = ( ws : WebSocket , handle : ActiveGitRequestHandle ) =>
402+ Effect . sync ( ( ) => {
403+ const handles = activeGitRequests . get ( ws ) ;
404+ if ( ! handles ) {
405+ return ;
406+ }
407+ handles . delete ( handle ) ;
408+ if ( handles . size === 0 ) {
409+ activeGitRequests . delete ( ws ) ;
410+ }
411+ } ) ;
412+
413+ const interruptActiveGitRequests = ( ws : WebSocket ) =>
414+ Effect . gen ( function * ( ) {
415+ const handles = Array . from ( activeGitRequests . get ( ws ) ?? [ ] ) ;
416+ activeGitRequests . delete ( ws ) ;
417+ for ( const handle of handles ) {
418+ yield * Fiber . interrupt ( handle . fiber ) . pipe ( Effect . ignore ) ;
419+ }
420+ } ) ;
421+
422+ const stopActiveGitRequest = (
423+ ws : WebSocket ,
424+ input : { cwd : string ; actionId ?: string | undefined } ,
425+ ) =>
426+ Effect . gen ( function * ( ) {
427+ const handles = Array . from ( activeGitRequests . get ( ws ) ?? [ ] ) ;
428+ const handle =
429+ input . actionId != null
430+ ? handles . find (
431+ ( candidate ) => candidate . cwd === input . cwd && candidate . actionId === input . actionId ,
432+ )
433+ : handles . find ( ( candidate ) => candidate . cwd === input . cwd ) ;
434+
435+ if ( ! handle ) {
436+ return ;
437+ }
438+
439+ yield * Fiber . interrupt ( handle . fiber ) ;
440+ } ) ;
441+
442+ const runTrackedGitRequest = < A , E > (
443+ ws : WebSocket ,
444+ meta : { kind : ActiveGitRequestKind ; cwd : string ; actionId ?: string | undefined } ,
445+ effect : Effect . Effect < A , E , never > ,
446+ interruptedMessage : string ,
447+ ) : Effect . Effect < A , E | GitActionStoppedError > =>
448+ Effect . gen ( function * ( ) {
449+ const fiber = yield * Effect . forkScoped ( effect ) ;
450+ const handle : ActiveGitRequestHandle = {
451+ kind : meta . kind ,
452+ cwd : meta . cwd ,
453+ actionId : meta . actionId ?? null ,
454+ fiber,
455+ } ;
456+ yield * registerActiveGitRequest ( ws , handle ) ;
457+ const exit = yield * Fiber . await ( fiber ) . pipe (
458+ Effect . ensuring ( unregisterActiveGitRequest ( ws , handle ) ) ,
459+ ) ;
460+ if ( Exit . isSuccess ( exit ) ) {
461+ return exit . value ;
462+ }
463+ if ( Cause . hasInterruptsOnly ( exit . cause ) ) {
464+ return yield * new GitActionStoppedError ( { message : interruptedMessage } ) ;
465+ }
466+ return yield * Effect . failCause ( exit . cause as Cause . Cause < E > ) ;
467+ } ) as Effect . Effect < A , E | GitActionStoppedError , never > ;
377468
378469 function logOutgoingPush ( push : WsPushEnvelopeBase , recipients : number ) {
379470 if ( ! logWebSocketEvents ) return ;
@@ -1117,24 +1208,40 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
11171208 const body = stripRequestTag ( request . body ) ;
11181209 const snapshot = yield * projectionReadModelQuery . getSnapshot ( ) ;
11191210 const gitEnv = yield * resolveRuntimeEnvironment ( { cwd : body . cwd , readModel : snapshot } ) ;
1120- return yield * git
1121- . syncCurrentBranch ( body . cwd )
1122- . pipe ( Effect . provideService ( RuntimeEnv , gitEnv ) ) ;
1211+ return yield * runTrackedGitRequest (
1212+ ws ,
1213+ { kind : "pull" , cwd : body . cwd } ,
1214+ git . syncCurrentBranch ( body . cwd ) . pipe ( Effect . provideService ( RuntimeEnv , gitEnv ) ) ,
1215+ "Git pull stopped." ,
1216+ ) ;
1217+ }
1218+
1219+ case WS_METHODS . gitStopAction : {
1220+ const body = stripRequestTag ( request . body ) ;
1221+ yield * stopActiveGitRequest ( ws , body ) ;
1222+ return { } ;
11231223 }
11241224
11251225 case WS_METHODS . gitRunStackedAction : {
11261226 const body = stripRequestTag ( request . body ) ;
11271227 const snapshot = yield * projectionReadModelQuery . getSnapshot ( ) ;
11281228 const gitEnv = yield * resolveRuntimeEnvironment ( { cwd : body . cwd , readModel : snapshot } ) ;
1129- return yield * gitManager
1130- . runStackedAction ( body , {
1131- actionId : body . actionId ,
1132- progressReporter : {
1133- publish : ( event ) =>
1134- pushBus . publishClient ( ws , WS_CHANNELS . gitActionProgress , event ) . pipe ( Effect . asVoid ) ,
1135- } ,
1136- } )
1137- . pipe ( Effect . provideService ( RuntimeEnv , gitEnv ) ) ;
1229+ return yield * runTrackedGitRequest (
1230+ ws ,
1231+ { kind : "stacked_action" , cwd : body . cwd , actionId : body . actionId } ,
1232+ gitManager
1233+ . runStackedAction ( body , {
1234+ actionId : body . actionId ,
1235+ progressReporter : {
1236+ publish : ( event ) =>
1237+ pushBus
1238+ . publishClient ( ws , WS_CHANNELS . gitActionProgress , event )
1239+ . pipe ( Effect . asVoid ) ,
1240+ } ,
1241+ } )
1242+ . pipe ( Effect . provideService ( RuntimeEnv , gitEnv ) ) ,
1243+ "Git action stopped." ,
1244+ ) ;
11381245 }
11391246
11401247 case WS_METHODS . gitResolvePullRequest : {
@@ -1702,6 +1809,17 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
17021809 } ;
17031810 }
17041811
1812+ if (
1813+ ( request . body . _tag === WS_METHODS . gitRunStackedAction ||
1814+ request . body . _tag === WS_METHODS . gitPull ) &&
1815+ Schema . is ( GitActionStoppedError ) ( squashed )
1816+ ) {
1817+ return {
1818+ message : redactSensitiveText ( squashed . message ) ,
1819+ code : "git_action_stopped" ,
1820+ } ;
1821+ }
1822+
17051823 if ( squashed instanceof Error ) {
17061824 return { message : redactSensitiveText ( squashed . message ) } ;
17071825 }
@@ -1798,19 +1916,25 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
17981916
17991917 ws . on ( "close" , ( ) => {
18001918 void runPromise (
1801- Ref . update ( clients , ( clients ) => {
1802- clients . delete ( ws ) ;
1803- return clients ;
1804- } ) ,
1919+ Effect . all ( [
1920+ interruptActiveGitRequests ( ws ) ,
1921+ Ref . update ( clients , ( clients ) => {
1922+ clients . delete ( ws ) ;
1923+ return clients ;
1924+ } ) ,
1925+ ] ) . pipe ( Effect . asVoid ) ,
18051926 ) ;
18061927 } ) ;
18071928
18081929 ws . on ( "error" , ( ) => {
18091930 void runPromise (
1810- Ref . update ( clients , ( clients ) => {
1811- clients . delete ( ws ) ;
1812- return clients ;
1813- } ) ,
1931+ Effect . all ( [
1932+ interruptActiveGitRequests ( ws ) ,
1933+ Ref . update ( clients , ( clients ) => {
1934+ clients . delete ( ws ) ;
1935+ return clients ;
1936+ } ) ,
1937+ ] ) . pipe ( Effect . asVoid ) ,
18141938 ) ;
18151939 } ) ;
18161940 } ) ;
0 commit comments