55import uuid
66
77import aiopg
8- from aiopg .sa .engine import APGCompiler_psycopg2
9- from sqlalchemy .dialects .postgresql .psycopg2 import PGDialect_psycopg2
108from sqlalchemy .engine .cursor import CursorResultMetaData
119from sqlalchemy .engine .interfaces import Dialect , ExecutionContext
1210from sqlalchemy .engine .row import Row
1311from sqlalchemy .sql import ClauseElement
1412from sqlalchemy .sql .ddl import DDLElement
1513
16- from databases .core import DatabaseURL
14+ from databases .backends .common .records import Record , Row , create_column_maps
15+ from databases .backends .compilers .psycopg import PGCompiler_psycopg
16+ from databases .backends .dialects .psycopg import PGDialect_psycopg
17+ from databases .core import LOG_EXTRA , DatabaseURL
1718from databases .interfaces import (
1819 ConnectionBackend ,
1920 DatabaseBackend ,
20- Record ,
21+ Record as RecordInterface ,
2122 TransactionBackend ,
2223)
2324
@@ -34,10 +35,10 @@ def __init__(
3435 self ._pool : typing .Union [aiopg .Pool , None ] = None
3536
3637 def _get_dialect (self ) -> Dialect :
37- dialect = PGDialect_psycopg2 (
38+ dialect = PGDialect_psycopg (
3839 json_serializer = json .dumps , json_deserializer = lambda x : x
3940 )
40- dialect .statement_compiler = APGCompiler_psycopg2
41+ dialect .statement_compiler = PGCompiler_psycopg
4142 dialect .implicit_returning = True
4243 dialect .supports_native_enum = True
4344 dialect .supports_smallserial = True # 9.2+
@@ -117,15 +118,18 @@ async def release(self) -> None:
117118 await self ._database ._pool .release (self ._connection )
118119 self ._connection = None
119120
120- async def fetch_all (self , query : ClauseElement ) -> typing .List [Record ]:
121+ async def fetch_all (self , query : ClauseElement ) -> typing .List [RecordInterface ]:
121122 assert self ._connection is not None , "Connection is not acquired"
122- query_str , args , context = self ._compile (query )
123+ query_str , args , result_columns , context = self ._compile (query )
124+ column_maps = create_column_maps (result_columns )
125+ dialect = self ._dialect
126+
123127 cursor = await self ._connection .cursor ()
124128 try :
125129 await cursor .execute (query_str , args )
126130 rows = await cursor .fetchall ()
127131 metadata = CursorResultMetaData (context , cursor .description )
128- return [
132+ rows = [
129133 Row (
130134 metadata ,
131135 metadata ._processors ,
@@ -135,32 +139,36 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[Record]:
135139 )
136140 for row in rows
137141 ]
142+ return [Record (row , result_columns , dialect , column_maps ) for row in rows ]
138143 finally :
139144 cursor .close ()
140145
141- async def fetch_one (self , query : ClauseElement ) -> typing .Optional [Record ]:
146+ async def fetch_one (self , query : ClauseElement ) -> typing .Optional [RecordInterface ]:
142147 assert self ._connection is not None , "Connection is not acquired"
143- query_str , args , context = self ._compile (query )
148+ query_str , args , result_columns , context = self ._compile (query )
149+ column_maps = create_column_maps (result_columns )
150+ dialect = self ._dialect
144151 cursor = await self ._connection .cursor ()
145152 try :
146153 await cursor .execute (query_str , args )
147154 row = await cursor .fetchone ()
148155 if row is None :
149156 return None
150157 metadata = CursorResultMetaData (context , cursor .description )
151- return Row (
158+ row = Row (
152159 metadata ,
153160 metadata ._processors ,
154161 metadata ._keymap ,
155162 Row ._default_key_style ,
156163 row ,
157164 )
165+ return Record (row , result_columns , dialect , column_maps )
158166 finally :
159167 cursor .close ()
160168
161169 async def execute (self , query : ClauseElement ) -> typing .Any :
162170 assert self ._connection is not None , "Connection is not acquired"
163- query_str , args , context = self ._compile (query )
171+ query_str , args , _ , _ = self ._compile (query )
164172 cursor = await self ._connection .cursor ()
165173 try :
166174 await cursor .execute (query_str , args )
@@ -173,7 +181,7 @@ async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
173181 cursor = await self ._connection .cursor ()
174182 try :
175183 for single_query in queries :
176- single_query , args , context = self ._compile (single_query )
184+ single_query , args , _ , _ = self ._compile (single_query )
177185 await cursor .execute (single_query , args )
178186 finally :
179187 cursor .close ()
@@ -182,36 +190,38 @@ async def iterate(
182190 self , query : ClauseElement
183191 ) -> typing .AsyncGenerator [typing .Any , None ]:
184192 assert self ._connection is not None , "Connection is not acquired"
185- query_str , args , context = self ._compile (query )
193+ query_str , args , result_columns , context = self ._compile (query )
194+ column_maps = create_column_maps (result_columns )
195+ dialect = self ._dialect
186196 cursor = await self ._connection .cursor ()
187197 try :
188198 await cursor .execute (query_str , args )
189199 metadata = CursorResultMetaData (context , cursor .description )
190200 async for row in cursor :
191- yield Row (
201+ record = Row (
192202 metadata ,
193203 metadata ._processors ,
194204 metadata ._keymap ,
195205 Row ._default_key_style ,
196206 row ,
197207 )
208+ yield Record (record , result_columns , dialect , column_maps )
198209 finally :
199210 cursor .close ()
200211
201212 def transaction (self ) -> TransactionBackend :
202213 return AiopgTransaction (self )
203214
204- def _compile (
205- self , query : ClauseElement
206- ) -> typing .Tuple [str , dict , CompilationContext ]:
215+ def _compile (self , query : ClauseElement ) -> typing .Tuple [str , list , tuple ]:
207216 compiled = query .compile (
208217 dialect = self ._dialect , compile_kwargs = {"render_postcompile" : True }
209218 )
210-
211219 execution_context = self ._dialect .execution_ctx_cls ()
212220 execution_context .dialect = self ._dialect
213221
214222 if not isinstance (query , DDLElement ):
223+ compiled_params = sorted (compiled .params .items ())
224+
215225 args = compiled .construct_params ()
216226 for key , val in args .items ():
217227 if key in compiled ._bind_processors :
@@ -224,11 +234,23 @@ def _compile(
224234 compiled ._ad_hoc_textual ,
225235 compiled ._loose_column_name_matching ,
226236 )
237+
238+ mapping = {
239+ key : "$" + str (i ) for i , (key , _ ) in enumerate (compiled_params , start = 1 )
240+ }
241+ compiled_query = compiled .string % mapping
242+ result_map = compiled ._result_columns
243+
227244 else :
228245 args = {}
246+ result_map = None
247+ compiled_query = compiled .string
229248
230- logger .debug ("Query: %s\n Args: %s" , compiled .string , args )
231- return compiled .string , args , CompilationContext (execution_context )
249+ query_message = compiled_query .replace (" \n " , " " ).replace ("\n " , " " )
250+ logger .debug (
251+ "Query: %s Args: %s" , query_message , repr (tuple (args )), extra = LOG_EXTRA
252+ )
253+ return compiled .string , args , result_map , CompilationContext (execution_context )
232254
233255 @property
234256 def raw_connection (self ) -> aiopg .connection .Connection :
0 commit comments