@@ -203,4 +203,122 @@ describe('ConsumerManagerService', () => {
203203 expect ( loggerServiceMock . error ) . toHaveBeenCalledWith ( 'Error while consuming message' , internalError ) ;
204204 expect ( errorHelpers . sendError ) . toHaveBeenCalledWith ( messagePeerService , { reason : 'internal_error' , source : message . payload } ) ;
205205 } ) ;
206+
207+ describe ( 'semver-compatible dispatch' , ( ) => {
208+ it ( 'should fall back to the highest declared minor when the incoming minor is newer' , async ( ) => {
209+ const handlerV10 = jest . fn ( ) ;
210+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerV10 } } ;
211+ service . register ( consumer ) ;
212+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '1.1' } , from : 'a' , to : [ ] } ;
213+ messagesSubjectMock . next ( message ) ;
214+ await jest . runAllTimersAsync ( ) ;
215+ expect ( handlerV10 ) . toHaveBeenCalledWith ( message ) ;
216+ expect ( loggerServiceMock . warn ) . not . toHaveBeenCalled ( ) ;
217+ } ) ;
218+
219+ it ( 'should prefer the exact minor when declared' , async ( ) => {
220+ const handlerV10 = jest . fn ( ) ;
221+ const handlerV11 = jest . fn ( ) ;
222+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerV10 , 1.1 : handlerV11 } } ;
223+ service . register ( consumer ) ;
224+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '1.1' } , from : 'a' , to : [ ] } ;
225+ messagesSubjectMock . next ( message ) ;
226+ await jest . runAllTimersAsync ( ) ;
227+ expect ( handlerV11 ) . toHaveBeenCalledWith ( message ) ;
228+ expect ( handlerV10 ) . not . toHaveBeenCalled ( ) ;
229+ } ) ;
230+
231+ it ( 'should pick the highest compatible minor lower than or equal to the incoming minor' , async ( ) => {
232+ const handlerV10 = jest . fn ( ) ;
233+ const handlerV12 = jest . fn ( ) ;
234+ const handlerV15 = jest . fn ( ) ;
235+ const consumer : BasicMessageConsumer = {
236+ type : 'base_message' ,
237+ supportedVersions : { '1.0' : handlerV10 , 1.2 : handlerV12 , 1.5 : handlerV15 }
238+ } ;
239+ service . register ( consumer ) ;
240+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '1.3' } , from : 'a' , to : [ ] } ;
241+ messagesSubjectMock . next ( message ) ;
242+ await jest . runAllTimersAsync ( ) ;
243+ expect ( handlerV12 ) . toHaveBeenCalledWith ( message ) ;
244+ expect ( handlerV10 ) . not . toHaveBeenCalled ( ) ;
245+ expect ( handlerV15 ) . not . toHaveBeenCalled ( ) ;
246+ } ) ;
247+
248+ it ( 'should not cross major version boundaries when the consumer only declares an older major' , async ( ) => {
249+ const handlerV10 = jest . fn ( ) ;
250+ const handlerV11 = jest . fn ( ) ;
251+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerV10 , 1.1 : handlerV11 } } ;
252+ service . register ( consumer ) ;
253+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '2.0' } , from : 'a' , to : [ ] } ;
254+ messagesSubjectMock . next ( message ) ;
255+ await jest . runAllTimersAsync ( ) ;
256+ expect ( handlerV10 ) . not . toHaveBeenCalled ( ) ;
257+ expect ( handlerV11 ) . not . toHaveBeenCalled ( ) ;
258+ expect ( loggerServiceMock . warn ) . toHaveBeenCalledWith ( 'No consumer found for message version: 2.0' ) ;
259+ expect ( errorHelpers . sendError ) . toHaveBeenCalledWith ( messagePeerService , { reason : 'version_mismatch' , source : message . payload } ) ;
260+ } ) ;
261+
262+ it ( 'should not match a consumer that only declares a newer major than the incoming message' , async ( ) => {
263+ const handlerV20 = jest . fn ( ) ;
264+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '2.0' : handlerV20 } } ;
265+ service . register ( consumer ) ;
266+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '1.5' } , from : 'a' , to : [ ] } ;
267+ messagesSubjectMock . next ( message ) ;
268+ await jest . runAllTimersAsync ( ) ;
269+ expect ( handlerV20 ) . not . toHaveBeenCalled ( ) ;
270+ expect ( loggerServiceMock . warn ) . toHaveBeenCalledWith ( 'No consumer found for message version: 1.5' ) ;
271+ expect ( errorHelpers . sendError ) . toHaveBeenCalledWith ( messagePeerService , { reason : 'version_mismatch' , source : message . payload } ) ;
272+ } ) ;
273+
274+ it ( 'should dispatch within the matching major when the consumer declares multiple majors' , async ( ) => {
275+ const handlerV10 = jest . fn ( ) ;
276+ const handlerV20 = jest . fn ( ) ;
277+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerV10 , '2.0' : handlerV20 } } ;
278+ service . register ( consumer ) ;
279+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '2.3' } , from : 'a' , to : [ ] } ;
280+ messagesSubjectMock . next ( message ) ;
281+ await jest . runAllTimersAsync ( ) ;
282+ expect ( handlerV20 ) . toHaveBeenCalledWith ( message ) ;
283+ expect ( handlerV10 ) . not . toHaveBeenCalled ( ) ;
284+ } ) ;
285+
286+ it ( 'should not match a consumer declaring only a higher minor within the same major' , async ( ) => {
287+ const handlerV11 = jest . fn ( ) ;
288+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { 1.1 : handlerV11 } } ;
289+ service . register ( consumer ) ;
290+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '1.0' } , from : 'a' , to : [ ] } ;
291+ messagesSubjectMock . next ( message ) ;
292+ await jest . runAllTimersAsync ( ) ;
293+ expect ( handlerV11 ) . not . toHaveBeenCalled ( ) ;
294+ expect ( errorHelpers . sendError ) . toHaveBeenCalledWith ( messagePeerService , { reason : 'version_mismatch' , source : message . payload } ) ;
295+ } ) ;
296+
297+ it ( 'should reject malformed incoming version strings' , async ( ) => {
298+ const handlerV10 = jest . fn ( ) ;
299+ const consumer : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerV10 } } ;
300+ service . register ( consumer ) ;
301+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : 'not-a-version' } , from : 'a' , to : [ ] } ;
302+ messagesSubjectMock . next ( message ) ;
303+ await jest . runAllTimersAsync ( ) ;
304+ expect ( handlerV10 ) . not . toHaveBeenCalled ( ) ;
305+ expect ( errorHelpers . sendError ) . toHaveBeenCalledWith ( messagePeerService , { reason : 'version_mismatch' , source : message . payload } ) ;
306+ } ) ;
307+
308+ it ( 'should resolve each consumer to its own best-matching minor when multiple consumers share a type' , async ( ) => {
309+ const handlerAV10 = jest . fn ( ) ;
310+ const handlerBV10 = jest . fn ( ) ;
311+ const handlerBV11 = jest . fn ( ) ;
312+ const consumerA : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerAV10 } } ;
313+ const consumerB : BasicMessageConsumer = { type : 'base_message' , supportedVersions : { '1.0' : handlerBV10 , 1.1 : handlerBV11 } } ;
314+ service . register ( consumerA ) ;
315+ service . register ( consumerB ) ;
316+ const message : RoutedMessage < VersionedMessage > = { payload : { type : 'base_message' , version : '1.1' } , from : 'a' , to : [ ] } ;
317+ messagesSubjectMock . next ( message ) ;
318+ await jest . runAllTimersAsync ( ) ;
319+ expect ( handlerAV10 ) . toHaveBeenCalledWith ( message ) ;
320+ expect ( handlerBV11 ) . toHaveBeenCalledWith ( message ) ;
321+ expect ( handlerBV10 ) . not . toHaveBeenCalled ( ) ;
322+ } ) ;
323+ } ) ;
206324} ) ;
0 commit comments