2121 EmailVerificationMethod ,
2222 EncryptionOptions ,
2323 OidcIdTokenVerification ,
24+ Padding ,
2425 PassphraseVerification ,
2526 PhoneNumberVerification ,
2627 PhoneNumberVerificationMethod ,
@@ -405,6 +406,23 @@ async def test_empty_message(self, tmp_path: Path, app: Dict[str, str]) -> None:
405406 assert len (result ) == 0
406407 await alice .session .stop ()
407408
409+ @pytest .mark .asyncio
410+ async def test_encrypt_with_padding (
411+ self , tmp_path : Path , app : Dict [str , str ]
412+ ) -> None :
413+ alice = await create_user_session (tmp_path , app )
414+ chunk_size = 1024 ** 2
415+ message = bytearray (
416+ 3 * chunk_size + 2
417+ ) # three big chunks plus a little something
418+ input_stream = InMemoryAsyncStream (message )
419+ async with await alice .session .encrypt_stream (
420+ input_stream , EncryptionOptions (padding_step = 500 )
421+ ) as encrypted_stream :
422+ encrypted_message = await encrypted_stream .read ()
423+ assert len (encrypted_message ) == 3146248
424+ await alice .session .stop ()
425+
408426
409427@pytest .mark .asyncio
410428async def test_encrypt_decrypt (tmp_path : Path , app : Dict [str , str ]) -> None :
@@ -426,6 +444,79 @@ async def test_encrypt_decrypt_empty(tmp_path: Path, app: Dict[str, str]) -> Non
426444 await alice .session .stop ()
427445
428446
447+ SIMPLE_ENCRYPTION_OVERHEAD = 17
448+
449+
450+ SIMPLE_PADDED_ENCRYPTION_OVERHEAD = SIMPLE_ENCRYPTION_OVERHEAD + 1
451+
452+
453+ @pytest .mark .asyncio
454+ async def test_auto_padding_is_default (tmp_path : Path , app : Dict [str , str ]) -> None :
455+ alice = await create_user_session (tmp_path , app )
456+ message = b"my clear data is clear!"
457+ length_with_padme = 24
458+ encrypted_data = await alice .session .encrypt (message )
459+ assert len (encrypted_data ) - SIMPLE_PADDED_ENCRYPTION_OVERHEAD == length_with_padme
460+ clear_data = await alice .session .decrypt (encrypted_data )
461+ assert clear_data == message
462+ await alice .session .stop ()
463+
464+
465+ @pytest .mark .asyncio
466+ async def test_padding_opt_auto (tmp_path : Path , app : Dict [str , str ]) -> None :
467+ alice = await create_user_session (tmp_path , app )
468+ message = b"my clear data is clear!"
469+ length_with_padme = 24
470+ encrypted_data = await alice .session .encrypt (
471+ message , EncryptionOptions (padding_step = Padding .AUTO )
472+ )
473+ assert len (encrypted_data ) - SIMPLE_PADDED_ENCRYPTION_OVERHEAD == length_with_padme
474+ clear_data = await alice .session .decrypt (encrypted_data )
475+ assert clear_data == message
476+ await alice .session .stop ()
477+
478+
479+ @pytest .mark .asyncio
480+ async def test_padding_opt_disable (tmp_path : Path , app : Dict [str , str ]) -> None :
481+ alice = await create_user_session (tmp_path , app )
482+ message = b"I love you"
483+ encrypted_data = await alice .session .encrypt (
484+ message , EncryptionOptions (padding_step = Padding .OFF )
485+ )
486+ assert len (encrypted_data ) == len (message ) + SIMPLE_ENCRYPTION_OVERHEAD
487+ clear_data = await alice .session .decrypt (encrypted_data )
488+ assert clear_data == message
489+ await alice .session .stop ()
490+
491+
492+ @pytest .mark .asyncio
493+ async def test_padding_opt_enable (tmp_path : Path , app : Dict [str , str ]) -> None :
494+ alice = await create_user_session (tmp_path , app )
495+ message = b"I love you"
496+ step = 13
497+ encrypted_data = await alice .session .encrypt (
498+ message , EncryptionOptions (padding_step = step )
499+ )
500+ assert (len (encrypted_data ) - SIMPLE_PADDED_ENCRYPTION_OVERHEAD ) % step == 0
501+ clear_data = await alice .session .decrypt (encrypted_data )
502+ assert clear_data == message
503+ await alice .session .stop ()
504+
505+
506+ def test_padding_opt_error (tmp_path : Path , app : Dict [str , str ]) -> None :
507+ with pytest .raises (error .InvalidArgument ):
508+ EncryptionOptions (padding_step = 0 )
509+
510+ with pytest .raises (error .InvalidArgument ):
511+ EncryptionOptions (padding_step = 1 )
512+
513+ with pytest .raises (error .InvalidArgument ):
514+ EncryptionOptions (padding_step = - 1 )
515+
516+ with pytest .raises (error .InvalidArgument ):
517+ EncryptionOptions (padding_step = 2.42 ) # type: ignore
518+
519+
429520@pytest .mark .asyncio
430521async def test_share_during_encrypt (tmp_path : Path , app : Dict [str , str ]) -> None :
431522 alice = await create_user_session (tmp_path , app )
@@ -593,6 +684,81 @@ async def test_share_with_encryption_session_without_self(
593684 await bob .session .stop ()
594685
595686
687+ ENCRYPTION_SESSION_OVERHEAD = 57
688+
689+
690+ ENCRYPTION_SESSION_PADDED_OVERHEAD = ENCRYPTION_SESSION_OVERHEAD + 1
691+
692+
693+ @pytest .mark .asyncio
694+ async def test_encryption_session_auto_padding_by_default (
695+ tmp_path : Path , app : Dict [str , str ]
696+ ) -> None :
697+ alice = await create_user_session (tmp_path , app )
698+ message = b"my clear data is clear!"
699+ length_with_padme = 24
700+ enc_session = await alice .session .create_encryption_session ()
701+ encrypted = await enc_session .encrypt (message )
702+ assert len (encrypted ) - ENCRYPTION_SESSION_PADDED_OVERHEAD == length_with_padme
703+
704+ decrypted = await alice .session .decrypt (encrypted )
705+ assert decrypted == message
706+ await alice .session .stop ()
707+
708+
709+ @pytest .mark .asyncio
710+ async def test_encryption_session_auto_padding (
711+ tmp_path : Path , app : Dict [str , str ]
712+ ) -> None :
713+ alice = await create_user_session (tmp_path , app )
714+ message = b"my clear data is clear!"
715+ length_with_padme = 24
716+ enc_session = await alice .session .create_encryption_session (
717+ EncryptionOptions (padding_step = Padding .AUTO )
718+ )
719+ encrypted = await enc_session .encrypt (message )
720+ assert len (encrypted ) - ENCRYPTION_SESSION_PADDED_OVERHEAD == length_with_padme
721+
722+ decrypted = await alice .session .decrypt (encrypted )
723+ assert decrypted == message
724+ await alice .session .stop ()
725+
726+
727+ @pytest .mark .asyncio
728+ async def test_encryption_session_no_padding (
729+ tmp_path : Path , app : Dict [str , str ]
730+ ) -> None :
731+ alice = await create_user_session (tmp_path , app )
732+ message = b"Ceci n'est pas un test"
733+ enc_session = await alice .session .create_encryption_session (
734+ EncryptionOptions (padding_step = Padding .OFF )
735+ )
736+ encrypted = await enc_session .encrypt (message )
737+ assert len (encrypted ) - ENCRYPTION_SESSION_OVERHEAD == len (message )
738+
739+ decrypted = await alice .session .decrypt (encrypted )
740+ assert decrypted == message
741+ await alice .session .stop ()
742+
743+
744+ @pytest .mark .asyncio
745+ async def test_encryption_session_padding_step (
746+ tmp_path : Path , app : Dict [str , str ]
747+ ) -> None :
748+ alice = await create_user_session (tmp_path , app )
749+ message = b"Ceci n'est pas un test"
750+ step = 13
751+ enc_session = await alice .session .create_encryption_session (
752+ EncryptionOptions (padding_step = step )
753+ )
754+ encrypted = await enc_session .encrypt (message )
755+ assert (len (encrypted ) - ENCRYPTION_SESSION_PADDED_OVERHEAD ) % step == 0
756+
757+ decrypted = await alice .session .decrypt (encrypted )
758+ assert decrypted == message
759+ await alice .session .stop ()
760+
761+
596762@pytest .mark .asyncio
597763async def test_encryption_session_streams (tmp_path : Path , app : Dict [str , str ]) -> None :
598764 alice = await create_user_session (tmp_path , app )
0 commit comments