Skip to content

Latest commit

 

History

History
380 lines (300 loc) · 9.35 KB

File metadata and controls

380 lines (300 loc) · 9.35 KB

Examples

This page provides complete examples for common use cases.

Note: These examples assume you have already configured your credentials via environment variables or called setup_origin_quantum_account_tool.

Complete Workflow: Bell State Experiment

This example demonstrates the full workflow from account setup to result retrieval.

import asyncio

async def bell_state_experiment():
    # Step 1: Setup account
    result = await setup_origin_quantum_account_tool(
        api_key="your_key",
        server_url="https://your_server"
    )

    if result["status"] != "success":
        print(f"Setup failed: {result['message']}")
        return

    print(f"Connected! Available devices: {result['available_devices']}")

    # Step 2: Define Bell state circuit
    circuit = """QINIT 2
CREG 2
H q[0]
CNOT q[0],q[1]
MEASURE q[0],c[0]
MEASURE q[1],c[1]"""

    # Step 3: Submit sampling task
    result = await sample_tool(
        circuit=circuit,
        device_id="20",
        shots=1000
    )
    task_id = result["task_id"]
    print(f"Task submitted: {task_id}")

    # Step 4: Poll for completion
    while True:
        status = await get_task_status_tool(task_id)
        print(f"Status: {status['task_status']}")

        if status["task_status"] == "DONE":
            break
        elif status["task_status"] == "FAILED":
            print("Task failed!")
            return

        await asyncio.sleep(2)

    # Step 5: Get results
    results = await get_task_results_tool(task_id)
    print(f"Results: {results['results']}")
    # Expected: ~50% 00, ~50% 11

# Run the experiment
asyncio.run(bell_state_experiment())

Quantum Random Number Generator

Generate truly random numbers using quantum superposition.

async def quantum_random_number():
    # 4-qubit random number generator (0-15)
    circuit = """QINIT 4
CREG 4
H q[0]
H q[1]
H q[2]
H q[3]
MEASURE q[0],c[0]
MEASURE q[1],c[1]
MEASURE q[2],c[2]
MEASURE q[3],c[3]"""

    result = await sample_tool(
        circuit=circuit,
        device_id="20",
        shots=1
    )

    # Get the random 4-bit number
    results = await get_task_results_tool(result["task_id"])
    # Parse results to get random number
    return results

asyncio.run(quantum_random_number())

Expectation Estimation (VQE-style)

Calculate the expectation value of an observable.

async def expectation_estimation():
    # Circuit WITHOUT measurements (for estimation)
    circuit = """QINIT 2
CREG 2
H q[0]
CNOT q[0],q[1]"""

    # Define observable as dictionary
    observable = {"Z0 Z1": 1.0}

    result = await estimate_tool(
        circuit=circuit,
        observable=observable,
        device_id="20"
    )

    task_id = result["task_id"]

    # Wait for completion
    while True:
        status = await get_task_status_tool(task_id)
        if status["task_status"] == "DONE":
            break
        await asyncio.sleep(2)

    results = await get_task_results_tool(task_id)
    print(f"Expectation value: {results['results']}")

asyncio.run(expectation_estimation())

Batch Sampling

Run multiple circuits efficiently.

async def batch_sampling():
    circuits = [
        # Bell state
        """QINIT 2
CREG 2
H q[0]
CNOT q[0],q[1]
MEASURE q[0],c[0]
MEASURE q[1],c[1]""",
        # |01⟩ state
        """QINIT 2
CREG 2
X q[0]
MEASURE q[0],c[0]
MEASURE q[1],c[1]""",
        # |10⟩ state
        """QINIT 2
CREG 2
X q[1]
MEASURE q[0],c[0]
MEASURE q[1],c[1]"""
    ]

    result = await batch_sample_tool(
        circuits=circuits,
        device_id="20",
        shots=1000
    )

    print(f"Batch task ID: {result['task_id']}")
    print(f"Number of circuits: {result['num_circuits']}")

asyncio.run(batch_sampling())

Multi-Objective Decision (CircuitObservableBinding)

Evaluate multiple circuits against multiple observables.

async def multi_objective_optimization():
    # Step 1: Create binding with circuits and observables
    binding = await create_circuit_observable_binding_tool(
        circuits=[
            # Strategy A: Bell state
            "QINIT 2\nCREG 2\nH q[0]\nCNOT q[0],q[1]",
            # Strategy B: |00⟩ state
            "QINIT 2\nCREG 2\n",
            # Strategy C: |++⟩ state
            "QINIT 2\nCREG 2\nH q[0]\nH q[1]"
        ],
        observables=[
            {"Z0 Z1": 1.0},  # Correlation measure
            {"X0": 0.5}       # X-basis measure
        ]
    )

    binding_id = binding["binding_id"]
    print(f"Created binding: {binding_id}")

    # Step 2: Add product rule (all combinations)
    # This creates: (0,0), (0,1), (1,0), (1,1), (2,0), (2,1)
    await add_product_rule_tool(
        binding_id=binding_id,
        circuit_indices=[0, 1, 2],
        observable_indices=[0, 1]
    )

    # Step 3: Execute estimation
    result = await estimate_with_binding_tool(
        binding_id=binding_id,
        device_id="20"
    )

    print(f"Task ID: {result['task_id']}")

    # Step 4: Cleanup (optional)
    await delete_binding_tool(binding_id)

asyncio.run(multi_objective_optimization())

Using Zip Rule for Specific Pairs

async def specific_pairs():
    binding = await create_circuit_observable_binding_tool(
        circuits=[
            "QINIT 2\nCREG 2\nH q[0]\nCNOT q[0],q[1]",
            "QINIT 2\nCREG 2\nX q[0]\nCNOT q[0],q[1]"
        ],
        observables=[
            {"Z0 Z1": 1.0},
            {"X0 X1": 0.5}
        ]
    )

    # Add specific pairs: (circuit_0, observable_1), (circuit_1, observable_0)
    await add_zip_rule_tool(
        binding_id=binding["binding_id"],
        circuit_indices=[0, 1],
        observable_indices=[1, 0]
    )

    result = await estimate_with_binding_tool(
        binding_id=binding["binding_id"],
        device_id="20"
    )

asyncio.run(specific_pairs())

Task Management

async def task_management():
    # List recent tasks
    tasks = await list_my_tasks_tool(limit=10)

    print("Recent tasks:")
    for task in tasks["tasks"]:
        print(f"  {task['task_id']}: {task['status']}")

    # Cancel a specific task (if still pending/running)
    if tasks["tasks"]:
        task_id = tasks["tasks"][0]["task_id"]
        result = await cancel_task_tool(task_id)
        print(f"Cancellation: {result['message']}")

asyncio.run(task_management())

Device Selection

async def select_best_device():
    devices = await list_qpu_devices_tool()

    # Find operational device with shortest queue
    best_device = None
    for device in devices["devices"]:
        if device["operational"] and not device.get("is_virtual", False):
            if best_device is None or device["pending_jobs"] < best_device["pending_jobs"]:
                best_device = device

    if best_device:
        print(f"Selected: {best_device['name']} (ID: {best_device['id']})")
        print(f"  Qubits: {best_device['num_qubits']}")
        print(f"  Queue: {best_device['pending_jobs']} jobs")

        # Get detailed properties
        props = await get_qpu_properties_tool(best_device["id"])
        print(f"  Gates: {props.get('basis_gates', [])}")

asyncio.run(select_best_device())

LangChain Integration

See the examples/ directory for complete LangChain agent examples.

import os
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent

async def create_quantum_agent():
    # Configure MCP client
    mcp_client = MultiServerMCPClient({
        "qpanda3-runtime": {
            "transport": "stdio",
            "command": "qpanda3-runtime-mcp-server",
            "args": [],
            "env": {
                "QPANDA3_API_KEY": os.getenv("QPANDA3_API_KEY", ""),
                "QPANDA3_SERVER_URL": os.getenv("QPANDA3_SERVER_URL", ""),
            }
        }
    })

    # Use persistent session
    async with mcp_client.session("qpanda3-runtime") as session:
        # Load MCP tools
        tools = await load_mcp_tools(session)

        # Create LLM
        llm = ChatOpenAI(model="gpt-4o", temperature=0)

        # Create agent with tools
        agent = create_react_agent(llm, tools)

        # Run a query
        response = await agent.ainvoke({
            "input": "List all available QPU devices"
        })
        print(response)

asyncio.run(create_quantum_agent())

Error Handling

async def robust_execution():
    # Submit task
    result = await sample_tool(
        circuit="QINIT 2\nCREG 2\nH q[0]\nCNOT q[0],q[1]\nMEASURE q[0],c[0]\nMEASURE q[1],c[1]",
        device_id="20",
        shots=1000
    )

    if result["status"] == "error":
        print(f"Submission failed: {result['message']}")
        return

    task_id = result["task_id"]

    # Poll with timeout
    max_attempts = 60  # 2 minutes max
    for attempt in range(max_attempts):
        status = await get_task_status_tool(task_id)

        if status["task_status"] == "DONE":
            results = await get_task_results_tool(task_id)
            print(f"Success: {results['results']}")
            return

        elif status["task_status"] == "FAILED":
            print(f"Task failed: {status.get('error_message', 'Unknown error')}")
            return

        await asyncio.sleep(2)

    # Timeout - cancel the task
    await cancel_task_tool(task_id)
    print("Task timed out and was cancelled")

asyncio.run(robust_execution())