This guide covers snapshot creation, restoration, and point-in-time recovery (PITR) operations in the MatrixOne Python SDK, enabling data backup, disaster recovery, and version control.
MatrixOne's snapshot and restore system provides:
- Database Snapshots: Create point-in-time snapshots of entire databases
- Table Snapshots: Create snapshots of specific tables
- Incremental Snapshots: Efficient backup of changes since last snapshot
- Point-in-Time Recovery: Restore data to any specific timestamp
- Cross-Database Restore: Restore snapshots to different databases
- Snapshot Management: List, query, and delete snapshots
- Automated Backup: Schedule regular snapshot creation
- Transaction Support: Atomic snapshot operations with
session()
Use client.session() for atomic snapshot and clone operations:
from matrixone import Client, SnapshotLevel
from matrixone.orm import Base, Column, Integer, String
from sqlalchemy import select, insert
# Define ORM model
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
client = Client()
client.connect(database='test')
# Atomic snapshot + clone operation
with client.session() as session:
# Create snapshot
snapshot = session.snapshots.create(
name='daily_backup',
level=SnapshotLevel.DATABASE,
database='production'
)
# Clone from snapshot atomically
session.clone.clone_database(
target_db='production_copy',
source_db='production',
snapshot_name='daily_backup'
)
# Both operations commit together or rollback on error
client.disconnect()from matrixone import Client, SnapshotLevel
from sqlalchemy import select, insert, func
client = Client()
client.connect(database='test')
# Complex atomic operation
with client.session() as session:
# Insert data
session.execute(insert(User).values(name='Alice', email='alice@example.com'))
session.execute(insert(User).values(name='Bob', email='bob@example.com'))
# Verify data count
stmt = select(func.count(User.id))
count = session.execute(stmt).scalar()
print(f"Inserted {count} users")
# Create snapshot after successful insert
snapshot = session.snapshots.create(
name='post_insert_snapshot',
level=SnapshotLevel.DATABASE,
database='test'
)
# Clone to backup database
session.clone.clone_database(
target_db='test_backup',
source_db='test',
snapshot_name='post_insert_snapshot'
)
# All operations are atomic
client.disconnect()from matrixone import Client
from matrixone.config import get_connection_params
# Connect to MatrixOne
connection_params = get_connection_params()
client = Client(*connection_params)
client.connect(*connection_params)
# Get snapshot manager
snapshot_manager = client.snapshot# Create a full database snapshot
snapshot = snapshot_manager.create(
name="daily_backup_20240115",
level=SnapshotLevel.DATABASE,
database="production_db",
description="Daily backup of production database"
)
print(f"Created snapshot: {snapshot.name}")
# Create snapshot with compression
snapshot = snapshot_manager.create(
name="compressed_backup",
level=SnapshotLevel.DATABASE,
database="analytics_db",
compression=True,
description="Compressed backup for long-term storage"
)
# Create incremental snapshot
snapshot = snapshot_manager.create(
name="incremental_backup",
level=SnapshotLevel.DATABASE,
database="production_db",
incremental=True,
base_snapshot="daily_backup_20240114"
)# Create table snapshot
snapshot = snapshot_manager.create(
name="users_table_backup",
level=SnapshotLevel.TABLE,
database="production_db",
table="users",
description="Backup of users table"
)
# Create snapshot of multiple tables
snapshot = snapshot_manager.create(
name="multi_table_backup",
level=SnapshotLevel.TABLE,
database="production_db",
tables=["users", "orders", "products"],
description="Backup of critical tables"
)
# Create table snapshot with specific columns
snapshot = snapshot_manager.create(
name="users_data_backup",
level=SnapshotLevel.TABLE,
database="production_db",
table="users",
columns=["id", "username", "email", "created_at"],
description="Backup of essential user data"
)# List all snapshots
snapshots = snapshot_manager.list()
for snapshot in snapshots:
print(f"Snapshot: {snapshot.name}")
print(f" Level: {snapshot.level}")
print(f" Database: {snapshot.database}")
print(f" Created: {snapshot.created_at}")
print(f" Size: {snapshot.size_mb} MB")
# List snapshots for specific database
db_snapshots = snapshot_manager.list(database="production_db")
for snapshot in db_snapshots:
print(f"Database snapshot: {snapshot.name}")
# List table snapshots
table_snapshots = snapshot_manager.list(level=SnapshotLevel.TABLE)
for snapshot in table_snapshots:
print(f"Table snapshot: {snapshot.name}")# Get specific snapshot
snapshot = snapshot_manager.get("daily_backup_20240115")
if snapshot:
print(f"Snapshot details:")
print(f" Name: {snapshot.name}")
print(f" Level: {snapshot.level}")
print(f" Database: {snapshot.database}")
print(f" Created: {snapshot.created_at}")
print(f" Size: {snapshot.size_mb} MB")
print(f" Status: {snapshot.status}")
# Get snapshot metadata
metadata = snapshot_manager.get_metadata("daily_backup_20240115")
print(f"Metadata: {metadata}")# Restore entire database from snapshot
restore_result = snapshot_manager.restore(
snapshot_name="daily_backup_20240115",
target_database="restored_production_db",
overwrite=True
)
print(f"Restore completed: {restore_result.success}")
# Restore to existing database (replace data)
restore_result = snapshot_manager.restore(
snapshot_name="daily_backup_20240115",
target_database="production_db",
overwrite=True,
backup_existing=True
)
# Restore with specific options
restore_result = snapshot_manager.restore(
snapshot_name="daily_backup_20240115",
target_database="test_restore_db",
overwrite=True,
create_database=True,
restore_permissions=True
)# Restore specific table
restore_result = snapshot_manager.restore_table(
snapshot_name="users_table_backup",
target_database="restored_db",
target_table="users",
overwrite=True
)
# Restore table to different name
restore_result = snapshot_manager.restore_table(
snapshot_name="users_table_backup",
target_database="restored_db",
target_table="users_backup",
overwrite=True
)
# Restore multiple tables
restore_result = snapshot_manager.restore_tables(
snapshot_name="multi_table_backup",
target_database="restored_db",
overwrite=True
)# Restore cluster from snapshot
restore_result = client.restore.restore_cluster("my_snapshot")
# Restore database from snapshot
restore_result = client.restore.restore_database(
"my_snapshot",
"root",
"restored_db"
)
# Get available recovery points
recovery_points = snapshot_manager.get_recovery_points(
database="production_db",
start_date="2024-01-15",
end_date="2024-01-16"
)
for point in recovery_points:
print(f"Recovery point: {point.timestamp}")
print(f" Type: {point.type}")
print(f" Available: {point.available}")# Query data from snapshot
results = client.query_snapshot(
snapshot_name="daily_backup_20240115",
sql="SELECT COUNT(*) FROM users"
)
print(f"User count in snapshot: {results.rows[0][0]}")
# Query specific table from snapshot
results = client.query_snapshot(
snapshot_name="users_table_backup",
sql="SELECT * FROM users WHERE created_at > '2024-01-01'"
)
# Compare data between snapshots
old_count = client.query_snapshot(
snapshot_name="daily_backup_20240114",
sql="SELECT COUNT(*) FROM users"
).rows[0][0]
new_count = client.query_snapshot(
snapshot_name="daily_backup_20240115",
sql="SELECT COUNT(*) FROM users"
).rows[0][0]
print(f"User growth: {new_count - old_count}")# Delete specific snapshot
snapshot_manager.delete("old_backup_20240101")
print("Snapshot deleted")
# Delete multiple snapshots
snapshots_to_delete = ["backup1", "backup2", "backup3"]
for snapshot_name in snapshots_to_delete:
snapshot_manager.delete(snapshot_name)
# Delete snapshots older than specified date
deleted_count = snapshot_manager.delete_older_than(
cutoff_date="2024-01-01",
database="production_db"
)
print(f"Deleted {deleted_count} old snapshots")
# Cleanup with retention policy
cleanup_result = snapshot_manager.cleanup_retention_policy(
database="production_db",
keep_daily=7, # Keep 7 daily snapshots
keep_weekly=4, # Keep 4 weekly snapshots
keep_monthly=12 # Keep 12 monthly snapshots
)
print(f"Cleanup completed: {cleanup_result.deleted_count} snapshots deleted")import asyncio
from matrixone import AsyncClient
async def async_snapshot_operations():
# Connect asynchronously
connection_params = get_connection_params()
async_client = AsyncClient(*connection_params)
await async_client.connect(*connection_params)
# Get async snapshot manager
snapshot_manager = async_client.snapshot
# Async snapshot creation
snapshot = await snapshot_manager.create_async(
name="async_backup",
level=SnapshotLevel.DATABASE,
database="production_db"
)
# Async restore
restore_result = await snapshot_manager.restore_async(
snapshot_name="async_backup",
target_database="async_restored_db",
overwrite=True
)
# Async snapshot listing
snapshots = await snapshot_manager.list_async()
print(f"Found {len(snapshots)} snapshots")
await async_client.disconnect()
# Run async operations
asyncio.run(async_snapshot_operations())import schedule
import time
from datetime import datetime, timedelta
class AutomatedBackupSystem:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.snapshot_manager = self.client.snapshot
def daily_backup(self):
"""Create daily backup of production database"""
timestamp = datetime.now().strftime("%Y%m%d")
snapshot_name = f"daily_backup_{timestamp}"
try:
snapshot = self.snapshot_manager.create(
name=snapshot_name,
level=SnapshotLevel.DATABASE,
database="production_db",
description=f"Daily backup for {timestamp}"
)
print(f"Daily backup created: {snapshot.name}")
# Cleanup old backups (keep 30 days)
self.cleanup_old_backups(days=30)
except Exception as e:
print(f"Daily backup failed: {e}")
def weekly_backup(self):
"""Create weekly backup with compression"""
timestamp = datetime.now().strftime("%Y%m%d")
snapshot_name = f"weekly_backup_{timestamp}"
try:
snapshot = self.snapshot_manager.create(
name=snapshot_name,
level=SnapshotLevel.DATABASE,
database="production_db",
compression=True,
description=f"Weekly backup for {timestamp}"
)
print(f"Weekly backup created: {snapshot.name}")
except Exception as e:
print(f"Weekly backup failed: {e}")
def cleanup_old_backups(self, days=30):
"""Clean up backups older than specified days"""
cutoff_date = datetime.now() - timedelta(days=days)
deleted_count = self.snapshot_manager.delete_older_than(
cutoff_date=cutoff_date,
database="production_db"
)
print(f"Cleaned up {deleted_count} old backups")
def start_scheduler(self):
"""Start the backup scheduler"""
# Schedule daily backup at 2 AM
schedule.every().day.at("02:00").do(self.daily_backup)
# Schedule weekly backup on Sunday at 3 AM
schedule.every().sunday.at("03:00").do(self.weekly_backup)
print("Backup scheduler started")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
# Usage
backup_system = AutomatedBackupSystem()
# backup_system.start_scheduler() # Uncomment to start schedulerclass DisasterRecoverySystem:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.snapshot_manager = self.client.snapshot
def get_latest_snapshot(self, database):
"""Get the most recent snapshot for a database"""
snapshots = self.snapshot_manager.list(database=database)
if not snapshots:
return None
# Sort by creation time and return latest
latest = max(snapshots, key=lambda s: s.created_at)
return latest
def emergency_restore(self, database, target_database=None):
"""Perform emergency restore from latest snapshot"""
latest_snapshot = self.get_latest_snapshot(database)
if not latest_snapshot:
raise Exception(f"No snapshots found for database {database}")
target_db = target_database or f"{database}_restored"
print(f"Emergency restore from snapshot: {latest_snapshot.name}")
print(f"Target database: {target_db}")
restore_result = self.snapshot_manager.restore(
snapshot_name=latest_snapshot.name,
target_database=target_db,
overwrite=True,
create_database=True
)
if restore_result.success:
print(f"Emergency restore completed successfully")
return target_db
else:
raise Exception(f"Emergency restore failed: {restore_result.error}")
def point_in_time_recovery(self, database, target_timestamp, target_database=None):
"""Perform point-in-time recovery"""
target_db = target_database or f"{database}_pitr_restored"
print(f"Point-in-time recovery to: {target_timestamp}")
print(f"Target database: {target_db}")
restore_result = self.client.restore.restore_database(
"my_snapshot",
"root",
target_db
)
if restore_result.success:
print(f"Point-in-time recovery completed successfully")
return target_db
else:
raise Exception(f"Point-in-time recovery failed: {restore_result.error}")
def validate_restore(self, database):
"""Validate that restored database is working correctly"""
try:
# Test basic connectivity
result = self.client.execute(f"SELECT 1 FROM {database}.information_schema.tables LIMIT 1")
# Test data integrity
table_count = self.client.execute(f"SELECT COUNT(*) FROM {database}.information_schema.tables")
print(f"Restored database has {table_count.rows[0][0]} tables")
return True
except Exception as e:
print(f"Restore validation failed: {e}")
return False
# Usage
dr_system = DisasterRecoverySystem()
# Emergency restore
restored_db = dr_system.emergency_restore("production_db")
# Validate restore
if dr_system.validate_restore(restored_db):
print("Restore validation successful")class DataMigrationSystem:
def __init__(self):
self.client = Client(*get_connection_params())
self.client.connect(*get_connection_params())
self.snapshot_manager = self.client.snapshot
def migrate_database(self, source_db, target_db, tables=None):
"""Migrate database using snapshots"""
# Create snapshot of source database
snapshot_name = f"migration_{source_db}_{int(time.time())}"
snapshot = self.snapshot_manager.create(
name=snapshot_name,
level=SnapshotLevel.DATABASE,
database=source_db,
description=f"Migration snapshot from {source_db}"
)
# Restore to target database
restore_result = self.snapshot_manager.restore(
snapshot_name=snapshot_name,
target_database=target_db,
overwrite=True,
create_database=True
)
if restore_result.success:
print(f"Database migration completed: {source_db} -> {target_db}")
# Cleanup migration snapshot
self.snapshot_manager.delete(snapshot_name)
return True
else:
print(f"Database migration failed: {restore_result.error}")
return False
def migrate_tables(self, source_db, target_db, tables):
"""Migrate specific tables"""
for table in tables:
snapshot_name = f"table_migration_{table}_{int(time.time())}"
# Create table snapshot
snapshot = self.snapshot_manager.create(
name=snapshot_name,
level=SnapshotLevel.TABLE,
database=source_db,
table=table
)
# Restore table
restore_result = self.snapshot_manager.restore_table(
snapshot_name=snapshot_name,
target_database=target_db,
target_table=table,
overwrite=True
)
if restore_result.success:
print(f"Table migration completed: {table}")
else:
print(f"Table migration failed: {table}")
# Cleanup
self.snapshot_manager.delete(snapshot_name)
# Usage
migration_system = DataMigrationSystem()
# Migrate entire database
migration_system.migrate_database("old_production", "new_production")
# Migrate specific tables
migration_system.migrate_tables("old_production", "new_production",
["users", "orders", "products"])Robust error handling for production applications:
from matrixone.exceptions import SnapshotError, RestoreError
try:
# Snapshot operations
snapshot = snapshot_manager.create(
name="test_backup",
level=SnapshotLevel.DATABASE,
database="test_db"
)
except SnapshotError as e:
print(f"Snapshot error: {e}")
except RestoreError as e:
print(f"Restore error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Retry mechanism for snapshot operations
def create_snapshot_with_retry(snapshot_manager, name, level, database, max_retries=3):
for attempt in range(max_retries):
try:
return snapshot_manager.create(name=name, level=level, database=database)
except Exception as e:
print(f"Snapshot attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoffBest practices for optimal performance:
# Batch snapshot operations
def batch_create_snapshots(snapshot_manager, databases, snapshot_prefix):
snapshots = []
for database in databases:
snapshot_name = f"{snapshot_prefix}_{database}_{int(time.time())}"
try:
snapshot = snapshot_manager.create(
name=snapshot_name,
level=SnapshotLevel.DATABASE,
database=database
)
snapshots.append(snapshot)
except Exception as e:
print(f"Failed to create snapshot for {database}: {e}")
return snapshots
# Efficient snapshot cleanup
def smart_cleanup(snapshot_manager, database, retention_days=30):
# Get all snapshots for database
snapshots = snapshot_manager.list(database=database)
# Sort by creation date
snapshots.sort(key=lambda s: s.created_at)
# Keep latest snapshot and apply retention policy
cutoff_date = datetime.now() - timedelta(days=retention_days)
snapshots_to_delete = [s for s in snapshots[:-1] if s.created_at < cutoff_date]
for snapshot in snapshots_to_delete:
snapshot_manager.delete(snapshot.name)
return len(snapshots_to_delete)Common issues and solutions:
- Snapshot creation failures
- Verify database exists and is accessible
- Check available disk space
- Ensure proper permissions
- Restore failures
- Verify snapshot exists and is valid
- Check target database permissions
- Ensure sufficient disk space for restore
- Performance issues
- Use incremental snapshots for large databases
- Schedule snapshots during low-activity periods
- Consider compression for long-term storage
- Point-in-time recovery issues
- Verify transaction logs are available
- Check recovery point availability
- Ensure proper timestamp format
For more information, see the :doc:`api/client` and :doc:`best_practices`.