Core services layer for Graph API operations, providing database management, connection pooling, data staging, admission control, and observability.
The core/ directory contains the foundational services that power the Graph API, organized by database technology:
- LadybugDB Services - Embedded graph database management, connection pooling, and service orchestration
- DuckDB Services - High-performance data staging via intermediate tables
- Shared Services - Admission control, task management, and metrics collection
core/
├── README.md # This file
├── __init__.py # Core service exports
│
├── ladybug/ # LadybugDB embedded graph database
│ ├── __init__.py # LadybugDB exports
│ ├── engine.py # Low-level database driver
│ ├── pool.py # Connection pooling
│ ├── manager.py # Database lifecycle and schema
│ ├── service.py # Service orchestration
│ ├── config.py # LadybugDB configuration
│ └── materialization_lock.py # Single-writer materialization lock
│
├── duckdb/ # DuckDB data staging
│ ├── __init__.py # DuckDB exports
│ ├── pool.py # DuckDB connection pooling
│ └── manager.py # Staging table management
│
├── lance/ # LanceDB vector storage
│ ├── __init__.py # LanceDB exports
│ └── manager.py # Vector table management
│
└── [shared services] # Technology-agnostic services
├── admission_control.py # CPU/memory-based backpressure
├── backup_service.py # Backup/restore service
├── memory_manager.py # Memory budget management
├── migration_service.py # Graph schema migration service
├── metrics_collector.py # Performance monitoring
├── task_manager.py # Async task coordination
├── task_sse.py # Server-Sent Events
└── utils.py # Shared utilities
Location: core/ladybug/
LadybugDB is our primary embedded graph database, providing high-performance graph operations with direct local access:
Architecture Layers:
- Engine (
engine.py) - Low-level driver interfacing with the embedded database - Connection Pool (
pool.py) - Efficient connection reuse and lifecycle management - Database Manager (
manager.py) - Database lifecycle, schema management, and queries - Service (
service.py) - High-level orchestration and cluster coordination
Key Features:
- Embedded Architecture - No network overhead, direct file system access
- Multi-Database Support - Multiple independent databases per instance
- Connection Pooling - Automatic resource management and cleanup
- Schema Management - DDL execution and validation
- Thread Safety - Safe concurrent access with proper locking
Usage:
from robosystems.graph_api.core.ladybug import (
Engine,
LadybugConnectionPool,
LadybugDatabaseManager,
LadybugService,
get_ladybug_service
)
# High-level service access (recommended)
service = get_ladybug_service()
response = service.execute_query(QueryRequest(
database="kg123",
cypher="MATCH (n) RETURN n LIMIT 10"
))
# Low-level engine access (for advanced use cases)
engine = Engine("/data/lbug-dbs/kg123.lbug")
result = engine.execute_query("MATCH (n:Entity) RETURN n.name")Location: core/duckdb/
High-performance data staging system for validating and preparing data before graph ingestion:
Primary Classes:
DuckDBConnectionPool- Connection pooling for DuckDBDuckDBTableManager- Staging table creation and querying
Key Features:
- S3 Integration - Direct access to S3 Parquet files via httpfs
- SQL Validation - Query and validate data before ingestion
- Automatic Deduplication - Node and relationship table deduplication
- Materialized Tables - Fast querying of staged data
Workflow:
User Uploads → S3 Storage → DuckDB Staging → Validation → Graph Database
Usage:
from robosystems.graph_api.core.duckdb import (
DuckDBTableManager,
DuckDBConnectionPool
)
manager = DuckDBTableManager(staging_path="./data/staging")
manager.create_table(
graph_id="kg123",
table_name="entities",
s3_pattern="s3://bucket/data/*.parquet"
)
# Validate data with SQL
result = manager.query(
graph_id="kg123",
sql="SELECT COUNT(*) FROM entities WHERE name IS NOT NULL"
)File: ladybug/engine.py
Low-level driver for LadybugDB database operations:
Key Classes:
Engine- Main database connection and query executionRepository- High-level repository abstraction
Key Features:
- Direct Database Access - Embedded database without network overhead
- Cypher Query Execution - Full Cypher query language support
- Parameterized Queries - Safe query execution with parameters
- Error Handling - Comprehensive exception handling
- Schema Operations - DDL execution for schema management
Usage:
from robosystems.graph_api.core.ladybug import Engine
engine = Engine("/data/lbug-dbs/kg123.lbug")
result = engine.execute_query(
"MATCH (n:Entity) WHERE n.id = $id RETURN n",
{"id": "entity-123"}
)File: ladybug/pool.py
Efficient connection pooling for LadybugDB with automatic lifecycle management:
Primary Class: LadybugConnectionPool
Key Features:
- Resource Efficiency - Reuse connections across requests
- Automatic Cleanup - Idle connection cleanup with configurable TTL
- Thread Safety - Safe concurrent access with proper locking
- Health Checks - Connection validation before use
- Per-Database Pools - Separate connection pools per database
- LRU Eviction - Least-recently-used connection eviction
Configuration:
from robosystems.graph_api.core.ladybug import LadybugConnectionPool
pool = LadybugConnectionPool(
base_path="/data/lbug-dbs",
max_connections_per_db=5,
idle_timeout_minutes=15,
connection_ttl_minutes=60
)
with pool.get_connection("kg123") as conn:
result = conn.execute("MATCH (n) RETURN n LIMIT 10")File: ladybug/manager.py
Complete database lifecycle management including creation, deletion, schema management, and querying:
Primary Class: LadybugDatabaseManager
Key Features:
- Database Lifecycle - Create, delete, and manage graph databases
- Schema Management - DDL execution and validation
- Multi-Database Support - Handle multiple databases per instance
- Connection Pool Integration - Automatic connection management
- Database Info - Size, health, and metadata queries
Usage:
from robosystems.graph_api.core.ladybug import LadybugDatabaseManager
manager = LadybugDatabaseManager(
base_path="/data/lbug-dbs",
max_databases=100
)
# Create database with schema
response = manager.create_database(
graph_id="kg123",
schema_type="entity",
read_only=False
)
# Get database information
info = manager.get_database_info("kg123")File: ladybug/service.py
High-level service orchestration coordinating all LadybugDB operations:
Primary Class: LadybugService
Key Features:
- Unified Interface - Single entry point for all operations
- Query Execution - Cypher query execution with metrics
- Health Monitoring - System health and resource tracking
- Cluster Information - Node metadata and topology
- Service Discovery - Node identification and registration
Usage:
from robosystems.graph_api.core.ladybug import get_ladybug_service
from robosystems.graph_api.models.database import QueryRequest
service = get_ladybug_service()
# Execute query
response = service.execute_query(QueryRequest(
database="kg123",
cypher="MATCH (n:Entity) RETURN n.name, n.id LIMIT 10",
parameters={}
))
# Get cluster health
health = service.get_cluster_health()
# Get cluster information
info = service.get_cluster_info()Files: duckdb/manager.py, duckdb/pool.py
High-performance data ingestion via intermediate DuckDB staging tables:
Primary Classes:
DuckDBTableManager- Staging table creation and query executionDuckDBConnectionPool- DuckDB connection pooling
Key Features:
- S3 Integration - Direct S3 file access via httpfs extension
- SQL Validation - Query and validate data before graph ingestion
- Materialized Tables - Fast querying of staged Parquet files
- Automatic Deduplication - Node and relationship table deduplication
- Connection Pooling - Efficient DuckDB connection management
Usage:
from robosystems.graph_api.core.duckdb import DuckDBTableManager
manager = DuckDBTableManager(staging_path="./data/staging")
# Create staging table from S3
manager.create_table(
graph_id="kg123",
table_name="Entity",
s3_pattern="s3://bucket/entities/*.parquet",
columns=["id", "name", "type"]
)
# Query staging table
result = manager.query(
graph_id="kg123",
sql="SELECT type, COUNT(*) as count FROM Entity GROUP BY type"
)File: admission_control.py
CPU and memory-based backpressure management to prevent system overload:
Primary Class: AdmissionController
Key Features:
- Resource Monitoring - Track CPU and memory utilization
- Adaptive Throttling - Reject requests when resources are constrained
- Configurable Thresholds - CPU and memory warning/critical levels
- Graceful Degradation - Return 503 Service Unavailable during overload
Configuration:
from robosystems.graph_api.core.admission_control import AdmissionController
admission = AdmissionController(
cpu_warning=70.0,
cpu_critical=85.0,
memory_warning=75.0,
memory_critical=90.0
)
# Check before processing request
if not admission.should_admit():
raise HTTPException(status_code=503, detail="Service overloaded")Files: task_manager.py, task_sse.py
Async operation coordination with Server-Sent Events for real-time progress:
Primary Classes:
TaskManager- Task coordination and trackingTaskSSE- Server-Sent Events for progress streaming
Key Features:
- Task Tracking - Manage long-running operations
- Progress Updates - Real-time status via Server-Sent Events
- Valkey-Backed - Distributed task state storage
- Automatic Cleanup - Task lifecycle management
Usage:
from robosystems.graph_api.core.task_manager import (
backup_task_manager,
restore_task_manager
)
# Create backup task
task_id = await backup_task_manager.create_task(
task_type="backup",
metadata={"database": "kg123"}
)
# Update progress
await backup_task_manager.update_task(
task_id,
status="running",
metadata={"progress": 50, "message": "Compressing backup..."}
)
# Complete task
await backup_task_manager.complete_task(
task_id,
result={"backup_size": 1024000, "location": "s3://..."}
)File: metrics_collector.py
Performance monitoring and observability for graph operations:
Primary Class: LadybugMetricsCollector
Key Features:
- Query Metrics - Execution time, row counts, error rates
- Database Metrics - Size, table counts, connection stats
- System Metrics - CPU, memory, disk usage
- Time-Series Data - Historical performance tracking
Usage:
from robosystems.graph_api.core.metrics_collector import LadybugMetricsCollector
metrics = LadybugMetricsCollector()
metrics.record_query("kg123", cypher_query, execution_time, row_count)
stats = metrics.get_database_stats("kg123")Core services are initialized at application startup:
from robosystems.graph_api.core import (
init_ladybug_service,
initialize_connection_pool
)
from robosystems.middleware.graph.types import NodeType, RepositoryType
# Initialize LadybugDB service
ladybug_service = init_ladybug_service(
base_path="/data/lbug-dbs",
max_databases=100,
read_only=False,
node_type=NodeType.WRITER,
repository_type=RepositoryType.ENTITY
)
# Initialize connection pool
connection_pool = initialize_connection_pool(
base_path="/data/lbug-dbs",
max_connections_per_db=10,
idle_timeout_minutes=15
)- API Request → Routers receive HTTP request
- Authentication → Middleware validates JWT/API key
- Authorization → Verify graph access permissions
- Service Routing →
LadybugServicecoordinates operation - Admission Check →
AdmissionControllerverifies resources - Connection Acquisition →
ConnectionPoolprovides connection - Query Execution →
Engineexecutes Cypher query - Metrics Collection →
MetricsCollectorrecords performance - Response → Results returned to client
- Upload → Parquet files to S3 (via API)
- Staging →
DuckDBTableManagercreates materialized table from S3 - Validation → User queries staging table with SQL
- Transformation → Optional data cleanup and validation
- Ingestion → Copy from DuckDB staging to LadybugDB graph
- Verification → Confirm data integrity in graph
- Cleanup → Staging table remains for incremental updates
Each graph database is isolated and independently managed:
# Each tenant gets their own database
tenant_a_db = "kg1a2b3c4d5e6f7g8" # 16-char hex ID
tenant_b_db = "kg9h8i7j6k5l4m3n" # Separate database
# Databases are isolated at file system level
# /data/lbug-dbs/kg1a2b3c4d5e6f7g8.lbug
# /data/lbug-dbs/kg9h8i7j6k5l4m3n.lbug
# Connection pools are per-database
with pool.get_connection(tenant_a_db) as conn:
# Tenant A operations
pass
with pool.get_connection(tenant_b_db) as conn:
# Tenant B operations (completely isolated)
passCore services are configured via environment variables:
# Database directory
LBUG_DATABASE_PATH=/data/lbug-dbs
# Instance limits
LBUG_DATABASES_PER_INSTANCE=10
# Node configuration
LBUG_NODE_TYPE=writer # writer, reader, shared_master
LBUG_REPOSITORY_TYPE=entity # entity, shared
LBUG_READ_ONLY=false# Pool configuration
LBUG_MAX_CONNECTIONS_PER_DB=10
LBUG_IDLE_TIMEOUT_MINUTES=15
LBUG_CONNECTION_TTL_MINUTES=60
# Health checks
LBUG_HEALTH_CHECK_INTERVAL=30# Staging configuration
DUCKDB_STAGING_PATH=./data/staging
DUCKDB_MAX_MEMORY=4GB
DUCKDB_THREADS=4
# S3 integration
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=us-east-1# Resource thresholds
CPU_WARNING_THRESHOLD=70
CPU_CRITICAL_THRESHOLD=85
MEMORY_WARNING_THRESHOLD=75
MEMORY_CRITICAL_THRESHOLD=90# Valkey configuration for task state (redis:// URL scheme)
VALKEY_URL=redis://localhost:6379
TASK_MANAGER_DB=3
TASK_TTL_SECONDS=3600- Always use connection pooling - Never create direct Engine instances in request handlers
- Set appropriate database limits - Configure
max_databasesbased on instance capacity - Monitor resource usage - Track CPU, memory, and disk I/O
- Use read-only mode - Enable for reader instances to prevent writes
- Configure pool sizes - Set
max_connections_per_dbbased on expected concurrency - Set reasonable TTLs - Balance between connection reuse and resource cleanup
- Monitor pool metrics - Track connection creation, reuse, and eviction rates
- Handle connection errors - Implement retry logic for transient failures
- Validate schema before creation - Use schema validation before creating databases
- Close connections on deletion - Always close all connections before deleting databases
- Use atomic operations - Ensure database operations are atomic and recoverable
- Implement proper cleanup - Clean up resources on database deletion
- Validate before ingestion - Always query and validate staged data before graph ingestion
- Use materialized tables - Don't use views for ingestion (poor performance)
- Keep staging tables - Retain tables for incremental loading and reprocessing
- Monitor staging disk usage - Clean up old staging tables periodically
- Set appropriate thresholds - Configure based on actual instance capacity
- Monitor continuously - Track resource trends to adjust thresholds
- Implement graceful degradation - Return meaningful errors during overload
- Test under load - Validate admission control under realistic load conditions
- Catch specific exceptions - Handle
ConnectionError,QueryErrorseparately - Log errors with context - Include graph_id, query, and operation details
- Implement retries - Retry transient failures with exponential backoff
- Return user-friendly errors - Sanitize error messages before returning to client
- Graph API README - Complete Graph API overview
- LadybugDB README - Detailed LadybugDB documentation
- DuckDB Staging README - DuckDB staging system documentation
- Client Factory - Client routing system
- Issues: robosystems/issues
- Source Code:
/robosystems/graph_api/core/ - API Docs: http://localhost:8001/docs