11import logging
2- import psycopg2
32import re
4-
53from datetime import datetime , timezone
4+ from typing import IO , AnyStr , List , Optional
5+
6+ import psycopg
7+ from django .core .management import CommandParser
68from django .core .management .base import BaseCommand
7- from typing import IO , List , AnyStr , Optional
89
9- from usaspending_api .broker .helpers .last_load_date import get_last_load_date , update_last_load_date
10- from usaspending_api .common .helpers .date_helper import datetime_command_line_argument_type
10+ from usaspending_api .broker .helpers .last_load_date import (
11+ get_last_load_date ,
12+ update_last_load_date ,
13+ )
14+ from usaspending_api .common .helpers .date_helper import (
15+ datetime_command_line_argument_type ,
16+ )
1117from usaspending_api .common .helpers .etl_helpers import update_c_to_d_linkages
1218from usaspending_api .common .helpers .sql_helpers import get_database_dsn_string
1319from usaspending_api .common .retrieve_file_from_uri import RetrieveFileFromUri
14- from usaspending_api .etl .award_helpers import update_awards , update_procurement_awards , prune_empty_awards
15- from usaspending_api .etl .transaction_loaders .fpds_loader import load_fpds_transactions , failed_ids , delete_stale_fpds
16- from usaspending_api .transactions .transaction_delete_journal_helpers import retrieve_deleted_fpds_transactions
20+ from usaspending_api .etl .award_helpers import (
21+ prune_empty_awards ,
22+ update_awards ,
23+ update_procurement_awards ,
24+ )
25+ from usaspending_api .etl .transaction_loaders .fpds_loader import (
26+ delete_stale_fpds ,
27+ failed_ids ,
28+ load_fpds_transactions ,
29+ )
30+ from usaspending_api .transactions .transaction_delete_journal_helpers import (
31+ retrieve_deleted_fpds_transactions ,
32+ )
1733
1834logger = logging .getLogger ("script" )
1935
@@ -28,12 +44,13 @@ class Command(BaseCommand):
2844 modified_award_ids = []
2945
3046 @staticmethod
31- def get_cursor_for_date_query (connection , date , count = False ):
47+ def get_cursor_for_date_query (
48+ connection : psycopg .Connection , date : datetime , count : bool = False
49+ ) -> psycopg .Cursor :
50+ db_cursor = connection .cursor ()
3251 if count :
33- db_cursor = connection .cursor ()
3452 db_query = ALL_FPDS_QUERY .format ("COUNT(*)" )
3553 else :
36- db_cursor = connection .cursor ("fpds_load" , cursor_factory = psycopg2 .extras .DictCursor )
3754 db_query = ALL_FPDS_QUERY .format ("detached_award_procurement_id" )
3855
3956 if date :
@@ -54,7 +71,7 @@ def load_fpds_incrementally(self, date: Optional[datetime], chunk_size: int = CH
5471 stale_awards = delete_stale_fpds (detached_award_procurement_ids )
5572 self .modified_award_ids .extend (stale_awards )
5673
57- with psycopg2 .connect (dsn = get_database_dsn_string ()) as connection :
74+ with psycopg .connect (get_database_dsn_string ()) as connection :
5875 logger .info ("Fetching records to update" )
5976 total_records = self .get_cursor_for_date_query (connection , date , True ).fetchall ()[0 ][0 ]
6077 records_processed = 0
@@ -93,7 +110,7 @@ def load_fpds_from_file(self, file_path: str) -> None:
93110 logger .info (f"Total transaction IDs in file: { total_count } " )
94111
95112 @staticmethod
96- def update_award_records (awards , skip_cd_linkage = True ):
113+ def update_award_records (awards : list , skip_cd_linkage : bool = True ) -> None :
97114 if awards :
98115 unique_awards = set (awards )
99116 logger .info (f"{ len (unique_awards )} award records impacted by transaction DML operations" )
@@ -107,7 +124,7 @@ def update_award_records(awards, skip_cd_linkage=True):
107124 else :
108125 logger .info ("No award records to update" )
109126
110- def add_arguments (self , parser ) :
127+ def add_arguments (self , parser : CommandParser ) -> None :
111128 mutually_exclusive_group = parser .add_mutually_exclusive_group (required = True )
112129
113130 mutually_exclusive_group .add_argument (
@@ -131,16 +148,17 @@ def add_arguments(self, parser):
131148 "--file" ,
132149 metavar = "FILEPATH" ,
133150 type = str ,
134- help = "Load/Reload transactions using the detached_award_procurement_id list stored at this file path (one ID per line) "
135- "to reload, one ID per line. Nonexistent IDs will be ignored." ,
151+ help = "Load/Reload transactions using the detached_award_procurement_id list stored at this file path "
152+ "(one ID per line) to reload, one ID per line. Nonexistent IDs will be ignored." ,
136153 )
137154 mutually_exclusive_group .add_argument (
138155 "--reload-all" ,
139156 action = "store_true" ,
140- help = "Script will load or reload all FPDS records in source tables, from all time. This does NOT clear the USAspending database first" ,
157+ help = "Script will load or reload all FPDS records in source tables, from all time. "
158+ "This does NOT clear the USAspending database first" ,
141159 )
142160
143- def handle (self , * args , ** options ):
161+ def handle (self , * args , ** options ) -> None :
144162
145163 # Record script execution start time to update the FPDS last updated date in DB as appropriate
146164 update_time = datetime .now (timezone .utc )
@@ -173,7 +191,8 @@ def handle(self, *args, **options):
173191 raise SystemExit (1 )
174192
175193 if options ["reload_all" ] or options ["since_last_load" ]:
176- # we wait until after the load finishes to update the load date because if this crashes we'll need to load again
194+ # we wait until after the load finishes to update the load date
195+ # because if this crashes we'll need to load again
177196 update_last_load_date ("fpds" , update_time )
178197
179- logger .info (f "Successfully Completed" )
198+ logger .info ("Successfully Completed" )
0 commit comments