From e694cd3fd196b605c8da0630bf8d8fa7115d5858 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Wed, 11 Mar 2026 14:04:14 -0400 Subject: [PATCH 01/14] add try except to debug download tests --- usaspending_api/common/tracing.py | 56 +++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/usaspending_api/common/tracing.py b/usaspending_api/common/tracing.py index 4c80f3da2e..f97d63a3b3 100644 --- a/usaspending_api/common/tracing.py +++ b/usaspending_api/common/tracing.py @@ -6,9 +6,10 @@ # Standard library imports import logging +from types import TracebackType # Typing imports -from typing import Callable, Optional +from typing import Callable, Optional, Type # OpenTelemetry imports from opentelemetry import trace @@ -22,7 +23,9 @@ def _activate_trace_filter(filter_class: Callable) -> None: if not hasattr(tracer, "_filters"): - _logger.warning("OpenTelemetry does not support direct filter activation on tracer") + _logger.warning( + "OpenTelemetry does not support direct filter activation on tracer" + ) else: if tracer._filters: tracer._filters.append(filter_class()) @@ -50,9 +53,13 @@ def drop(cls, span: trace.Span): span.set_status(Status(StatusCode.ERROR)) span.set_attribute(cls.EAGERLY_DROP_TRACE_KEY, True) - def process_trace(self, trace: trace): + def process_trace(self, trace_to_process: trace) -> Optional[trace]: """Drop trace if any span attribute has tag with key 'EAGERLY_DROP_TRACE'""" - return None if any(span.get_attribute(self.EAGERLY_DROP_TRACE_KEY) for span in trace) else trace + return ( + None + if any(span.get_attribute(self.EAGERLY_DROP_TRACE_KEY) for span in trace_to_process) + else trace_to_process + ) class SubprocessTrace: @@ -82,15 +89,26 @@ def __enter__(self) -> trace.Span: self.span.set_attribute(key, value) return self.span - def __exit__(self, exc_type, exc_val, exc_tb): - self.span.__exit__(exc_type, exc_val, exc_tb) - # End the span or handle any cleanup - if self.span: - if exc_type: - # Handle exception metadata - self.span.set_status(trace.Status(trace.StatusCode.ERROR, description=str(exc_val))) - self.span.record_exception(exc_val) - self.span.end() + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + try: + self.span.__exit__(exc_type, exc_val, exc_tb) + # End the span or handle any cleanup + if self.span: + if exc_type: + # Handle exception metadata + self.span.set_status( + trace.Status(trace.StatusCode.ERROR, description=str(exc_val)) + ) + self.span.record_exception(exc_val) + self.span.end() + except Exception as e: + _logger.exception(f"Failed to exit subprocess trace. {e}") + raise class OpenTelemetryLoggingTraceFilter: @@ -99,17 +117,19 @@ class OpenTelemetryLoggingTraceFilter: _log = logging.getLogger(f"{__name__}.OpenTelemetryLoggingTraceFilter") @classmethod - def activate(cls): + def activate(cls) -> None: _activate_trace_filter(cls) - def process_trace(self, trace): + def process_trace(self, trace_to_process: trace) -> trace: logged = False trace_id = "???" - for span in trace: + for span in trace_to_process: trace_id = span.context.trace_id or "???" - if not span.get_attribute(OpenTelemetryEagerlyDropTraceFilter.EAGERLY_DROP_TRACE_KEY): + if not span.get_attribute( + OpenTelemetryEagerlyDropTraceFilter.EAGERLY_DROP_TRACE_KEY + ): logged = True self._log.info(f"----[SPAN#{trace_id}]" + "-" * 40 + f"\n{span}") if logged: self._log.info(f"====[END TRACE#{trace_id}]" + "=" * 35) - return trace + return trace_to_process From 2b886e8f41c47fed4a7ffb4ec9440189f11bd69e Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Wed, 11 Mar 2026 14:45:36 -0400 Subject: [PATCH 02/14] cleanup --- usaspending_api/common/tracing.py | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/usaspending_api/common/tracing.py b/usaspending_api/common/tracing.py index f97d63a3b3..cdb7f1146e 100644 --- a/usaspending_api/common/tracing.py +++ b/usaspending_api/common/tracing.py @@ -45,22 +45,14 @@ class OpenTelemetryEagerlyDropTraceFilter: EAGERLY_DROP_TRACE_KEY = "EAGERLY_DROP_TRACE" @classmethod - def activate(cls): + def activate(cls) -> None: _activate_trace_filter(cls) @classmethod - def drop(cls, span: trace.Span): + def drop(cls, span: trace.Span) -> None: span.set_status(Status(StatusCode.ERROR)) span.set_attribute(cls.EAGERLY_DROP_TRACE_KEY, True) - def process_trace(self, trace_to_process: trace) -> Optional[trace]: - """Drop trace if any span attribute has tag with key 'EAGERLY_DROP_TRACE'""" - return ( - None - if any(span.get_attribute(self.EAGERLY_DROP_TRACE_KEY) for span in trace_to_process) - else trace_to_process - ) - class SubprocessTrace: """ @@ -119,17 +111,3 @@ class OpenTelemetryLoggingTraceFilter: @classmethod def activate(cls) -> None: _activate_trace_filter(cls) - - def process_trace(self, trace_to_process: trace) -> trace: - logged = False - trace_id = "???" - for span in trace_to_process: - trace_id = span.context.trace_id or "???" - if not span.get_attribute( - OpenTelemetryEagerlyDropTraceFilter.EAGERLY_DROP_TRACE_KEY - ): - logged = True - self._log.info(f"----[SPAN#{trace_id}]" + "-" * 40 + f"\n{span}") - if logged: - self._log.info(f"====[END TRACE#{trace_id}]" + "=" * 35) - return trace_to_process From fbe68e981ee8099b719b1af1f3d9c6e43e0fdb81 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Wed, 11 Mar 2026 16:35:57 -0400 Subject: [PATCH 03/14] join process back to parent process --- .../filestreaming/download_generation.py | 542 +++++++++++++----- 1 file changed, 388 insertions(+), 154 deletions(-) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 96cab0970a..7a429ffc6c 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -20,24 +20,43 @@ from opentelemetry import trace from opentelemetry.trace import SpanKind -from usaspending_api.awards.v2.lookups.lookups import assistance_type_mapping, contract_type_mapping, idv_type_mapping -from usaspending_api.common.csv_helpers import count_rows_in_delimited_file, partition_large_delimited_file +from usaspending_api.awards.v2.lookups.lookups import ( + assistance_type_mapping, + contract_type_mapping, + idv_type_mapping, +) +from usaspending_api.common.csv_helpers import ( + count_rows_in_delimited_file, + partition_large_delimited_file, +) from usaspending_api.common.exceptions import InvalidParameterException from usaspending_api.common.helpers.orm_helpers import generate_raw_quoted_query -from usaspending_api.common.helpers.s3_helpers import download_s3_object, multipart_upload +from usaspending_api.common.helpers.s3_helpers import ( + download_s3_object, + multipart_upload, +) from usaspending_api.common.helpers.text_helpers import slugify_text_for_file_names from usaspending_api.common.tracing import SubprocessTrace from usaspending_api.download.download_utils import construct_data_date_range from usaspending_api.download.filestreaming import NAMING_CONFLICT_DISCRIMINATOR from usaspending_api.download.filestreaming.download_source import DownloadSource -from usaspending_api.download.filestreaming.file_description import build_file_description, save_file_description +from usaspending_api.download.filestreaming.file_description import ( + build_file_description, + save_file_description, +) from usaspending_api.download.filestreaming.zip_file import append_files_to_zip_file from usaspending_api.download.helpers import verify_requested_columns_available from usaspending_api.download.helpers import write_to_download_log as write_to_log -from usaspending_api.download.lookups import FILE_FORMATS, JOB_STATUS_DICT, VALUE_MAPPINGS +from usaspending_api.download.lookups import ( + FILE_FORMATS, + JOB_STATUS_DICT, + VALUE_MAPPINGS, +) from usaspending_api.download.models.download_job import DownloadJob from usaspending_api.download.models.download_job_lookup import DownloadJobLookup -from usaspending_api.search.filters.time_period.decorators import NEW_AWARDS_ONLY_KEYWORD +from usaspending_api.search.filters.time_period.decorators import ( + NEW_AWARDS_ONLY_KEYWORD, +) from usaspending_api.settings import MAX_DOWNLOAD_LIMIT DOWNLOAD_VISIBILITY_TIMEOUT = 60 * 10 @@ -52,7 +71,9 @@ tracer = trace.get_tracer_provider().get_tracer(__name__) -def generate_download(download_job: DownloadJob, origination: Optional[str] = None): +def generate_download( # noqa: PLR0912,PLR0915 + download_job: DownloadJob, origination: Optional[str] = None +) -> str | None: """Create data archive files from the download job object""" # Parse data from download_job @@ -82,14 +103,24 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No "download_job_id": str(download_job.download_job_id), "download_job_status": str(download_job.job_status.name), "download_file_name": str(download_job.file_name), - "download_file_size": download_job.file_size if download_job.file_size is not None else 0, - "number_of_rows": download_job.number_of_rows if download_job.number_of_rows is not None else 0, + "download_file_size": download_job.file_size + if download_job.file_size is not None + else 0, + "number_of_rows": download_job.number_of_rows + if download_job.number_of_rows is not None + else 0, "number_of_columns": ( - download_job.number_of_columns if download_job.number_of_columns is not None else 0 + download_job.number_of_columns + if download_job.number_of_columns is not None + else 0 ), - "error_message": download_job.error_message if download_job.error_message else "", + "error_message": download_job.error_message + if download_job.error_message + else "", "monthly_download": str(download_job.monthly_download), - "json_request": str(download_job.json_request) if download_job.json_request else "", + "json_request": str(download_job.json_request) + if download_job.json_request + else "", "file_name": str(file_name), } ) @@ -104,13 +135,17 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No ) as limit_exceeded: limit_exceeded.set_attributes( { - "message": f"Unable to process this download because it includes more than the current limit of {MAX_DOWNLOAD_LIMIT} records", + "message": ( + "Unable to process this download because it includes more than the current limit of" + f" {MAX_DOWNLOAD_LIMIT} records" + ), "limit": limit, } ) raise Exception( - f"Unable to process this download because it includes more than the current limit of {MAX_DOWNLOAD_LIMIT} records" + "Unable to process this download because it includes more than the current limit of" + f" {MAX_DOWNLOAD_LIMIT} records" ) # Create temporary files and working directory @@ -131,12 +166,26 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No source_column_count = len(source.columns(columns)) if source_column_count == 0: create_empty_data_file( - source, download_job, working_dir, piid, assistance_id, zip_file_path, file_format + source, + download_job, + working_dir, + piid, + assistance_id, + zip_file_path, + file_format, ) else: download_job.number_of_columns += source_column_count parse_source( - source, columns, download_job, working_dir, piid, assistance_id, zip_file_path, limit, file_format + source, + columns, + download_job, + working_dir, + piid, + assistance_id, + zip_file_path, + limit, + file_format, ) include_data_dictionary = json_request.get("include_data_dictionary") if include_data_dictionary: @@ -144,7 +193,9 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No include_file_description = json_request.get("include_file_description") if include_file_description: write_to_log(message="Adding file description to zip file") - file_description = build_file_description(include_file_description["source"], sources) + file_description = build_file_description( + include_file_description["source"], sources + ) file_description = file_description.replace("[AWARD_ID]", str(award_id)) file_description_path = save_file_description( working_dir, include_file_description["destination"], file_description @@ -167,7 +218,7 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No } ) fail_download(download_job, e, exc_msg) - raise InvalidParameterException(e) + raise InvalidParameterException(e) from e except Exception as e: # Set error message; job_status_id will be set in download_sqs_worker.handle() exc_msg = "An exception was raised while attempting to process the DownloadJob" @@ -191,7 +242,9 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No if working_dir and os.path.exists(working_dir): shutil.rmtree(working_dir) _kill_spawned_processes(download_job) - DownloadJobLookup.objects.filter(download_job_id=download_job.download_job_id).delete() + DownloadJobLookup.objects.filter( + download_job_id=download_job.download_job_id + ).delete() # push file to S3 bucket, if not local if not settings.IS_LOCAL: @@ -219,7 +272,13 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No "service": "aws.s3", "span_type": "WEB", "resource": ".".join( - [multipart_upload.__module__, (multipart_upload.__qualname__ or multipart_upload.__name__)] + [ + multipart_upload.__module__, + ( + multipart_upload.__qualname__ + or multipart_upload.__name__ + ), + ] ), } ) @@ -229,11 +288,16 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No try: bucket = settings.BULK_DOWNLOAD_S3_BUCKET_NAME region = settings.USASPENDING_AWS_REGION - s3_span.set_attributes({"bucket": bucket, "region": region, "file": zip_file_path}) + s3_span.set_attributes( + {"bucket": bucket, "region": region, "file": zip_file_path} + ) start_uploading = time.perf_counter() - multipart_upload(bucket, region, zip_file_path, os.path.basename(zip_file_path)) + multipart_upload( + bucket, region, zip_file_path, os.path.basename(zip_file_path) + ) write_to_log( - message=f"Uploading took {time.perf_counter() - start_uploading:.2f}s", download_job=download_job + message=f"Uploading took {time.perf_counter() - start_uploading:.2f}s", + download_job=download_job, ) except Exception as e: exc_msg = "An exception was raised while attempting to upload the file" @@ -253,7 +317,7 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No # Set error message; job_status_id will be set in download_sqs_worker.handle() fail_download(download_job, e, exc_msg) if isinstance(e, InvalidParameterException): - raise InvalidParameterException(e) + raise InvalidParameterException(e) from e else: raise Exception(download_job.error_message) from e finally: @@ -265,8 +329,10 @@ def generate_download(download_job: DownloadJob, origination: Optional[str] = No return finish_download(download_job) -def get_download_sources( - json_request: dict, download_job: DownloadJob = None, origination: Optional[str] = None +def get_download_sources( # noqa: PLR0912, PLR0915 + json_request: dict, + download_job: DownloadJob = None, + origination: Optional[str] = None, ) -> List[DownloadSource]: download_sources = [] for download_type in json_request["download_types"]: @@ -288,7 +354,8 @@ def get_download_sources( # download requests `new_awards_only` we only want to apply this # date type to the prime award summaries if json_request["filters"].get("time_period") is not None and ( - download_type == "sub_awards" or download_type == "elasticsearch_sub_awards" + download_type == "sub_awards" + or download_type == "elasticsearch_sub_awards" ): for time_period in filters["time_period"]: if time_period.get("date_type") == NEW_AWARDS_ONLY_KEYWORD: @@ -305,34 +372,58 @@ def get_download_sources( queryset = filter_function(filters) if filters.get("prime_and_sub_award_types") is not None: - award_type_codes = set(filters["prime_and_sub_award_types"][download_type]) + award_type_codes = set( + filters["prime_and_sub_award_types"][download_type] + ) else: award_type_codes = set(filters["award_type_codes"]) if ( - award_type_codes & (set(contract_type_mapping.keys()) | set(idv_type_mapping.keys())) + award_type_codes + & (set(contract_type_mapping.keys()) | set(idv_type_mapping.keys())) or "procurement" in award_type_codes ): # only generate d1 files if the user is asking for contract data d1_source = DownloadSource( - VALUE_MAPPINGS[download_type]["table_name"], "d1", download_type, agency_id, filters + VALUE_MAPPINGS[download_type]["table_name"], + "d1", + download_type, + agency_id, + filters, + ) + d1_filters = { + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True + } + d1_source.queryset = queryset & download_type_table.objects.filter( + **d1_filters ) - d1_filters = {f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True} - d1_source.queryset = queryset & download_type_table.objects.filter(**d1_filters) download_sources.append(d1_source) - if award_type_codes & set(assistance_type_mapping.keys()) or ("grant" in award_type_codes): + if award_type_codes & set(assistance_type_mapping.keys()) or ( + "grant" in award_type_codes + ): # only generate d2 files if the user is asking for assistance data d2_source = DownloadSource( - VALUE_MAPPINGS[download_type]["table_name"], "d2", download_type, agency_id, filters + VALUE_MAPPINGS[download_type]["table_name"], + "d2", + download_type, + agency_id, + filters, + ) + d2_filters = { + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False + } + d2_source.queryset = queryset & download_type_table.objects.filter( + **d2_filters ) - d2_filters = {f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False} - d2_source.queryset = queryset & download_type_table.objects.filter(**d2_filters) download_sources.append(d2_source) elif VALUE_MAPPINGS[download_type]["source_type"] == "account": # Account downloads - filters = {**json_request["filters"], **json_request.get("account_filters", {})} + filters = { + **json_request["filters"], + **json_request.get("account_filters", {}), + } if "is_fpds_join" in VALUE_MAPPINGS[download_type]: # Contracts @@ -343,7 +434,10 @@ def get_download_sources( agency_id, extra_file_type="Contracts_", ) - d1_filters = {**filters, f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True} + d1_filters = { + **filters, + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": True, + } d1_account_source.queryset = filter_function( download_type, VALUE_MAPPINGS[download_type]["table"], @@ -360,7 +454,10 @@ def get_download_sources( agency_id, extra_file_type="Assistance_", ) - d2_filters = {**filters, f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False} + d2_filters = { + **filters, + f"{VALUE_MAPPINGS[download_type].get('is_fpds_join', '')}is_fpds": False, + } d2_account_source.queryset = filter_function( download_type, VALUE_MAPPINGS[download_type]["table"], @@ -388,7 +485,10 @@ def get_download_sources( else: account_source = DownloadSource( - VALUE_MAPPINGS[download_type]["table_name"], json_request["account_level"], download_type, agency_id + VALUE_MAPPINGS[download_type]["table_name"], + json_request["account_level"], + download_type, + agency_id, ) account_source.queryset = filter_function( download_type, @@ -408,70 +508,20 @@ def get_download_sources( ) disaster_source.award_category = json_request["award_category"] disaster_source.queryset = filter_function( - json_request["filters"], download_type, VALUE_MAPPINGS[download_type]["base_fields"] + json_request["filters"], + download_type, + VALUE_MAPPINGS[download_type]["base_fields"], ) download_sources.append(disaster_source) - verify_requested_columns_available(tuple(download_sources), json_request.get("columns", [])) + verify_requested_columns_available( + tuple(download_sources), json_request.get("columns", []) + ) return download_sources -def build_data_file_name(source, download_job, piid, assistance_id): - if download_job and download_job.monthly_download: - # For monthly archives, use the existing detailed zip filename for the data files - # e.g. FY(All)-012_Contracts_Delta_20191108.zip -> FY(All)-012_Contracts_Delta_20191108_%.csv - return strip_file_extension(download_job.file_name) - - file_name_pattern = VALUE_MAPPINGS[source.source_type]["download_name"] - timestamp = datetime.strftime(datetime.now(timezone.utc), "%Y-%m-%d_H%HM%MS%S") - - if source.is_for_idv or source.is_for_contract: - file_name_values = {"piid": slugify_text_for_file_names(piid, "UNKNOWN", 50)} - elif source.is_for_assistance: - file_name_values = {"assistance_id": slugify_text_for_file_names(assistance_id, "UNKNOWN", 50)} - elif source.source_type == "disaster_recipient": - file_name_values = {"award_category": source.award_category, "timestamp": timestamp} - else: - d_map = {"d1": "Contracts", "d2": "Assistance", "treasury_account": "TAS", "federal_account": "FA"} - - if source.agency_code == "all": - agency = "All" - else: - agency = str(source.agency_code) - - request = json.loads(download_job.json_request) - filters = request["filters"] - - if request.get("limit") or ( - request.get("request_type") == "disaster" and source.source_type in ("elasticsearch_awards", "sub_awards") - ): - agency = "" - elif source.file_type not in ("treasury_account", "federal_account"): - agency = f"{agency}_" - - if request.get("request_type") == "disaster": - account_filters = request["account_filters"] - current_fiscal_period = ( - f"FY{account_filters['latest_fiscal_year']}P{str(account_filters['latest_fiscal_period']).zfill(2)}" - ) - data_quarters = f"{current_fiscal_period}-Present" - else: - data_quarters = construct_data_date_range(filters) - - file_name_values = { - "agency": agency, - "data_quarters": data_quarters, - "level": d_map[source.file_type], - "timestamp": timestamp, - "type": d_map[source.file_type], - "extra_file_type": source.extra_file_type, - } - - return file_name_pattern.format(**file_name_values) - - -def parse_source( +def parse_source( # noqa: PLR0913 source: DownloadSource, columns: Optional[List[str]], download_job: DownloadJob, @@ -481,7 +531,7 @@ def parse_source( zip_file_path: str, limit: int, file_format: str, -): +) -> None: """Write to delimited text file(s) and zip file(s) using the source data""" data_file_name = build_data_file_name(source, download_job, piid, assistance_id) @@ -490,39 +540,65 @@ def parse_source( source.file_name = f"{data_file_name}.{extension}" source_path = os.path.join(working_dir, source.file_name) - write_to_log(message=f"Preparing to download data as {source.file_name}", download_job=download_job) + write_to_log( + message=f"Preparing to download data as {source.file_name}", + download_job=download_job, + ) # Generate the query file; values, limits, dates fixed - export_query = generate_export_query(source_query, limit, source, columns, file_format) - temp_file, temp_file_path = generate_export_query_temp_file(export_query, download_job) + export_query = generate_export_query( + source_query, limit, source, columns, file_format + ) + temp_file, temp_file_path = generate_export_query_temp_file( + export_query, download_job + ) start_time = time.perf_counter() try: # Create a separate process to run the PSQL command; wait - psql_process = multiprocessing.Process(target=execute_psql, args=(temp_file_path, source_path, download_job)) - write_to_log(message=f"Running {source.file_name} using psql", download_job=download_job) + psql_process = multiprocessing.Process( + target=execute_psql, args=(temp_file_path, source_path, download_job) + ) + write_to_log( + message=f"Running {source.file_name} using psql", download_job=download_job + ) psql_process.start() wait_for_process(psql_process, start_time, download_job) delim = FILE_FORMATS[file_format]["delimiter"] # Log how many rows we have - write_to_log(message="Counting rows in delimited text file", download_job=download_job) + write_to_log( + message="Counting rows in delimited text file", download_job=download_job + ) try: - number_of_rows = count_rows_in_delimited_file(filename=source_path, has_header=True, delimiter=delim) + number_of_rows = count_rows_in_delimited_file( + filename=source_path, has_header=True, delimiter=delim + ) download_job.number_of_rows += number_of_rows - write_to_log(message=f"Number of rows in text file: {number_of_rows}", download_job=download_job) + write_to_log( + message=f"Number of rows in text file: {number_of_rows}", + download_job=download_job, + ) except Exception: write_to_log( - message="Unable to obtain delimited text file line count", is_error=True, download_job=download_job + message="Unable to obtain delimited text file line count", + is_error=True, + download_job=download_job, ) download_job.save() # Create a separate process to split the large data files into smaller file and write to zip; wait zip_process = multiprocessing.Process( target=split_and_zip_data_files, - args=(zip_file_path, source_path, data_file_name, file_format, download_job), + args=( + zip_file_path, + source_path, + data_file_name, + file_format, + download_job, + ), ) zip_process.start() wait_for_process(zip_process, start_time, download_job) @@ -535,7 +611,81 @@ def parse_source( os.remove(temp_file_path) -def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_format, download_job=None): +def build_data_file_name( # noqa: PLR0912 + source: DownloadSource, download_job: DownloadJob, piid: str, assistance_id: str +) -> str: + if download_job and download_job.monthly_download: + # For monthly archives, use the existing detailed zip filename for the data files + # e.g. FY(All)-012_Contracts_Delta_20191108.zip -> FY(All)-012_Contracts_Delta_20191108_%.csv + return strip_file_extension(download_job.file_name) + + file_name_pattern = VALUE_MAPPINGS[source.source_type]["download_name"] + timestamp = datetime.strftime(datetime.now(timezone.utc), "%Y-%m-%d_H%HM%MS%S") + + if source.is_for_idv or source.is_for_contract: + file_name_values = {"piid": slugify_text_for_file_names(piid, "UNKNOWN", 50)} + elif source.is_for_assistance: + file_name_values = { + "assistance_id": slugify_text_for_file_names(assistance_id, "UNKNOWN", 50) + } + elif source.source_type == "disaster_recipient": + file_name_values = { + "award_category": source.award_category, + "timestamp": timestamp, + } + else: + d_map = { + "d1": "Contracts", + "d2": "Assistance", + "treasury_account": "TAS", + "federal_account": "FA", + } + + if source.agency_code == "all": + agency = "All" + else: + agency = str(source.agency_code) + + request = json.loads(download_job.json_request) + filters = request["filters"] + + if request.get("limit") or ( + request.get("request_type") == "disaster" + and source.source_type in ("elasticsearch_awards", "sub_awards") + ): + agency = "" + elif source.file_type not in ("treasury_account", "federal_account"): + agency = f"{agency}_" + + if request.get("request_type") == "disaster": + account_filters = request["account_filters"] + current_fiscal_period = ( + f"FY{account_filters['latest_fiscal_year']}" + f"P{str(account_filters['latest_fiscal_period']).zfill(2)}" + ) + data_quarters = f"{current_fiscal_period}-Present" + else: + data_quarters = construct_data_date_range(filters) + + file_name_values = { + "agency": agency, + "data_quarters": data_quarters, + "level": d_map[source.file_type], + "timestamp": timestamp, + "type": d_map[source.file_type], + "extra_file_type": source.extra_file_type, + } + + return file_name_pattern.format(**file_name_values) + + +def split_and_zip_data_files( + zip_file_path: str, + source_path: str, + data_file_name: str, + file_format: str, + download_job: DownloadJob | None = None, +) -> None: with SubprocessTrace( name=f"job.{JOB_TYPE}.download.zip", kind=SpanKind.INTERNAL, @@ -560,7 +710,10 @@ def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_fo extension = FILE_FORMATS[file_format]["extension"] output_template = f"{data_file_name}_%s.{extension}" - write_to_log(message="Beginning the delimited text file partition", download_job=download_job) + write_to_log( + message="Beginning the delimited text file partition", + download_job=download_job, + ) list_of_files = partition_large_delimited_file( download_job=download_job, file_path=source_path, @@ -574,12 +727,15 @@ def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_fo write_to_log(message=msg, download_job=download_job) # Zip the split files into one zipfile - write_to_log(message="Beginning zipping and compression", download_job=download_job) + write_to_log( + message="Beginning zipping and compression", download_job=download_job + ) log_time = time.perf_counter() append_files_to_zip_file(list_of_files, zip_file_path) write_to_log( - message=f"Writing to zipfile took {time.perf_counter() - log_time:.4f}s", download_job=download_job + message=f"Writing to zipfile took {time.perf_counter() - log_time:.4f}s", + download_job=download_job, ) except Exception as e: @@ -591,7 +747,7 @@ def split_and_zip_data_files(zip_file_path, source_path, data_file_name, file_fo raise e -def start_download(download_job): +def start_download(download_job: DownloadJob) -> str: # Update job attributes download_job.job_status_id = JOB_STATUS_DICT["running"] download_job.number_of_rows = 0 @@ -599,21 +755,29 @@ def start_download(download_job): download_job.file_size = 0 download_job.save() - write_to_log(message=f"Starting to process DownloadJob {download_job.download_job_id}", download_job=download_job) + write_to_log( + message=f"Starting to process DownloadJob {download_job.download_job_id}", + download_job=download_job, + ) return download_job.file_name -def finish_download(download_job): +def finish_download(download_job: DownloadJob) -> str: download_job.job_status_id = JOB_STATUS_DICT["finished"] download_job.save() - write_to_log(message=f"Finished processing DownloadJob {download_job.download_job_id}", download_job=download_job) + write_to_log( + message=f"Finished processing DownloadJob {download_job.download_job_id}", + download_job=download_job, + ) return download_job.file_name -def wait_for_process(process, start_time, download_job): +def wait_for_process( + process: multiprocessing.Process, start_time: float, download_job: DownloadJob +) -> float: """Wait for the process to complete, throw errors for timeouts or Process exceptions""" log_time = time.perf_counter() @@ -633,27 +797,43 @@ def wait_for_process(process, start_time, download_job): sleep_count += 1 over_time = (time.perf_counter() - start_time) >= MAX_VISIBILITY_TIMEOUT - if download_job and (not download_job.monthly_download and over_time) or process.exitcode != 0: + process_error = None + if ( + download_job + and (not download_job.monthly_download and over_time) + or process.exitcode != 0 + ): if process.is_alive(): # Process is running for longer than MAX_VISIBILITY_TIMEOUT, kill it write_to_log( - message=f"Attempting to terminate process (pid {process.pid})", download_job=download_job, is_error=True + message=f"Attempting to terminate process (pid {process.pid})", + download_job=download_job, + is_error=True, ) process.terminate() - e = TimeoutError( + process_error = TimeoutError( f"DownloadJob {download_job.download_job_id} lasted longer than {MAX_VISIBILITY_TIMEOUT / 3600} hours" ) else: # An error occurred in the process - e = Exception("Command failed. Please see the logs for details.") + process_error = Exception( + "Command failed. Please see the logs for details." + ) - raise e + process.join() + + if process_error: + raise process_error return time.perf_counter() - log_time def generate_export_query( - source_query: QuerySet, limit: int, source: DownloadSource, column_subset: Optional[List[str]], file_format: str + source_query: QuerySet, + limit: int, + source: DownloadSource, + column_subset: Optional[List[str]], + file_format: str, ) -> str: if limit: source_query = source_query[:limit] @@ -674,19 +854,29 @@ def generate_export_query( annotated_group_by_columns.append(annotation_select_reverse[val]) query_annotated = apply_annotations_to_sql( - generate_raw_quoted_query(source_query), selected_columns, annotated_group_by_columns + generate_raw_quoted_query(source_query), + selected_columns, + annotated_group_by_columns, ) options = FILE_FORMATS[file_format]["options"] return rf"\COPY ({query_annotated}) TO STDOUT {options}" -def generate_export_query_temp_file(export_query, download_job, temp_dir=None): - write_to_log(message=f"Saving PSQL Query: {export_query}", download_job=download_job, is_debug=True) +def generate_export_query_temp_file( + export_query: str, download_job: DownloadJob, temp_dir: str | None = None +) -> tuple[str, str]: + write_to_log( + message=f"Saving PSQL Query: {export_query}", + download_job=download_job, + is_debug=True, + ) dir_name = "/tmp" if temp_dir: dir_name = temp_dir # Create a unique temporary file to hold the raw query, using \copy - (temp_sql_file, temp_sql_file_path) = tempfile.mkstemp(prefix="bd_sql_", dir=dir_name) + (temp_sql_file, temp_sql_file_path) = tempfile.mkstemp( + prefix="bd_sql_", dir=dir_name + ) with open(temp_sql_file_path, "w") as file: file.write(export_query) @@ -695,8 +885,10 @@ def generate_export_query_temp_file(export_query, download_job, temp_dir=None): def apply_annotations_to_sql( - raw_query: str, aliases: List[str], annotated_group_by_columns: Optional[List[str]] = None -): + raw_query: str, + aliases: List[str], + annotated_group_by_columns: Optional[List[str]] = None, +) -> str: """ Django's ORM understandably doesn't allow aliases to be the same names as other fields available. However, if we want to use the efficiency of psql's COPY method and keep the column names, we need to allow these scenarios. This @@ -707,7 +899,9 @@ def apply_annotations_to_sql( DIRECT_SELECT_QUERY_REGEX = r'^[^ ]*\."[^"]*"$' # Django is pretty consistent with how it prints out queries # Create a list from the non-derived values between SELECT and FROM - selects_list = [val for val in select_statements if re.search(DIRECT_SELECT_QUERY_REGEX, val)] + selects_list = [ + val for val in select_statements if re.search(DIRECT_SELECT_QUERY_REGEX, val) + ] # Create a list from the derived values between SELECT and FROM aliased_list = [ @@ -742,7 +936,9 @@ def apply_annotations_to_sql( # It is assumed that all non-positional values in the GROUP BY are column names meaning the first number # matching the value of "idx" will be the position. However, this is not guaranteed in the rest of the # query, so we make sure to stop after the first match found. - second_half_query = second_half_query.replace(f" {idx}", f" {{idx_{idx}}}", 1) + second_half_query = second_half_query.replace( + f" {idx}", f" {{idx_{idx}}}", 1 + ) second_half_query = second_half_query.format( **{f"idx_{idx}": col_select for idx, col_select in group_by_to_replace} ) @@ -750,10 +946,13 @@ def apply_annotations_to_sql( # Match aliases with their values values_list = [ - f'{deriv_dict[alias] if alias in deriv_dict else selects_list.pop(0)} AS "{alias}"' for alias in aliases + f'{deriv_dict[alias] if alias in deriv_dict else selects_list.pop(0)} AS "{alias}"' + for alias in aliases ] - sql = raw_query.replace(_top_level_split(raw_query, "FROM")[0], f"SELECT {', '.join(values_list)} ", 1) + sql = raw_query.replace( + _top_level_split(raw_query, "FROM")[0], f"SELECT {', '.join(values_list)} ", 1 + ) if cte_sql: sql = f"{cte_sql} {sql}" @@ -768,7 +967,7 @@ def apply_annotations_to_sql( return sql.replace(NAMING_CONFLICT_DISCRIMINATOR, "") -def _select_columns(sql: str) -> Tuple[str, List[str]]: +def _select_columns(sql: str) -> Tuple[str, List[str]]: # noqa: PLR0912 in_quotes = False in_cte = False parens_depth = 0 @@ -811,7 +1010,7 @@ def _select_columns(sql: str) -> Tuple[str, List[str]]: return cte_sql, retval # this will almost certainly error out later. -def _top_level_split(sql, splitter): +def _top_level_split(sql: str, splitter: str) -> str: in_quotes = False parens_depth = 0 for index, char in enumerate(sql): @@ -830,7 +1029,9 @@ def _top_level_split(sql, splitter): raise Exception(f"SQL string ${sql} cannot be split on ${splitter}") -def execute_psql(temp_sql_file_path, source_path, download_job): +def execute_psql( + temp_sql_file_path: str, source_path: str, download_job: DownloadJob +) -> None: """Executes a single PSQL command within its own Subprocess""" download_sql = Path(temp_sql_file_path).read_text() if download_sql.startswith("\\COPY"): @@ -855,14 +1056,24 @@ def execute_psql(temp_sql_file_path, source_path, download_job): "download_job_id": str(download_job.download_job_id), "download_job_status": str(download_job.job_status.name), "download_file_name": str(download_job.file_name), - "download_file_size": download_job.file_size if download_job.file_size is not None else 0, - "number_of_rows": download_job.number_of_rows if download_job.number_of_rows is not None else 0, + "download_file_size": download_job.file_size + if download_job.file_size is not None + else 0, + "number_of_rows": download_job.number_of_rows + if download_job.number_of_rows is not None + else 0, "number_of_columns": ( - download_job.number_of_columns if download_job.number_of_columns is not None else 0 + download_job.number_of_columns + if download_job.number_of_columns is not None + else 0 ), - "error_message": download_job.error_message if download_job.error_message else "", + "error_message": download_job.error_message + if download_job.error_message + else "", "monthly_download": str(download_job.monthly_download), - "json_request": str(download_job.json_request) if download_job.json_request else "", + "json_request": str(download_job.json_request) + if download_job.json_request + else "", } ) @@ -870,15 +1081,25 @@ def execute_psql(temp_sql_file_path, source_path, download_job): log_time = time.perf_counter() temp_env = os.environ.copy() if download_job and not download_job.monthly_download: - # Since terminating the process isn't guaranteed to end the DB statement, add timeout to client connection + # Terminating the process isn't guaranteed to end the DB statement; add timeout to client connection temp_env["PGOPTIONS"] = ( f"--statement-timeout={settings.DOWNLOAD_DB_TIMEOUT_IN_HOURS}h " f"--work-mem={settings.DOWNLOAD_DB_WORK_MEM_IN_MB}MB" ) - cat_command = subprocess.Popen(["cat", temp_sql_file_path], stdout=subprocess.PIPE) + cat_command = subprocess.Popen( + ["cat", temp_sql_file_path], stdout=subprocess.PIPE + ) subprocess.check_output( - ["psql", "-q", "-o", source_path, retrieve_db_string(), "-v", "ON_ERROR_STOP=1"], + [ + "psql", + "-q", + "-o", + source_path, + retrieve_db_string(), + "-v", + "ON_ERROR_STOP=1", + ], stdin=cat_command.stdout, stderr=subprocess.STDOUT, env=temp_env, @@ -890,7 +1111,11 @@ def execute_psql(temp_sql_file_path, source_path, download_job): download_job=download_job, ) except subprocess.CalledProcessError as e: - write_to_log(message=f"PSQL Error: {e.output.decode()}", is_error=True, download_job=download_job) + write_to_log( + message=f"PSQL Error: {e.output.decode()}", + is_error=True, + download_job=download_job, + ) raise e except Exception as e: if not settings.IS_LOCAL: @@ -898,28 +1123,36 @@ def execute_psql(temp_sql_file_path, source_path, download_job): e.cmd = "[redacted psql command]" write_to_log(message=e, is_error=True, download_job=download_job) sql = subprocess.check_output(["cat", temp_sql_file_path]).decode() - write_to_log(message=f"Faulty SQL: {sql}", is_error=True, download_job=download_job) + write_to_log( + message=f"Faulty SQL: {sql}", is_error=True, download_job=download_job + ) raise e -def retrieve_db_string(): +def retrieve_db_string() -> str: """It is necessary for this to be a function so the test suite can mock the connection string""" return settings.DOWNLOAD_DATABASE_URL -def strip_file_extension(file_name): +def strip_file_extension(file_name: str) -> str: return os.path.splitext(os.path.basename(file_name))[0] -def fail_download(download_job, exception, message): +def fail_download( + download_job: DownloadJob, exception: Exception, message: str +) -> None: write_to_log(message=message, is_error=True, download_job=download_job) - stack_trace = "".join(traceback.format_exception(type(exception), value=exception, tb=exception.__traceback__)) + stack_trace = "".join( + traceback.format_exception( + type(exception), value=exception, tb=exception.__traceback__ + ) + ) download_job.error_message = f"{message}:\n{stack_trace}" download_job.job_status_id = JOB_STATUS_DICT["failed"] download_job.save() -def add_data_dictionary_to_zip(working_dir, zip_file_path): +def add_data_dictionary_to_zip(working_dir: str, zip_file_path: str) -> None: write_to_log(message="Adding data dictionary to zip file") data_dictionary_file_name = "Data_Dictionary_Crosswalk.xlsx" data_dictionary_file_path = os.path.join(working_dir, data_dictionary_file_name) @@ -933,7 +1166,7 @@ def add_data_dictionary_to_zip(working_dir, zip_file_path): append_files_to_zip_file([data_dictionary_file_path], zip_file_path) -def _kill_spawned_processes(download_job=None): +def _kill_spawned_processes(download_job: DownloadJob = None) -> None: """Cleanup (kill) any spawned child processes during this job run""" job = ps.Process(os.getpid()) for spawn_of_job in job.children(recursive=True): @@ -949,7 +1182,7 @@ def _kill_spawned_processes(download_job=None): pass -def create_empty_data_file( +def create_empty_data_file( # noqa: PLR0913 source: DownloadSource, download_job: DownloadJob, working_dir: str, @@ -963,7 +1196,8 @@ def create_empty_data_file( source.file_name = f"{data_file_name}.{extension}" source_path = os.path.join(working_dir, source.file_name) write_to_log( - message=f"Skipping download of {source.file_name} due to no valid columns provided", download_job=download_job + message=f"Skipping download of {source.file_name} due to no valid columns provided", + download_job=download_job, ) Path(source_path).touch() append_files_to_zip_file([source_path], zip_file_path) From 8feba44c3173fd81b350a55f4c0d4f27b29ac445 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Wed, 11 Mar 2026 21:57:26 -0400 Subject: [PATCH 04/14] more verbose logging on cleanup of processes --- .../filestreaming/download_generation.py | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 7a429ffc6c..66998fabdf 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -797,7 +797,7 @@ def wait_for_process( sleep_count += 1 over_time = (time.perf_counter() - start_time) >= MAX_VISIBILITY_TIMEOUT - process_error = None + if ( download_job and (not download_job.monthly_download and over_time) @@ -811,19 +811,16 @@ def wait_for_process( is_error=True, ) process.terminate() - process_error = TimeoutError( + e = TimeoutError( f"DownloadJob {download_job.download_job_id} lasted longer than {MAX_VISIBILITY_TIMEOUT / 3600} hours" ) else: # An error occurred in the process - process_error = Exception( + e = Exception( "Command failed. Please see the logs for details." ) - process.join() - - if process_error: - raise process_error + raise e return time.perf_counter() - log_time @@ -1166,20 +1163,32 @@ def add_data_dictionary_to_zip(working_dir: str, zip_file_path: str) -> None: append_files_to_zip_file([data_dictionary_file_path], zip_file_path) -def _kill_spawned_processes(download_job: DownloadJob = None) -> None: +def _kill_spawned_processes(download_job: DownloadJob) -> None: """Cleanup (kill) any spawned child processes during this job run""" job = ps.Process(os.getpid()) for spawn_of_job in job.children(recursive=True): write_to_log( - message=f"Attempting to terminate child process with PID [{spawn_of_job.pid}] and name " - f"[{spawn_of_job.name}]", + message=f"Attempting to terminate child process with PID [{spawn_of_job.pid}], name " + f"[{spawn_of_job.name()}], and status [{spawn_of_job.status()}]", download_job=download_job, is_error=True, ) try: spawn_of_job.kill() + write_to_log( + message=f"Successfully terminated child process with PID[{spawn_of_job.pid}]", + download_job=download_job, + is_error=True + ) except ps.NoSuchProcess: pass + except Exception as e: + write_to_log( + message=f"Failed to terminate child process with PID[{spawn_of_job.pid}]", + download_job=download_job, + is_error=True + ) + raise e def create_empty_data_file( # noqa: PLR0913 From c7d08f5cd44e88ba3a2b58b34436a4332681f2e0 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Thu, 12 Mar 2026 10:36:56 -0400 Subject: [PATCH 05/14] add wait_procs to cleanup of child processes --- .../filestreaming/download_generation.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 66998fabdf..0288ee7bee 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -816,9 +816,7 @@ def wait_for_process( ) else: # An error occurred in the process - e = Exception( - "Command failed. Please see the logs for details." - ) + e = Exception("Command failed. Please see the logs for details.") raise e @@ -1166,7 +1164,8 @@ def add_data_dictionary_to_zip(working_dir: str, zip_file_path: str) -> None: def _kill_spawned_processes(download_job: DownloadJob) -> None: """Cleanup (kill) any spawned child processes during this job run""" job = ps.Process(os.getpid()) - for spawn_of_job in job.children(recursive=True): + child_processes = job.children(recursive=True) + for spawn_of_job in child_processes: write_to_log( message=f"Attempting to terminate child process with PID [{spawn_of_job.pid}], name " f"[{spawn_of_job.name()}], and status [{spawn_of_job.status()}]", @@ -1175,20 +1174,17 @@ def _kill_spawned_processes(download_job: DownloadJob) -> None: ) try: spawn_of_job.kill() - write_to_log( - message=f"Successfully terminated child process with PID[{spawn_of_job.pid}]", - download_job=download_job, - is_error=True - ) except ps.NoSuchProcess: pass - except Exception as e: - write_to_log( - message=f"Failed to terminate child process with PID[{spawn_of_job.pid}]", - download_job=download_job, - is_error=True - ) - raise e + + ps.wait_procs( + child_processes, + callback=lambda proc: write_to_log( + message=f"Terminated child process with PID[{proc.pid}]", + download_job=download_job, + is_error=True, + ), + ) def create_empty_data_file( # noqa: PLR0913 From 69bee9102ac91737ed7ce01e8b2197d409cf5733 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Thu, 12 Mar 2026 12:22:49 -0400 Subject: [PATCH 06/14] close subprocess containing download sql --- usaspending_api/download/filestreaming/download_generation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 0288ee7bee..98b0c85760 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -1100,6 +1100,10 @@ def execute_psql( env=temp_env, ) + # Wait for initial process to close and terminate to free up resources + cat_command.stdout.close() + cat_command.wait() + duration = time.perf_counter() - log_time write_to_log( message=f"Wrote {os.path.basename(source_path)}, took {duration:.4f} seconds", From 6f8ca43d911a1045a8a3c4a7fc484d58a2a649f4 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Thu, 12 Mar 2026 14:24:45 -0400 Subject: [PATCH 07/14] verbose logging for debugging --- .../download/filestreaming/download_generation.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 98b0c85760..d1872746be 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -803,6 +803,12 @@ def wait_for_process( and (not download_job.monthly_download and over_time) or process.exitcode != 0 ): + logger.info("**********") + logger.info(f"DownloadJob error: {download_job.error_message}") + logger.info(f"Is overtime: {over_time}") + logger.info(f"Process Exitcode: {process.exitcode}") + logger.info(f"Process repr: {repr(process)}") + logger.info("**********") if process.is_alive(): # Process is running for longer than MAX_VISIBILITY_TIMEOUT, kill it write_to_log( From 041e0ca0e02163d0d4e9c77bb3d74de05447e983 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Fri, 13 Mar 2026 11:14:45 -0400 Subject: [PATCH 08/14] verbose logging for debugging --- .../filestreaming/download_generation.py | 170 +++++++++--------- 1 file changed, 88 insertions(+), 82 deletions(-) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index d1872746be..a4a01dcf91 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -1046,92 +1046,98 @@ def execute_psql( service="bulk-download", ) - with subprocess_trace as span: - span.set_attributes( - { - "service": "bulk-download", - "resource": str(download_sql), - "span_type": "Internal", - "source_path": str(source_path), - # download job details - "download_job_id": str(download_job.download_job_id), - "download_job_status": str(download_job.job_status.name), - "download_file_name": str(download_job.file_name), - "download_file_size": download_job.file_size - if download_job.file_size is not None - else 0, - "number_of_rows": download_job.number_of_rows - if download_job.number_of_rows is not None - else 0, - "number_of_columns": ( - download_job.number_of_columns - if download_job.number_of_columns is not None - else 0 - ), - "error_message": download_job.error_message - if download_job.error_message - else "", - "monthly_download": str(download_job.monthly_download), - "json_request": str(download_job.json_request) - if download_job.json_request - else "", - } - ) + try: + with subprocess_trace as span: + span.set_attributes( + { + "service": "bulk-download", + "resource": str(download_sql), + "span_type": "Internal", + "source_path": str(source_path), + # download job details + "download_job_id": str(download_job.download_job_id), + "download_job_status": str(download_job.job_status.name), + "download_file_name": str(download_job.file_name), + "download_file_size": download_job.file_size + if download_job.file_size is not None + else 0, + "number_of_rows": download_job.number_of_rows + if download_job.number_of_rows is not None + else 0, + "number_of_columns": ( + download_job.number_of_columns + if download_job.number_of_columns is not None + else 0 + ), + "error_message": download_job.error_message + if download_job.error_message + else "", + "monthly_download": str(download_job.monthly_download), + "json_request": str(download_job.json_request) + if download_job.json_request + else "", + } + ) - try: - log_time = time.perf_counter() - temp_env = os.environ.copy() - if download_job and not download_job.monthly_download: - # Terminating the process isn't guaranteed to end the DB statement; add timeout to client connection - temp_env["PGOPTIONS"] = ( - f"--statement-timeout={settings.DOWNLOAD_DB_TIMEOUT_IN_HOURS}h " - f"--work-mem={settings.DOWNLOAD_DB_WORK_MEM_IN_MB}MB" - ) + try: + log_time = time.perf_counter() + temp_env = os.environ.copy() + if download_job and not download_job.monthly_download: + # Terminating the process isn't guaranteed to end the DB statement; add timeout to client connection + temp_env["PGOPTIONS"] = ( + f"--statement-timeout={settings.DOWNLOAD_DB_TIMEOUT_IN_HOURS}h " + f"--work-mem={settings.DOWNLOAD_DB_WORK_MEM_IN_MB}MB" + ) - cat_command = subprocess.Popen( - ["cat", temp_sql_file_path], stdout=subprocess.PIPE - ) - subprocess.check_output( - [ - "psql", - "-q", - "-o", - source_path, - retrieve_db_string(), - "-v", - "ON_ERROR_STOP=1", - ], - stdin=cat_command.stdout, - stderr=subprocess.STDOUT, - env=temp_env, - ) + cat_command = subprocess.Popen( + ["cat", temp_sql_file_path], stdout=subprocess.PIPE + ) + subprocess.check_output( + [ + "psql", + "-q", + "-o", + source_path, + retrieve_db_string(), + "-v", + "ON_ERROR_STOP=1", + ], + stdin=cat_command.stdout, + stderr=subprocess.STDOUT, + env=temp_env, + ) - # Wait for initial process to close and terminate to free up resources - cat_command.stdout.close() - cat_command.wait() + # Wait for initial process to close and terminate to free up resources + cat_command.stdout.close() + cat_command.wait() - duration = time.perf_counter() - log_time - write_to_log( - message=f"Wrote {os.path.basename(source_path)}, took {duration:.4f} seconds", - download_job=download_job, - ) - except subprocess.CalledProcessError as e: - write_to_log( - message=f"PSQL Error: {e.output.decode()}", - is_error=True, - download_job=download_job, - ) - raise e - except Exception as e: - if not settings.IS_LOCAL: - # Not logging the command as it can contain the database connection string - e.cmd = "[redacted psql command]" - write_to_log(message=e, is_error=True, download_job=download_job) - sql = subprocess.check_output(["cat", temp_sql_file_path]).decode() - write_to_log( - message=f"Faulty SQL: {sql}", is_error=True, download_job=download_job - ) - raise e + duration = time.perf_counter() - log_time + write_to_log( + message=f"Wrote {os.path.basename(source_path)}, took {duration:.4f} seconds", + download_job=download_job, + ) + except subprocess.CalledProcessError as e: + write_to_log( + message=f"PSQL Error: {e.output.decode()}", + is_error=True, + download_job=download_job, + ) + raise e + except Exception as e: + if not settings.IS_LOCAL: + # Not logging the command as it can contain the database connection string + e.cmd = "[redacted psql command]" + write_to_log(message=e, is_error=True, download_job=download_job) + sql = subprocess.check_output(["cat", temp_sql_file_path]).decode() + write_to_log( + message=f"Faulty SQL: {sql}", + is_error=True, + download_job=download_job, + ) + raise e + except Exception as e: + write_to_log(message=e, is_error=True, download_job=download_job) + raise e def retrieve_db_string() -> str: From 2414ad0442ff529f834f4a0be8057e394d87d119 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Fri, 13 Mar 2026 11:15:40 -0400 Subject: [PATCH 09/14] verbose logging for debugging --- .../download/filestreaming/download_generation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index a4a01dcf91..dc99bbf5a7 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -1136,7 +1136,11 @@ def execute_psql( ) raise e except Exception as e: - write_to_log(message=e, is_error=True, download_job=download_job) + write_to_log( + message=f"Failed in subprocess with error: {e}", + is_error=True, + download_job=download_job, + ) raise e From 7e4a439b769f1f64b679d845b45e545f8baaeb27 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Fri, 13 Mar 2026 15:49:15 -0400 Subject: [PATCH 10/14] Add memory and disk usage to github action for debugging --- .../workflows/test-non-spark-integration.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/.github/workflows/test-non-spark-integration.yaml b/.github/workflows/test-non-spark-integration.yaml index 52a40a221f..21076019a8 100644 --- a/.github/workflows/test-non-spark-integration.yaml +++ b/.github/workflows/test-non-spark-integration.yaml @@ -33,6 +33,14 @@ jobs: name: Run runs-on: ${{ vars.RUNNER_VERSION }} steps: + - name: Check memory output + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Checkout Source Repository uses: actions/checkout@v4 with: @@ -64,3 +72,11 @@ jobs: include-glob: '**/tests/integration/*' marker: '(not signal_handling and not spark)' working-directory: ./usaspending-api + + - name: Check memory output + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h From d32faf96ee95972c8745dbdfea3e753c550b3269 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Fri, 13 Mar 2026 15:55:27 -0400 Subject: [PATCH 11/14] Add memory and disk usage to github action for debugging --- .github/workflows/test-non-spark-integration.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/test-non-spark-integration.yaml b/.github/workflows/test-non-spark-integration.yaml index 21076019a8..270c2e788d 100644 --- a/.github/workflows/test-non-spark-integration.yaml +++ b/.github/workflows/test-non-spark-integration.yaml @@ -34,6 +34,8 @@ jobs: runs-on: ${{ vars.RUNNER_VERSION }} steps: - name: Check memory output + shell: bash + working-directory: ./usaspending-api if: always() run: | echo "Memory Usage:" @@ -74,6 +76,8 @@ jobs: working-directory: ./usaspending-api - name: Check memory output + shell: bash + working-directory: ./usaspending-api if: always() run: | echo "Memory Usage:" From 9657dbe93c426864ed68cee902402c956ac65eaf Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Fri, 13 Mar 2026 16:04:01 -0400 Subject: [PATCH 12/14] Add memory and disk usage to github action for debugging --- .../workflows/test-non-spark-integration.yaml | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/test-non-spark-integration.yaml b/.github/workflows/test-non-spark-integration.yaml index 270c2e788d..bc57058a87 100644 --- a/.github/workflows/test-non-spark-integration.yaml +++ b/.github/workflows/test-non-spark-integration.yaml @@ -33,16 +33,6 @@ jobs: name: Run runs-on: ${{ vars.RUNNER_VERSION }} steps: - - name: Check memory output - shell: bash - working-directory: ./usaspending-api - if: always() - run: | - echo "Memory Usage:" - free -h - echo "Disk Usage:" - df -h - - name: Checkout Source Repository uses: actions/checkout@v4 with: @@ -55,6 +45,16 @@ jobs: path: data-act-broker-backend ref: ${{ needs.Setup-Broker-Branch.outputs.branch }} + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Init Python Environment uses: ./usaspending-api/.github/actions/init-python-environment with: From 636e18f6fd98599c1e432783897dd8948b4c8212 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Mon, 16 Mar 2026 10:25:54 -0400 Subject: [PATCH 13/14] add more debugging --- .../workflows/test-non-spark-integration.yaml | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/.github/workflows/test-non-spark-integration.yaml b/.github/workflows/test-non-spark-integration.yaml index bc57058a87..b5a69d3cb3 100644 --- a/.github/workflows/test-non-spark-integration.yaml +++ b/.github/workflows/test-non-spark-integration.yaml @@ -60,6 +60,16 @@ jobs: with: working-directory: ./usaspending-api + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Init Test Environment uses: ./usaspending-api/.github/actions/init-test-environment with: @@ -67,6 +77,16 @@ jobs: is-spark-test: false working-directory: ./usaspending-api + - name: Check memory output + shell: bash + working-directory: ./usaspending-api + if: always() + run: | + echo "Memory Usage:" + free -h + echo "Disk Usage:" + df -h + - name: Run Test Cases uses: ./usaspending-api/.github/actions/run-pytest with: From 6944f8116ebcc8b63052e63207a77b83bd9e61a5 Mon Sep 17 00:00:00 2001 From: Seth Stoudenmier Date: Fri, 3 Apr 2026 11:57:17 -0400 Subject: [PATCH 14/14] test join in wait_for_process --- .../download/filestreaming/download_generation.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index dc99bbf5a7..3c37329f1f 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -797,7 +797,7 @@ def wait_for_process( sleep_count += 1 over_time = (time.perf_counter() - start_time) >= MAX_VISIBILITY_TIMEOUT - + err = None if ( download_job and (not download_job.monthly_download and over_time) @@ -817,14 +817,16 @@ def wait_for_process( is_error=True, ) process.terminate() - e = TimeoutError( + err = TimeoutError( f"DownloadJob {download_job.download_job_id} lasted longer than {MAX_VISIBILITY_TIMEOUT / 3600} hours" ) else: # An error occurred in the process - e = Exception("Command failed. Please see the logs for details.") + err = Exception("Command failed. Please see the logs for details.") - raise e + process.join() + if err: + raise err return time.perf_counter() - log_time