Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/test-non-spark-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,62 @@ jobs:
path: data-act-broker-backend
ref: ${{ needs.Setup-Broker-Branch.outputs.branch }}

- name: Check memory output
shell: bash
working-directory: ./usaspending-api
if: always()
run: |
echo "Memory Usage:"
free -h
echo "Disk Usage:"
df -h

- name: Init Python Environment
uses: ./usaspending-api/.github/actions/init-python-environment
with:
working-directory: ./usaspending-api

- name: Check memory output
shell: bash
working-directory: ./usaspending-api
if: always()
run: |
echo "Memory Usage:"
free -h
echo "Disk Usage:"
df -h

- name: Init Test Environment
uses: ./usaspending-api/.github/actions/init-test-environment
with:
is-integration-test: true
is-spark-test: false
working-directory: ./usaspending-api

- name: Check memory output
shell: bash
working-directory: ./usaspending-api
if: always()
run: |
echo "Memory Usage:"
free -h
echo "Disk Usage:"
df -h

- name: Run Test Cases
uses: ./usaspending-api/.github/actions/run-pytest
with:
cov-report-name: 'non-spark-integration-tests'
include-glob: '**/tests/integration/*'
marker: '(not signal_handling and not spark)'
working-directory: ./usaspending-api

- name: Check memory output
shell: bash
working-directory: ./usaspending-api
if: always()
run: |
echo "Memory Usage:"
free -h
echo "Disk Usage:"
df -h
58 changes: 28 additions & 30 deletions usaspending_api/common/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

# Standard library imports
import logging
from types import TracebackType

# Typing imports
from typing import Callable, Optional
from typing import Callable, Optional, Type

# OpenTelemetry imports
from opentelemetry import trace
Expand All @@ -22,7 +23,9 @@

def _activate_trace_filter(filter_class: Callable) -> None:
if not hasattr(tracer, "_filters"):
_logger.warning("OpenTelemetry does not support direct filter activation on tracer")
_logger.warning(
"OpenTelemetry does not support direct filter activation on tracer"
)
else:
if tracer._filters:
tracer._filters.append(filter_class())
Expand All @@ -42,18 +45,14 @@ class OpenTelemetryEagerlyDropTraceFilter:
EAGERLY_DROP_TRACE_KEY = "EAGERLY_DROP_TRACE"

@classmethod
def activate(cls):
def activate(cls) -> None:
_activate_trace_filter(cls)

@classmethod
def drop(cls, span: trace.Span):
def drop(cls, span: trace.Span) -> None:
span.set_status(Status(StatusCode.ERROR))
span.set_attribute(cls.EAGERLY_DROP_TRACE_KEY, True)

def process_trace(self, trace: trace):
"""Drop trace if any span attribute has tag with key 'EAGERLY_DROP_TRACE'"""
return None if any(span.get_attribute(self.EAGERLY_DROP_TRACE_KEY) for span in trace) else trace


class SubprocessTrace:
"""
Expand Down Expand Up @@ -82,15 +81,26 @@ def __enter__(self) -> trace.Span:
self.span.set_attribute(key, value)
return self.span

def __exit__(self, exc_type, exc_val, exc_tb):
self.span.__exit__(exc_type, exc_val, exc_tb)
# End the span or handle any cleanup
if self.span:
if exc_type:
# Handle exception metadata
self.span.set_status(trace.Status(trace.StatusCode.ERROR, description=str(exc_val)))
self.span.record_exception(exc_val)
self.span.end()
def __exit__(
self,
exc_type: Type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
try:
self.span.__exit__(exc_type, exc_val, exc_tb)
# End the span or handle any cleanup
if self.span:
if exc_type:
# Handle exception metadata
self.span.set_status(
trace.Status(trace.StatusCode.ERROR, description=str(exc_val))
)
self.span.record_exception(exc_val)
self.span.end()
except Exception as e:
_logger.exception(f"Failed to exit subprocess trace. {e}")
raise


class OpenTelemetryLoggingTraceFilter:
Expand All @@ -99,17 +109,5 @@ class OpenTelemetryLoggingTraceFilter:
_log = logging.getLogger(f"{__name__}.OpenTelemetryLoggingTraceFilter")

@classmethod
def activate(cls):
def activate(cls) -> None:
_activate_trace_filter(cls)

def process_trace(self, trace):
logged = False
trace_id = "???"
for span in trace:
trace_id = span.context.trace_id or "???"
if not span.get_attribute(OpenTelemetryEagerlyDropTraceFilter.EAGERLY_DROP_TRACE_KEY):
logged = True
self._log.info(f"----[SPAN#{trace_id}]" + "-" * 40 + f"\n{span}")
if logged:
self._log.info(f"====[END TRACE#{trace_id}]" + "=" * 35)
return trace
Loading
Loading