@@ -233,6 +233,71 @@ def test_setinputsizes(self):
233233 with self .assertRaises (NotImplementedError ):
234234 self .cursor .setinputsizes (10 , 20 )
235235
236+ def test_process_parameters_single_value (self ):
237+ """Test parameter processing with single value (not dict/list)"""
238+ test_uuid = uuid .uuid4 ()
239+
240+ # Test with UUID
241+ result = self .cursor ._process_parameters (test_uuid )
242+ self .assertEqual (result , test_uuid .bytes )
243+
244+ # Test with string
245+ result = self .cursor ._process_parameters ("test_string" )
246+ self .assertEqual (result , "test_string" )
247+
248+ def test_convert_named_params_replacement_failure (self ):
249+ """Test RuntimeError for failed parameter replacement"""
250+ query = "SELECT * FROM test WHERE id = :id"
251+ params = {"id" : "test" }
252+
253+ # Create a scenario where replacement fails
254+ with patch ("re.sub" ) as mock_sub :
255+ mock_sub .return_value = (
256+ query # Return unchanged query to simulate failure
257+ )
258+
259+ with self .assertRaises (RuntimeError ) as cm :
260+ self .cursor ._convert_named_params_with_casting (query , params )
261+
262+ self .assertIn ("Failed to replace parameter" , str (cm .exception ))
263+
264+ def test_convert_named_params_remaining_matches (self ):
265+ """Test RuntimeError for remaining named parameters"""
266+ query = "SELECT * FROM test WHERE id = :id"
267+ params = {"id" : "test" }
268+
269+ # Mock re.finditer to simulate remaining matches
270+ with patch ("re.finditer" ) as mock_finditer :
271+ # First call returns matches for initial processing
272+ # Second call returns matches for final validation
273+ mock_match = Mock ()
274+ mock_match .group .side_effect = lambda x : "id" if x == 1 else ":id"
275+ mock_match .start .return_value = 0
276+ mock_finditer .side_effect = [
277+ [mock_match ], # Initial matches
278+ [mock_match ], # Remaining matches for validation
279+ ]
280+
281+ with patch (
282+ "re.sub" , return_value = "SELECT * FROM test WHERE id = $1"
283+ ):
284+ with self .assertRaises (RuntimeError ) as cm :
285+ self .cursor ._convert_named_params_with_casting (
286+ query , params
287+ )
288+
289+ self .assertIn ("Conversion incomplete" , str (cm .exception ))
290+
291+ def test_executemany_coverage (self ):
292+ """Test executemany method for coverage"""
293+ operation = "INSERT INTO test VALUES ($1, $2)"
294+ seq_of_parameters = [[1 , "a" ], [2 , "b" ]]
295+
296+ # Test the sync wrapper by mocking await_only
297+ with patch ("psqlpy_sqlalchemy.connection.await_only" ) as mock_await :
298+ self .cursor .executemany (operation , seq_of_parameters )
299+ mock_await .assert_called_once ()
300+
236301
237302class TestAsyncAdaptPsqlpySSCursor (unittest .TestCase ):
238303 """Test cases for AsyncAdapt_psqlpy_ss_cursor"""
@@ -808,5 +873,129 @@ async def test_start_transaction_with_exception(self):
808873 self .assertFalse (self .connection ._started )
809874
810875
876+ class TestAsyncAdaptPsqlpySSCursorCoverage (unittest .TestCase ):
877+ """Additional tests for server-side cursor coverage"""
878+
879+ def setUp (self ):
880+ """Set up test fixtures"""
881+ self .mock_adapt_connection = Mock ()
882+ self .mock_connection = Mock ()
883+ self .mock_adapt_connection ._connection = self .mock_connection
884+ self .mock_adapt_connection .await_ = Mock ()
885+
886+ self .ss_cursor = AsyncAdapt_psqlpy_ss_cursor (
887+ self .mock_adapt_connection
888+ )
889+
890+ def test_fetchmany_with_size_none (self ):
891+ """Test fetchmany when size is None (uses arraysize)"""
892+ mock_cursor = Mock ()
893+ mock_result = Mock ()
894+ mock_result .row_factory = lambda x : [[("col1" , "value1" )]]
895+
896+ self .ss_cursor ._cursor = mock_cursor
897+ self .ss_cursor .arraysize = 5
898+ self .mock_adapt_connection .await_ .return_value = mock_result
899+
900+ result = self .ss_cursor .fetchmany (size = None )
901+
902+ # Should use arraysize when size is None
903+ mock_cursor .fetchmany .assert_called_with (size = 5 )
904+ self .assertEqual (result , [("value1" ,)])
905+
906+ def test_convert_result_exception (self ):
907+ """Test _convert_result with exception"""
908+ mock_result = Mock ()
909+ mock_result .row_factory .side_effect = Exception ("Conversion error" )
910+
911+ result = self .ss_cursor ._convert_result (mock_result )
912+
913+ # Should return empty tuple on exception
914+ self .assertEqual (result , tuple ())
915+
916+ def test_fetchone_exception (self ):
917+ """Test fetchone with exception"""
918+ mock_cursor = Mock ()
919+ self .ss_cursor ._cursor = mock_cursor
920+ self .mock_adapt_connection .await_ .side_effect = Exception (
921+ "Fetch error"
922+ )
923+
924+ result = self .ss_cursor .fetchone ()
925+
926+ # Should return None on exception
927+ self .assertIsNone (result )
928+
929+ def test_fetchmany_exception (self ):
930+ """Test fetchmany with exception"""
931+ mock_cursor = Mock ()
932+ self .ss_cursor ._cursor = mock_cursor
933+ self .mock_adapt_connection .await_ .side_effect = Exception (
934+ "Fetch error"
935+ )
936+
937+ result = self .ss_cursor .fetchmany ()
938+
939+ # Should return empty list on exception
940+ self .assertEqual (result , [])
941+
942+ def test_fetchall_exception (self ):
943+ """Test fetchall with exception"""
944+ mock_cursor = Mock ()
945+ self .ss_cursor ._cursor = mock_cursor
946+ self .mock_adapt_connection .await_ .side_effect = Exception (
947+ "Fetch error"
948+ )
949+
950+ result = self .ss_cursor .fetchall ()
951+
952+ # Should return empty list on exception
953+ self .assertEqual (result , [])
954+
955+
956+ class TestAsyncAdaptPsqlpyConnectionCoverage (unittest .TestCase ):
957+ """Additional tests for connection coverage"""
958+
959+ def setUp (self ):
960+ """Set up test fixtures"""
961+ self .mock_dbapi = Mock ()
962+ self .mock_connection = Mock ()
963+ self .connection = AsyncAdapt_psqlpy_connection (
964+ self .mock_dbapi , self .mock_connection
965+ )
966+
967+ def test_ping_recent (self ):
968+ """Test ping when recently pinged (within 30 seconds)"""
969+ import time
970+
971+ # Set last ping time to recent
972+ self .connection ._last_ping_time = time .time () - 10 # 10 seconds ago
973+ self .connection ._connection_valid = True
974+
975+ result = self .connection .ping ()
976+
977+ # Should return cached result without executing query
978+ self .assertTrue (result )
979+ self .mock_connection .execute .assert_not_called ()
980+
981+ def test_ping_exception (self ):
982+ """Test ping with exception"""
983+ import time
984+
985+ # Set last ping time to old
986+ self .connection ._last_ping_time = time .time () - 60 # 60 seconds ago
987+
988+ with patch ("psqlpy_sqlalchemy.connection.await_only" ) as mock_await :
989+ mock_await .side_effect = Exception ("Connection error" )
990+
991+ result = self .connection .ping ()
992+
993+ self .assertFalse (result )
994+ self .assertFalse (self .connection ._connection_valid )
995+ self .assertEqual (
996+ self .connection ._performance_stats ["connection_errors" ], 1
997+ )
998+
999+
8111000if __name__ == "__main__" :
8121001 unittest .main ()
0 commit comments