This page provides complete examples for common use cases.
Note: These examples assume you have already configured your credentials via environment variables or called
setup_origin_quantum_account_tool.
This example demonstrates the full workflow from account setup to result retrieval.
import asyncio
async def bell_state_experiment():
# Step 1: Setup account
result = await setup_origin_quantum_account_tool(
api_key="your_key",
server_url="https://your_server"
)
if result["status"] != "success":
print(f"Setup failed: {result['message']}")
return
print(f"Connected! Available devices: {result['available_devices']}")
# Step 2: Define Bell state circuit
circuit = """QINIT 2
CREG 2
H q[0]
CNOT q[0],q[1]
MEASURE q[0],c[0]
MEASURE q[1],c[1]"""
# Step 3: Submit sampling task
result = await sample_tool(
circuit=circuit,
device_id="20",
shots=1000
)
task_id = result["task_id"]
print(f"Task submitted: {task_id}")
# Step 4: Poll for completion
while True:
status = await get_task_status_tool(task_id)
print(f"Status: {status['task_status']}")
if status["task_status"] == "DONE":
break
elif status["task_status"] == "FAILED":
print("Task failed!")
return
await asyncio.sleep(2)
# Step 5: Get results
results = await get_task_results_tool(task_id)
print(f"Results: {results['results']}")
# Expected: ~50% 00, ~50% 11
# Run the experiment
asyncio.run(bell_state_experiment())Generate truly random numbers using quantum superposition.
async def quantum_random_number():
# 4-qubit random number generator (0-15)
circuit = """QINIT 4
CREG 4
H q[0]
H q[1]
H q[2]
H q[3]
MEASURE q[0],c[0]
MEASURE q[1],c[1]
MEASURE q[2],c[2]
MEASURE q[3],c[3]"""
result = await sample_tool(
circuit=circuit,
device_id="20",
shots=1
)
# Get the random 4-bit number
results = await get_task_results_tool(result["task_id"])
# Parse results to get random number
return results
asyncio.run(quantum_random_number())Calculate the expectation value of an observable.
async def expectation_estimation():
# Circuit WITHOUT measurements (for estimation)
circuit = """QINIT 2
CREG 2
H q[0]
CNOT q[0],q[1]"""
# Define observable as dictionary
observable = {"Z0 Z1": 1.0}
result = await estimate_tool(
circuit=circuit,
observable=observable,
device_id="20"
)
task_id = result["task_id"]
# Wait for completion
while True:
status = await get_task_status_tool(task_id)
if status["task_status"] == "DONE":
break
await asyncio.sleep(2)
results = await get_task_results_tool(task_id)
print(f"Expectation value: {results['results']}")
asyncio.run(expectation_estimation())Run multiple circuits efficiently.
async def batch_sampling():
circuits = [
# Bell state
"""QINIT 2
CREG 2
H q[0]
CNOT q[0],q[1]
MEASURE q[0],c[0]
MEASURE q[1],c[1]""",
# |01⟩ state
"""QINIT 2
CREG 2
X q[0]
MEASURE q[0],c[0]
MEASURE q[1],c[1]""",
# |10⟩ state
"""QINIT 2
CREG 2
X q[1]
MEASURE q[0],c[0]
MEASURE q[1],c[1]"""
]
result = await batch_sample_tool(
circuits=circuits,
device_id="20",
shots=1000
)
print(f"Batch task ID: {result['task_id']}")
print(f"Number of circuits: {result['num_circuits']}")
asyncio.run(batch_sampling())Evaluate multiple circuits against multiple observables.
async def multi_objective_optimization():
# Step 1: Create binding with circuits and observables
binding = await create_circuit_observable_binding_tool(
circuits=[
# Strategy A: Bell state
"QINIT 2\nCREG 2\nH q[0]\nCNOT q[0],q[1]",
# Strategy B: |00⟩ state
"QINIT 2\nCREG 2\n",
# Strategy C: |++⟩ state
"QINIT 2\nCREG 2\nH q[0]\nH q[1]"
],
observables=[
{"Z0 Z1": 1.0}, # Correlation measure
{"X0": 0.5} # X-basis measure
]
)
binding_id = binding["binding_id"]
print(f"Created binding: {binding_id}")
# Step 2: Add product rule (all combinations)
# This creates: (0,0), (0,1), (1,0), (1,1), (2,0), (2,1)
await add_product_rule_tool(
binding_id=binding_id,
circuit_indices=[0, 1, 2],
observable_indices=[0, 1]
)
# Step 3: Execute estimation
result = await estimate_with_binding_tool(
binding_id=binding_id,
device_id="20"
)
print(f"Task ID: {result['task_id']}")
# Step 4: Cleanup (optional)
await delete_binding_tool(binding_id)
asyncio.run(multi_objective_optimization())async def specific_pairs():
binding = await create_circuit_observable_binding_tool(
circuits=[
"QINIT 2\nCREG 2\nH q[0]\nCNOT q[0],q[1]",
"QINIT 2\nCREG 2\nX q[0]\nCNOT q[0],q[1]"
],
observables=[
{"Z0 Z1": 1.0},
{"X0 X1": 0.5}
]
)
# Add specific pairs: (circuit_0, observable_1), (circuit_1, observable_0)
await add_zip_rule_tool(
binding_id=binding["binding_id"],
circuit_indices=[0, 1],
observable_indices=[1, 0]
)
result = await estimate_with_binding_tool(
binding_id=binding["binding_id"],
device_id="20"
)
asyncio.run(specific_pairs())async def task_management():
# List recent tasks
tasks = await list_my_tasks_tool(limit=10)
print("Recent tasks:")
for task in tasks["tasks"]:
print(f" {task['task_id']}: {task['status']}")
# Cancel a specific task (if still pending/running)
if tasks["tasks"]:
task_id = tasks["tasks"][0]["task_id"]
result = await cancel_task_tool(task_id)
print(f"Cancellation: {result['message']}")
asyncio.run(task_management())async def select_best_device():
devices = await list_qpu_devices_tool()
# Find operational device with shortest queue
best_device = None
for device in devices["devices"]:
if device["operational"] and not device.get("is_virtual", False):
if best_device is None or device["pending_jobs"] < best_device["pending_jobs"]:
best_device = device
if best_device:
print(f"Selected: {best_device['name']} (ID: {best_device['id']})")
print(f" Qubits: {best_device['num_qubits']}")
print(f" Queue: {best_device['pending_jobs']} jobs")
# Get detailed properties
props = await get_qpu_properties_tool(best_device["id"])
print(f" Gates: {props.get('basis_gates', [])}")
asyncio.run(select_best_device())See the examples/ directory for complete LangChain agent examples.
import os
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent
async def create_quantum_agent():
# Configure MCP client
mcp_client = MultiServerMCPClient({
"qpanda3-runtime": {
"transport": "stdio",
"command": "qpanda3-runtime-mcp-server",
"args": [],
"env": {
"QPANDA3_API_KEY": os.getenv("QPANDA3_API_KEY", ""),
"QPANDA3_SERVER_URL": os.getenv("QPANDA3_SERVER_URL", ""),
}
}
})
# Use persistent session
async with mcp_client.session("qpanda3-runtime") as session:
# Load MCP tools
tools = await load_mcp_tools(session)
# Create LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Create agent with tools
agent = create_react_agent(llm, tools)
# Run a query
response = await agent.ainvoke({
"input": "List all available QPU devices"
})
print(response)
asyncio.run(create_quantum_agent())async def robust_execution():
# Submit task
result = await sample_tool(
circuit="QINIT 2\nCREG 2\nH q[0]\nCNOT q[0],q[1]\nMEASURE q[0],c[0]\nMEASURE q[1],c[1]",
device_id="20",
shots=1000
)
if result["status"] == "error":
print(f"Submission failed: {result['message']}")
return
task_id = result["task_id"]
# Poll with timeout
max_attempts = 60 # 2 minutes max
for attempt in range(max_attempts):
status = await get_task_status_tool(task_id)
if status["task_status"] == "DONE":
results = await get_task_results_tool(task_id)
print(f"Success: {results['results']}")
return
elif status["task_status"] == "FAILED":
print(f"Task failed: {status.get('error_message', 'Unknown error')}")
return
await asyncio.sleep(2)
# Timeout - cancel the task
await cancel_task_tool(task_id)
print("Task timed out and was cancelled")
asyncio.run(robust_execution())