-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworker_config_example.rs
More file actions
254 lines (221 loc) · 9.48 KB
/
worker_config_example.rs
File metadata and controls
254 lines (221 loc) · 9.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// Copyright {{.Year}} Conductor OSS
// Licensed under the Apache License, Version 2.0. See LICENSE in the project root for license information.
use conductor::{
client::ConductorClient,
configuration::Configuration,
error::Result,
models::{StartWorkflowRequest, Task, WorkflowDef, WorkflowTask},
schema::generate_schema,
worker::{FnWorker, TaskHandler, WorkerOutput},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tracing::info;
/// Input schema for the configured task
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct ConfiguredTaskInput {
/// Name of the user to process
name: String,
/// Priority level (1-10)
priority: i32,
/// Optional tags for the task
#[serde(default)]
tags: Vec<String>,
}
/// Output schema for the configured task
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct ConfiguredTaskOutput {
/// Processing result
result: String,
/// Timestamp of completion
processed_at: String,
/// Worker that processed this task
worker_id: String,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("conductor=info".parse().unwrap()),
)
.init();
// Load configuration
let config = Configuration::default();
info!("Connecting to Conductor at {}", config.server_api_url);
// Create the Conductor client
let client = ConductorClient::new(config.clone())?;
// Register workflow
register_workflow(&client).await?;
// Create task handler
let mut handler = TaskHandler::new(config.clone())?;
// ==============================
// Worker 1: Fully Configured Worker
// ==============================
// This worker demonstrates all configuration options
let configured_worker = FnWorker::new("configured_task", |task: Task| async move {
let name = task
.get_input_string("name")
.unwrap_or_else(|| "Unknown".to_string());
let priority: i32 = task.get_input("priority").unwrap_or(5);
info!(
"[Configured Worker] Processing: name={}, priority={}",
name, priority
);
// Simulate processing based on priority
let delay = std::time::Duration::from_millis((100 * (11 - priority)) as u64);
tokio::time::sleep(delay).await;
Ok(WorkerOutput::completed_with_result(serde_json::json!({
"result": format!("Processed {} with priority {}", name, priority),
"processed_at": chrono::Utc::now().to_rfc3339(),
"worker_id": hostname::get()
.map(|h| h.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_string())
})))
})
// Configuration options (can be overridden by environment variables)
.with_thread_count(5) // Max concurrent executions
.with_poll_interval_millis(200) // Poll every 200ms
.with_domain("default") // Task routing domain
.with_identity("rust-worker-configured") // Worker identity
// JSON Schema for input/output validation
.with_input_schema(generate_schema::<ConfiguredTaskInput>(true)) // strict mode
.with_output_schema(generate_schema::<ConfiguredTaskOutput>(true));
// ==============================
// Worker 2: Worker with Type-Derived Schema
// ==============================
let schema_worker = FnWorker::new("schema_task", |task: Task| async move {
let input: ConfiguredTaskInput = task.get_input("input").unwrap_or(ConfiguredTaskInput {
name: "default".to_string(),
priority: 5,
tags: vec![],
});
info!(
"[Schema Worker] Input: name={}, priority={}, tags={:?}",
input.name, input.priority, input.tags
);
Ok(WorkerOutput::completed_with_result(serde_json::json!({
"result": format!("Processed {}", input.name),
"processed_at": chrono::Utc::now().to_rfc3339(),
"worker_id": "schema-worker"
})))
})
.with_thread_count(3)
// Use helper method to generate schema from type
.with_input_schema_from::<ConfiguredTaskInput>(true)
.with_output_schema_from::<ConfiguredTaskOutput>(true);
// ==============================
// Worker 3: Minimal Configuration
// ==============================
// This worker uses mostly defaults, demonstrating that configuration is optional
let minimal_worker = FnWorker::new("minimal_task", |task: Task| async move {
let data = task.get_input_string("data").unwrap_or_default();
info!("[Minimal Worker] Processing: {}", data);
Ok(WorkerOutput::completed_with_result(format!(
"Processed: {}",
data
)))
});
// Uses defaults: thread_count=1, poll_interval=100ms, no domain, no schema
// ==============================
// Worker 4: Domain-Specific Worker
// ==============================
// This worker only processes tasks from a specific domain
let domain_worker = FnWorker::new("domain_task", |_task: Task| async move {
info!("[Domain Worker] Processing task from domain");
Ok(WorkerOutput::completed_with_result("domain task completed"))
})
.with_domain("special_domain") // Only processes tasks routed to this domain
.with_thread_count(2);
// Add all workers
handler.add_worker(configured_worker);
handler.add_worker(schema_worker);
handler.add_worker(minimal_worker);
handler.add_worker(domain_worker);
// Start the handler (this will also register task definitions if configured)
info!("\nStarting task handler...");
info!("Configuration hierarchy:");
info!(" 1. Worker-specific env: CONDUCTOR_WORKER_<NAME>_<PROPERTY>");
info!(" 2. Global env: CONDUCTOR_WORKER_ALL_<PROPERTY>");
info!(" 3. Code defaults");
handler.start().await?;
// Print configuration summary
println!("\n{}", "=".repeat(70));
println!("Worker Configuration Example");
println!("{}", "=".repeat(70));
println!("\nRegistered Workers:");
println!(" 1. configured_task - Full configuration with JSON Schema");
println!(" - thread_count: 5 (or CONDUCTOR_WORKER_CONFIGURED_TASK_THREAD_COUNT)");
println!(" - poll_interval: 200ms");
println!(" - domain: default");
println!(" - input/output schema: enabled (strict mode)");
println!("\n 2. schema_task - Type-derived JSON Schema");
println!(" - Uses with_input_schema_from<T>()");
println!(" - Automatic schema from Rust struct");
println!("\n 3. minimal_task - Default configuration");
println!(" - thread_count: 1 (default)");
println!(" - poll_interval: 100ms (default)");
println!(" - no domain, no schema");
println!("\n 4. domain_task - Domain-specific routing");
println!(" - domain: special_domain");
println!(" - Only processes tasks with matching domain");
println!("\nEnvironment Variable Examples:");
println!(" CONDUCTOR_WORKER_ALL_THREAD_COUNT=10");
println!(" CONDUCTOR_WORKER_CONFIGURED_TASK_THREAD_COUNT=20");
println!(" CONDUCTOR_WORKER_CONFIGURED_TASK_PAUSED=true");
println!("{}", "=".repeat(70));
// Execute test workflow
let workflow_client = client.workflow_client();
let request = StartWorkflowRequest::new("worker_config_demo")
.with_version(1)
.with_input_value("name", "Test User")
.with_input_value("priority", 8);
info!("\nStarting test workflow...");
let workflow_id = workflow_client.start_workflow(&request).await?;
info!("Workflow started: {}", workflow_id);
info!("View at: {}", config.execution_url(&workflow_id));
// Wait for completion
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let workflow = workflow_client.get_workflow(&workflow_id, false).await?;
info!("Workflow status: {:?}", workflow.status);
// Keep running
info!("\nWorkers are running. Press Ctrl+C to stop...");
tokio::signal::ctrl_c().await.ok();
handler.stop().await?;
info!("Done!");
Ok(())
}
async fn register_workflow(client: &ConductorClient) -> Result<()> {
let metadata = client.metadata_client();
let workflow = WorkflowDef::new("worker_config_demo")
.with_description("Demonstrates worker configuration options")
.with_version(1)
.with_task(
WorkflowTask::simple("configured_task", "configured_ref")
.with_input_param("name", "${workflow.input.name}")
.with_input_param("priority", "${workflow.input.priority}"),
)
.with_task(
WorkflowTask::simple("schema_task", "schema_ref").with_input_param(
"input",
serde_json::json!({
"name": "${workflow.input.name}",
"priority": "${workflow.input.priority}",
"tags": ["demo", "config"]
}),
),
)
.with_task(
WorkflowTask::simple("minimal_task", "minimal_ref")
.with_input_param("data", "simple data"),
)
.with_output_param("configured_result", "${configured_ref.output}")
.with_output_param("schema_result", "${schema_ref.output}")
.with_output_param("minimal_result", "${minimal_ref.output}");
info!("Registering workflow: {}", workflow.name);
metadata
.register_or_update_workflow_def(&workflow, true)
.await?;
Ok(())
}