@@ -257,6 +257,32 @@ def test_29bits_normal_fixed(self):
257257 self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Physical ), rxid_physical )
258258 self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Functional ), rxid_functional )
259259
260+ def test_29bits_normal_fixed_custom_id (self ):
261+ ta = 0x55
262+ sa = 0xAA
263+ rxid_physical = 0x1F40AA55
264+ rxid_functional = 0x1F41AA55
265+ txid_physical = 0x1F4055AA
266+ txid_functional = 0x1F4155AA
267+
268+ p_id = 0x1F400000
269+ f_id = 0x1F410000
270+
271+ address = isotp .Address (isotp .AddressingMode .NormalFixed_29bits , target_address = ta , source_address = sa , physical_id = p_id , functional_id = f_id )
272+
273+ self .assertTrue (address .is_for_me (Message (rxid_physical , extended_id = True )))
274+ self .assertTrue (address .is_for_me (Message (rxid_functional , extended_id = True )))
275+ self .assertFalse (address .is_for_me (Message (txid_physical , extended_id = True )))
276+ self .assertFalse (address .is_for_me (Message (txid_functional , extended_id = True )))
277+ self .assertFalse (address .is_for_me (Message (arbitration_id = (rxid_physical ) & 0x7FF , extended_id = False )))
278+ self .assertFalse (address .is_for_me (Message (arbitration_id = rxid_physical + 1 , extended_id = True )))
279+ self .assertFalse (address .is_for_me (Message (arbitration_id = (rxid_physical + 1 )& 0x7FF , extended_id = False )))
280+
281+ self .assertEqual (address .get_tx_arbitraton_id (isotp .TargetAddressType .Physical ), txid_physical )
282+ self .assertEqual (address .get_tx_arbitraton_id (isotp .TargetAddressType .Functional ), txid_functional )
283+ self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Physical ), rxid_physical )
284+ self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Functional ), rxid_functional )
285+
260286 def test_29bits_normal_fixed_through_layer (self ):
261287 functional = isotp .TargetAddressType .Functional
262288 physical = isotp .TargetAddressType .Physical
@@ -334,6 +360,86 @@ def test_29bits_normal_fixed_through_layer(self):
334360 self .assertEqual (msg .data , bytearray ([0x21 , 0x0A , 0x0B ]))
335361 self .assertTrue (msg .is_extended_id )
336362
363+ def test_29bits_normal_fixed_custom_id_through_layer (self ):
364+ functional = isotp .TargetAddressType .Functional
365+ physical = isotp .TargetAddressType .Physical
366+ ta = 0x55
367+ sa = 0xAA
368+ rxid_physical = 0x1F40AA55
369+ rxid_functional = 0x1F41AA55
370+ txid_physical = 0x1F4055AA
371+ txid_functional = 0x1F4155AA
372+
373+ p_id = 0x1F400000
374+ f_id = 0x1F410000
375+
376+ address = isotp .Address (isotp .AddressingMode .NormalFixed_29bits , target_address = ta , source_address = sa , physical_id = p_id , functional_id = f_id )
377+ layer = isotp .TransportLayer (txfn = self .stack_txfn , rxfn = self .stack_rxfn , address = address , params = {'stmin' :0 , 'blocksize' :0 })
378+
379+ # Receive Single frame - Physical
380+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = bytearray ([0x03 , 0x01 , 0x02 , 0x03 ]), extended_id = True ))
381+ layer .process ()
382+ frame = layer .recv ()
383+ self .assertIsNotNone (frame )
384+ self .assertEqual (frame , b'\x01 \x02 \x03 ' )
385+
386+ # Receive Single frame - Functional
387+ layer .reset ()
388+ self .simulate_rx_msg (Message (arbitration_id = rxid_functional , data = bytearray ([0x03 , 0x01 , 0x02 , 0x03 ]), extended_id = True ))
389+ layer .process ()
390+ frame = layer .recv ()
391+ self .assertIsNotNone (frame )
392+ self .assertEqual (frame , b'\x01 \x02 \x03 ' )
393+
394+ # Receive multiframe - Physical
395+ layer .reset ()
396+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = bytearray ([0x10 , 0x08 , 0x01 , 0x02 , 0x03 , 0x04 , 0x05 , 0x06 ]), extended_id = True ))
397+ layer .process ()
398+ self .assert_sent_flow_control (stmin = 0 , blocksize = 0 )
399+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = bytearray ([0x21 , 0x07 , 0x08 ]), extended_id = True ))
400+ layer .process ()
401+ frame = layer .recv ()
402+ self .assertIsNotNone (frame )
403+ self .assertEqual (frame , b'\x01 \x02 \x03 \x04 \x05 \x06 \x07 \x08 ' )
404+
405+ #Transmit single frame - Physical
406+ layer .reset ()
407+ layer .send (b'\x04 \x05 \x06 ' , physical )
408+ layer .process ()
409+ msg = self .get_tx_can_msg ()
410+ self .assertIsNotNone (msg )
411+ self .assertEqual (msg .arbitration_id , txid_physical )
412+ self .assertEqual (msg .data , bytearray ([0x03 , 0x04 , 0x05 , 0x06 ]))
413+ self .assertTrue (msg .is_extended_id )
414+
415+ #Transmit single frame - Functional
416+ layer .reset ()
417+ layer .send (b'\x04 \x05 \x06 ' , functional )
418+ layer .process ()
419+ msg = self .get_tx_can_msg ()
420+ self .assertIsNotNone (msg )
421+ self .assertEqual (msg .arbitration_id , txid_functional )
422+ self .assertEqual (msg .data , bytearray ([0x03 , 0x04 , 0x05 , 0x06 ]))
423+ self .assertTrue (msg .is_extended_id )
424+
425+ # Transmit multiframe - Physical
426+ layer .reset ()
427+ layer .send (b'\x04 \x05 \x06 \x07 \x08 \x09 \x0A \x0B ' , physical )
428+ layer .process ()
429+ msg = self .get_tx_can_msg ()
430+ self .assertIsNotNone (msg )
431+ self .assertEqual (msg .arbitration_id , txid_physical )
432+ self .assertEqual (msg .data , bytearray ([0x10 , 0x08 , 0x04 , 0x05 , 0x06 , 0x07 , 0x08 , 0x09 ]))
433+ self .assertTrue (msg .is_extended_id )
434+
435+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = self .make_flow_control_data (flow_status = 0 , stmin = 0 , blocksize = 0 ), extended_id = True ))
436+ layer .process ()
437+ msg = self .get_tx_can_msg ()
438+ self .assertIsNotNone (msg )
439+ self .assertEqual (msg .arbitration_id , txid_physical )
440+ self .assertEqual (msg .data , bytearray ([0x21 , 0x0A , 0x0B ]))
441+ self .assertTrue (msg .is_extended_id )
442+
337443 def test_11bits_extended (self ):
338444 txid = 0x123
339445 rxid = 0x456
@@ -658,6 +764,37 @@ def test_29bits_mixed(self):
658764 self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Physical ), rxid_physical )
659765 self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Functional ), rxid_functional )
660766
767+ def test_29bits_mixed_custom_id (self ):
768+ ta = 0x55
769+ sa = 0xAA
770+ ae = 0x99
771+ rxid_physical = 0x1F4EAA55
772+ rxid_functional = 0x1F4DAA55
773+ txid_physical = 0x1F4E55AA
774+ txid_functional = 0x1F4D55AA
775+
776+ p_id = 0x1F4E0000
777+ f_id = 0x1F4D0000
778+
779+ address = isotp .Address (isotp .AddressingMode .Mixed_29bits , source_address = sa , target_address = ta , address_extension = ae , physical_id = p_id , functional_id = f_id )
780+
781+ self .assertFalse (address .is_for_me (Message (rxid_physical , extended_id = True ))) # No data
782+ self .assertFalse (address .is_for_me (Message (rxid_functional , extended_id = True ))) # No data
783+ self .assertFalse (address .is_for_me (Message (txid_physical , extended_id = True ))) # No data
784+ self .assertFalse (address .is_for_me (Message (txid_functional , extended_id = True ))) # No data
785+
786+ self .assertTrue (address .is_for_me (Message (rxid_physical , data = bytearray ([ae ]), extended_id = True )))
787+ self .assertFalse (address .is_for_me (Message (rxid_physical , data = bytearray ([ae ]), extended_id = False )))
788+ self .assertTrue (address .is_for_me (Message (rxid_functional , data = bytearray ([ae ]), extended_id = True )))
789+ self .assertFalse (address .is_for_me (Message (rxid_functional , data = bytearray ([ae ]), extended_id = False )))
790+ self .assertFalse (address .is_for_me (Message (txid_physical , data = bytearray ([ae ]), extended_id = True )))
791+ self .assertFalse (address .is_for_me (Message (txid_functional , data = bytearray ([ae ]), extended_id = True )))
792+
793+ self .assertEqual (address .get_tx_arbitraton_id (isotp .TargetAddressType .Physical ), txid_physical )
794+ self .assertEqual (address .get_tx_arbitraton_id (isotp .TargetAddressType .Functional ), txid_functional )
795+ self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Physical ), rxid_physical )
796+ self .assertEqual (address .get_rx_arbitraton_id (isotp .TargetAddressType .Functional ), rxid_functional )
797+
661798 def test_29bits_mixed_through_layer (self ):
662799 functional = isotp .TargetAddressType .Functional
663800 physical = isotp .TargetAddressType .Physical
@@ -735,3 +872,84 @@ def test_29bits_mixed_through_layer(self):
735872 self .assertEqual (msg .arbitration_id , txid_physical )
736873 self .assertEqual (msg .data , bytearray ([ae , 0x21 , 0x09 , 0x0A , 0x0B ]))
737874 self .assertTrue (msg .is_extended_id )
875+
876+ def test_29bits_mixed_custom_id_through_layer (self ):
877+ functional = isotp .TargetAddressType .Functional
878+ physical = isotp .TargetAddressType .Physical
879+ ta = 0x55
880+ sa = 0xAA
881+ ae = 0x99
882+ rxid_physical = 0x1F4EAA55
883+ rxid_functional = 0x1F4DAA55
884+ txid_physical = 0x1F4E55AA
885+ txid_functional = 0x1F4D55AA
886+
887+ p_id = 0x1F4E0000
888+ f_id = 0x1F4D0000
889+
890+ address = isotp .Address (isotp .AddressingMode .Mixed_29bits , source_address = sa , target_address = ta , address_extension = ae , physical_id = p_id , functional_id = f_id )
891+ layer = isotp .TransportLayer (txfn = self .stack_txfn , rxfn = self .stack_rxfn , address = address , params = {'stmin' :0 , 'blocksize' :0 })
892+
893+ # Receive Single frame - Physical
894+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = bytearray ([ae , 0x03 , 0x01 , 0x02 , 0x03 ]), extended_id = True ))
895+ layer .process ()
896+ frame = layer .recv ()
897+ self .assertIsNotNone (frame )
898+ self .assertEqual (frame , b'\x01 \x02 \x03 ' )
899+
900+ # Receive Single frame - Functional
901+ layer .reset ()
902+ self .simulate_rx_msg (Message (arbitration_id = rxid_functional , data = bytearray ([ae , 0x03 , 0x01 , 0x02 , 0x03 ]), extended_id = True ))
903+ layer .process ()
904+ frame = layer .recv ()
905+ self .assertIsNotNone (frame )
906+ self .assertEqual (frame , b'\x01 \x02 \x03 ' )
907+
908+ # Receive multiframe - Physical
909+ layer .reset ()
910+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = bytearray ([ae , 0x10 , 0x08 , 0x01 , 0x02 , 0x03 , 0x04 , 0x05 ]), extended_id = True ))
911+ layer .process ()
912+ self .assert_sent_flow_control (prefix = [ae ], stmin = 0 , blocksize = 0 )
913+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = bytearray ([ae , 0x21 , 0x06 , 0x07 , 0x08 ]), extended_id = True ))
914+ layer .process ()
915+ frame = layer .recv ()
916+ self .assertIsNotNone (frame )
917+ self .assertEqual (frame , b'\x01 \x02 \x03 \x04 \x05 \x06 \x07 \x08 ' )
918+
919+ #Transmit single frame - Physical
920+ layer .reset ()
921+ layer .send (b'\x04 \x05 \x06 ' , physical )
922+ layer .process ()
923+ msg = self .get_tx_can_msg ()
924+ self .assertIsNotNone (msg )
925+ self .assertEqual (msg .arbitration_id , txid_physical )
926+ self .assertEqual (msg .data , bytearray ([ae , 0x03 , 0x04 , 0x05 , 0x06 ]))
927+ self .assertTrue (msg .is_extended_id )
928+
929+ #Transmit single frame - Functional
930+ layer .reset ()
931+ layer .send (b'\x04 \x05 \x06 ' , functional )
932+ layer .process ()
933+ msg = self .get_tx_can_msg ()
934+ self .assertIsNotNone (msg )
935+ self .assertEqual (msg .arbitration_id , txid_functional )
936+ self .assertEqual (msg .data , bytearray ([ae , 0x03 , 0x04 , 0x05 , 0x06 ]))
937+ self .assertTrue (msg .is_extended_id )
938+
939+ # Transmit multiframe - Physical
940+ layer .reset ()
941+ layer .send (b'\x04 \x05 \x06 \x07 \x08 \x09 \x0A \x0B ' , physical )
942+ layer .process ()
943+ msg = self .get_tx_can_msg ()
944+ self .assertIsNotNone (msg )
945+ self .assertEqual (msg .arbitration_id , txid_physical )
946+ self .assertEqual (msg .data , bytearray ([ae , 0x10 , 0x08 , 0x04 , 0x05 , 0x06 , 0x07 , 0x08 ]))
947+ self .assertTrue (msg .is_extended_id )
948+
949+ self .simulate_rx_msg (Message (arbitration_id = rxid_physical , data = self .make_flow_control_data (flow_status = 0 , stmin = 0 , blocksize = 0 , prefix = [ae ]), extended_id = True ))
950+ layer .process ()
951+ msg = self .get_tx_can_msg ()
952+ self .assertIsNotNone (msg )
953+ self .assertEqual (msg .arbitration_id , txid_physical )
954+ self .assertEqual (msg .data , bytearray ([ae , 0x21 , 0x09 , 0x0A , 0x0B ]))
955+ self .assertTrue (msg .is_extended_id )
0 commit comments