Complete API reference for task management operations in the Conductor Rust SDK. This covers task polling, updating, logging, and queue management.
- Quick Start
- TaskClient API
- Polling Tasks
- Updating Tasks
- Task Logging
- Queue Management
- Models Reference
- Best Practices
use conductor::{Configuration, ConductorClient};
use conductor::models::{Task, TaskResult, TaskResultStatus};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Configuration::new("http://localhost:8080/api");
let client = ConductorClient::new(config)?;
let task_client = client.task_client();
// Poll for a task
if let Some(task) = task_client.poll_task("my_task", Some("worker-1"), None).await? {
println!("Got task: {}", task.task_id);
// Process the task...
let result = TaskResult::completed(&task.task_id, &task.workflow_instance_id)
.with_output_value("result", "success");
// Update the task
task_client.update_task(&result).await?;
}
Ok(())
}| Method | Endpoint | Description |
|---|---|---|
poll_task() |
GET /tasks/poll/{taskType} |
Poll for a single task |
batch_poll() |
GET /tasks/poll/batch/{taskType} |
Poll for multiple tasks |
| Method | Endpoint | Description |
|---|---|---|
get_task() |
GET /tasks/{taskId} |
Get task by ID |
update_task() |
POST /tasks |
Update task with result |
update_task_with_retry() |
POST /tasks |
Update with retry logic |
update_task_by_ref_name() |
POST /tasks/{wfId}/{taskRef}/{status} |
Update by reference name |
update_task_sync() |
POST /tasks/{wfId}/{taskRef}/{status}/sync |
Update and get workflow |
| Method | Endpoint | Description |
|---|---|---|
add_task_log() |
POST /tasks/{taskId}/log |
Add log message to task |
get_task_logs() |
GET /tasks/{taskId}/log |
Get all task logs |
| Method | Endpoint | Description |
|---|---|---|
get_queue_sizes() |
GET /tasks/queue/sizes |
Get queue sizes for task types |
get_queue_size_for_task() |
GET /tasks/queue/sizes |
Get queue size for one task type |
get_task_poll_data() |
GET /tasks/queue/polldata/{taskType} |
Get poll data for task type |
get_all_poll_data() |
GET /tasks/queue/polldata/all |
Get all poll data |
get_tasks_in_progress() |
GET /tasks/in_progress/{taskType} |
Get in-progress tasks |
remove_task_from_queue() |
DELETE /tasks/queue/{taskType}/{taskId} |
Remove task from queue |
// Basic poll
let task = task_client.poll_task("my_task", None, None).await?;
// With worker ID
let task = task_client.poll_task(
"my_task",
Some("worker-1"),
None,
).await?;
// With domain routing
let task = task_client.poll_task(
"my_task",
Some("worker-1"),
Some("production"),
).await?;
if let Some(task) = task {
println!("Task ID: {}", task.task_id);
println!("Workflow ID: {}", task.workflow_instance_id);
println!("Input: {:?}", task.input_data);
}Poll multiple tasks at once for higher throughput:
use std::time::Duration;
// Poll up to 10 tasks with 100ms timeout
let tasks = task_client.batch_poll(
"my_task",
Some("worker-1"),
None, // domain
10, // count
Duration::from_millis(100), // timeout
).await?;
println!("Received {} tasks", tasks.len());
for task in tasks {
// Process each task
}use conductor::models::{TaskResult, TaskResultStatus};
// Create task result
let result = TaskResult::completed(&task.task_id, &task.workflow_instance_id)
.with_worker_id("worker-1")
.with_output_value("result", "processed")
.with_output_value("count", 42);
// Update the task
task_client.update_task(&result).await?;// Completed
let result = TaskResult::completed(&task.task_id, &task.workflow_instance_id)
.with_output_value("success", true);
// Failed (will retry based on task definition)
let result = TaskResult::failed(&task.task_id, &task.workflow_instance_id)
.with_reason("Database connection failed");
// Failed with terminal error (no retry)
let result = TaskResult {
task_id: task.task_id.clone(),
workflow_instance_id: task.workflow_instance_id.clone(),
status: TaskResultStatus::FailedWithTerminalError,
reason_for_incompletion: Some("Invalid input - cannot process".to_string()),
..Default::default()
};
// In Progress (for long-running tasks)
let result = TaskResult::in_progress(&task.task_id, &task.workflow_instance_id)
.with_callback_after_seconds(30)
.with_output_value("progress", 50);Automatically retry on transient failures:
let result = TaskResult::completed(&task.task_id, &task.workflow_instance_id)
.with_output_value("result", "done");
// Will retry up to 3 times with exponential backoff
task_client.update_task_with_retry(&result, 3).await?;Update a task using workflow ID and task reference name:
use serde_json::json;
let response = task_client.update_task_by_ref_name(
&workflow_id,
"task_reference_name",
TaskResultStatus::Completed,
json!({ "result": "success" }),
Some("worker-1"),
).await?;Update task and get the resulting workflow state:
let workflow = task_client.update_task_sync(
&workflow_id,
"task_reference_name",
TaskResultStatus::Completed,
json!({ "result": "done" }),
Some("worker-1"),
).await?;
println!("Workflow status: {:?}", workflow.status);// Add a single log entry
task_client.add_task_log(&task.task_id, "Starting processing").await?;
// Log progress
for i in 0..10 {
task_client.add_task_log(
&task.task_id,
&format!("Processing batch {}/10", i + 1),
).await?;
// Process batch...
}
// Log completion
task_client.add_task_log(&task.task_id, "Processing complete").await?;let logs = task_client.get_task_logs(&task.task_id).await?;
for log in logs {
println!("[{}] {}", log.created_time, log.log);
}// Get sizes for multiple task types
let sizes = task_client.get_queue_sizes(&["task1", "task2", "task3"]).await?;
for (task_type, size) in sizes {
println!("{}: {} tasks in queue", task_type, size);
}
// Get size for a single task type
let size = task_client.get_queue_size_for_task("my_task").await?;
println!("Queue depth: {}", size);Monitor worker polling activity:
// Get poll data for a specific task type
let poll_data = task_client.get_task_poll_data("my_task").await?;
for data in poll_data {
println!("Queue: {}", data.queue_name);
if let Some(worker) = &data.worker_id {
println!(" Last worker: {}", worker);
}
println!(" Last poll: {}", data.last_poll_time);
}
// Get all poll data
let all_data = task_client.get_all_poll_data().await?;let tasks = task_client.get_tasks_in_progress(
"my_task",
None, // start_key for pagination
Some(100), // count
).await?;
println!("Tasks in progress: {}", tasks.len());
for task in tasks {
println!(" {} - started at {}", task.task_id, task.start_time);
}Manually remove a task from the queue:
task_client.remove_task_from_queue("my_task", &task_id).await?;The task object returned from polling:
pub struct Task {
pub task_id: String,
pub workflow_instance_id: String,
pub task_def_name: String,
pub reference_task_name: String,
pub workflow_type: String,
pub status: TaskStatus,
pub input_data: HashMap<String, Value>,
pub output_data: HashMap<String, Value>,
pub scheduled_time: i64,
pub start_time: i64,
pub end_time: Option<i64>,
pub retry_count: i32,
pub poll_count: i32,
pub correlation_id: Option<String>,
pub reason_for_incompletion: Option<String>,
// ... additional fields
}
impl Task {
// Get typed input value
pub fn get_input<T: DeserializeOwned>(&self, key: &str) -> Option<T>;
// Get input as string
pub fn get_input_string(&self, key: &str) -> Option<String>;
// Get input as i32
pub fn get_input_i32(&self, key: &str) -> Option<i32>;
}Result for updating a task:
pub struct TaskResult {
pub task_id: String,
pub workflow_instance_id: String,
pub worker_id: Option<String>,
pub status: TaskResultStatus,
pub output_data: HashMap<String, Value>,
pub reason_for_incompletion: Option<String>,
pub callback_after_seconds: i64,
pub logs: Vec<String>,
}
impl TaskResult {
// Create completed result
pub fn completed(task_id: &str, workflow_id: &str) -> Self;
// Create failed result
pub fn failed(task_id: &str, workflow_id: &str) -> Self;
// Create in-progress result
pub fn in_progress(task_id: &str, workflow_id: &str) -> Self;
// Builder methods
pub fn with_worker_id(self, id: impl Into<String>) -> Self;
pub fn with_output_value(self, key: impl Into<String>, value: impl Serialize) -> Self;
pub fn with_output(self, output: HashMap<String, Value>) -> Self;
pub fn with_reason(self, reason: impl Into<String>) -> Self;
pub fn with_callback_after_seconds(self, seconds: i64) -> Self;
pub fn with_log(self, log: impl Into<String>) -> Self;
}pub enum TaskResultStatus {
Completed, // Task completed successfully
Failed, // Task failed (may retry)
FailedWithTerminalError, // Task failed (no retry)
InProgress, // Task still running
}pub struct TaskExecLog {
pub log: String,
pub task_id: String,
pub created_time: i64,
}pub struct PollData {
pub queue_name: String,
pub domain: Option<String>,
pub worker_id: Option<String>,
pub last_poll_time: i64,
}// Instead of polling one at a time
loop {
if let Some(task) = task_client.poll_task("task", Some("worker"), None).await? {
process(task).await;
}
}
// Use batch polling
loop {
let tasks = task_client.batch_poll("task", Some("worker"), None, 10, Duration::from_millis(100)).await?;
for task in tasks {
tokio::spawn(process(task));
}
}async fn safe_update(client: &TaskClient, result: &TaskResult) -> Result<()> {
match client.update_task_with_retry(result, 3).await {
Ok(_) => Ok(()),
Err(e) => {
// Log the failure for debugging
eprintln!("Failed to update task {}: {}", result.task_id, e);
// Task will timeout and be retried by Conductor
Err(e)
}
}
}async fn process_with_logging(client: &TaskClient, task: Task) -> Result<TaskResult> {
client.add_task_log(&task.task_id, "Starting task processing").await.ok();
// Validate input
let input = match task.get_input::<MyInput>("data") {
Some(data) => data,
None => {
client.add_task_log(&task.task_id, "ERROR: Missing required input 'data'").await.ok();
return Ok(TaskResult::failed(&task.task_id, &task.workflow_instance_id)
.with_reason("Missing required input"));
}
};
client.add_task_log(&task.task_id, &format!("Processing: {:?}", input)).await.ok();
// Process...
client.add_task_log(&task.task_id, "Task completed successfully").await.ok();
Ok(TaskResult::completed(&task.task_id, &task.workflow_instance_id))
}async fn health_check(client: &TaskClient) -> bool {
let size = client.get_queue_size_for_task("critical_task").await.unwrap_or(0);
if size > 1000 {
eprintln!("WARNING: Queue depth is high: {}", size);
// Trigger alert or scale workers
}
size < 5000 // Return healthy if under threshold
}async fn long_running_task(client: &TaskClient, task: Task) -> Result<TaskResult> {
let total_items: i32 = task.get_input("total_items").unwrap_or(0);
let processed: i32 = task.get_input("_processed").unwrap_or(0);
// Process a batch
let batch_size = 100;
let new_processed = processed + batch_size;
if new_processed < total_items {
// More work to do - report progress and callback
Ok(TaskResult::in_progress(&task.task_id, &task.workflow_instance_id)
.with_callback_after_seconds(30)
.with_output_value("_processed", new_processed)
.with_output_value("progress", (new_processed * 100) / total_items))
} else {
// Done
Ok(TaskResult::completed(&task.task_id, &task.workflow_instance_id)
.with_output_value("total_processed", new_processed))
}
}- Worker Implementation - High-level worker abstraction
- Workflow Management - Workflow operations
- Metadata Management - Task definitions