Successfully added a full async API to the QueryWeaver library, providing high-performance async/await support for applications that can benefit from concurrency.
Created a complete async version of the QueryWeaver client with:
- Same Interface: All methods match the sync API but with
async/await - Context Manager Support:
async withfor automatic resource cleanup - Concurrent Operations: Multiple operations can run simultaneously
- Performance Benefits: Non-blocking I/O for better throughput
All major operations are now available in async versions:
async load_database()- Load database schemas asynchronouslyasync text_to_sql()- Generate SQL with async processingasync query()- Full query processing with async executionasync get_database_schema()- Retrieve schema information asynchronously
async with AsyncQueryWeaverClient(...) as client:
await client.load_database(...)
sql = await client.text_to_sql(...)
# Automatically closed when exiting contextawait asyncio.gather(
client.load_database("db1", "postgresql://..."),
client.load_database("db2", "mysql://..."),
client.load_database("db3", "postgresql://...")
)queries = ["query 1", "query 2", "query 3"]
results = await asyncio.gather(*[
client.text_to_sql("mydb", query) for query in queries
])async def process_in_batches(queries, batch_size=5):
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
batch_results = await asyncio.gather(*[
client.text_to_sql("mydb", q) for q in batch
])
await asyncio.sleep(0.1) # Brief pause between batches- Composition over Inheritance: AsyncQueryWeaverClient uses the sync client for initialization logic, then provides its own async methods
- Native Async: All I/O operations use the existing async infrastructure from QueryWeaver core
- Same API Surface: Method signatures match the sync version for easy migration
- Resource Management: Proper cleanup with context managers
- Non-blocking Operations: All database and AI operations are non-blocking
- Error Handling: Same exception types and error handling as sync API
- Memory Efficiency: Shared state with sync client where possible
- Type Hints: Full type annotation support
- Context Managers:
async withsupport for automatic cleanup
import asyncio
from queryweaver import AsyncQueryWeaverClient
async def main():
async with AsyncQueryWeaverClient(
falkordb_url="redis://localhost:6379/0",
openai_api_key="your-api-key"
) as client:
await client.load_database("mydb", "postgresql://...")
sql = await client.text_to_sql("mydb", "Show all customers")
result = await client.query("mydb", "Count orders")
asyncio.run(main())async def process_many_queries():
async with AsyncQueryWeaverClient(...) as client:
await client.load_database("mydb", "postgresql://...")
# Process 100 queries concurrently in batches
queries = [f"Query {i}" for i in range(100)]
results = []
for i in range(0, len(queries), 10): # Batches of 10
batch = queries[i:i+10]
batch_results = await asyncio.gather(*[
client.text_to_sql("mydb", query) for query in batch
], return_exceptions=True)
results.extend(batch_results)# You can use both APIs in the same application
from queryweaver import QueryWeaverClient, AsyncQueryWeaverClient
# Sync API for simple operations
sync_client = QueryWeaverClient(...)
sync_client.load_database("mydb", "postgresql://...")
# Async API for high-performance operations
async def process_batch():
async_client = AsyncQueryWeaverClient(...)
async_client._loaded_databases = sync_client._loaded_databases # Share state
queries = ["query1", "query2", "query3"]
return await asyncio.gather(*[
async_client.text_to_sql("mydb", q) for q in queries
])- Multiple Queries: Process many queries simultaneously
- Database Loading: Load multiple database schemas in parallel
- I/O Overlap: Hide network latency with concurrent operations
- Memory: Shared state between sync and async clients where possible
- Connections: Async operations don't block threads
- Throughput: Much higher query throughput for batch operations
- Event Loop: Integrates with existing async applications
- Backpressure: Built-in support for rate limiting with batching
- Resource Management: Proper cleanup with context managers
Comprehensive test suite added:
- Unit Tests: All async methods tested with mocking
- Context Manager Tests: Async context manager functionality
- Concurrency Tests: Parallel operation testing
- Error Handling: Exception propagation in async context
- Integration Tests: Real async operation testing
examples/async_library_usage.py- Comprehensive async examplestests/test_async_library_api.py- Async API unit tests
queryweaver.py- Added AsyncQueryWeaverClient class__init__.py- Export async classesdocs/library-usage.md- Added async documentation
# Sync version
client = QueryWeaverClient(...)
client.load_database("mydb", "postgresql://...")
sql = client.text_to_sql("mydb", "query")
# Async version
async with AsyncQueryWeaverClient(...) as client:
await client.load_database("mydb", "postgresql://...")
sql = await client.text_to_sql("mydb", "query")# Sequential processing (slow)
results = []
for query in queries:
result = await client.text_to_sql("mydb", query)
results.append(result)
# Concurrent processing (fast)
results = await asyncio.gather(*[
client.text_to_sql("mydb", query) for query in queries
])- Use Context Managers: Always use
async withfor automatic cleanup - Batch Operations: Process multiple queries concurrently when possible
- Rate Limiting: Use batches to avoid overwhelming the system
- Error Handling: Use
return_exceptions=Trueinasyncio.gather()for robust error handling - Resource Management: Call
await client.close()if not using context managers
The async API provides a foundation for:
- Connection Pooling: Async database connection pools
- Streaming Results: Async generators for large result sets
- Real-time Processing: WebSocket integration for real-time queries
- Distributed Processing: Integration with async task queues
- Monitoring: Async metrics and monitoring integration
The async API provides significant performance benefits for applications that need to process multiple queries or can benefit from concurrent operations. It maintains the same simple, intuitive interface as the sync API while enabling high-performance async/await patterns.