@@ -433,6 +433,284 @@ describe('rclnodejs/web — HTTP transport (call + publish)', function () {
433433 } ) ;
434434} ) ;
435435
436+ // ===================================================================
437+ // SSE subscribe + CORS — opt-in HTTP behaviour (sse: true, cors: …).
438+ // ===================================================================
439+ describe ( 'rclnodejs/web — HTTP transport (SSE subscribe + CORS)' , function ( ) {
440+ this . timeout ( 60 * 1000 ) ;
441+
442+ let node ;
443+ let runtime ;
444+ let base ; // "http://127.0.0.1:<port>"
445+ let serviceImpl ;
446+
447+ before ( async function ( ) {
448+ await rclnodejs . init ( ) ;
449+ node = rclnodejs . createNode ( 'http_sse_test_node' ) ;
450+ rclnodejs . spin ( node ) ;
451+
452+ serviceImpl = node . createService (
453+ 'example_interfaces/srv/AddTwoInts' ,
454+ '/sse_add' ,
455+ ( request , response ) => {
456+ const reply = response . template ;
457+ reply . sum = request . a + request . b ;
458+ response . send ( reply ) ;
459+ }
460+ ) ;
461+
462+ runtime = createRuntime ( {
463+ node,
464+ transport : new HttpTransport ( {
465+ port : 0 ,
466+ host : '127.0.0.1' ,
467+ sse : true ,
468+ cors : true ,
469+ } ) ,
470+ } ) ;
471+ runtime . expose ( {
472+ call : { '/sse_add' : 'example_interfaces/srv/AddTwoInts' } ,
473+ publish : { '/sse_chatter' : 'std_msgs/msg/String' } ,
474+ subscribe : { '/sse_chatter' : 'std_msgs/msg/String' } ,
475+ } ) ;
476+ await runtime . start ( ) ;
477+ base = `http://127.0.0.1:${ runtime . transports [ 0 ] . port } ` ;
478+ } ) ;
479+
480+ after ( async function ( ) {
481+ if ( runtime ) await runtime . stop ( ) ;
482+ if ( node && serviceImpl ) {
483+ try {
484+ node . destroyService ( serviceImpl ) ;
485+ } catch ( _ ) { }
486+ }
487+ rclnodejs . shutdown ( ) ;
488+ } ) ;
489+
490+ describe ( 'SSE subscribe' , function ( ) {
491+ it ( 'GET subscribe streams a ready event then message events' , async function ( ) {
492+ const ac = new AbortController ( ) ;
493+ const res = await fetch ( base + '/capability/subscribe/sse_chatter' , {
494+ headers : { accept : 'text/event-stream' } ,
495+ signal : ac . signal ,
496+ } ) ;
497+ assert . strictEqual ( res . status , 200 ) ;
498+ assert . match ( res . headers . get ( 'content-type' ) || '' , / t e x t \/ e v e n t - s t r e a m / ) ;
499+ const sse = sseReader ( res ) ;
500+ try {
501+ // The runtime emits `ready` once the dispatcher acknowledges the
502+ // subscribe (headers are deferred until then).
503+ const ready = await sse . next ( ( e ) => e . event === 'ready' ) ;
504+ assert . strictEqual ( JSON . parse ( ready . data ) . capability , '/sse_chatter' ) ;
505+
506+ // Publish over HTTP until a delivery streams back. (Subscriptions
507+ // only see samples published after they're established, so we
508+ // retry on an interval to avoid a startup race.)
509+ const pump = setInterval ( ( ) => {
510+ fetch ( base + '/capability/publish/sse_chatter' , {
511+ method : 'POST' ,
512+ headers : { 'content-type' : 'application/json' } ,
513+ body : JSON . stringify ( { data : 'sse-hello' } ) ,
514+ } ) . catch ( ( ) => { } ) ;
515+ } , 100 ) ;
516+ try {
517+ const msg = await sse . next ( ( e ) => e . event === 'message' ) ;
518+ assert . strictEqual ( JSON . parse ( msg . data ) . data , 'sse-hello' ) ;
519+ } finally {
520+ clearInterval ( pump ) ;
521+ }
522+ } finally {
523+ await sse . cancel ( ) ;
524+ ac . abort ( ) ;
525+ }
526+ } ) ;
527+
528+ it ( 'rejects a non-GET subscribe with 405 + allow: GET' , async function ( ) {
529+ const res = await fetch ( base + '/capability/subscribe/sse_chatter' , {
530+ method : 'POST' ,
531+ headers : { 'content-type' : 'application/json' } ,
532+ body : '{}' ,
533+ } ) ;
534+ assert . strictEqual ( res . status , 405 ) ;
535+ assert . strictEqual ( res . headers . get ( 'allow' ) , 'GET' ) ;
536+ const body = await res . json ( ) ;
537+ assert . strictEqual ( body . code , 'method_not_allowed' ) ;
538+ } ) ;
539+
540+ it ( 'rejects an unexposed SSE subscribe as a JSON error (not a stream)' , async function ( ) {
541+ // The 200 text/event-stream headers are deferred until the
542+ // dispatcher acks, so a rejected subscribe surfaces as a normal
543+ // JSON error with the mapped status — never a half-open stream.
544+ const res = await fetch ( base + '/capability/subscribe/nope' , {
545+ headers : { accept : 'text/event-stream' } ,
546+ } ) ;
547+ assert . strictEqual ( res . status , 404 ) ;
548+ assert . match ( res . headers . get ( 'content-type' ) || '' , / a p p l i c a t i o n \/ j s o n / ) ;
549+ const body = await res . json ( ) ;
550+ assert . strictEqual ( body . code , 'not_exposed' ) ;
551+ } ) ;
552+ } ) ;
553+
554+ describe ( 'CORS' , function ( ) {
555+ it ( 'answers OPTIONS preflight on capability routes with 204 + headers' , async function ( ) {
556+ const res = await fetch ( base + '/capability/call/sse_add' , {
557+ method : 'OPTIONS' ,
558+ headers : {
559+ origin : 'http://localhost:8080' ,
560+ 'access-control-request-method' : 'POST' ,
561+ 'access-control-request-headers' : 'content-type, authorization' ,
562+ } ,
563+ } ) ;
564+ assert . strictEqual ( res . status , 204 ) ;
565+ assert . strictEqual ( res . headers . get ( 'access-control-allow-origin' ) , '*' ) ;
566+ assert . match (
567+ res . headers . get ( 'access-control-allow-methods' ) || '' ,
568+ / P O S T /
569+ ) ;
570+ // Reflects the browser's requested headers so Authorization / custom
571+ // headers pass preflight rather than being limited to content-type.
572+ assert . strictEqual (
573+ res . headers . get ( 'access-control-allow-headers' ) ,
574+ 'content-type, authorization'
575+ ) ;
576+ } ) ;
577+
578+ it ( 'falls back to content-type when no request-headers hint is sent' , async function ( ) {
579+ const res = await fetch ( base + '/capability/call/sse_add' , {
580+ method : 'OPTIONS' ,
581+ headers : {
582+ origin : 'http://localhost:8080' ,
583+ 'access-control-request-method' : 'POST' ,
584+ } ,
585+ } ) ;
586+ assert . strictEqual ( res . status , 204 ) ;
587+ assert . strictEqual (
588+ res . headers . get ( 'access-control-allow-headers' ) ,
589+ 'content-type'
590+ ) ;
591+ } ) ;
592+
593+ it ( 'adds CORS headers to actual (non-preflight) responses' , async function ( ) {
594+ const res = await fetch ( base + '/capability/call/sse_add' , {
595+ method : 'POST' ,
596+ headers : {
597+ 'content-type' : 'application/json' ,
598+ origin : 'http://localhost:8080' ,
599+ } ,
600+ body : JSON . stringify ( { a : '1n' , b : '2n' } ) ,
601+ } ) ;
602+ assert . strictEqual ( res . status , 200 ) ;
603+ assert . strictEqual ( res . headers . get ( 'access-control-allow-origin' ) , '*' ) ;
604+ assert . strictEqual ( ( await res . json ( ) ) . sum , '3n' ) ;
605+ } ) ;
606+
607+ it ( 'does not answer OPTIONS preflight for non-capability paths' , async function ( ) {
608+ const res = await fetch ( base + '/not-a-capability' , {
609+ method : 'OPTIONS' ,
610+ headers : { origin : 'http://localhost:8080' } ,
611+ } ) ;
612+ assert . strictEqual ( res . status , 404 ) ;
613+ } ) ;
614+ } ) ;
615+
616+ describe ( 'CORS allow-list' , function ( ) {
617+ let listRuntime ;
618+ let listBase ;
619+
620+ before ( async function ( ) {
621+ listRuntime = createRuntime ( {
622+ node,
623+ transport : new HttpTransport ( {
624+ port : 0 ,
625+ host : '127.0.0.1' ,
626+ cors : [ 'http://allowed.example' ] ,
627+ } ) ,
628+ } ) ;
629+ listRuntime . expose ( {
630+ call : { '/sse_add' : 'example_interfaces/srv/AddTwoInts' } ,
631+ } ) ;
632+ await listRuntime . start ( ) ;
633+ listBase = `http://127.0.0.1:${ listRuntime . transports [ 0 ] . port } ` ;
634+ } ) ;
635+
636+ after ( async function ( ) {
637+ if ( listRuntime ) await listRuntime . stop ( ) ;
638+ } ) ;
639+
640+ it ( 'echoes an allowed origin and sets Vary: Origin' , async function ( ) {
641+ const res = await fetch ( listBase + '/capability/call/sse_add' , {
642+ method : 'POST' ,
643+ headers : {
644+ 'content-type' : 'application/json' ,
645+ origin : 'http://allowed.example' ,
646+ } ,
647+ body : JSON . stringify ( { a : '2n' , b : '3n' } ) ,
648+ } ) ;
649+ assert . strictEqual ( res . status , 200 ) ;
650+ assert . strictEqual (
651+ res . headers . get ( 'access-control-allow-origin' ) ,
652+ 'http://allowed.example'
653+ ) ;
654+ assert . match ( res . headers . get ( 'vary' ) || '' , / O r i g i n / i) ;
655+ } ) ;
656+
657+ it ( 'omits allow-origin for an origin not on the list' , async function ( ) {
658+ const res = await fetch ( listBase + '/capability/call/sse_add' , {
659+ method : 'POST' ,
660+ headers : {
661+ 'content-type' : 'application/json' ,
662+ origin : 'http://evil.example' ,
663+ } ,
664+ body : JSON . stringify ( { a : '2n' , b : '3n' } ) ,
665+ } ) ;
666+ assert . strictEqual ( res . status , 200 ) ;
667+ assert . strictEqual ( res . headers . get ( 'access-control-allow-origin' ) , null ) ;
668+ } ) ;
669+ } ) ;
670+ } ) ;
671+
672+ // Minimal Server-Sent Events reader over a fetch() response body. Parses
673+ // `event:`/`data:` lines into `{event, data}` records and exposes a
674+ // `next(predicate)` that resolves with the first matching record. Keep-
675+ // alive comment lines (`:`-prefixed) are ignored.
676+ function sseReader ( res ) {
677+ const reader = res . body . getReader ( ) ;
678+ const decoder = new TextDecoder ( ) ;
679+ const state = { buffer : '' , event : 'message' , data : '' } ;
680+
681+ async function next ( predicate , timeoutMs = 8000 ) {
682+ const deadline = Date . now ( ) + timeoutMs ;
683+ for ( ; ; ) {
684+ let nl ;
685+ while ( ( nl = state . buffer . indexOf ( '\n' ) ) >= 0 ) {
686+ const line = state . buffer . slice ( 0 , nl ) . replace ( / \r $ / , '' ) ;
687+ state . buffer = state . buffer . slice ( nl + 1 ) ;
688+ if ( line === '' ) {
689+ const ev = { event : state . event , data : state . data } ;
690+ state . event = 'message' ;
691+ state . data = '' ;
692+ if ( ev . data !== '' && predicate ( ev ) ) return ev ;
693+ continue ;
694+ }
695+ if ( line . startsWith ( ':' ) ) continue ; // keep-alive comment
696+ const ci = line . indexOf ( ':' ) ;
697+ const field = ci === - 1 ? line : line . slice ( 0 , ci ) ;
698+ const val = ci === - 1 ? '' : line . slice ( ci + 1 ) . replace ( / ^ / , '' ) ;
699+ if ( field === 'event' ) state . event = val ;
700+ else if ( field === 'data' ) state . data += val ;
701+ }
702+ if ( Date . now ( ) > deadline ) {
703+ throw new Error ( 'sseReader: timed out waiting for event' ) ;
704+ }
705+ const { value, done } = await reader . read ( ) ;
706+ if ( done ) throw new Error ( 'sseReader: stream ended early' ) ;
707+ state . buffer += decoder . decode ( value , { stream : true } ) ;
708+ }
709+ }
710+
711+ return { next, cancel : ( ) => reader . cancel ( ) . catch ( ( ) => { } ) } ;
712+ }
713+
436714function waitFor ( predicate , timeoutMs ) {
437715 const started = Date . now ( ) ;
438716 return new Promise ( ( resolve , reject ) => {
0 commit comments