diff --git a/.github/workflows/test-non-spark-integration.yaml b/.github/workflows/test-non-spark-integration.yaml index 52a40a221f..b5a69d3cb3 100644 --- a/.github/workflows/test-non-spark-integration.yaml +++ b/.github/workflows/test-non-spark-integration.yaml @@ -45,11 +45,31 @@ jobs: path: data-act-broker-backend ref: ${{ needs.Setup-Broker-Branch.outputs.branch }} + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Init Python Environment uses: ./usaspending-api/.github/actions/init-python-environment with: working-directory: ./usaspending-api + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Init Test Environment uses: ./usaspending-api/.github/actions/init-test-environment with: @@ -57,6 +77,16 @@ jobs: is-spark-test: false working-directory: ./usaspending-api + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Run Test Cases uses: ./usaspending-api/.github/actions/run-pytest with: @@ -64,3 +94,13 @@ jobs: include-glob: '**/tests/integration/*' marker: '(not signal_handling and not spark)' working-directory: ./usaspending-api + + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h diff --git a/usaspending_api/common/tracing.py b/usaspending_api/common/tracing.py index 4c80f3da2e..cdb7f1146e 100644 --- a/usaspending_api/common/tracing.py +++ b/usaspending_api/common/tracing.py @@ -6,9 +6,10 @@ # Standard library imports import logging +from types import TracebackType # Typing imports -from typing import Callable, Optional +from typing import Callable, Optional, Type # OpenTelemetry imports from opentelemetry import trace @@ -22,7 +23,9 @@ def _activate_trace_filter(filter_class: Callable) -> None: if not hasattr(tracer, "_filters"): - _logger.warning("OpenTelemetry does not support direct filter activation on tracer") + _logger.warning( + "OpenTelemetry does not support direct filter activation on tracer" + ) else: if tracer._filters: tracer._filters.append(filter_class()) @@ -42,18 +45,14 @@ class OpenTelemetryEagerlyDropTraceFilter: EAGERLY_DROP_TRACE_KEY = "EAGERLY_DROP_TRACE" @classmethod - def activate(cls): + def activate(cls) -> None: _activate_trace_filter(cls) @classmethod - def drop(cls, span: trace.Span): + def drop(cls, span: trace.Span) -> None: span.set_status(Status(StatusCode.ERROR)) span.set_attribute(cls.EAGERLY_DROP_TRACE_KEY, True) - def process_trace(self, trace: trace): - """Drop trace if any span attribute has tag with key 'EAGERLY_DROP_TRACE'""" - return None if any(span.get_attribute(self.EAGERLY_DROP_TRACE_KEY) for span in trace) else trace - class SubprocessTrace: """ @@ -82,15 +81,26 @@ def __enter__(self) -> trace.Span: self.span.set_attribute(key, value) return self.span - def __exit__(self, exc_type, exc_val, exc_tb): - self.span.__exit__(exc_type, exc_val, exc_tb) - # End the span or handle any cleanup - if self.span: - if exc_type: - # Handle exception metadata - self.span.set_status(trace.Status(trace.StatusCode.ERROR, description=str(exc_val))) - self.span.record_exception(exc_val) - self.span.end() + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + try: + self.span.__exit__(exc_type, exc_val, exc_tb) + # End the span or handle any cleanup + if self.span: + if exc_type: + # Handle exception metadata + self.span.set_status( + trace.Status(trace.StatusCode.ERROR, description=str(exc_val)) + ) + self.span.record_exception(exc_val) + self.span.end() + except Exception as e: + _logger.exception(f"Failed to exit subprocess trace. {e}") + raise class OpenTelemetryLoggingTraceFilter: @@ -99,17 +109,5 @@ class OpenTelemetryLoggingTraceFilter: _log = logging.getLogger(f"{__name__}.OpenTelemetryLoggingTraceFilter") @classmethod - def activate(cls): + def activate(cls) -> None: _activate_trace_filter(cls) - - def process_trace(self, trace): - logged = False - trace_id = "???" - for span in trace: - trace_id = span.context.trace_id or "???" - if not span.get_attribute(OpenTelemetryEagerlyDropTraceFilter.EAGERLY_DROP_TRACE_KEY): - logged = True - self._log.info(f"----[SPAN#{trace_id}]" + "-" * 40 + f"\n{span}") - if logged: - self._log.info(f"====[END TRACE#{trace_id}]" + "=" * 35) - return trace diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 96cab0970a..3c37329f1f 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -20,24 +20,43 @@ from opentelemetry import trace from opentelemetry.trace import SpanKind -from usaspending_api.awards.v2.lookups.lookups import assistance_type_mapping, contract_type_mapping, idv_type_mapping -from usaspending_api.common.csv_helpers import count_rows_in_delimited_file, partition_large_delimited_file +from usaspending_api.awards.v2.lookups.lookups import ( + assistance_type_mapping, + contract_type_mapping, + idv_type_mapping, +) +from usaspending_api.common.csv_helpers import ( + count_rows_in_delimited_file, + partition_large_delimited_file, +) from usaspending_api.common.exceptions import InvalidParameterException from usaspending_api.common.helpers.orm_helpers import generate_raw_quoted_query -from usaspending_api.common.helpers.s3_helpers import download_s3_object, multipart_upload +from usaspending_api.common.helpers.s3_helpers import ( + download_s3_object, + multipart_upload, +) from usaspending_api.common.helpers.text_helpers import slugify_text_for_file_names from usaspending_api.common.tracing import SubprocessTrace from usaspending_api.download.download_utils import construct_data_date_range from usaspending_api.download.filestreaming import NAMING_CONFLICT_DISCRIMINATOR from usaspending_api.download.filestreaming.download_source import DownloadSource -from usaspending_api.download.filestreaming.file_description import build_file_description, save_file_description +from usaspending_api.download.filestreaming.file_description import ( + build_file_description, + save_file_description, +) from usaspending_api.download.filestreaming.zip_file import append_files_to_zip_file from usaspending_api.download.helpers import verify_requested_columns_available from usaspending_api.download.helpers import write_to_download_log as write_to_log -from usaspending_api.download.lookups import FILE_FORMATS, JOB_STATUS_DICT, VALUE_MAPPINGS +from usaspending_api.download.lookups import ( + FILE_FORMATS, + JOB_STATUS_DICT, + VALUE_MAPPINGS, +) from usaspending_api.download.models.download_job import DownloadJob from usaspending_api.download.models.download_job_lookup import DownloadJobLookup -from usaspending_api.search.filters.time_period.decorators import NEW_AWARDS_ONLY_KEYWORD +from usaspending_api.search.filters.time_period.decorators import ( + NEW_AWARDS_ONLY_KEYWORD, +) from usaspending_api.settings import MAX_DOWNLOAD_LIMIT DOWNLOAD_VISIBILITY_TIMEOUT = 60 * 10 @@ -52,7 +71,9 @@ tracer = trace.get_tracer_provider().get_tracer(__name__) -def generate_download(download_job: DownloadJob, origination: Optional[str] = None): +def generate_download( # noqa: PLR0912,PLR0915 + download_job: DownloadJob, origination: Optional[str] = None +) -> str | None: """Create data archive files from the download job object""" # Parse data from download_job @@ -82,14 +103,24 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No "download_job_id": str(download_job.download_job_id), "download_job_status": str(download_job.job_status.name), "download_file_name": str(download_job.file_name), - "download_file_size": download_job.file_size if download_job.file_size is not None else 0, - "number_of_rows": download_job.number_of_rows if download_job.number_of_rows is not None else 0, + "download_file_size": download_job.file_size + if download_job.file_size is not None + else 0, + "number_of_rows": download_job.number_of_rows + if download_job.number_of_rows is not None + else 0, "number_of_columns": ( - download_job.number_of_columns if download_job.number_of_columns is not None else 0 + download_job.number_of_columns + if download_job.number_of_columns is not None + else 0 ), - "error_message": download_job.error_message if download_job.error_message else "", + "error_message": download_job.error_message + if download_job.error_message + else "", "monthly_download": str(download_job.monthly_download), - "json_request": str(download_job.json_request) if download_job.json_request else "", + "json_request": str(download_job.json_request) + if download_job.json_request + else "", "file_name": str(file_name), } ) @@ -104,13 +135,17 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No ) as limit_exceeded: limit_exceeded.set_attributes( { - "message": f"Unable to process this download because it includes more than the current limit of {MAX_DOWNLOAD_LIMIT} records", + "message": ( + "Unable to process this download because it includes more than the current limit of" + f" {MAX_DOWNLOAD_LIMIT} records" + ), "limit": limit, } ) raise Exception( - f"Unable to process this download because it includes more than the current limit of {MAX_DOWNLOAD_LIMIT} records" + "Unable to process this download because it includes more than the current limit of" + f" {MAX_DOWNLOAD_LIMIT} records" ) # Create temporary files and working directory @@ -131,12 +166,26 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No source_column_count = len(source.columns(columns)) if source_column_count == 0: create_empty_data_file( - source, download_job, working_dir, piid, assistance_id, zip_file_path, file_format + source, + download_job, + working_dir, + piid, + assistance_id, + zip_file_path, + file_format, ) else: download_job.number_of_columns += source_column_count parse_source( - source, columns, download_job, working_dir, piid, assistance_id, zip_file_path, limit, file_format + source, + columns, + download_job, + working_dir, + piid, + assistance_id, + zip_file_path, + limit, + file_format, ) include_data_dictionary = json_request.get("include_data_dictionary") if include_data_dictionary: @@ -144,7 +193,9 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No include_file_description = json_request.get("include_file_description") if include_file_description: write_to_log(message="Adding file description to zip file") - file_description = build_file_description(include_file_description["source"], sources) + file_description = build_file_description( + include_file_description["source"], sources + ) file_description = file_description.replace("[AWARD_ID]", str(award_id)) file_description_path = save_file_description( working_dir, include_file_description["destination"], file_description @@ -167,7 +218,7 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No } ) fail_download(download_job, e, exc_msg) - raise InvalidParameterException(e) + raise InvalidParameterException(e) from e except Exception as e: # Set error message; job_status_id will be set in download_sqs_worker.handle() exc_msg = "An exception was raised while attempting to process the DownloadJob" @@ -191,7 +242,9 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No if working_dir and os.path.exists(working_dir): shutil.rmtree(working_dir) _kill_spawned_processes(download_job) - DownloadJobLookup.objects.filter(download_job_id=download_job.download_job_id).delete() + DownloadJobLookup.objects.filter( + download_job_id=download_job.download_job_id + ).delete() # push file to S3 bucket, if not local if not settings.IS_LOCAL: @@ -219,7 +272,13 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No "service": "aws.s3", "span_type": "WEB", "resource": ".".join( - [multipart_upload.__module__, (multipart_upload.__qualname__ or multipart_upload.__name__)] + [ + multipart_upload.__module__, + ( + multipart_upload.__qualname__ + or multipart_upload.__name__ + ), + ] ), } ) @@ -229,11 +288,16 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No try: bucket = settings.BULK_DOWNLOAD_S3_BUCKET_NAME region = settings.USASPENDING_AWS_REGION - s3_span.set_attributes({"bucket": bucket, "region": region, "file": zip_file_path}) + s3_span.set_attributes( + {"bucket": bucket, "region": region, "file": zip_file_path} + ) start_uploading = time.perf_counter() - multipart_upload(bucket, region, zip_file_path, os.path.basename(zip_file_path)) + multipart_upload( + bucket, region, zip_file_path, os.path.basename(zip_file_path) + ) write_to_log( - message=f"Uploading took {time.perf_counter() - start_uploading:.2f}s", download_job=download_job + message=f"Uploading took {time.perf_counter() - start_uploading:.2f}s", + download_job=download_job, ) except Exception as e: exc_msg = "An exception was raised while attempting to upload the file" @@ -253,7 +317,7 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No # Set error message; job_status_id will be set in download_sqs_worker.handle() fail_download(download_job, e, exc_msg) if isinstance(e, InvalidParameterException): - raise InvalidParameterException(e) + raise InvalidParameterException(e) from e else: raise Exception(download_job.error_message) from e finally: @@ -265,8 +329,10 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No return finish_download(download_job) -def get_download_sources( - json_request: dict, download_job: DownloadJob = None, origination: Optional[str] = None +def get_download_sources( # noqa: PLR0912, PLR0915 + json_request: dict, + download_job: DownloadJob = None, + origination: Optional[str] = None, ) -> List[DownloadSource]: download_sources = [] for download_type in json_request["download_types"]: @@ -288,7 +354,8 @@ def get_download_sources( # download requests `new_awards_only` we only want to apply this # date type to the prime award summaries if json_request["filters"].get("time_period") is not None and ( - download_type == "sub_awards" or download_type == "elasticsearch_sub_awards" + download_type == "sub_awards" + or download_type == "elasticsearch_sub_awards" ): for time_period in filters["time_period"]: if time_period.get("date_type") == NEW_AWARDS_ONLY_KEYWORD: @@ -305,34 +372,58 @@ def get_download_sources( queryset = filter_function(filters) if filters.get("prime_and_sub_award_types") is not None: - award_type_codes = set(filters["prime_and_sub_award_types"][download_type]) + award_type_codes = set( + filters["prime_and_sub_award_types"][download_type] + ) else: award_type_codes = set(filters["award_type_codes"]) if ( - award_type_codes & (set(contract_type_mapping.keys()) | set(idv_type_mapping.keys())) + award_type_codes + & (set(contract_type_mapping.keys()) | set(idv_type_mapping.keys())) or "procurement" in award_type_codes ): # only generate d1 files if the user is asking for contract data d1_source = DownloadSource( - VALUE_MAPPINGS[download_type]["table_name"], "d1", download_type, agency_id, filters + VALUE_MAPPINGS[download_type]["table_name"], + "d1", + download_type, + agency_id, + filters, + ) + d1_filters = { + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True + } + d1_source.queryset = queryset & download_type_table.objects.filter( + **d1_filters ) - d1_filters = {f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True} - d1_source.queryset = queryset & download_type_table.objects.filter(**d1_filters) download_sources.append(d1_source) - if award_type_codes & set(assistance_type_mapping.keys()) or ("grant" in award_type_codes): + if award_type_codes & set(assistance_type_mapping.keys()) or ( + "grant" in award_type_codes + ): # only generate d2 files if the user is asking for assistance data d2_source = DownloadSource( - VALUE_MAPPINGS[download_type]["table_name"], "d2", download_type, agency_id, filters + VALUE_MAPPINGS[download_type]["table_name"], + "d2", + download_type, + agency_id, + filters, + ) + d2_filters = { + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False + } + d2_source.queryset = queryset & download_type_table.objects.filter( + **d2_filters ) - d2_filters = {f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False} - d2_source.queryset = queryset & download_type_table.objects.filter(**d2_filters) download_sources.append(d2_source) elif VALUE_MAPPINGS[download_type]["source_type"] == "account": # Account downloads - filters = {**json_request["filters"], **json_request.get("account_filters", {})} + filters = { + **json_request["filters"], + **json_request.get("account_filters", {}), + } if "is_fpds_join" in VALUE_MAPPINGS[download_type]: # Contracts @@ -343,7 +434,10 @@ def get_download_sources( agency_id, extra_file_type="Contracts_", ) - d1_filters = {**filters, f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True} + d1_filters = { + **filters, + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True, + } d1_account_source.queryset = filter_function( download_type, VALUE_MAPPINGS[download_type]["table"], @@ -360,7 +454,10 @@ def get_download_sources( agency_id, extra_file_type="Assistance_", ) - d2_filters = {**filters, f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False} + d2_filters = { + **filters, + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False, + } d2_account_source.queryset = filter_function( download_type, VALUE_MAPPINGS[download_type]["table"], @@ -388,7 +485,10 @@ def get_download_sources( else: account_source = DownloadSource( - VALUE_MAPPINGS[download_type]["table_name"], json_request["account_level"], download_type, agency_id + VALUE_MAPPINGS[download_type]["table_name"], + json_request["account_level"], + download_type, + agency_id, ) account_source.queryset = filter_function( download_type, @@ -408,70 +508,20 @@ def get_download_sources( ) disaster_source.award_category = json_request["award_category"] disaster_source.queryset = filter_function( - json_request["filters"], download_type, VALUE_MAPPINGS[download_type]["base_fields"] + json_request["filters"], + download_type, + VALUE_MAPPINGS[download_type]["base_fields"], ) download_sources.append(disaster_source) - verify_requested_columns_available(tuple(download_sources), json_request.get("columns", [])) + verify_requested_columns_available( + tuple(download_sources), json_request.get("columns", []) + ) return download_sources -def build_data_file_name(source, download_job, piid, assistance_id): - if download_job and download_job.monthly_download: - # For monthly archives, use the existing detailed zip filename for the data files - # e.g. FY(All)-012_Contracts_Delta_20191108.zip -> FY(All)-012_Contracts_Delta_20191108_%.csv - return strip_file_extension(download_job.file_name) - - file_name_pattern = VALUE_MAPPINGS[source.source_type]["download_name"] - timestamp = datetime.strftime(datetime.now(timezone.utc), "%Y-%m-%d_H%HM%MS%S") - - if source.is_for_idv or source.is_for_contract: - file_name_values = {"piid": slugify_text_for_file_names(piid, "UNKNOWN", 50)} - elif source.is_for_assistance: - file_name_values = {"assistance_id": slugify_text_for_file_names(assistance_id, "UNKNOWN", 50)} - elif source.source_type == "disaster_recipient": - file_name_values = {"award_category": source.award_category, "timestamp": timestamp} - else: - d_map = {"d1": "Contracts", "d2": "Assistance", "treasury_account": "TAS", "federal_account": "FA"} - - if source.agency_code == "all": - agency = "All" - else: - agency = str(source.agency_code) - - request = json.loads(download_job.json_request) - filters = request["filters"] - - if request.get("limit") or ( - request.get("request_type") == "disaster" and source.source_type in ("elasticsearch_awards", "sub_awards") - ): - agency = "" - elif source.file_type not in ("treasury_account", "federal_account"): - agency = f"{agency}_" - - if request.get("request_type") == "disaster": - account_filters = request["account_filters"] - current_fiscal_period = ( - f"FY{account_filters['latest_fiscal_year']}P{str(account_filters['latest_fiscal_period']).zfill(2)}" - ) - data_quarters = f"{current_fiscal_period}-Present" - else: - data_quarters = construct_data_date_range(filters) - - file_name_values = { - "agency": agency, - "data_quarters": data_quarters, - "level": d_map[source.file_type], - "timestamp": timestamp, - "type": d_map[source.file_type], - "extra_file_type": source.extra_file_type, - } - - return file_name_pattern.format(**file_name_values) - - -def parse_source( +def parse_source( # noqa: PLR0913 source: DownloadSource, columns: Optional[List[str]], download_job: DownloadJob, @@ -481,7 +531,7 @@ def parse_source( zip_file_path: str, limit: int, file_format: str, -): +) -> None: """Write to delimited text file(s) and zip file(s) using the source data""" data_file_name = build_data_file_name(source, download_job, piid, assistance_id) @@ -490,39 +540,65 @@ def parse_source( source.file_name = f"{data_file_name}.{extension}" source_path = os.path.join(working_dir, source.file_name) - write_to_log(message=f"Preparing to download data as {source.file_name}", download_job=download_job) + write_to_log( + message=f"Preparing to download data as {source.file_name}", + download_job=download_job, + ) # Generate the query file; values, limits, dates fixed - export_query = generate_export_query(source_query, limit, source, columns, file_format) - temp_file, temp_file_path = generate_export_query_temp_file(export_query, download_job) + export_query = generate_export_query( + source_query, limit, source, columns, file_format + ) + temp_file, temp_file_path = generate_export_query_temp_file( + export_query, download_job + ) start_time = time.perf_counter() try: # Create a separate process to run the PSQL command; wait - psql_process = multiprocessing.Process(target=execute_psql, args=(temp_file_path, source_path, download_job)) - write_to_log(message=f"Running {source.file_name} using psql", download_job=download_job) + psql_process = multiprocessing.Process( + target=execute_psql, args=(temp_file_path, source_path, download_job) + ) + write_to_log( + message=f"Running {source.file_name} using psql", download_job=download_job + ) psql_process.start() wait_for_process(psql_process, start_time, download_job) delim = FILE_FORMATS[file_format]["delimiter"] # Log how many rows we have - write_to_log(message="Counting rows in delimited text file", download_job=download_job) + write_to_log( + message="Counting rows in delimited text file", download_job=download_job + ) try: - number_of_rows = count_rows_in_delimited_file(filename=source_path, has_header=True, delimiter=delim) + number_of_rows = count_rows_in_delimited_file( + filename=source_path, has_header=True, delimiter=delim + ) download_job.number_of_rows += number_of_rows - write_to_log(message=f"Number of rows in text file: {number_of_rows}", download_job=download_job) + write_to_log( + message=f"Number of rows in text file: {number_of_rows}", + download_job=download_job, + ) except Exception: write_to_log( - message="Unable to obtain delimited text file line count", is_error=True, download_job=download_job + message="Unable to obtain delimited text file line count", + is_error=True, + download_job=download_job, ) download_job.save() # Create a separate process to split the large data files into smaller file and write to zip; wait zip_process = multiprocessing.Process( target=split_and_zip_data_files, - args=(zip_file_path, source_path, data_file_name, file_format, download_job), + args=( + zip_file_path, + source_path, + data_file_name, + file_format, + download_job, + ), ) zip_process.start() wait_for_process(zip_process, start_time, download_job) @@ -535,7 +611,81 @@ def parse_source( os.remove(temp_file_path) -def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_format, download_job=None): +def build_data_file_name( # noqa: PLR0912 + source: DownloadSource, download_job: DownloadJob, piid: str, assistance_id: str +) -> str: + if download_job and download_job.monthly_download: + # For monthly archives, use the existing detailed zip filename for the data files + # e.g. FY(All)-012_Contracts_Delta_20191108.zip -> FY(All)-012_Contracts_Delta_20191108_%.csv + return strip_file_extension(download_job.file_name) + + file_name_pattern = VALUE_MAPPINGS[source.source_type]["download_name"] + timestamp = datetime.strftime(datetime.now(timezone.utc), "%Y-%m-%d_H%HM%MS%S") + + if source.is_for_idv or source.is_for_contract: + file_name_values = {"piid": slugify_text_for_file_names(piid, "UNKNOWN", 50)} + elif source.is_for_assistance: + file_name_values = { + "assistance_id": slugify_text_for_file_names(assistance_id, "UNKNOWN", 50) + } + elif source.source_type == "disaster_recipient": + file_name_values = { + "award_category": source.award_category, + "timestamp": timestamp, + } + else: + d_map = { + "d1": "Contracts", + "d2": "Assistance", + "treasury_account": "TAS", + "federal_account": "FA", + } + + if source.agency_code == "all": + agency = "All" + else: + agency = str(source.agency_code) + + request = json.loads(download_job.json_request) + filters = request["filters"] + + if request.get("limit") or ( + request.get("request_type") == "disaster" + and source.source_type in ("elasticsearch_awards", "sub_awards") + ): + agency = "" + elif source.file_type not in ("treasury_account", "federal_account"): + agency = f"{agency}_" + + if request.get("request_type") == "disaster": + account_filters = request["account_filters"] + current_fiscal_period = ( + f"FY{account_filters['latest_fiscal_year']}" + f"P{str(account_filters['latest_fiscal_period']).zfill(2)}" + ) + data_quarters = f"{current_fiscal_period}-Present" + else: + data_quarters = construct_data_date_range(filters) + + file_name_values = { + "agency": agency, + "data_quarters": data_quarters, + "level": d_map[source.file_type], + "timestamp": timestamp, + "type": d_map[source.file_type], + "extra_file_type": source.extra_file_type, + } + + return file_name_pattern.format(**file_name_values) + + +def split_and_zip_data_files( + zip_file_path: str, + source_path: str, + data_file_name: str, + file_format: str, + download_job: DownloadJob | None = None, +) -> None: with SubprocessTrace( name=f"job.{JOB_TYPE}.download.zip", kind=SpanKind.INTERNAL, @@ -560,7 +710,10 @@ def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_fo extension = FILE_FORMATS[file_format]["extension"] output_template = f"{data_file_name}_%s.{extension}" - write_to_log(message="Beginning the delimited text file partition", download_job=download_job) + write_to_log( + message="Beginning the delimited text file partition", + download_job=download_job, + ) list_of_files = partition_large_delimited_file( download_job=download_job, file_path=source_path, @@ -574,12 +727,15 @@ def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_fo write_to_log(message=msg, download_job=download_job) # Zip the split files into one zipfile - write_to_log(message="Beginning zipping and compression", download_job=download_job) + write_to_log( + message="Beginning zipping and compression", download_job=download_job + ) log_time = time.perf_counter() append_files_to_zip_file(list_of_files, zip_file_path) write_to_log( - message=f"Writing to zipfile took {time.perf_counter() - log_time:.4f}s", download_job=download_job + message=f"Writing to zipfile took {time.perf_counter() - log_time:.4f}s", + download_job=download_job, ) except Exception as e: @@ -591,7 +747,7 @@ def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_fo raise e -def start_download(download_job): +def start_download(download_job: DownloadJob) -> str: # Update job attributes download_job.job_status_id = JOB_STATUS_DICT["running"] download_job.number_of_rows = 0 @@ -599,21 +755,29 @@ def start_download(download_job): download_job.file_size = 0 download_job.save() - write_to_log(message=f"Starting to process DownloadJob {download_job.download_job_id}", download_job=download_job) + write_to_log( + message=f"Starting to process DownloadJob {download_job.download_job_id}", + download_job=download_job, + ) return download_job.file_name -def finish_download(download_job): +def finish_download(download_job: DownloadJob) -> str: download_job.job_status_id = JOB_STATUS_DICT["finished"] download_job.save() - write_to_log(message=f"Finished processing DownloadJob {download_job.download_job_id}", download_job=download_job) + write_to_log( + message=f"Finished processing DownloadJob {download_job.download_job_id}", + download_job=download_job, + ) return download_job.file_name -def wait_for_process(process, start_time, download_job): +def wait_for_process( + process: multiprocessing.Process, start_time: float, download_job: DownloadJob +) -> float: """Wait for the process to complete, throw errors for timeouts or Process exceptions""" log_time = time.perf_counter() @@ -633,27 +797,46 @@ def wait_for_process(process, start_time, download_job): sleep_count += 1 over_time = (time.perf_counter() - start_time) >= MAX_VISIBILITY_TIMEOUT - if download_job and (not download_job.monthly_download and over_time) or process.exitcode != 0: + err = None + if ( + download_job + and (not download_job.monthly_download and over_time) + or process.exitcode != 0 + ): + logger.info("**********") + logger.info(f"DownloadJob error: {download_job.error_message}") + logger.info(f"Is overtime: {over_time}") + logger.info(f"Process Exitcode: {process.exitcode}") + logger.info(f"Process repr: {repr(process)}") + logger.info("**********") if process.is_alive(): # Process is running for longer than MAX_VISIBILITY_TIMEOUT, kill it write_to_log( - message=f"Attempting to terminate process (pid {process.pid})", download_job=download_job, is_error=True + message=f"Attempting to terminate process (pid {process.pid})", + download_job=download_job, + is_error=True, ) process.terminate() - e = TimeoutError( + err = TimeoutError( f"DownloadJob {download_job.download_job_id} lasted longer than {MAX_VISIBILITY_TIMEOUT / 3600} hours" ) else: # An error occurred in the process - e = Exception("Command failed. Please see the logs for details.") + err = Exception("Command failed. Please see the logs for details.") - raise e + process.join() + if err: + raise err return time.perf_counter() - log_time def generate_export_query( - source_query: QuerySet, limit: int, source: DownloadSource, column_subset: Optional[List[str]], file_format: str + source_query: QuerySet, + limit: int, + source: DownloadSource, + column_subset: Optional[List[str]], + file_format: str, ) -> str: if limit: source_query = source_query[:limit] @@ -674,19 +857,29 @@ def generate_export_query( annotated_group_by_columns.append(annotation_select_reverse[val]) query_annotated = apply_annotations_to_sql( - generate_raw_quoted_query(source_query), selected_columns, annotated_group_by_columns + generate_raw_quoted_query(source_query), + selected_columns, + annotated_group_by_columns, ) options = FILE_FORMATS[file_format]["options"] return rf"\COPY ({query_annotated}) TO STDOUT {options}" -def generate_export_query_temp_file(export_query, download_job, temp_dir=None): - write_to_log(message=f"Saving PSQL Query: {export_query}", download_job=download_job, is_debug=True) +def generate_export_query_temp_file( + export_query: str, download_job: DownloadJob, temp_dir: str | None = None +) -> tuple[str, str]: + write_to_log( + message=f"Saving PSQL Query: {export_query}", + download_job=download_job, + is_debug=True, + ) dir_name = "/tmp" if temp_dir: dir_name = temp_dir # Create a unique temporary file to hold the raw query, using \copy - (temp_sql_file, temp_sql_file_path) = tempfile.mkstemp(prefix="bd_sql_", dir=dir_name) + (temp_sql_file, temp_sql_file_path) = tempfile.mkstemp( + prefix="bd_sql_", dir=dir_name + ) with open(temp_sql_file_path, "w") as file: file.write(export_query) @@ -695,8 +888,10 @@ def generate_export_query_temp_file(export_query, download_job, temp_dir=None): def apply_annotations_to_sql( - raw_query: str, aliases: List[str], annotated_group_by_columns: Optional[List[str]] = None -): + raw_query: str, + aliases: List[str], + annotated_group_by_columns: Optional[List[str]] = None, +) -> str: """ Django's ORM understandably doesn't allow aliases to be the same names as other fields available. However, if we want to use the efficiency of psql's COPY method and keep the column names, we need to allow these scenarios. This @@ -707,7 +902,9 @@ def apply_annotations_to_sql( DIRECT_SELECT_QUERY_REGEX = r'^[^ ]*\."[^"]*"$' # Django is pretty consistent with how it prints out queries # Create a list from the non-derived values between SELECT and FROM - selects_list = [val for val in select_statements if re.search(DIRECT_SELECT_QUERY_REGEX, val)] + selects_list = [ + val for val in select_statements if re.search(DIRECT_SELECT_QUERY_REGEX, val) + ] # Create a list from the derived values between SELECT and FROM aliased_list = [ @@ -742,7 +939,9 @@ def apply_annotations_to_sql( # It is assumed that all non-positional values in the GROUP BY are column names meaning the first number # matching the value of "idx" will be the position. However, this is not guaranteed in the rest of the # query, so we make sure to stop after the first match found. - second_half_query = second_half_query.replace(f" {idx}", f" {{idx_{idx}}}", 1) + second_half_query = second_half_query.replace( + f" {idx}", f" {{idx_{idx}}}", 1 + ) second_half_query = second_half_query.format( **{f"idx_{idx}": col_select for idx, col_select in group_by_to_replace} ) @@ -750,10 +949,13 @@ def apply_annotations_to_sql( # Match aliases with their values values_list = [ - f'{deriv_dict[alias] if alias in deriv_dict else selects_list.pop(0)} AS "{alias}"' for alias in aliases + f'{deriv_dict[alias] if alias in deriv_dict else selects_list.pop(0)} AS "{alias}"' + for alias in aliases ] - sql = raw_query.replace(_top_level_split(raw_query, "FROM")[0], f"SELECT {', '.join(values_list)} ", 1) + sql = raw_query.replace( + _top_level_split(raw_query, "FROM")[0], f"SELECT {', '.join(values_list)} ", 1 + ) if cte_sql: sql = f"{cte_sql} {sql}" @@ -768,7 +970,7 @@ def apply_annotations_to_sql( return sql.replace(NAMING_CONFLICT_DISCRIMINATOR, "") -def _select_columns(sql: str) -> Tuple[str, List[str]]: +def _select_columns(sql: str) -> Tuple[str, List[str]]: # noqa: PLR0912 in_quotes = False in_cte = False parens_depth = 0 @@ -811,7 +1013,7 @@ def _select_columns(sql: str) -> Tuple[str, List[str]]: return cte_sql, retval # this will almost certainly error out later. -def _top_level_split(sql, splitter): +def _top_level_split(sql: str, splitter: str) -> str: in_quotes = False parens_depth = 0 for index, char in enumerate(sql): @@ -830,7 +1032,9 @@ def _top_level_split(sql, splitter): raise Exception(f"SQL string ${sql} cannot be split on ${splitter}") -def execute_psql(temp_sql_file_path, source_path, download_job): +def execute_psql( + temp_sql_file_path: str, source_path: str, download_job: DownloadJob +) -> None: """Executes a single PSQL command within its own Subprocess""" download_sql = Path(temp_sql_file_path).read_text() if download_sql.startswith("\\COPY"): @@ -844,82 +1048,128 @@ def execute_psql(temp_sql_file_path, source_path, download_job): service="bulk-download", ) - with subprocess_trace as span: - span.set_attributes( - { - "service": "bulk-download", - "resource": str(download_sql), - "span_type": "Internal", - "source_path": str(source_path), - # download job details - "download_job_id": str(download_job.download_job_id), - "download_job_status": str(download_job.job_status.name), - "download_file_name": str(download_job.file_name), - "download_file_size": download_job.file_size if download_job.file_size is not None else 0, - "number_of_rows": download_job.number_of_rows if download_job.number_of_rows is not None else 0, - "number_of_columns": ( - download_job.number_of_columns if download_job.number_of_columns is not None else 0 - ), - "error_message": download_job.error_message if download_job.error_message else "", - "monthly_download": str(download_job.monthly_download), - "json_request": str(download_job.json_request) if download_job.json_request else "", - } - ) + try: + with subprocess_trace as span: + span.set_attributes( + { + "service": "bulk-download", + "resource": str(download_sql), + "span_type": "Internal", + "source_path": str(source_path), + # download job details + "download_job_id": str(download_job.download_job_id), + "download_job_status": str(download_job.job_status.name), + "download_file_name": str(download_job.file_name), + "download_file_size": download_job.file_size + if download_job.file_size is not None + else 0, + "number_of_rows": download_job.number_of_rows + if download_job.number_of_rows is not None + else 0, + "number_of_columns": ( + download_job.number_of_columns + if download_job.number_of_columns is not None + else 0 + ), + "error_message": download_job.error_message + if download_job.error_message + else "", + "monthly_download": str(download_job.monthly_download), + "json_request": str(download_job.json_request) + if download_job.json_request + else "", + } + ) - try: - log_time = time.perf_counter() - temp_env = os.environ.copy() - if download_job and not download_job.monthly_download: - # Since terminating the process isn't guaranteed to end the DB statement, add timeout to client connection - temp_env["PGOPTIONS"] = ( - f"--statement-timeout={settings.DOWNLOAD_DB_TIMEOUT_IN_HOURS}h " - f"--work-mem={settings.DOWNLOAD_DB_WORK_MEM_IN_MB}MB" + try: + log_time = time.perf_counter() + temp_env = os.environ.copy() + if download_job and not download_job.monthly_download: + # Terminating the process isn't guaranteed to end the DB statement; add timeout to client connection + temp_env["PGOPTIONS"] = ( + f"--statement-timeout={settings.DOWNLOAD_DB_TIMEOUT_IN_HOURS}h " + f"--work-mem={settings.DOWNLOAD_DB_WORK_MEM_IN_MB}MB" + ) + + cat_command = subprocess.Popen( + ["cat", temp_sql_file_path], stdout=subprocess.PIPE + ) + subprocess.check_output( + [ + "psql", + "-q", + "-o", + source_path, + retrieve_db_string(), + "-v", + "ON_ERROR_STOP=1", + ], + stdin=cat_command.stdout, + stderr=subprocess.STDOUT, + env=temp_env, ) - cat_command = subprocess.Popen(["cat", temp_sql_file_path], stdout=subprocess.PIPE) - subprocess.check_output( - ["psql", "-q", "-o", source_path, retrieve_db_string(), "-v", "ON_ERROR_STOP=1"], - stdin=cat_command.stdout, - stderr=subprocess.STDOUT, - env=temp_env, - ) + # Wait for initial process to close and terminate to free up resources + cat_command.stdout.close() + cat_command.wait() - duration = time.perf_counter() - log_time - write_to_log( - message=f"Wrote {os.path.basename(source_path)}, took {duration:.4f} seconds", - download_job=download_job, - ) - except subprocess.CalledProcessError as e: - write_to_log(message=f"PSQL Error: {e.output.decode()}", is_error=True, download_job=download_job) - raise e - except Exception as e: - if not settings.IS_LOCAL: - # Not logging the command as it can contain the database connection string - e.cmd = "[redacted psql command]" - write_to_log(message=e, is_error=True, download_job=download_job) - sql = subprocess.check_output(["cat", temp_sql_file_path]).decode() - write_to_log(message=f"Faulty SQL: {sql}", is_error=True, download_job=download_job) - raise e - - -def retrieve_db_string(): + duration = time.perf_counter() - log_time + write_to_log( + message=f"Wrote {os.path.basename(source_path)}, took {duration:.4f} seconds", + download_job=download_job, + ) + except subprocess.CalledProcessError as e: + write_to_log( + message=f"PSQL Error: {e.output.decode()}", + is_error=True, + download_job=download_job, + ) + raise e + except Exception as e: + if not settings.IS_LOCAL: + # Not logging the command as it can contain the database connection string + e.cmd = "[redacted psql command]" + write_to_log(message=e, is_error=True, download_job=download_job) + sql = subprocess.check_output(["cat", temp_sql_file_path]).decode() + write_to_log( + message=f"Faulty SQL: {sql}", + is_error=True, + download_job=download_job, + ) + raise e + except Exception as e: + write_to_log( + message=f"Failed in subprocess with error: {e}", + is_error=True, + download_job=download_job, + ) + raise e + + +def retrieve_db_string() -> str: """It is necessary for this to be a function so the test suite can mock the connection string""" return settings.DOWNLOAD_DATABASE_URL -def strip_file_extension(file_name): +def strip_file_extension(file_name: str) -> str: return os.path.splitext(os.path.basename(file_name))[0] -def fail_download(download_job, exception, message): +def fail_download( + download_job: DownloadJob, exception: Exception, message: str +) -> None: write_to_log(message=message, is_error=True, download_job=download_job) - stack_trace = "".join(traceback.format_exception(type(exception), value=exception, tb=exception.__traceback__)) + stack_trace = "".join( + traceback.format_exception( + type(exception), value=exception, tb=exception.__traceback__ + ) + ) download_job.error_message = f"{message}:\n{stack_trace}" download_job.job_status_id = JOB_STATUS_DICT["failed"] download_job.save() -def add_data_dictionary_to_zip(working_dir, zip_file_path): +def add_data_dictionary_to_zip(working_dir: str, zip_file_path: str) -> None: write_to_log(message="Adding data dictionary to zip file") data_dictionary_file_name = "Data_Dictionary_Crosswalk.xlsx" data_dictionary_file_path = os.path.join(working_dir, data_dictionary_file_name) @@ -933,13 +1183,14 @@ def add_data_dictionary_to_zip(working_dir, zip_file_path): append_files_to_zip_file([data_dictionary_file_path], zip_file_path) -def _kill_spawned_processes(download_job=None): +def _kill_spawned_processes(download_job: DownloadJob) -> None: """Cleanup (kill) any spawned child processes during this job run""" job = ps.Process(os.getpid()) - for spawn_of_job in job.children(recursive=True): + child_processes = job.children(recursive=True) + for spawn_of_job in child_processes: write_to_log( - message=f"Attempting to terminate child process with PID [{spawn_of_job.pid}] and name " - f"[{spawn_of_job.name}]", + message=f"Attempting to terminate child process with PID [{spawn_of_job.pid}], name " + f"[{spawn_of_job.name()}], and status [{spawn_of_job.status()}]", download_job=download_job, is_error=True, ) @@ -948,8 +1199,17 @@ def _kill_spawned_processes(download_job=None): except ps.NoSuchProcess: pass + ps.wait_procs( + child_processes, + callback=lambda proc: write_to_log( + message=f"Terminated child process with PID[{proc.pid}]", + download_job=download_job, + is_error=True, + ), + ) + -def create_empty_data_file( +def create_empty_data_file( # noqa: PLR0913 source: DownloadSource, download_job: DownloadJob, working_dir: str, @@ -963,7 +1223,8 @@ def create_empty_data_file( source.file_name = f"{data_file_name}.{extension}" source_path = os.path.join(working_dir, source.file_name) write_to_log( - message=f"Skipping download of {source.file_name} due to no valid columns provided", download_job=download_job + message=f"Skipping download of {source.file_name} due to no valid columns provided", + download_job=download_job, ) Path(source_path).touch() append_files_to_zip_file([source_path], zip_file_path)