88from datetime import datetime
99
1010import psycopg2
11+ from psycopg2 import sql
1112from django .conf import settings
1213from django .core .files .base import File
1314from jinja2 import Environment , FileSystemLoader
@@ -153,7 +154,7 @@ def postgres_connection(self):
153154 self ._postgres_connection .autocommit = True
154155 return self ._postgres_connection
155156
156- def execute_sql_queries (self , statements : list [str ] ):
157+ def execute_sql_queries (self , statements : " list[sql.Composable]" ):
157158 """This method is used to execute the sql queries."""
158159 try :
159160 cursor = self .postgres_connection .cursor ()
@@ -185,21 +186,29 @@ def _grant_schema_permissions_on_new_db(self):
185186 cursor = new_db_connection .cursor ()
186187
187188 schemas = ["raw" , "dev" , "stg" , "prod" ]
189+ user_ident = sql .Identifier (self .user_name )
188190 for schema in schemas :
189- cursor .execute (f"GRANT USAGE ON SCHEMA { schema } TO { self .user_name } ;" )
191+ schema_ident = sql .Identifier (schema )
192+ cursor .execute (sql .SQL ("GRANT USAGE ON SCHEMA {} TO {};" ).format (schema_ident , user_ident ))
190193 cursor .execute (
191- f"GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA { schema } TO { self .user_name } ;"
194+ sql .SQL ("GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA {} TO {};" ).format (
195+ schema_ident , user_ident
196+ )
192197 )
193198 cursor .execute (
194- f"ALTER DEFAULT PRIVILEGES IN SCHEMA { schema } "
195- f"GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO { self .user_name } ;"
199+ sql .SQL ("ALTER DEFAULT PRIVILEGES IN SCHEMA {} "
200+ "GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO {};" ).format (
201+ schema_ident , user_ident
202+ )
196203 )
197204 # Transfer schema ownership for non-source schemas so the project user
198205 # can DROP and recreate tables during transformation runs.
199206 # Schema owners can drop any object within their schema in PostgreSQL.
200207 # raw schema stays read-only (source data from template).
201208 for schema in ["dev" , "stg" , "prod" ]:
202- cursor .execute (f"ALTER SCHEMA { schema } OWNER TO { self .user_name } ;" )
209+ cursor .execute (
210+ sql .SQL ("ALTER SCHEMA {} OWNER TO {};" ).format (sql .Identifier (schema ), user_ident )
211+ )
203212
204213 cursor .close ()
205214 logging .info (f"Schema permissions granted on database { self .database_name } " )
@@ -286,14 +295,15 @@ def clear_existing_db(self):
286295 cd .delete ()
287296
288297 # Clearing existing users and databases
289- terminate_session = (
290- f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity " f"WHERE datname = '{ self .database_name } ';"
291- )
292- drop_database = f"drop database if exists { self .database_name } ;"
293- drop_user = f"drop user if exists { self .user_name } ;"
298+ db_ident = sql .Identifier (self .database_name )
299+ user_ident = sql .Identifier (self .user_name )
300+ terminate_session = sql .SQL (
301+ "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = {};"
302+ ).format (sql .Literal (self .database_name ))
303+ drop_database = sql .SQL ("DROP DATABASE IF EXISTS {};" ).format (db_ident )
304+ drop_user = sql .SQL ("DROP USER IF EXISTS {};" ).format (user_ident )
294305 sql_statements = [
295306 terminate_session ,
296- # revoke_privilege,
297307 drop_database ,
298308 drop_user ,
299309 ]
@@ -302,17 +312,22 @@ def clear_existing_db(self):
302312
303313 def create_new_database (self ):
304314 """This method is used to create a new database."""
305- create_db_query = f"CREATE DATABASE { self .database_name } ;"
306- user_check_query = f"SELECT 1 FROM pg_roles WHERE rolname = { self .user_name } "
307- create_user_query = f"CREATE USER { self .user_name } WITH ENCRYPTED PASSWORD '{ self .password } ';"
308- grant_role_query = f"GRANT ALL PRIVILEGES ON DATABASE { self .database_name } TO { self .user_name } ;"
315+ db_ident = sql .Identifier (self .database_name )
316+ user_ident = sql .Identifier (self .user_name )
317+ create_db_query = sql .SQL ("CREATE DATABASE {};" ).format (db_ident )
318+ create_user_query = sql .SQL ("CREATE USER {} WITH ENCRYPTED PASSWORD {};" ).format (
319+ user_ident , sql .Literal (self .password )
320+ )
321+ grant_role_query = sql .SQL ("GRANT ALL PRIVILEGES ON DATABASE {} TO {};" ).format (db_ident , user_ident )
309322 statements = [create_db_query ]
310323 if not self .user_exist ():
311324 statements .append (create_user_query )
312325 statements .append (grant_role_query )
313326 if self ._clone_db :
314327 logging .info (f"creating(cloning) new sample db with the name - { self .database_name } " )
315- create_template_db_query = f"CREATE DATABASE { self .database_name } TEMPLATE { self .master_db_name } ;"
328+ create_template_db_query = sql .SQL ("CREATE DATABASE {} TEMPLATE {};" ).format (
329+ db_ident , sql .Identifier (self .master_db_name )
330+ )
316331 statements [0 ] = create_template_db_query
317332 try :
318333 self .execute_sql_queries (statements = statements )
@@ -347,9 +362,11 @@ def user_exist(self) -> bool:
347362 def grant_permissions (self ):
348363 """This method is used to grant the permissions to the user and
349364 database."""
365+ user_ident = sql .Identifier (self .user_name )
366+ db_ident = sql .Identifier (self .database_name )
350367 statements = [
351- f "CREATE USER { self . user_name } WITH ENCRYPTED PASSWORD ' { self . password } ';" ,
352- f "GRANT ALL PRIVILEGES ON DATABASE { self . database_name } TO { self . user_name } ;" ,
368+ sql . SQL ( "CREATE USER {} WITH ENCRYPTED PASSWORD {};" ). format ( user_ident , sql . Literal ( self . password )) ,
369+ sql . SQL ( "GRANT ALL PRIVILEGES ON DATABASE {} TO {};" ). format ( db_ident , user_ident ) ,
353370 ]
354371 self .execute_sql_queries (statements = statements )
355372 logging .info (f"new sample db and user created with the name - { self .user_name } database - { self .database_name } " )
@@ -410,8 +427,9 @@ def create_schemas(self):
410427 # Create schemas using raw SQL
411428 schemas = ["raw" , "dev" , "stg" , "prod" ]
412429 for schema in schemas :
413- statements .append (f"DROP SCHEMA IF EXISTS { schema } CASCADE;" )
414- statements .append (f"CREATE SCHEMA IF NOT EXISTS { schema } ;" )
430+ schema_ident = sql .Identifier (schema )
431+ statements .append (sql .SQL ("DROP SCHEMA IF EXISTS {} CASCADE;" ).format (schema_ident ))
432+ statements .append (sql .SQL ("CREATE SCHEMA IF NOT EXISTS {};" ).format (schema_ident ))
415433 self .execute_sql_queries (statements = statements )
416434 logging .info ("schemas created successfully" )
417435 except Exception as e :
0 commit comments