Skip to content

Latest commit

 

History

History
1384 lines (1070 loc) · 42.6 KB

File metadata and controls

1384 lines (1070 loc) · 42.6 KB

Best Practices Guide

This guide provides best practices for using the MatrixOne Python SDK effectively in production environments, with emphasis on using the SDK's high-level APIs and avoiding raw SQL.

Warning

🚨 CRITICAL: Column Naming Convention

Always use lowercase with underscores (snake_case) for column names!

MatrixOne does not support SQL standard double-quoted identifiers in queries, which causes issues with camelCase column names when using SQLAlchemy ORM.

Examples:

# ❌ DON'T: CamelCase column names (will fail in SELECT queries)
class User(Base):
    userName = Column(String(50))      # CREATE succeeds, SELECT fails!
    userId = Column(Integer)           # Will cause SQL syntax errors

# ✅ DO: Use lowercase with underscores (snake_case)
class User(Base):
    user_name = Column(String(50))     # Works perfectly
    user_id = Column(Integer)          # All operations succeed

Why this matters:

  • ✅ CREATE TABLE works with both styles (uses backticks)
  • ✅ INSERT works with both styles
  • SELECT fails with camelCase (uses double quotes, not supported by MatrixOne)

Generated SQL comparison:

-- CamelCase generates:
SELECT "userName" FROM user  -- ❌ Fails with syntax error!

-- snake_case generates:
SELECT user_name FROM user   -- ✅ Works perfectly!

SDK-First Development Philosophy

The MatrixOne Python SDK provides rich, high-level APIs designed to replace raw SQL and complex SQLAlchemy code. Always prefer SDK APIs for better maintainability, type safety, and feature integration.

Table Operations Best Practices

Creating Tables with SDK API

from matrixone import Client
from matrixone.config import get_connection_params

# Connect using environment configuration
client = Client()
host, port, user, password, database = get_connection_params()
client.connect(host=host, port=port, user=user, password=password, database=database)

# ✅ GOOD: Use create_table API with dictionary
# Note: Always use snake_case for column names!
client.create_table("users", {
    "id": "int",
    "user_name": "varchar(100)",      # ✅ snake_case
    "email": "varchar(255)",
    "created_at": "timestamp",        # ✅ snake_case
    "age": "int"
}, primary_key="id")

# ✅ GOOD: Use create_table with ORM model
from sqlalchemy import Column, Integer, String, DateTime
from matrixone.orm import declarative_base

Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String(200))
    price = Column(Integer)
    category = Column(String(50))
    created_at = Column(DateTime)     # ✅ snake_case

client.create_table(Product)

# ❌ AVOID: Raw SQL for table creation
# client.execute("CREATE TABLE users (id INT, username VARCHAR(100)...)")

Data Insertion Best Practices

# ✅ GOOD: Single insert using SDK API
# Note: Use snake_case for column names
client.insert("users", {
    "id": 1,
    "user_name": "alice",        # ✅ snake_case
    "email": "alice@example.com",
    "age": 25
})

# ✅ GOOD: Batch insert for multiple rows - MUCH FASTER
users_data = [
    {"id": 2, "user_name": "bob", "email": "bob@example.com", "age": 30},
    {"id": 3, "user_name": "charlie", "email": "charlie@example.com", "age": 28},
    {"id": 4, "user_name": "diana", "email": "diana@example.com", "age": 32}
]
client.batch_insert("users", users_data)

# ✅ GOOD: Insert with model class
client.insert(Product, {
    "id": 101,
    "name": "Laptop",
    "price": 999,
    "category": "Electronics"
})

# ❌ AVOID: Manual SQL INSERT statements
# client.execute("INSERT INTO users VALUES (1, 'alice', 'alice@example.com', 25)")

# ❌ AVOID: Loop with individual inserts (use batch_insert instead)
# for user in users_data:
#     client.execute(f"INSERT INTO users ...")  # Slow!

Query Building Best Practices

from matrixone.orm import logical_in
from sqlalchemy import func

# ✅ GOOD: Use query builder for simple queries
results = client.query("users").filter("age > 25").all()

# ✅ GOOD: Use where with parameters (prevents SQL injection)
results = client.query("users").where("email = ?", "alice@example.com").all()

# ✅ GOOD: Use logical_in for flexible IN queries
results = client.query("users").filter(logical_in("age", [25, 28, 30])).all()

# ✅ GOOD: Chain multiple filters
results = (client.query("users")
           .filter("age > 20")
           .filter(logical_in("username", ["alice", "bob"]))
           .order_by("created_at DESC")
           .limit(10)
           .all())

# ✅ GOOD: Use aggregation functions
result = (client.query("users")
          .select("age", func.count("id").label("count"))
          .group_by("age")
          .having(func.count("id") > 1)
          .all())

# ✅ GOOD: Use explain for query analysis
explain_result = client.query("users").filter("age > 25").explain(verbose=True)
print(explain_result)

# ❌ AVOID: Complex raw SQL that query builder can handle
# client.execute("SELECT age, COUNT(id) as count FROM users
#                 WHERE age > 20 GROUP BY age HAVING COUNT(id) > 1")

Update and Delete Operations

# ✅ GOOD: Update using query builder
client.query("users").update({"age": 26}).filter("id = 1").execute()

# ✅ GOOD: Update multiple columns
client.query("users").update({
    "username": "alice_updated",
    "age": 27
}).filter("id = 1").execute()

# ✅ GOOD: Delete using query builder
client.query("users").filter("id = 999").delete()

# ✅ GOOD: Bulk delete with conditions
client.query("users").filter("age < 18").delete()

# ❌ AVOID: Raw UPDATE/DELETE SQL
# client.execute("UPDATE users SET age = 26 WHERE id = 1")
# client.execute("DELETE FROM users WHERE id = 999")

Vector Operations Best Practices

Creating Vector Tables

from matrixone.sqlalchemy_ext import create_vector_column
from matrixone.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text

Base = declarative_base()

# ✅ GOOD: Define vector table with ORM
class Document(Base):
    __tablename__ = 'documents'
    id = Column(Integer, primary_key=True)
    title = Column(String(200))
    content = Column(Text)
    category = Column(String(50))
    # Vector column for embeddings
    embedding = create_vector_column(384, 'f32')  # 384-dim f32 vectors

client.create_table(Document)

# ✅ GOOD: Create vector table using dictionary API
client.create_table("articles", {
    "id": "int",
    "title": "varchar(200)",
    "content": "text",
    "embedding": "vecf32(768)"  # 768-dimensional vectors
}, primary_key="id")

IVF Index Management

import numpy as np

# ✅ GOOD: Enable IVF indexing
client.vector_ops.enable_ivf()

# ✅ GOOD: Create IVF index with optimal parameters
client.vector_ops.create_ivf(
    "documents",  # Table name as positional argument
    name="idx_embedding_ivf",
    column="embedding",
    lists=100,  # Use sqrt(N) to 4*sqrt(N) where N is total vectors
    op_type="vector_l2_ops"
)

# ✅ GOOD: Insert vectors using SDK API
vectors_data = []
for i in range(100):
    vectors_data.append({
        "id": i,
        "title": f"Document {i}",
        "content": f"Content for document {i}",
        "embedding": np.random.rand(384).tolist()
    })
client.batch_insert("documents", vectors_data)

# ✅ CRITICAL: Monitor IVF index health regularly
stats = client.vector_ops.get_ivf_stats("documents", "embedding")

counts = stats['distribution']['centroid_count']
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')

print(f"Total centroids: {len(counts)}")
print(f"Total vectors: {sum(counts)}")
print(f"Balance ratio: {balance_ratio:.2f}")

# Rebuild if imbalanced
if balance_ratio > 2.5:
    print("⚠️  Index needs rebuilding")
    client.vector_ops.drop("documents", "idx_embedding_ivf")
    client.vector_ops.create_ivf("documents", "idx_embedding_ivf", "embedding", lists=100)

# ❌ AVOID: Raw SQL for vector indexing
# client.execute("CREATE INDEX idx_embedding ON documents USING ivf (embedding vector_l2_ops) LISTS = 100")

Vector Similarity Search

# ✅ GOOD: Use vector_ops.similarity_search API
query_vector = np.random.rand(384).tolist()

results = client.vector_ops.similarity_search(
    "documents",  # Table name as positional argument
    vector_column="embedding",
    query_vector=query_vector,
    limit=10,
    distance_type="l2"
)

print(f"Found {len(results.rows)} similar documents")
for row in results.rows:
    print(f"ID: {row[0]}, Title: {row[1]}, Distance: {row[-1]:.4f}")

# ✅ GOOD: Similarity search with filters
results = client.vector_ops.similarity_search(
    "documents",  # Table name as positional argument
    vector_column="embedding",
    query_vector=query_vector,
    limit=10,
    distance_type="cosine",
    filter_conditions="category = 'technology'"
)

# ✅ GOOD: Range search for distance threshold
results = client.vector_ops.range_search(
    "documents",  # Table name as positional argument
    vector_column="embedding",
    query_vector=query_vector,
    max_distance=0.5,
    distance_type="l2"
)

# ❌ AVOID: Raw SQL for vector search
# client.execute("SELECT * FROM documents ORDER BY l2_distance(embedding, ?) LIMIT 10")

HNSW Index for High Accuracy

# ✅ GOOD: Enable and create HNSW index
client.vector_ops.enable_hnsw()

client.vector_ops.create_hnsw(
    "documents",  # Table name as positional argument
    name="idx_embedding_hnsw",
    column="embedding",
    m=16,                  # Connections per node (higher = better recall, slower build)
    ef_construction=200,   # Build-time search depth (higher = better quality)
    op_type="vector_l2_ops"
)

# Search works the same way - SDK automatically uses the best index
results = client.vector_ops.similarity_search(
    "documents",  # Table name as positional argument
    vector_column="embedding",
    query_vector=query_vector,
    limit=10
)

Fulltext Search Best Practices

Fulltext Index Creation

# ✅ GOOD: Enable fulltext indexing first
client.fulltext_index.enable_fulltext()

# ✅ GOOD: Create fulltext index using SDK API
client.fulltext_index.create(
    "documents",  # Table name as positional argument
    name="ftidx_content",
    columns=["title", "content"]
)

# ✅ GOOD: Create index with specific algorithm
from matrixone import FulltextAlgorithmType

client.fulltext_index.create(
    "documents",  # Table name as positional argument
    name="ftidx_advanced",
    columns=["content"],
    algorithm=FulltextAlgorithmType.BM25
)

# ❌ AVOID: Raw SQL for fulltext index creation
# client.execute("CREATE FULLTEXT INDEX ftidx_content ON documents(title, content)
#                 WITH PARSER ngram ALGORITHM = BM25")

Fulltext Search Operations

from matrixone.sqlalchemy_ext.fulltext_search import boolean_match
from matrixone.orm import logical_in

# ✅ GOOD: Boolean search with encourage (like search)
results = client.query(
    "documents.title",
    "documents.content",
    boolean_match("title", "content").encourage("machine learning")
).execute()

print(f"Found {len(results.rows)} results")
for row in results.rows:
    print(f"Title: {row[0]}, Content: {row[1]}")

# ✅ GOOD: Boolean search with must/should operators
ft_filter = (boolean_match("title", "content")
             .must("python")
             .should("tensorflow", "pytorch")
             .must_not("deprecated"))

results = client.query(
    "documents.title",
    "documents.content",
    ft_filter
).execute()

# ✅ GOOD: Combine fulltext with regular filters
results = (client.query(
               "documents.title",
               "documents.content",
               boolean_match("title", "content").encourage("deep learning")
           )
           .filter("created_at > '2024-01-01'")
           .limit(10)
           .execute())

Metadata Analysis Best Practices

Table Metadata Scanning

# ✅ GOOD: Use metadata.scan for comprehensive table analysis
metadata = client.metadata.scan(
    dbname="test",
    tablename="users"
)

print("Column Analysis:")
for row in metadata:
    print(f"\n{row.column_name} ({row.data_type}):")
    print(f"  - Nullable: {row.is_nullable}")
    print(f"  - Null count: {row.null_count}")
    print(f"  - Distinct values: {row.distinct_count}")
    print(f"  - Min: {row.min_value}, Max: {row.max_value}")
    print(f"  - Avg length: {row.avg_length}")

# ✅ GOOD: Get table-level statistics
stats = client.metadata.get_table_brief_stats(
    dbname="test",
    tablename="users"
)

print(f"\nTable Statistics:")
print(f"  - Total rows: {stats.row_count}")
print(f"  - Size: {stats.size_bytes / 1024 / 1024:.2f} MB")
print(f"  - Columns: {stats.column_count}")

# ✅ GOOD: Scan all tables in a database
all_metadata = client.metadata.scan(dbname="test")

for row in all_metadata:
    print(f"{row.table_name}.{row.column_name}: {row.data_type}")

Transaction Management

Basic Transaction Usage

# ✅ GOOD: Use transaction context manager
with client.transaction() as tx:
    # Insert user
    tx.insert("users", {
        "id": 100,
        "username": "transaction_user",
        "email": "tx@example.com",
        "age": 30
    })

    # Insert related order
    tx.insert("orders", {
        "id": 1,
        "user_id": 100,
        "amount": 99.99,
        "status": "pending"
    })

    # Transaction commits automatically if no exception

# On exception, transaction rolls back automatically
try:
    with client.transaction() as tx:
        tx.insert("users", {"id": 101, "username": "test"})
        raise Exception("Something went wrong")
        tx.insert("orders", {"id": 2, "user_id": 101})  # Never executed
except Exception as e:
    print(f"Transaction rolled back: {e}")

Advanced Transaction Patterns

# ✅ GOOD: Transaction with query operations
with client.transaction() as tx:
    # Check balance
    result = tx.query("accounts").filter("id = 1").one()
    balance = result[2]  # Assuming balance is 3rd column

    if balance >= 100:
        # Deduct from account
        tx.query("accounts").update({
            "balance": balance - 100
        }).filter("id = 1").execute()

        # Record transaction
        tx.insert("transactions", {
            "account_id": 1,
            "amount": -100,
            "type": "withdrawal"
        })

# ✅ GOOD: Batch operations in transaction
with client.transaction() as tx:
    # Batch insert multiple records
    users = [
        {"id": i, "username": f"user{i}", "email": f"user{i}@example.com"}
        for i in range(200, 300)
    ]
    tx.batch_insert("users", users)

    # Update related statistics
    tx.execute("UPDATE user_stats SET total_users = total_users + 100")

Snapshot and PITR Operations

Snapshot Management

from matrixone.snapshot import SnapshotLevel

# ✅ GOOD: Create cluster-level snapshot
snapshot = client.snapshots.create(
    name="cluster_backup_20240101",
    level=SnapshotLevel.CLUSTER
)
print(f"Created snapshot: {snapshot.name}")

# ✅ GOOD: Create account-level snapshot
snapshot = client.snapshots.create(
    name="account_backup",
    level=SnapshotLevel.ACCOUNT
)

# ✅ GOOD: Create database-level snapshot
snapshot = client.snapshots.create(
    name="db_backup_test",
    level=SnapshotLevel.DATABASE,
    database_name="test"
)

# ✅ GOOD: List all snapshots
snapshots = client.snapshots.list()
for snap in snapshots:
    print(f"{snap.name}: {snap.level}")

# ✅ GOOD: Drop snapshot when no longer needed
client.snapshots.drop("old_backup_20231201")

Table Cloning

# ✅ GOOD: Clone database
client.clone.clone_database(
    target_database="test_copy",
    source_database="test",
    snapshot_name="db_backup_test"
)

# ✅ GOOD: Clone table
client.clone.clone_table(
    target_table="users_backup",
    source_table="users",
    snapshot_name="users_backup",
    if_not_exists=True
)

# ✅ GOOD: Clone table within transaction
with client.transaction() as tx:
    tx.clone.clone_table(
        target_table="users_temp",
        source_table="users",
        snapshot_name="users_backup"
    )

Point-in-Time Recovery (PITR)

# ✅ GOOD: Create PITR for cluster
pitr_name = "pitr_backup"
pitr = client.pitr.create_cluster_pitr(
    name=pitr_name,
    range_value=1,
    range_unit="d"  # days
)

# ✅ GOOD: List PITR snapshots
pitr_list = client.pitr.list()
for pitr_item in pitr_list:
    print(f"PITR: {pitr_item}")

# ✅ GOOD: Drop PITR when done
try:
    client.pitr.drop_cluster_pitr(pitr_name)
except Exception as e:
    print(f"PITR cleanup: {e}")

Account and User Management

Account Operations

from matrixone.account import AccountManager

# ✅ GOOD: Initialize AccountManager
account_manager = AccountManager(client)

# ✅ GOOD: Create account (name, admin_name, admin_password)
account = account_manager.create_account(
    "test_account",
    "admin_user",
    "secure_password_123"
)
print(f"Created account: {account.name}")

# ✅ GOOD: List accounts
accounts = account_manager.list_accounts()
for acc in accounts:
    print(f"Account: {acc.name}, Status: {acc.status}")

# ✅ GOOD: Drop account
account_manager.drop_account("test_account")

User and Role Management

from matrixone.account import AccountManager

account_manager = AccountManager(client)

# ✅ GOOD: Create user (user_name, password)
user = account_manager.create_user("developer", "dev_password_123")
print(f"Created user: {user.name}")

# ✅ GOOD: Create role (role_name)
role = account_manager.create_role("data_analyst")
print(f"Created role: {role.name}")

# ✅ GOOD: Grant privileges on specific table (optional)
# Note: table must exist, use database.table format
account_manager.grant_privilege(
    "SELECT",           # privilege
    "TABLE",            # object_type
    "users",       # object_name (database.table)
    to_role="data_analyst"
)

# ✅ GOOD: Grant role to user (role_name, to_user)
account_manager.grant_role("data_analyst", "developer")
print(f"Granted role to user")

# ✅ GOOD: List users
users = account_manager.list_users()
for user in users:
    print(f"User: {user.name}")

# ✅ GOOD: Clean up
account_manager.drop_user("developer")
account_manager.drop_role("data_analyst")

# ✅ GOOD: List roles
roles = account_manager.list_roles()
for role in roles:
    print(f"Role: {role.name}")

Pub/Sub Operations

Publication and Subscription Management

# ✅ GOOD: List publications
publications = client.pubsub.list_publications()
for pub in publications:
    print(f"Publication: {pub}")

# ✅ GOOD: List subscriptions
subscriptions = client.pubsub.list_subscriptions()
for sub in subscriptions:
    print(f"Subscription: {sub}")

# ✅ GOOD: Drop subscription (if exists)
try:
    client.pubsub.drop_subscription("test_subscription")
except Exception as e:
    print(f"Drop subscription: {e}")

# ✅ GOOD: Drop publication (if exists)
try:
    client.pubsub.drop_publication("test_publication")
except Exception as e:
    print(f"Drop publication: {e}")

Async Operations Best Practices

Basic Async Usage

import asyncio
from matrixone import AsyncClient
from matrixone.config import get_connection_params

async def async_operations():
    # ✅ GOOD: Use AsyncClient for async operations
    client = AsyncClient()
    host, port, user, password, database = get_connection_params()
    await client.connect(host=host, port=port, user=user, password=password, database=database)

    try:
        # Create table
        await client.create_table("async_users", {
            "id": "int",
            "username": "varchar(100)",
            "email": "varchar(255)"
        }, primary_key="id")

        # Batch insert
        users = [
            {"id": i, "username": f"user{i}", "email": f"user{i}@example.com"}
            for i in range(100)
        ]
        await client.batch_insert("async_users", users)

        # Query
        results = await client.query("async_users").filter("id < 10").all()
        print(f"Found {len(results.rows)} users")

    finally:
        await client.disconnect()

asyncio.run(async_operations())

Async Vector Operations

import numpy as np

async def async_vector_operations():
    client = AsyncClient()
    await client.connect(...)

    try:
        # Create vector table
        await client.create_table("async_documents", {
            "id": "int",
            "title": "varchar(200)",
            "embedding": "vecf32(384)"
        }, primary_key="id")

        # Enable and create IVF index
        await client.vector_ops.enable_ivf()
        await client.vector_ops.create_ivf(
            "async_documents",
            "idx_embedding",
            "embedding",
            lists=50
        )

        # Batch insert vectors
        docs = []
        for i in range(100):
            docs.append({
                "id": i,
                "title": f"Document {i}",
                "embedding": np.random.rand(384).tolist()
            })
        await client.batch_insert("async_documents", docs)

        # Similarity search
        query_vector = np.random.rand(384).tolist()
        results = await client.vector_ops.similarity_search(
            "async_documents",
            "embedding",
            query_vector,
            limit=10
        )

        # Monitor index health
        stats = await client.vector_ops.get_ivf_stats("async_documents", "embedding")
        counts = stats['distribution']['centroid_count']
        print(f"Index health: {len(counts)} centroids, {sum(counts)} vectors")

    finally:
        await client.disconnect()

Async Transactions

async def async_transaction():
    client = AsyncClient()
    await client.connect(...)

    try:
        # ✅ GOOD: Async transaction
        async with client.transaction() as tx:
            await tx.insert("users", {
                "id": 500,
                "username": "async_user",
                "email": "async@example.com"
            })

            await tx.insert("orders", {
                "id": 1000,
                "user_id": 500,
                "amount": 199.99
            })
            # Auto-commit on success

    finally:
        await client.disconnect()

Performance Optimization

Batch Operations

# ✅ GOOD: Always use batch_insert for multiple rows
# This is 10-100x faster than individual inserts
large_dataset = []
for i in range(10000):
    large_dataset.append({
        "id": i,
        "data": f"row_{i}",
        "value": i * 2
    })

client.batch_insert("large_table", large_dataset)

# ❌ AVOID: Loop with individual inserts
# for row in large_dataset:
#     client.insert("large_table", row)  # Very slow!

Connection Pooling

from matrixone import Client

# ✅ GOOD: Configure connection pooling for production
client = Client(
    pool_size=20,           # Number of connections in pool
    max_overflow=40,        # Additional connections when pool is full
    pool_timeout=30,        # Wait time for connection
    pool_recycle=3600,      # Recycle connections after 1 hour
    connection_timeout=30,  # Connection establishment timeout
    query_timeout=300       # Query execution timeout
)

Query Optimization

# ✅ GOOD: Use EXPLAIN to analyze queries
explain_result = client.query("users").filter("age > 25").explain(verbose=True)
print(explain_result)

# ✅ GOOD: Add indexes for frequently queried columns
client.execute("CREATE INDEX idx_users_age ON users(age)")
client.execute("CREATE INDEX idx_users_email ON users(email)")

# ✅ GOOD: Use limit for large result sets
results = client.query("users").order_by("created_at DESC").limit(100).all()

# ✅ GOOD: Filter before ordering/grouping
results = (client.query("users")
           .filter("created_at > '2024-01-01'")  # Filter first
           .order_by("username")                  # Then order
           .limit(50)                             # Then limit
           .all())

Monitoring and Logging

from matrixone.logging import MatrixOneLogger, LogLevel

# ✅ GOOD: Configure logging for production
logger = MatrixOneLogger(
    level=LogLevel.INFO,        # INFO level for production
    log_file="matrixone.log",   # Log to file
    max_bytes=10485760,         # 10MB max file size
    backup_count=5              # Keep 5 backup files
)

client = Client(
    logger=logger,
    sql_log_mode='simple',        # Simple SQL logging
    slow_query_threshold=1.0      # Log queries > 1 second
)

Index Maintenance Best Practices

Critical for Production: Regular index maintenance ensures optimal performance, especially for vector indexes.

IVF Index Creation Timing

Important

Critical Issue: Index Creation Timing

IVF indexes should be created AFTER inserting initial data for optimal clustering:

# ✅ CORRECT ORDER:
client.create_table(Document)
client.batch_insert(Document, initial_data)  # Insert first
client.vector_ops.create_ivf("documents", "idx", "embedding", lists=50)  # Index last

# Then continue normal operations
client.insert(Document, new_doc)  # ✅ IVF supports dynamic updates
# ❌ AVOID: Creating index on empty table
client.create_table(Document)
client.vector_ops.create_ivf("documents", "idx", "embedding", lists=50)
client.batch_insert(Document, data)  # Poor initial clustering

Why? Initial data helps IVF algorithm create better balanced clusters.

Key Difference from HNSW:

  • IVF: Insert data → Create index → Continue updates ✅ (dynamic)
  • HNSW: Insert ALL data → Create index → Read-only 🚧 (static, updates coming soon)

IVF Index Health Monitoring

import math
from datetime import datetime

def monitor_ivf_health(client, table_name, column_name, expected_lists):
    """
    Monitor IVF index health - CRITICAL for production vector search.

    Args:
        client: MatrixOne client
        table_name: Table with IVF index
        column_name: Vector column name
        expected_lists: Expected number of centroids
    """
    # ✅ GOOD: Get comprehensive IVF statistics
    stats = client.vector_ops.get_ivf_stats(table_name, column_name)

    distribution = stats['distribution']
    centroid_counts = distribution['centroid_count']

    # Calculate health metrics
    total_centroids = len(centroid_counts)
    total_vectors = sum(centroid_counts)
    min_count = min(centroid_counts) if centroid_counts else 0
    max_count = max(centroid_counts) if centroid_counts else 0
    avg_count = total_vectors / total_centroids if total_centroids > 0 else 0

    # ⭐ KEY METRIC: Balance ratio
    balance_ratio = max_count / min_count if min_count > 0 else float('inf')

    # Health assessment
    print(f"\n{'='*60}")
    print(f"IVF Health Report - {table_name}.{column_name}")
    print(f"Timestamp: {datetime.now().isoformat()}")
    print(f"{'='*60}")
    print(f"Total Centroids:  {total_centroids} (expected: {expected_lists})")
    print(f"Total Vectors:    {total_vectors}")
    print(f"Avg/Centroid:     {avg_count:.2f}")
    print(f"Balance Ratio:    {balance_ratio:.2f}")

    # Status assessment (threshold: <2.0 good, >2.5 rebuild)
    if balance_ratio < 2.0:
        status = "✅ HEALTHY"
        action = "Continue monitoring"
    elif balance_ratio < 2.5:
        status = "⚠️  FAIR"
        action = "Plan rebuild"
    else:
        status = "❌ CRITICAL"
        action = "Rebuild immediately"

    print(f"Status:           {status}")
    print(f"Action:           {action}")
    print(f"{'='*60}\n")

    return {
        'balance_ratio': balance_ratio,
        'total_vectors': total_vectors,
        'status': status,
        'action': action
    }

# ✅ GOOD: Regular health checks (schedule daily/weekly)
health = monitor_ivf_health(
    client,
    "documents",
    "embedding",
    expected_lists=100
)

# ✅ GOOD: Automated alerting
if health['balance_ratio'] > 2.5:
    # Send alert (email, Slack, PagerDuty, etc.)
    print(f"🚨 ALERT: Index needs attention! Balance ratio: {health['balance_ratio']:.2f}")

IVF Index Rebuild Strategy

def rebuild_ivf_index(client, table_name, column_name, index_name):
    """
    Rebuild IVF index with optimal parameters.

    When to rebuild:
    - Balance ratio > 2.5
    - After bulk inserts (>20% new data)
    - Query performance degradation
    - After major deletes or updates
    """
    print(f"Rebuilding IVF index: {table_name}.{column_name}")

    # ✅ GOOD: Get current stats before rebuild
    old_stats = client.vector_ops.get_ivf_stats(table_name, column_name)
    old_counts = old_stats['distribution']['centroid_count']
    total_vectors = sum(old_counts)
    old_balance = max(old_counts) / min(old_counts) if min(old_counts) > 0 else float('inf')

    print(f"  Old stats: {total_vectors} vectors, balance {old_balance:.2f}")

    # ✅ GOOD: Calculate optimal lists parameter
    # Rule: lists = √N to 4×√N (where N = total vectors)
    optimal_lists = int(math.sqrt(total_vectors) * 2)  # Using 2×√N
    optimal_lists = max(10, min(optimal_lists, 1000))  # Clamp between 10-1000

    print(f"  Calculated optimal lists: {optimal_lists}")

    # ✅ GOOD: Drop and recreate index
    try:
        # Drop old index
        client.vector_ops.drop(table_name, index_name)
        print(f"  ✓ Dropped old index")

        # Recreate with optimal parameters
        client.vector_ops.create_ivf(
            table_name,
            name=index_name,
            column=column_name,
            lists=optimal_lists,
            op_type="vector_l2_ops"
        )
        print(f"  ✓ Created new index with {optimal_lists} lists")

        # ✅ GOOD: Verify new index health
        import time
        time.sleep(2)  # Give index time to stabilize

        new_stats = client.vector_ops.get_ivf_stats(table_name, column_name)
        new_counts = new_stats['distribution']['centroid_count']
        new_balance = max(new_counts) / min(new_counts) if min(new_counts) > 0 else float('inf')

        improvement = ((old_balance - new_balance) / old_balance * 100)

        print(f"\nRebuild Results:")
        print(f"  Old balance: {old_balance:.2f}")
        print(f"  New balance: {new_balance:.2f}")
        print(f"  Improvement: {improvement:.1f}%")

        if new_balance < 2.0:
            print(f"  ✅ Index is now healthy!")
        else:
            print(f"  ⚠️  Consider adjusting lists parameter")

    except Exception as e:
        print(f"  ❌ Rebuild failed: {e}")
        raise

# Usage in production
# ✅ GOOD: Schedule during low-traffic periods
# ✅ GOOD: Check health first, rebuild only if needed
health = monitor_ivf_health(client, "documents", "embedding", expected_lists=100)
if health['balance_ratio'] > 2.5:
    rebuild_ivf_index(client, "documents", "embedding", "idx_embedding_ivf")

IVF Index Parameter Selection

import math

# ✅ GOOD: Calculate optimal lists (guideline: <1K: 10-20, 1K-100K: 50-200, >100K: √N to 4×√N)
total_vectors = 50000
optimal_lists = int(math.sqrt(total_vectors) * 2)  # Using 2×√N = ~316 lists

client.vector_ops.create_ivf(
    "large_table",
    name="idx_vectors",
    column="embedding",
    lists=optimal_lists,
    op_type="vector_l2_ops"
)

Fulltext Index Maintenance

from matrixone import FulltextParserType

# ✅ GOOD: BM25 for most cases, choose parser by content type
client.fulltext_index.create("articles", "idx_content", ["title", "content"], algorithm="BM25")

# For Chinese: NGRAM parser
client.fulltext_index.create("chinese_docs", "idx_cn", "content", algorithm="BM25",
                              parser=FulltextParserType.NGRAM)

# For JSON: JSON parser (indexes values, not keys)
client.fulltext_index.create("json_docs", "idx_json", "data", algorithm="BM25",
                              parser=FulltextParserType.JSON)

HNSW Index Considerations

from sqlalchemy import BigInteger, Column
from matrixone.sqlalchemy_ext import create_vector_column

# ✅ GOOD: HNSW requires BigInteger primary key
class Document(Base):
    __tablename__ = 'documents'
    id = Column(BigInteger, primary_key=True)  # Must be BigInteger
    embedding = create_vector_column(128, 'f32')

# ✅ GOOD: Current workflow
client.create_table(Document)
client.batch_insert(Document, all_documents)  # Insert data first

client.vector_ops.enable_hnsw()
client.vector_ops.create_hnsw(Document, "idx_embedding", "embedding", m=16)

# 🚧 Coming Soon: Dynamic updates after index creation
# Current workaround: Drop index → Modify data → Recreate index

Batch Operation Size Optimization

# ✅ GOOD: Optimal batch sizes for different operations

# For inserts: 1000-10000 rows per batch
batch_size = 5000
for i in range(0, len(large_dataset), batch_size):
    batch = large_dataset[i:i + batch_size]
    client.batch_insert("table_name", batch)
    print(f"Inserted batch {i//batch_size + 1}")

# For vector data: smaller batches (vectors are larger)
vector_batch_size = 1000
for i in range(0, len(vector_data), vector_batch_size):
    batch = vector_data[i:i + vector_batch_size]
    client.batch_insert("vectors_table", batch)

# ❌ AVOID: Too large batches (memory issues)
# client.batch_insert("table", million_rows)  # May cause OOM

# ❌ AVOID: Too small batches (performance issues)
# for row in data:
#     client.insert("table", row)  # Very slow!

Error Handling Best Practices

Comprehensive Exception Handling

from matrixone.exceptions import (
    ConnectionError,
    QueryError,
    TransactionError,
    SnapshotError
)

# ✅ GOOD: Handle specific exceptions
try:
    client.connect(...)
except ConnectionError as e:
    print(f"Failed to connect: {e}")
    # Implement retry logic or fallback

# ✅ GOOD: Handle transaction errors
try:
    with client.transaction() as tx:
        tx.insert("users", {...})
        tx.insert("orders", {...})
except TransactionError as e:
    print(f"Transaction failed: {e}")
    # Transaction auto-rolled back

# ✅ GOOD: Handle query errors with retry
max_retries = 3
for attempt in range(max_retries):
    try:
        results = client.query("users").all()
        break
    except QueryError as e:
        if attempt == max_retries - 1:
            raise
        print(f"Query failed, retrying ({attempt + 1}/{max_retries})...")
        time.sleep(1)

Resource Cleanup

from contextlib import contextmanager

# ✅ GOOD: Use context managers for automatic cleanup
@contextmanager
def get_client():
    client = Client()
    client.connect(...)
    try:
        yield client
    finally:
        client.disconnect()

# Usage
with get_client() as client:
    results = client.query("users").all()
    # Client automatically disconnected

Testing Best Practices

Unit Testing with SDK

import pytest
from matrixone import Client
from matrixone.config import get_connection_params

@pytest.fixture
def test_client():
    """Fixture for test client"""
    client = Client()
    host, port, user, password, database = get_connection_params()
    client.connect(host=host, port=port, user=user, password=password, database=database)
    yield client
    client.disconnect()

@pytest.fixture
def test_table(test_client):
    """Fixture for test table"""
    table_name = "test_users"
    test_client.create_table(table_name, {
        "id": "int",
        "username": "varchar(100)",
        "email": "varchar(255)"
    }, primary_key="id")
    yield table_name
    test_client.drop_table(table_name)

def test_insert_and_query(test_client, test_table):
    """Test insert and query operations"""
    # Insert data
    test_client.insert(test_table, {
        "id": 1,
        "username": "testuser",
        "email": "test@example.com"
    })

    # Query data
    results = test_client.query(test_table).filter("id = 1").all()
    assert len(results.rows) == 1
    assert results.rows[0][1] == "testuser"

Summary of SDK Features

Essential SDK APIs to Use:

Table Operations: - client.create_table() - Create tables - client.drop_table() - Drop tables - client.insert() - Insert single row - client.batch_insert() - Batch insert (fastest) - client.query() - Query builder

Vector Operations: - client.vector_ops.create_ivf() - Create IVF index - client.vector_ops.create_hnsw() - Create HNSW index - client.vector_ops.similarity_search() - Vector search - client.vector_ops.get_ivf_stats() - Monitor index health (CRITICAL) - client.vector_ops.drop() - Drop index

Fulltext Operations: - client.fulltext_index.enable_fulltext() - Enable fulltext - client.fulltext_index.create() - Create fulltext index - client.fulltext_index.drop() - Drop fulltext index - Use boolean_match() in queries with encourage(), must(), should()

Metadata Operations: - client.metadata.scan() - Scan table metadata - client.metadata.get_table_brief_stats() - Get table statistics

Snapshot Operations: - client.snapshots.create() - Create snapshot - client.snapshots.list() - List snapshots - client.snapshots.drop() - Drop snapshot - client.clone.clone_database() - Clone database - client.clone.clone_table() - Clone table

Transaction Operations: - with client.transaction() as tx: - Transaction context

Account Operations: - AccountManager(client) - Initialize account manager - account_manager.create_account() - Create account - account_manager.create_user() - Create user - account_manager.create_role() - Create role - account_manager.grant_role() - Grant role to user - account_manager.drop_user() / drop_role() - Clean up

Pub/Sub Operations: - client.pubsub.list_publications() - List publications - client.pubsub.list_subscriptions() - List subscriptions - client.pubsub.drop_publication() - Drop publication - client.pubsub.drop_subscription() - Drop subscription

Remember: Always prefer SDK APIs over raw SQL for better maintainability, type safety, and feature integration!