1+ """Tests to cover specific coverage gaps identified in the codebase."""
2+
3+ import os
4+ import pickle
5+ import sys
6+ import tempfile
7+ import time
8+ from datetime import datetime , timedelta
9+ from unittest .mock import Mock , patch
10+
11+ import pytest
12+
13+ import cachier
14+ from cachier .config import CacheEntry , _global_params
15+
16+ # Import backend-specific test helpers
17+ from tests .test_mongo_core import _test_mongetter
18+
19+
20+ # Test 1: Automatic cleanup trigger in core.py (line 344->350)
21+ # This test is replaced by the version in test_coverage_gaps_simple.py
22+ # which doesn't require access to the internal core object
23+
24+
25+ # Test 2: MongoDB allow_none=False handling (line 99)
26+ @pytest .mark .mongo
27+ def test_mongo_allow_none_false ():
28+ """Test MongoDB backend with allow_none=False and None return value."""
29+
30+ @cachier .cachier (mongetter = _test_mongetter , allow_none = False )
31+ def returns_none ():
32+ return None
33+
34+ # First call should execute and return None
35+ result1 = returns_none ()
36+ assert result1 is None
37+
38+ # Second call should also execute (not cached) because None is not allowed
39+ result2 = returns_none ()
40+ assert result2 is None
41+
42+ # Clear cache
43+ returns_none .clear_cache ()
44+
45+
46+ # Test 3: MongoDB delete_stale_entries (lines 162-163)
47+ # Removed - redundant with test_mongo_delete_stale_direct in test_coverage_gaps_simple.py
48+
49+
50+ # Test 4: Pickle _clear_being_calculated_all_cache_files (lines 183-189)
51+ # Removed - redundant with test_pickle_clear_being_calculated_separate_files in test_coverage_gaps_simple.py
52+
53+
54+ # Test 5: Pickle save_cache with hash_str (line 205)
55+ # Removed - redundant with test_pickle_save_with_hash_str in test_coverage_gaps_simple.py
56+
57+
58+ # Test 6: Redis import error handling (lines 14-15)
59+ @pytest .mark .redis
60+ def test_redis_import_error_handling ():
61+ """Test Redis backend when redis package is not available."""
62+ # This test is already covered by test_redis_import_warning
63+ # but let's ensure the specific lines are hit
64+ with patch .dict (sys .modules , {'redis' : None }):
65+ # Force reload of redis core module
66+ if 'cachier.cores.redis' in sys .modules :
67+ del sys .modules ['cachier.cores.redis' ]
68+
69+ try :
70+ from cachier .cores .redis import _RedisCore
71+ # If we get here, redis was imported successfully (shouldn't happen in test)
72+ pytest .skip ("Redis is installed, cannot test import error" )
73+ except ImportError as e :
74+ # This is expected - verify the error message
75+ assert "No module named 'redis'" in str (e ) or "redis" in str (e )
76+
77+
78+ # Test 7: Redis corrupted entry handling (lines 112-114)
79+ @pytest .mark .redis
80+ def test_redis_corrupted_entry_handling ():
81+ """Test Redis backend with corrupted cache entries."""
82+ import redis
83+ client = redis .Redis (host = 'localhost' , port = 6379 , decode_responses = False )
84+
85+ try :
86+ # Test connection
87+ client .ping ()
88+ except redis .ConnectionError :
89+ pytest .skip ("Redis server not available" )
90+
91+ @cachier .cachier (backend = "redis" , redis_client = client )
92+ def test_func (x ):
93+ return x * 2
94+
95+ # Clear cache
96+ test_func .clear_cache ()
97+
98+ # Manually insert corrupted data
99+ cache_key = "cachier:test_coverage_gaps:test_func:somehash"
100+ client .hset (cache_key , "value" , b"corrupted_pickle_data" )
101+ client .hset (cache_key , "time" , str (time .time ()).encode ())
102+ client .hset (cache_key , "stale" , b"0" )
103+ client .hset (cache_key , "being_calculated" , b"0" )
104+
105+ # Try to access - should handle corrupted data gracefully
106+ result = test_func (42 )
107+ assert result == 84
108+
109+ test_func .clear_cache ()
110+
111+
112+ # Test 8: Redis deletion failure during eviction (lines 133-135)
113+ @pytest .mark .redis
114+ def test_redis_deletion_failure_during_eviction ():
115+ """Test Redis LRU eviction with deletion failures."""
116+ import redis
117+ client = redis .Redis (host = 'localhost' , port = 6379 , decode_responses = False )
118+
119+ try :
120+ client .ping ()
121+ except redis .ConnectionError :
122+ pytest .skip ("Redis server not available" )
123+
124+ @cachier .cachier (
125+ backend = "redis" ,
126+ redis_client = client ,
127+ cache_size_limit = "100B" # Very small limit to trigger eviction
128+ )
129+ def test_func (x ):
130+ return "x" * 50 # Large result to fill cache quickly
131+
132+ # Clear cache
133+ test_func .clear_cache ()
134+
135+ # Fill cache to trigger eviction
136+ test_func (1 )
137+
138+ # Mock delete to fail
139+ original_delete = client .delete
140+ delete_called = []
141+
142+ def mock_delete (* args ):
143+ delete_called .append (args )
144+ # Fail on first delete attempt
145+ if len (delete_called ) == 1 :
146+ raise redis .RedisError ("Mocked deletion failure" )
147+ return original_delete (* args )
148+
149+ client .delete = mock_delete
150+
151+ try :
152+ # This should trigger eviction and handle the deletion failure
153+ test_func (2 )
154+ # Verify delete was attempted
155+ assert len (delete_called ) > 0
156+ finally :
157+ client .delete = original_delete
158+ test_func .clear_cache ()
159+
160+
161+ # Test 9: SQL allow_none=False handling (line 128)
162+ # Removed - redundant with test_sql_allow_none_false_not_stored in test_coverage_gaps_simple.py
163+
164+
165+ # Test 10: SQL delete_stale_entries (lines 302-312)
166+ # Removed - redundant with test_sql_delete_stale_direct in test_coverage_gaps_simple.py
167+
168+
169+ # Test 11: Pickle timeout during wait (line 398)
170+ @pytest .mark .pickle
171+ def test_pickle_timeout_during_wait ():
172+ """Test calculation timeout while waiting in pickle backend."""
173+ import threading
174+ import queue
175+
176+ @cachier .cachier (
177+ backend = "pickle" ,
178+ wait_for_calc_timeout = 0.5 # Short timeout
179+ )
180+ def slow_func (x ):
181+ time .sleep (2 ) # Longer than timeout
182+ return x * 2
183+
184+ slow_func .clear_cache ()
185+
186+ res_queue = queue .Queue ()
187+
188+ def call_slow_func ():
189+ try :
190+ res = slow_func (42 )
191+ res_queue .put (("success" , res ))
192+ except Exception as e :
193+ res_queue .put (("error" , e ))
194+
195+ # Start first thread that will take long
196+ thread1 = threading .Thread (target = call_slow_func )
197+ thread1 .start ()
198+
199+ # Give it time to start processing
200+ time .sleep (0.1 )
201+
202+ # Start second thread that should timeout waiting
203+ thread2 = threading .Thread (target = call_slow_func )
204+ thread2 .start ()
205+
206+ # Wait for threads
207+ thread1 .join (timeout = 3 )
208+ thread2 .join (timeout = 3 )
209+
210+ # Check results - at least one should have succeeded
211+ results = []
212+ while not res_queue .empty ():
213+ results .append (res_queue .get ())
214+
215+ assert len (results ) >= 1
216+
217+ slow_func .clear_cache ()
218+
219+
220+ # Test 12: Redis stale deletion with cache size tracking (lines 374-375, 380)
221+ # Removed - redundant with test_redis_stale_delete_size_tracking in test_coverage_gaps_simple.py
222+
223+
224+ # Test 13: Redis non-bytes timestamp handling (line 364)
225+ @pytest .mark .redis
226+ def test_redis_non_bytes_timestamp ():
227+ """Test Redis backend with non-bytes timestamp values."""
228+ import redis
229+ from cachier .cores .redis import _RedisCore
230+
231+ client = redis .Redis (host = 'localhost' , port = 6379 , decode_responses = False )
232+
233+ try :
234+ client .ping ()
235+ except redis .ConnectionError :
236+ pytest .skip ("Redis server not available" )
237+
238+ @cachier .cachier (
239+ backend = "redis" ,
240+ redis_client = client ,
241+ stale_after = timedelta (seconds = 10 )
242+ )
243+ def test_func (x ):
244+ return x * 2
245+
246+ # Clear cache
247+ test_func .clear_cache ()
248+
249+ # Create an entry
250+ test_func (1 )
251+
252+ # Manually modify timestamp to be a string instead of bytes
253+ keys = list (client .scan_iter (match = "cachier:test_coverage_gaps:test_func:*" ))
254+ if keys :
255+ # Force timestamp to be a string (non-bytes)
256+ client .hset (keys [0 ], "time" , "not_a_number" )
257+
258+ # Create a separate core instance to test stale deletion
259+ core = _RedisCore (
260+ hash_func = None ,
261+ redis_client = client ,
262+ wait_for_calc_timeout = 0 ,
263+ )
264+ core .set_func (test_func )
265+
266+ # Try to delete stale entries - should handle non-bytes timestamp gracefully
267+ try :
268+ core .delete_stale_entries (timedelta (seconds = 1 ))
269+ except Exception :
270+ pass # Expected to handle gracefully
271+
272+ test_func .clear_cache ()
0 commit comments