Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0
The Rule Validation Service component provides functionality to validate extracted document information against predefined business rules and compliance criteria using a two-step LLM-based evaluation approach.
The rule validation service uses a two-step approach:
- Fact Extraction: Extracts relevant facts from document sections
- Orchestrator: Consolidates facts and makes final compliance decisions
This architecture enables better handling of large documents, separation of fact-finding from decision-making, and more accurate compliance determinations from multiple evidence sources.
- Two-Step Validation Workflow:
- Fact extraction from document sections with intelligent chunking
- Orchestrator consolidates facts across sections for final decisions
- Separation of evidence gathering from compliance determination
- Asynchronous Processing: Handles multiple rule types and rules concurrently using asyncio
- Rate Limiting: Built-in semaphore-based rate limiting for API calls to prevent throttling
- Intelligent Text Chunking:
- Page-aware chunking that preserves page boundaries
- Configurable overlap (default 10%) for context preservation
- Automatic fallback to character-based chunking
- Chunking always occurs for fact extraction, orchestrator always runs
- Customizable Recommendations:
- User-defined recommendation options (e.g., Pass/Fail/Info Not Found)
- Dynamic statistics generation based on actual recommendations
- Prompt integration with custom options
- Comprehensive Tracking:
- Token usage and cost tracking
- Detailed timing metrics
- Supporting page references for each rule evaluation
- Robust Error Handling: Graceful degradation with fallback responses and detailed error logging
- Pydantic Validation: Strong data validation for inputs and outputs
- JSON Response Parsing: Intelligent parsing of LLM responses including markdown code block handling
- Orchestrated Consolidation:
- Aggregates fact extraction results across all document sections
- Generates consolidated compliance decisions per rule
- Creates comprehensive summaries with recommendation counts
- Produces both JSON and Markdown output formats
-
RuleValidationService (
service.py):- Section-level fact extraction
- Intelligent page-aware chunking
- Concurrent rule processing with rate limiting
- Extracts facts with citations and relevance
- Returns
FactExtractionResponsewith extracted_facts and extraction_summary
-
RuleValidationOrchestratorService (
orchestrator.py):- Loads fact extraction results from S3
- Consolidates facts across sections using LLM
- Makes final compliance decisions per rule
- Generates overall statistics with dynamic recommendation counts
- Creates JSON and Markdown summary outputs
- Always runs (even for single sections) to make compliance decisions
Document Sections
↓
Fact Extraction (RuleValidationService)
↓ (stores extracted facts in S3)
↓ (multiple chunks per section if needed)
Orchestrator Consolidation (RuleValidationOrchestratorService)
↓ (LLM consolidates facts → compliance decision)
Final Compliance Decision
↓
Output (JSON + Markdown)
Step 1: Fact Extraction
- Input: Document text + Rule
- Output:
FactExtractionResponseextracted_facts: List of facts with citationsextraction_summary: Summary of findings
- LLM focuses on finding relevant evidence
- No compliance decision made at this stage
Step 2: Orchestrator
- Input: All extracted facts from all chunks/sections
- Output:
LLMResponserecommendation: Pass/Fail/Information Not Foundreasoning: Compliance determination explanationsupporting_pages: Aggregated page references
- LLM makes final compliance decision
- Consolidates evidence from multiple sources
- Always runs (even for single section documents)
Response model from fact extraction step:
- rule_type: Type of rule being evaluated
- rule: The specific rule description
- extracted_facts: List of facts with citation and relevance
- extraction_summary: Summary of extracted evidence
Validated response model from orchestrator with automatic data cleaning:
- rule_type: Type of rule being evaluated (e.g., "global_periods", "same_day_service_rules")
- rule: The specific rule description being validated
- supporting_pages: List of page IDs that support the recommendation
- recommendation: Validated recommendation (customizable, default: Pass/Fail/Info Not Found)
- reasoning: Cleaned explanation text with specific page citations
Features:
- Automatic validation of recommendation against configured options
- Reasoning text cleaned of special characters
- Supporting pages validated as list format
- Whitespace automatically stripped
rule_typeandruleadded by code (not from LLM response)
{
"section_id": "section_1",
"chunking_occurred": True, # True if section was chunked
"chunks_created": 2, # Number of chunks (0 = no chunking, 2+ = chunked)
"responses": {
"rule_type_1": [
{
"rule_type": "global_periods",
"rule": "Rule description",
"supporting_pages": ["1", "3"],
"recommendation": "Pass",
"reasoning": "Detailed reasoning with page citations"
}
]
}
}{
"document_id": "doc_123",
"overall_status": "COMPLETE",
"total_rule_types": 4,
"rule_summary": {
"rule_type_1": {
"status": "COMPLETE",
"total_rules": 5,
"Pass": 4,
"Fail": 1
}
},
"overall_statistics": {
"total_rules": 20,
"recommendation_counts": {
"Pass": 15,
"Fail": 3,
"Info Not Found": 2
}
},
"supporting_pages": ["1", "2", "3", "4"],
"rule_details": {
"rule_type_1": {
"total_rules": 5,
"recommendation_counts": {"Pass": 4, "Fail": 1},
"rules": [...]
}
}
}rule_validation:
model: us.anthropic.claude-3-5-sonnet-20240620-v1:0
temperature: "0.0"
top_k: "20"
top_p: "0.0"
max_tokens: "4096"
semaphore: 5 # Concurrent API calls
# Customizable recommendation options
recommendation_options: |-
Pass: The requirement criteria are fully met.
Fail: The requirement is partially met or requires additional information.
Info Not Found: No relevant data exists in the user history.
system_prompt: |
You are a specialized evaluator for medical coding compliance...
task_prompt: |
<<CACHEPOINT>>
<options>
{recommendation_options}
</options>
<document-text>
{DOCUMENT_TEXT}
</document-text>
<rule-type>{rule_type}</rule-type>
<rule>{rule}</rule>rule_validation_orchestrator:
model: us.anthropic.claude-3-5-sonnet-20240620-v1:0
temperature: "0.0"
top_k: "20"
top_p: "0.0"
max_tokens: "4096"
system_prompt: |
You are a specialized evaluator that consolidates rule validation results...
task_prompt: |
<initial_response>
{initial_responses}
</initial_response>
<<CACHEPOINT>>
<options>
{recommendation_options}
</options>
<criteria>
<rule_type>{rule_type}</rule_type>
<rule>{rule}</rule>
</criteria>rule_classes:
- $schema: https://json-schema.org/draft/2020-12/schema
x-aws-idp-rule-type: global_periods
type: object
rule_properties:
minor_surgery_000_010:
type: string
description: >-
If a procedure has a global period of 000 or 010 days...
major_surgery_090:
type: string
description: >-
If a procedure has a global period of 090 days...
$id: global_periodsimport yaml
from idp_common.models import Document
from idp_common.rule_validation import RuleValidationService
# Load configuration from YAML
with open('config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Initialize service with region and config
service = RuleValidationService(
region="us-east-1",
config=config
)
# Process document (processes all sections)
document = Document(...) # Document with classified sections
result = service.validate_document(document)
# Results stored in S3 at:
# s3://{output_bucket}/{document.id}/rule_validation/sections/section_{id}_responses.jsonfrom idp_common.rule_validation import RuleValidationOrchestratorService
# Initialize orchestrator
orchestrator = RuleValidationOrchestratorService(config=config)
# Consolidate all section results
updated_document = orchestrator.consolidate_and_save(
document=document,
config=config,
multiple_sections=True
)
# Access consolidated results
print(f"Consolidated summary: {updated_document.rule_validation_result.output_uri}")
print(f"Sections processed: {updated_document.rule_validation_result.metadata['sections_processed']}")
# Results saved to s3://{output_bucket}/{document.id}/rule_validation/consolidated/:
# - consolidated_summary.json (with dynamic recommendation_counts)
# - consolidated_summary.md (Markdown report)
# - Aggregated supporting page IDs# In configuration
config = {
"rule_validation": {
"recommendation_options": """
Approved: All requirements are satisfied.
Rejected: Requirements are not met.
Pending: Additional information needed.
"""
}
}
# Statistics will automatically use these custom options:
# {
# "recommendation_counts": {
# "Approved": 10,
# "Rejected": 2,
# "Pending": 3
# }
# }For each document section:
- Extract text from section pages with page markers
- Check if chunking is needed based on token limits
- If chunking required:
- Split by page boundaries
- Add overlap from previous chunk
- Preserve page markers
- Evaluate all rules concurrently using asyncio:
- Each rule type processed in parallel
- Each rule within a type processed in parallel
- Semaphore controls concurrent API calls
- Store section responses in S3
For each rule type:
- Load all section responses from S3
- Group responses by rule
- For each rule with multiple responses (processed concurrently):
- Send to LLM for consolidation
- LLM analyzes all evidence
- Generates single consolidated recommendation
- Aggregates supporting pages
- Save consolidated responses per rule type
- Load all consolidated rule type responses
- Generate statistics:
- Count total rules
- Dynamically count each recommendation type
- Calculate per-rule-type breakdowns
- Collect all supporting pages
- Create JSON summary
- Generate Markdown report
- Save both formats to S3
Page numbers in supporting_pages are actually page IDs from the document structure:
# In service.py - page markers are added during text preparation
all_text += f"<page-number>{page_id}</page-number>\n{page_text}\n\n"
# LLM sees these markers and references them in responses
{
"supporting_pages": ["1", "3", "4"], # These are page IDs
"reasoning": "Evidence found on pages 1, 3, and 4..."
}
# Orchestrator aggregates all page IDs
"supporting_pages": ["1", "2", "3", "4", "5"] # Sorted unique page IDs# Splits text by page markers
page_pattern = r"<page-number>(\d+)</page-number>\s*"
pages = [(page_id, content), ...]
# Builds chunks respecting page boundaries
chunk_text = "<page-number>1</page-number>\nPage 1 content\n\n<page-number>2</page-number>\nPage 2 content"
# Adds overlap from previous chunk
if previous_chunk_pages:
# Include last page or partial content from previous chunk
overlap_text = build_overlap(previous_chunk_pages)
chunk_text = overlap_text + current_chunk_textIf no page markers found:
- Falls back to character-based chunking
- Uses configurable overlap percentage
- Preserves word boundaries
# If LLM response parsing fails
response_dict = {
"rule_type": rule_type,
"rule": rule,
"supporting_pages": [],
"recommendation": "Error",
"reasoning": f"Failed to parse response: {response_text}"
}- Invalid recommendations are caught by Pydantic validation
- Custom recommendations must match configured options
- Missing required fields trigger validation errors
# Semaphore controls concurrent API calls
semaphore: 5 # Max 5 concurrent requests
# In code
async with self.semaphore:
response = await self._invoke_model_async(...)- Prompt caching with
<<CACHEPOINT>>marker - Static content before marker is cached
- Dynamic content (document text, options) after marker
- Reduces token costs for repeated evaluations
# Automatic token tracking
self.token_metrics = utils.merge_metering_data(
self.token_metrics,
metering or {}
)
# Includes:
# - inputTokens, outputTokens, totalTokens
# - requests count
# - Per-context breakdown{
"document_id": "doc_123",
"overall_status": "COMPLETE",
"total_rule_types": 4,
"rule_summary": {...},
"overall_statistics": {
"total_rules": 20,
"recommendation_counts": {
"Pass": 15,
"Fail": 3,
"Info Not Found": 2
}
},
"supporting_pages": ["1", "2", "3"],
"rule_details": {...},
"generated_at": "2026-01-22T19:00:00"
}# Rule Validation Summary
**Document ID**: doc_123
**Status**: COMPLETE
**Total Rule Types**: 4
**Total Rules**: 20
## Overall Statistics
**Recommendation Counts:**
- **Pass:** 15
- **Fail:** 3
- **Info Not Found:** 2
**Supporting Pages:** 1, 2, 3
## Rule Type: global_periods
**Total Rules:** 5
**Pass:** 4
**Fail:** 1
### Rules
| Rule | Recommendation | Reasoning | Supporting Pages |
|------|----------------|-----------|------------------|
| Minor surgery rule | Pass | Evidence found... | 1, 3 |
...The rule validation service integrates with the IDP pipeline through:
- Document Model: Uses Document object with classified sections
- S3 Storage: Stores intermediate and final results in S3
- Step Functions: Orchestrated through AWS Step Functions workflow
- Lambda Functions:
rule-validation-function: Executes section-level evaluationrule-validation-orchestration-function: Executes orchestration
- Status Tracking: Updates document status through pipeline
Use Jupyter notebooks to test and validate rule validation functionality. See notebooks/misc/e2e-holistic-packet-classification-rule-validation.ipynb for a complete end-to-end example.
# Install IDP common package
%pip install -e "../../lib/idp_common_pkg[dev, all]"
import yaml
from idp_common.models import Document, Status
from idp_common.rule_validation import RuleValidationService
from idp_common import s3
# Load configuration
config_path = "../../config_library/pattern-2/rule-validation/config.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
print(f"Model: {config['rule_validation']['model']}")# Initialize service
rule_validation_service = RuleValidationService(
region=region,
config=config
)
# Process each section individually
section_results = []
for section in document.sections:
print(f"Processing section {section.section_id} ({section.classification})")
# Create document with only this section
section_document = Document(
id=document.id,
input_key=document.input_key,
input_bucket=document.input_bucket,
output_bucket=document.output_bucket,
pages=document.pages,
sections=[section], # Only this section
status=document.status,
metering=document.metering.copy() if document.metering else {}
)
# Create fresh service instance (avoids asyncio semaphore issues in notebooks)
section_service = RuleValidationService(region=region, config=config)
# Process the section
section_result = section_service.validate_document(section_document)
section_results.append(section_result)
print(f"Completed section {section.section_id}")# Check section results
for i, section_result in enumerate(section_results):
section_id = document.sections[i].section_id
if hasattr(section_result, 'rule_validation_result'):
rv_result = section_result.rule_validation_result
section_uri = rv_result.metadata.get('section_output_uri')
print(f"Section {section_id}: {section_uri}")
# Load and view section responses
section_data = s3.get_json_content(section_uri)
print(f" Rules evaluated: {len(section_data.get('responses', {}))}")from idp_common.rule_validation import RuleValidationOrchestratorService
# Initialize orchestrator
orchestrator = RuleValidationOrchestratorService(config=config)
# Consolidate all section results
updated_document = orchestrator.consolidate_and_save(
document=document,
config=config,
multiple_sections=True
)
print("Consolidation complete")
print(f"Summary URI: {updated_document.rule_validation_result.output_uri}")
# View consolidated summary
summary_uri = updated_document.rule_validation_result.output_uri
summary = s3.get_json_content(summary_uri)
print("\nOverall Statistics:")
print(json.dumps(summary['overall_statistics'], indent=2))
print("\nRule Summary:")
for rule_type, stats in summary['rule_summary'].items():
print(f"\n{rule_type}:")
print(f" Total rules: {stats['total_rules']}")
for rec, count in stats.items():
if rec not in ['status', 'total_rules']:
print(f" {rec}: {count}")# Modify config for custom recommendations
config['rule_validation']['recommendation_options'] = """
Compliant: Fully meets regulatory requirements.
Non-Compliant: Does not meet requirements.
Requires Review: Manual review needed.
"""
# Reinitialize service with custom config
custom_service = RuleValidationService(region=region, config=config)
# Process with custom recommendations
result = custom_service.validate_document(section_document)
# Statistics will use custom options:
# {"Compliant": 10, "Non-Compliant": 2, "Requires Review": 3}Important Notes:
- Create a fresh
RuleValidationServiceinstance for each section in notebooks to avoid asyncio semaphore issues - Section results are stored in S3 at
{document_id}/rule_validation/sections/section_{id}_responses.json - Consolidated results are at
{document_id}/rule_validation/consolidated/ - The orchestrator's
consolidate_and_save()method handles async operations internally - no need to useasyncio.run()
For complete workflow examples, see notebooks/misc/e2e-holistic-packet-classification-rule-validation.ipynb.
- Place
{recommendation_options}AFTER<<CACHEPOINT>>for proper caching - Use temperature 0 for deterministic rule evaluation
- Adjust semaphore based on API rate limits
- Define clear, distinct recommendation options
- Write specific, measurable rule descriptions
- Include clear success criteria
- Reference specific document sections or fields
- Provide examples in rule descriptions
- Use page-level classification to create focused sections
- Limit section size to avoid excessive chunking
- Monitor token usage and adjust max_tokens if needed
- Use appropriate models (faster models for simple rules)
- Check section response files in S3 for intermediate results
- Review consolidated responses for rule-level details
- Examine Markdown report for human-readable summary
- Monitor CloudWatch logs for detailed execution traces
- v2.0: Added rule validation orchestrator with dynamic recommendations
- v1.5: Implemented page-aware chunking
- v1.0: Initial release with section-level evaluation
- GenAI IDP Accelerator Team