1616# Import backend-specific test helpers
1717from tests .test_mongo_core import _test_mongetter
1818
19-
2019# Test 1: Automatic cleanup trigger in core.py (line 344->350)
2120# This test is replaced by the version in test_coverage_gaps_simple.py
2221# which doesn't require access to the internal core object
2625@pytest .mark .mongo
2726def test_mongo_allow_none_false ():
2827 """Test MongoDB backend with allow_none=False and None return value."""
29-
28+
3029 @cachier .cachier (mongetter = _test_mongetter , allow_none = False )
3130 def returns_none ():
3231 return None
33-
32+
3433 # First call should execute and return None
3534 result1 = returns_none ()
3635 assert result1 is None
37-
36+
3837 # Second call should also execute (not cached) because None is not allowed
3938 result2 = returns_none ()
4039 assert result2 is None
41-
40+
4241 # Clear cache
4342 returns_none .clear_cache ()
4443
@@ -61,13 +60,14 @@ def test_redis_import_error_handling():
6160 """Test Redis backend when redis package is not available."""
6261 # This test is already covered by test_redis_import_warning
6362 # but let's ensure the specific lines are hit
64- with patch .dict (sys .modules , {' redis' : None }):
63+ with patch .dict (sys .modules , {" redis" : None }):
6564 # Force reload of redis core module
66- if ' cachier.cores.redis' in sys .modules :
67- del sys .modules [' cachier.cores.redis' ]
68-
65+ if " cachier.cores.redis" in sys .modules :
66+ del sys .modules [" cachier.cores.redis" ]
67+
6968 try :
7069 from cachier .cores .redis import _RedisCore
70+
7171 # If we get here, redis was imported successfully (shouldn't happen in test)
7272 pytest .skip ("Redis is installed, cannot test import error" )
7373 except ImportError as e :
@@ -80,32 +80,33 @@ def test_redis_import_error_handling():
8080def test_redis_corrupted_entry_handling ():
8181 """Test Redis backend with corrupted cache entries."""
8282 import redis
83- client = redis .Redis (host = 'localhost' , port = 6379 , decode_responses = False )
84-
83+
84+ client = redis .Redis (host = "localhost" , port = 6379 , decode_responses = False )
85+
8586 try :
8687 # Test connection
8788 client .ping ()
8889 except redis .ConnectionError :
8990 pytest .skip ("Redis server not available" )
90-
91+
9192 @cachier .cachier (backend = "redis" , redis_client = client )
9293 def test_func (x ):
9394 return x * 2
94-
95+
9596 # Clear cache
9697 test_func .clear_cache ()
97-
98+
9899 # Manually insert corrupted data
99100 cache_key = "cachier:test_coverage_gaps:test_func:somehash"
100101 client .hset (cache_key , "value" , b"corrupted_pickle_data" )
101102 client .hset (cache_key , "time" , str (time .time ()).encode ())
102103 client .hset (cache_key , "stale" , b"0" )
103104 client .hset (cache_key , "being_calculated" , b"0" )
104-
105+
105106 # Try to access - should handle corrupted data gracefully
106107 result = test_func (42 )
107108 assert result == 84
108-
109+
109110 test_func .clear_cache ()
110111
111112
@@ -114,40 +115,41 @@ def test_func(x):
114115def test_redis_deletion_failure_during_eviction ():
115116 """Test Redis LRU eviction with deletion failures."""
116117 import redis
117- client = redis .Redis (host = 'localhost' , port = 6379 , decode_responses = False )
118-
118+
119+ client = redis .Redis (host = "localhost" , port = 6379 , decode_responses = False )
120+
119121 try :
120122 client .ping ()
121123 except redis .ConnectionError :
122124 pytest .skip ("Redis server not available" )
123-
125+
124126 @cachier .cachier (
125127 backend = "redis" ,
126128 redis_client = client ,
127- cache_size_limit = "100B" # Very small limit to trigger eviction
129+ cache_size_limit = "100B" , # Very small limit to trigger eviction
128130 )
129131 def test_func (x ):
130132 return "x" * 50 # Large result to fill cache quickly
131-
133+
132134 # Clear cache
133135 test_func .clear_cache ()
134-
136+
135137 # Fill cache to trigger eviction
136138 test_func (1 )
137-
139+
138140 # Mock delete to fail
139141 original_delete = client .delete
140142 delete_called = []
141-
143+
142144 def mock_delete (* args ):
143145 delete_called .append (args )
144146 # Fail on first delete attempt
145147 if len (delete_called ) == 1 :
146148 raise redis .RedisError ("Mocked deletion failure" )
147149 return original_delete (* args )
148-
150+
149151 client .delete = mock_delete
150-
152+
151153 try :
152154 # This should trigger eviction and handle the deletion failure
153155 test_func (2 )
@@ -170,50 +172,50 @@ def mock_delete(*args):
170172@pytest .mark .pickle
171173def test_pickle_timeout_during_wait ():
172174 """Test calculation timeout while waiting in pickle backend."""
173- import threading
174175 import queue
175-
176+ import threading
177+
176178 @cachier .cachier (
177179 backend = "pickle" ,
178- wait_for_calc_timeout = 0.5 # Short timeout
180+ wait_for_calc_timeout = 0.5 , # Short timeout
179181 )
180182 def slow_func (x ):
181183 time .sleep (2 ) # Longer than timeout
182184 return x * 2
183-
185+
184186 slow_func .clear_cache ()
185-
187+
186188 res_queue = queue .Queue ()
187-
189+
188190 def call_slow_func ():
189191 try :
190192 res = slow_func (42 )
191193 res_queue .put (("success" , res ))
192194 except Exception as e :
193195 res_queue .put (("error" , e ))
194-
196+
195197 # Start first thread that will take long
196198 thread1 = threading .Thread (target = call_slow_func )
197199 thread1 .start ()
198-
200+
199201 # Give it time to start processing
200202 time .sleep (0.1 )
201-
203+
202204 # Start second thread that should timeout waiting
203205 thread2 = threading .Thread (target = call_slow_func )
204206 thread2 .start ()
205-
207+
206208 # Wait for threads
207209 thread1 .join (timeout = 3 )
208210 thread2 .join (timeout = 3 )
209-
211+
210212 # Check results - at least one should have succeeded
211213 results = []
212214 while not res_queue .empty ():
213215 results .append (res_queue .get ())
214-
216+
215217 assert len (results ) >= 1
216-
218+
217219 slow_func .clear_cache ()
218220
219221
@@ -222,51 +224,52 @@ def call_slow_func():
222224
223225
224226# Test 13: Redis non-bytes timestamp handling (line 364)
225- @pytest .mark .redis
227+ @pytest .mark .redis
226228def test_redis_non_bytes_timestamp ():
227229 """Test Redis backend with non-bytes timestamp values."""
228230 import redis
231+
229232 from cachier .cores .redis import _RedisCore
230-
231- client = redis .Redis (host = ' localhost' , port = 6379 , decode_responses = False )
232-
233+
234+ client = redis .Redis (host = " localhost" , port = 6379 , decode_responses = False )
235+
233236 try :
234237 client .ping ()
235238 except redis .ConnectionError :
236239 pytest .skip ("Redis server not available" )
237-
240+
238241 @cachier .cachier (
239- backend = "redis" ,
240- redis_client = client ,
241- stale_after = timedelta (seconds = 10 )
242+ backend = "redis" , redis_client = client , stale_after = timedelta (seconds = 10 )
242243 )
243244 def test_func (x ):
244245 return x * 2
245-
246+
246247 # Clear cache
247248 test_func .clear_cache ()
248-
249+
249250 # Create an entry
250251 test_func (1 )
251-
252+
252253 # Manually modify timestamp to be a string instead of bytes
253- keys = list (client .scan_iter (match = "cachier:test_coverage_gaps:test_func:*" ))
254+ keys = list (
255+ client .scan_iter (match = "cachier:test_coverage_gaps:test_func:*" )
256+ )
254257 if keys :
255258 # Force timestamp to be a string (non-bytes)
256259 client .hset (keys [0 ], "time" , "not_a_number" )
257-
260+
258261 # Create a separate core instance to test stale deletion
259262 core = _RedisCore (
260263 hash_func = None ,
261264 redis_client = client ,
262265 wait_for_calc_timeout = 0 ,
263266 )
264267 core .set_func (test_func )
265-
268+
266269 # Try to delete stale entries - should handle non-bytes timestamp gracefully
267270 try :
268271 core .delete_stale_entries (timedelta (seconds = 1 ))
269272 except Exception :
270273 pass # Expected to handle gracefully
271-
272- test_func .clear_cache ()
274+
275+ test_func .clear_cache ()
0 commit comments