Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0
The Reporting module provides comprehensive functionality for saving document processing data to reporting storage in structured formats for analytics and reporting purposes. It supports saving evaluation results, metering data, and document sections as Parquet files in S3, with automatic AWS Glue table creation and partition projection for efficient querying with Amazon Athena.
The SaveReportingData class is the main component of the reporting module. It provides methods to save different types of document data to a reporting bucket in Parquet format, including automated cost calculation capabilities.
from idp_common.reporting import SaveReportingData
from idp_common.models import Document
# Initialize the SaveReportingData class with a reporting bucket
reporter = SaveReportingData(reporting_bucket="my-reporting-bucket")
# Initialize with pricing configuration for cost calculation
config = {
"pricing": [
{
"name": "bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0",
"units": [
{"name": "inputTokens", "price": "3.0e-6"},
{"name": "outputTokens", "price": "1.5e-5"}
]
}
]
}
reporter = SaveReportingData(
reporting_bucket="my-reporting-bucket",
database_name="my-glue-database", # Optional for Glue table creation
config=config # Optional for cost calculations
)
# Save specific data types for a document
results = reporter.save(document, data_to_save=["evaluation_results", "metering"])- reporting_bucket (str): S3 bucket name for storing reporting data
- database_name (str, optional): AWS Glue database name for automatic table creation and updates
- config (Dict[str, Any], optional): Configuration dictionary containing pricing data and other settings
When config is provided with pricing information, the system automatically calculates unit costs and estimated costs for all metering data.
- Modular Design: Each data type has its own processing method, making it easy to add support for new data types
- Parquet Format: Data is saved in Parquet format, which is optimized for analytics workloads
- Hierarchical Storage: Data is organized in a hierarchical structure by date/document with unique timestamped filenames
- Flexible Schema: Each data type has its own schema definition, allowing for specialized data structures
- Dynamic Schema Inference: Automatically infers schemas for document sections without predefined structures
- Partition Projection: Uses AWS Glue partition projection for efficient querying without MSCK REPAIR operations
- Automatic Table Discovery: AWS Glue Crawler automatically discovers new section types and creates tables
- Unique File Naming: Timestamp-based filenames prevent overwrites when documents are reprocessed
- Error Handling: Comprehensive error handling with detailed logging and graceful failure recovery
- AWS Athena Ready: All data is immediately queryable through Amazon Athena with optimized partition pruning
- Cost Calculation: Automated unit cost and estimated cost calculation for metering data based on configuration
- Pricing Configuration: Dynamic loading and caching of pricing data from configuration sources
- Cost Analytics: Enhanced metering data with cost information for detailed financial analysis
The reporting module includes comprehensive cost calculation capabilities that automatically enhance metering data with cost information:
Pricing data is loaded from the configuration dictionary passed to the constructor. The configuration supports multiple services and units:
config = {
"pricing": [
{
"name": "bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0",
"units": [
{"name": "inputTokens", "price": "3.0e-6"},
{"name": "outputTokens", "price": "1.5e-5"},
{"name": "cacheReadInputTokens", "price": "1.5e-6"},
{"name": "cacheWriteInputTokens", "price": "1.875e-5"}
]
},
{
"name": "textract/analyze_document",
"units": [
{"name": "pages", "price": "0.0015"}
]
},
{
"name": "bedrock/us.amazon.nova-pro-v1:0",
"units": [
{"name": "inputTokens", "price": "8.0e-7"},
{"name": "outputTokens", "price": "3.2e-6"}
]
}
]
}
# Initialize with cost calculation enabled
reporter = SaveReportingData(
reporting_bucket="my-reporting-bucket",
database_name="my-glue-database",
config=config
)- Configuration Loading: Pricing data is loaded from the config dictionary and cached for performance
- Service/Unit Matching: System attempts exact match for service_api/unit combinations
- Fuzzy Matching: If exact match fails, uses partial matching for common patterns
- Cost Calculation:
estimated_cost = value × unit_costfor each metering record - Fallback Handling: Missing pricing defaults to $0.0 with warning logs
When cost calculation is enabled, metering records include additional fields:
| Field | Type | Description |
|---|---|---|
| unit_cost | double | Cost per unit in USD (e.g., $0.000003 per token) |
| estimated_cost | double | Total calculated cost (value × unit_cost) |
The following methods support the cost calculation functionality:
Loads and caches pricing data from the configuration dictionary.
Returns: Dictionary mapping service names to unit costs
Example:
# Automatically called during metering data processing
pricing_map = reporter._get_pricing_from_config()
# Result: {
# "bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0": {
# "inputTokens": 0.000003,
# "outputTokens": 0.000015
# }
# }Retrieves the unit cost for a specific service API and unit combination.
Parameters:
service_api: The service identifier (e.g., "bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0")unit: The unit of measurement (e.g., "inputTokens", "pages")
Returns: Unit cost in USD, or 0.0 if not found
Example:
# Get cost per input token for Claude
cost = reporter._get_unit_cost(
"bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0",
"inputTokens"
)
# Result: 0.000003Clears the cached pricing data to force reload from configuration.
Example:
# Clear cache after configuration update
reporter.clear_pricing_cache()Creates or updates the AWS Glue table for metering data with the enhanced schema including cost fields.
Parameters:
schema: PyArrow schema including unit_cost and estimated_cost columns
Returns: True if table was created or updated, False otherwise
With cost calculation enabled, you can perform detailed cost analysis:
from idp_common.reporting import SaveReportingData
from idp_common.models import Document
# Initialize with cost calculation
config = {"pricing": [...]} # Your pricing configuration
reporter = SaveReportingData(
reporting_bucket="my-reporting-bucket",
config=config
)
# Process document with cost calculation
document = Document.from_dict(document_data)
results = reporter.save(document, data_to_save=["metering"])
# The metering data now includes:
# - unit_cost: Cost per token/page/operation
# - estimated_cost: Total cost for each operation- Caching: Pricing configuration is cached after first load to avoid repeated parsing
- Lazy Loading: Pricing data is only loaded when first metering record is processed
- Memory Efficient: Cache stores only processed pricing data, not raw configuration
- Error Handling: Invalid pricing entries are skipped with warning logs
The system provides detailed logging for cost calculation operations:
INFO - Found 15 pricing entries in configuration
INFO - Successfully loaded 45 pricing entries from configuration
INFO - Using partial match for bedrock/claude-3/inputTokens: bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0/inputTokens = $0.000003
WARNING - No unit cost mapping found for service_api='custom/service', unit='unknown_unit'. Using $0.0
The module supports saving document evaluation results, which include:
- Document-level metrics: Overall accuracy, precision, recall, F1 score, etc.
- Section-level metrics: Metrics for each document section
- Attribute-level details: Detailed information about each attribute, including expected vs. actual values, match status, and confidence
# Save evaluation results for a document
result = reporter.save_evaluation_results(document)The module supports saving document processing metering data for cost tracking and analytics:
# Save metering data for a document
result = reporter.save_metering_data(document)The module supports saving document section extraction results as Parquet files with dynamic schema inference. This functionality processes each section in the document, loads the extraction results from S3, and saves them in a structured, partitioned format suitable for analytics.
- Dynamic Schema Inference: Automatically constructs PyArrow schemas from JSON data without requiring predefined schemas
- Flexible Data Handling: Supports various JSON structures (objects, arrays, primitives)
- Nested JSON Flattening: Converts nested objects to flat structure using dot notation
- Partition Structure: Organizes data with section_type and date-based partitioning
- Error Resilience: Continues processing other sections if individual sections fail
- Comprehensive Logging: Detailed logging for monitoring and debugging
# Save document sections along with other data types
data_to_save = ["sections"] # New option
results = reporter.save(document, data_to_save)
# Or combine with existing functionality
data_to_save = ["evaluation_results", "metering", "sections"]
results = reporter.save(document, data_to_save)For the sections functionality to work, your Document object must have:
- Sections: A list of
Sectionobjects indocument.sections - Extraction Results: Each section should have
extraction_result_uripointing to S3 JSON file - Classification: Each section should have a
classificationfield for partitioning
Schema Inference: The method dynamically infers PyArrow schemas by analyzing the JSON data:
- Strings: Mapped to
pa.string() - Integers: Mapped to
pa.int64() - Floats: Mapped to
pa.float64() - Booleans: Mapped to
pa.bool_() - Lists/Objects: Converted to JSON strings and mapped to
pa.string() - Mixed Types: Defaults to
pa.string()
JSON Flattening: Nested JSON structures are flattened using dot notation:
// Input JSON
{
"customer": {
"name": "John Doe",
"address": {
"street": "123 Main St",
"city": "Anytown"
}
},
"items": ["item1", "item2"]
}
// Flattened Output
{
"customer.name": "John Doe",
"customer.address.street": "123 Main St",
"customer.address.city": "Anytown",
"items": "[\"item1\", \"item2\"]" // Arrays become JSON strings
}Metadata Fields: Each record includes the following metadata fields:
section_id: The unique identifier of the sectiondocument_id: The document identifiersection_classification: The section's classification/typesection_confidence: The confidence score for the sectiontimestamp: The timestamp when the document was processed
Data is stored in S3 with the following structure:
reporting-bucket/
├── evaluation_metrics/
│ ├── document_metrics/
│ │ └── date=YYYY-MM-DD/
│ │ ├── doc-id_20240115_143052_123_results.parquet
│ │ ├── doc-id_20240115_143127_456_results.parquet # Same doc, reprocessed
│ │ └── another-doc-id_20240115_144001_789_results.parquet
│ ├── section_metrics/
│ │ └── date=YYYY-MM-DD/
│ │ ├── doc-id_20240115_143052_123_results.parquet
│ │ └── another-doc-id_20240115_144001_789_results.parquet
│ └── attribute_metrics/
│ └── date=YYYY-MM-DD/
│ ├── doc-id_20240115_143052_123_results.parquet
│ └── another-doc-id_20240115_144001_789_results.parquet
├── metering/
│ └── date=YYYY-MM-DD/
│ ├── doc-id_20240115_143052_123_results.parquet
│ └── another-doc-id_20240115_144001_789_results.parquet
└── document_sections/
├── invoice/
│ └── date=YYYY-MM-DD/
│ ├── doc-id_section_1.parquet
│ └── doc-id_section_4.parquet
├── receipt/
│ └── date=YYYY-MM-DD/
│ └── doc-id_section_2.parquet
└── bank_statement/
└── date=YYYY-MM-DD/
└── doc-id_section_3.parquet
All files use a unique timestamp-based naming convention to prevent overwrites:
- Format:
{escaped_doc_id}_{timestamp}_results.parquet - Timestamp:
YYYYMMDD_HHMMSS_mmm(includes milliseconds) - Example:
invoice-123_20240115_143052_123_results.parquet
This ensures that if the same document is processed multiple times in the same day, each processing attempt creates a separate file, preserving all data for analysis and debugging.
This structure is designed to be compatible with AWS Glue and Amazon Athena for analytics. The document sections are partitioned by section_type (classification) as the first partition level, followed by a single date-based partition using the format YYYY-MM-DD. Each file is uniquely named with the document ID and section ID to avoid conflicts, and the document ID is included as a column in the Parquet data for filtering and analysis.
The new single date partition structure provides several advantages:
- Simplified Queries: Natural date range queries like
WHERE date BETWEEN '2024-01-01' AND '2024-01-31' - Efficient Pruning: Athena can efficiently prune partitions based on date ranges
- Cleaner Organization: Single date partition is easier to understand and maintain
- Better Performance: Reduced partition overhead compared to three-level partitioning
- Future-Proof: Easier to extend and modify partition strategies
The reporting module is designed to work seamlessly with AWS Glue and Amazon Athena:
The reporting module provides two types of automatic table creation:
- Evaluation Tables:
document_evaluations,section_evaluations,attribute_evaluations - Metering Table:
metering - Created during stack deployment via CloudFormation
When processing documents with new section types, the SaveReportingData class automatically:
- Creates New Tables: Generates a Glue table for each unique section type (e.g.,
document_sections_invoice,document_sections_w2) - Updates Schemas: Adds new columns when new fields are detected in extraction results
- Configures Partitions: Sets up partition projection for efficient date-based queries
- Normalizes Names: Converts section types to lowercase for S3 path consistency (e.g., "W2" → "w2")
This automatic table creation eliminates manual table management and ensures data is immediately queryable in Athena.
All tables use AWS Glue partition projection to eliminate the need for MSCK REPAIR TABLE operations:
# Example partition projection for date-based partitioning
projection.enabled: "true"
projection.date.type: "date"
projection.date.format: "yyyy-MM-dd"
projection.date.range: "2024-01-01,2030-12-31"
projection.date.interval: "1"
projection.date.interval.unit: "DAYS"
storage.location.template: "s3://bucket/path/date=${date}/"The document sections crawler automatically:
- Discovers New Section Types: Creates tables for new document classifications
- Updates Schemas: Adapts to changes in extraction result structures
- Configurable Schedule: Runs manually, every 15 minutes, hourly, or daily
- Partition Discovery: Automatically discovers new date partitions
- No Manual Maintenance: Tables and partitions are automatically managed
- Immediate Availability: New data is queryable immediately without manual intervention
- Optimized Performance: Partition projection provides efficient query performance
- Schema Evolution: Automatic adaptation to changing data structures
To add support for a new data type:
- Add a new method to the
SaveReportingDataclass:
def save_document_metadata(self, document: Document) -> Optional[Dict[str, Any]]:
"""
Save document metadata to the reporting bucket.
Args:
document: Document object containing metadata
Returns:
Dict with status and message, or None if no metadata
"""
# Define schema specific to document metadata
metadata_schema = pa.schema([
('document_id', pa.string()),
('input_key', pa.string()),
('created_at', pa.timestamp('ms')),
('page_count', pa.int32()),
('file_size', pa.int64()),
# Add other metadata fields as needed
])
# Implementation for saving document metadata
# ...
return {
'statusCode': 200,
'body': "Successfully saved document metadata"
}- Update the
savemethod to call this new method when the data type is requested:
if 'document_metadata' in data_to_save:
logger.info("Processing document metadata")
result = self.save_document_metadata(document)
if result:
results.append(result)The module is designed to be used in Lambda functions for saving document data to reporting storage:
from idp_common.models import Document
from idp_common.reporting import SaveReportingData
def handler(event, context):
# Extract parameters from the event
document_dict = event.get('document')
reporting_bucket = event.get('reporting_bucket')
data_to_save = event.get('data_to_save', [])
# Convert document dict to Document object
document = Document.from_dict(document_dict)
# Use the SaveReportingData class to save the data
reporter = SaveReportingData(reporting_bucket)
results = reporter.save(document, data_to_save)
# Return success if all operations completed
return {
'statusCode': 200,
'body': "Successfully saved data to reporting bucket"
}The reporting module has the following dependencies:
boto3: For AWS S3 operationspyarrow: For Parquet file creation and schema definitionidp_common.models: For the Document data model
When deploying Lambda functions that use this module, ensure that these dependencies are included in the deployment package.