@@ -877,6 +877,22 @@ def test_uuid6_test_vectors(self):
877877 equal ((u .int >> 80 ) & 0xffff , 0x232a )
878878 equal ((u .int >> 96 ) & 0xffff_ffff , 0x1ec9_414c )
879879
880+ def check_uuid7 (
881+ self ,
882+ u ,
883+ time_ms = None ,
884+ counter_hi = None , counter_lo = None ,
885+ tail = None
886+ ):
887+ if time_ms is not None :
888+ self .assertEqual (u .time , time_ms )
889+ if counter_hi is not None :
890+ self .assertEqual ((u .int >> 64 ) & 0xfff , counter_hi )
891+ if counter_lo is not None :
892+ self .assertEqual ((u .int >> 32 ) & 0x3fff_ffff , counter_lo )
893+ if tail is not None :
894+ self .assertEqual (u .int & 0xffff_ffff , tail )
895+
880896 def test_uuid7 (self ):
881897 equal = self .assertEqual
882898 u = self .uuid .uuid7 ()
@@ -1102,6 +1118,120 @@ def test_uuid7_overflow_counter(self):
11021118 equal (v .time , unix_ts_ms )
11031119 self .assertTrue (self .uuid ._last_counter_v7_overflow )
11041120
1121+ def test_uuid7_multiple_counter_overflows (self ):
1122+ # Tests when counter overflows multiple times within the same frame.
1123+ # See https://github.com/python/cpython/issues/138862.
1124+ equal = self .assertEqual
1125+
1126+ t0_ms = 1 + random .getrandbits (24 )
1127+
1128+ counter_max_value = 0x3ff_ffff_ffff
1129+ counter_max_value_hi = (counter_max_value >> 30 ) & 0x0fff
1130+ counter_max_value_lo = (counter_max_value & 0x3fff_ffff )
1131+
1132+ random_tail = int .from_bytes (b'\x11 ' * 4 )
1133+ tail1 = tail3a = tail3b = random_tail
1134+ tail2a = 1 + random .getrandbits (16 )
1135+ tail2b = 2 * tail2a
1136+
1137+ counter1 = counter2a = counter_max_value
1138+ counter1_hi = counter2a_hi = counter_max_value_hi
1139+ counter1_lo = counter2a_lo = counter_max_value_lo
1140+
1141+ counter2b = random .getrandbits (40 )
1142+ counter2b_hi = (counter2b >> 30 ) & 0x0fff
1143+ counter2b_lo = (counter2b & 0x3fff_ffff )
1144+ self .assertLess (counter2b , counter_max_value - 3 )
1145+
1146+ def patch_os_urandom (wraps = True ):
1147+ if wraps :
1148+ return mock .patch ('os.urandom' , wraps = lambda n : b'\x11 ' * n )
1149+ return mock .patch ('os.urandom' )
1150+
1151+ def patch_get_counter_and_tail (c , t ):
1152+ return mock .patch .object (
1153+ self .uuid ,
1154+ "_uuid7_get_counter_and_tail" ,
1155+ return_value = (c , t ),
1156+ )
1157+
1158+ def check_invariants (t , c , * , overflow ):
1159+ equal (self .uuid ._last_timestamp_v7 , t )
1160+ equal (self .uuid ._last_counter_v7 , c )
1161+ self .assertIs (self .uuid ._last_counter_v7_overflow , overflow )
1162+
1163+ with (
1164+ mock .patch .multiple (
1165+ self .uuid ,
1166+ _last_timestamp_v7 = t0_ms ,
1167+ _last_counter_v7 = counter_max_value - 1 ,
1168+ _last_counter_v7_overflow = False ,
1169+ ),
1170+ mock .patch ('time.time_ns' , return_value = 1_000_000 * t0_ms ),
1171+ ):
1172+ # All the calls in this block to uuid7() are always assumed
1173+ # to be within the same logical millisecond but the timestamp
1174+ # that is used for the UUIDv7 objects will be altered (and
1175+ # considered in the future).
1176+
1177+ # u1's counter is now the maximal value it can have.
1178+ # For the next call, we will need to jump 1ms in the
1179+ # future and pick a new counter (in our case, it will
1180+ # be an overflowing one).
1181+ with patch_os_urandom () as urand :
1182+ u1 = self .uuid .uuid7 ()
1183+ urand .assert_called_once_with (4 )
1184+ self .check_uuid7 (u1 , t0_ms , counter1_hi , counter1_lo , tail1 )
1185+ # For now, we are not yet in an overflow (but all subsequent
1186+ # calls will be in an overflow state even if we normally
1187+ # increment the counters). The overflow state is only cleared
1188+ # when the physical millisecond catches up to the logical one.
1189+ check_invariants (u1 .time , counter1 , overflow = False )
1190+ del u1
1191+
1192+ # u1's counter is maximal, so we enter the overflow state
1193+ # and jump 1ms in the future; the randomized counter is
1194+ # still one that would cause an overflow at the next call.
1195+ with (
1196+ patch_os_urandom (wraps = False ) as urand ,
1197+ patch_get_counter_and_tail (counter_max_value , tail2a ),
1198+ ):
1199+ u2a = self .uuid .uuid7 ()
1200+ urand .assert_not_called ()
1201+ self .check_uuid7 (u2a , t0_ms + 1 , counter2a_hi , counter2a_lo , tail2a )
1202+ check_invariants (u2a .time , counter2a , overflow = True )
1203+ del u2a
1204+
1205+ # u2a's counter was the maximal value so we need to update
1206+ # the timestamp and pick a new counter again (this time,
1207+ # it will be a small value that we can increment later).
1208+ with (
1209+ patch_os_urandom (wraps = False ) as urand ,
1210+ patch_get_counter_and_tail (counter2b , tail2b ),
1211+ ):
1212+ u2b = self .uuid .uuid7 ()
1213+ urand .assert_not_called ()
1214+ self .check_uuid7 (u2b , t0_ms + 2 , counter2b_hi , counter2b_lo , tail2b )
1215+ check_invariants (u2b .time , counter2b , overflow = True )
1216+ del u2b
1217+
1218+ # u2a's counter was small enough that we can increment it;
1219+ # we are still in the future but we don't need to advance
1220+ # the timestamp again.
1221+ with patch_os_urandom () as urand :
1222+ u3a = self .uuid .uuid7 ()
1223+ urand .assert_called_once_with (4 )
1224+ self .check_uuid7 (u3a , t0_ms + 2 , counter2b_hi , counter2b_lo + 1 , tail3a )
1225+ check_invariants (u3a .time , counter2b + 1 , overflow = True )
1226+ del u3a
1227+
1228+ with patch_os_urandom () as urand :
1229+ u3b = self .uuid .uuid7 ()
1230+ urand .assert_called_once_with (4 )
1231+ self .check_uuid7 (u3b , t0_ms + 2 , counter2b_hi , counter2b_lo + 2 , tail3b )
1232+ check_invariants (u3b .time , counter2b + 2 , overflow = True )
1233+ del u3b
1234+
11051235 def test_uuid8 (self ):
11061236 equal = self .assertEqual
11071237 u = self .uuid .uuid8 ()
0 commit comments