1515from conductor .client .workflow .conductor_workflow import ConductorWorkflow
1616from conductor .client .workflow .task .dynamic_task import DynamicTask
1717from conductor .client .workflow .task .human_task import HumanTask
18- from conductor .client .workflow .task .llm_tasks .llm_chat_complete import LlmChatComplete , ChatMessage
18+ from conductor .client .workflow .task .llm_tasks .llm_chat_complete import (
19+ LlmChatComplete ,
20+ ChatMessage ,
21+ )
1922from conductor .client .workflow .task .simple_task import SimpleTask
2023from conductor .client .workflow .task .sub_workflow_task import SubWorkflowTask
2124from conductor .client .workflow .task .switch_task import SwitchTask
@@ -34,48 +37,53 @@ def start_workers(api_config):
3437 return task_handler
3538
3639
37- @worker_task (task_definition_name = ' get_customer_list' )
40+ @worker_task (task_definition_name = " get_customer_list" )
3841def get_customer_list () -> List [Customer ]:
3942 customers = []
4043 for i in range (100 ):
41- customer_name = '' .join (random .choices (string .ascii_uppercase +
42- string .digits , k = 5 ))
44+ customer_name = "" .join (
45+ random .choices (string .ascii_uppercase + string .digits , k = 5 )
46+ )
4347 spend = random .randint (a = 100000 , b = 9000000 )
4448 customers .append (
45- Customer (id = i , name = 'Customer ' + customer_name ,
46- annual_spend = spend ,
47- country = 'US' )
49+ Customer (
50+ id = i , name = "Customer " + customer_name , annual_spend = spend , country = "US"
51+ )
4852 )
4953 return customers
5054
5155
52- @worker_task (task_definition_name = ' get_top_n' )
56+ @worker_task (task_definition_name = " get_top_n" )
5357def get_top_n_customers (n : int , customers : List [Customer ]) -> List [Customer ]:
5458 customers .sort (key = lambda x : x .annual_spend , reverse = True )
5559 end = min (n + 1 , len (customers ))
56- return customers [1 : end ]
60+ return customers [1 :end ]
5761
5862
59- @worker_task (task_definition_name = ' generate_promo_code' )
63+ @worker_task (task_definition_name = " generate_promo_code" )
6064def get_top_n_customers () -> str :
61- res = '' .join (random .choices (string .ascii_uppercase +
62- string .digits , k = 5 ))
65+ res = "" .join (random .choices (string .ascii_uppercase + string .digits , k = 5 ))
6366 return res
6467
6568
66- @worker_task (task_definition_name = ' send_email' )
69+ @worker_task (task_definition_name = " send_email" )
6770def send_email (customer : list [Customer ], promo_code : str ) -> str :
68- return f' Sent { promo_code } to { len (customer )} customers'
71+ return f" Sent { promo_code } to { len (customer )} customers"
6972
7073
71- @worker_task (task_definition_name = ' create_workflow' )
74+ @worker_task (task_definition_name = " create_workflow" )
7275def create_workflow (steps : list [str ], inputs : Dict [str , object ]) -> dict :
7376 executor = OrkesClients ().get_workflow_executor ()
74- workflow = ConductorWorkflow (executor = executor , name = ' copilot_execution' , version = 1 )
77+ workflow = ConductorWorkflow (executor = executor , name = " copilot_execution" , version = 1 )
7578
7679 for step in steps :
77- if step == 'review' :
78- task = HumanTask (task_ref_name = 'review' , display_name = 'review email' , form_version = 0 , form_template = 'email_review' )
80+ if step == "review" :
81+ task = HumanTask (
82+ task_ref_name = "review" ,
83+ display_name = "review email" ,
84+ form_version = 0 ,
85+ form_template = "email_review" ,
86+ )
7987 task .input_parameters .update (inputs [step ])
8088 workflow >> task
8189 else :
@@ -84,13 +92,13 @@ def create_workflow(steps: list[str], inputs: Dict[str, object]) -> dict:
8492 workflow >> task
8593
8694 workflow .register (overwrite = True )
87- print (f' \n \n \n Registered workflow by name { workflow .name } \n ' )
95+ print (f" \n \n \n Registered workflow by name { workflow .name } \n " )
8896 return workflow .to_workflow_def ().toJSON ()
8997
9098
9199def main ():
92- llm_provider = ' openai_saas'
93- chat_complete_model = ' gpt-4'
100+ llm_provider = " openai_saas"
101+ chat_complete_model = " gpt-4"
94102 api_config = Configuration ()
95103 api_config .apply_logging_config ()
96104 clients = OrkesClients (configuration = api_config )
@@ -100,11 +108,11 @@ def main():
100108 task_handler = start_workers (api_config = api_config )
101109
102110 # register our two tasks
103- metadata_client .register_task_def (task_def = TaskDef (name = ' get_weather' ))
104- metadata_client .register_task_def (task_def = TaskDef (name = ' get_price_from_amazon' ))
111+ metadata_client .register_task_def (task_def = TaskDef (name = " get_weather" ))
112+ metadata_client .register_task_def (task_def = TaskDef (name = " get_price_from_amazon" ))
105113
106114 # Define and associate prompt with the AI integration
107- prompt_name = ' chat_function_instructions'
115+ prompt_name = " chat_function_instructions"
108116 prompt_text = """
109117 You are a helpful assistant that can answer questions using tools provided.
110118 You have the following tools specified as functions in python:
@@ -151,47 +159,72 @@ def main():
151159 # description='openai config',
152160 # config=open_ai_config)
153161
154- orchestrator .add_prompt_template (prompt_name , prompt_text , ' chat instructions' )
162+ orchestrator .add_prompt_template (prompt_name , prompt_text , " chat instructions" )
155163
156164 # associate the prompts
157- orchestrator .associate_prompt_template (prompt_name , llm_provider , [chat_complete_model ])
165+ orchestrator .associate_prompt_template (
166+ prompt_name , llm_provider , [chat_complete_model ]
167+ )
158168
159- wf = ConductorWorkflow (name = 'my_function_chatbot' , version = 1 , executor = workflow_executor )
169+ wf = ConductorWorkflow (
170+ name = "my_function_chatbot" , version = 1 , executor = workflow_executor
171+ )
160172
161- user_input = WaitTask (task_ref_name = ' get_user_input' )
173+ user_input = WaitTask (task_ref_name = " get_user_input" )
162174
163- chat_complete = LlmChatComplete (task_ref_name = 'chat_complete_ref' ,
164- llm_provider = llm_provider , model = chat_complete_model ,
165- instructions_template = prompt_name ,
166- messages = [
167- ChatMessage ( role = 'user' ,
168- message = user_input .output (' query' ))
169- ] ,
170- max_tokens = 2048 )
175+ chat_complete = LlmChatComplete (
176+ task_ref_name = "chat_complete_ref" ,
177+ llm_provider = llm_provider ,
178+ model = chat_complete_model ,
179+ instructions_template = prompt_name ,
180+ messages = [ ChatMessage ( role = "user" , message = user_input .output (" query" ))],
181+ max_tokens = 2048 ,
182+ )
171183
172- function_call = DynamicTask (task_reference_name = 'fn_call_ref' , dynamic_task = 'SUB_WORKFLOW' )
173- function_call .input_parameters ['steps' ] = chat_complete .output ('function_parameters.steps' )
174- function_call .input_parameters ['inputs' ] = chat_complete .output ('function_parameters.inputs' )
175- function_call .input_parameters ['subWorkflowName' ] = 'copilot_execution'
176- function_call .input_parameters ['subWorkflowVersion' ] = 1
184+ function_call = DynamicTask (
185+ task_reference_name = "fn_call_ref" , dynamic_task = "SUB_WORKFLOW"
186+ )
187+ function_call .input_parameters ["steps" ] = chat_complete .output (
188+ "function_parameters.steps"
189+ )
190+ function_call .input_parameters ["inputs" ] = chat_complete .output (
191+ "function_parameters.inputs"
192+ )
193+ function_call .input_parameters ["subWorkflowName" ] = "copilot_execution"
194+ function_call .input_parameters ["subWorkflowVersion" ] = 1
177195
178- sub_workflow = SubWorkflowTask (task_ref_name = 'execute_workflow' , workflow_name = 'copilot_execution' , version = 1 )
196+ sub_workflow = SubWorkflowTask (
197+ task_ref_name = "execute_workflow" , workflow_name = "copilot_execution" , version = 1
198+ )
179199
180- create = create_workflow (task_ref_name = 'create_workflow' , steps = chat_complete .output ('result.function_parameters.steps' ),
181- inputs = chat_complete .output ('result.function_parameters.inputs' ))
182- call_function = SwitchTask (task_ref_name = 'to_call_or_not' , case_expression = chat_complete .output ('result.function' ))
183- call_function .switch_case ('create_workflow' , [create , sub_workflow ])
200+ create = create_workflow (
201+ task_ref_name = "create_workflow" ,
202+ steps = chat_complete .output ("result.function_parameters.steps" ),
203+ inputs = chat_complete .output ("result.function_parameters.inputs" ),
204+ )
205+ call_function = SwitchTask (
206+ task_ref_name = "to_call_or_not" ,
207+ case_expression = chat_complete .output ("result.function" ),
208+ )
209+ call_function .switch_case ("create_workflow" , [create , sub_workflow ])
184210
185- call_one_fun = DynamicTask (task_reference_name = 'call_one_fun_ref' , dynamic_task = chat_complete .output ('result.function' ))
186- call_one_fun .input_parameters ['inputs' ] = chat_complete .output ('result.function_parameters' )
187- call_one_fun .input_parameters ['dynamicTaskInputParam' ] = 'inputs'
211+ call_one_fun = DynamicTask (
212+ task_reference_name = "call_one_fun_ref" ,
213+ dynamic_task = chat_complete .output ("result.function" ),
214+ )
215+ call_one_fun .input_parameters ["inputs" ] = chat_complete .output (
216+ "result.function_parameters"
217+ )
218+ call_one_fun .input_parameters ["dynamicTaskInputParam" ] = "inputs"
188219
189220 call_function .default_case ([call_one_fun ])
190221
191222 wf >> user_input >> chat_complete >> call_function
192223
193224 # let's make sure we don't run it for more than 2 minutes -- avoid runaway loops
194- wf .timeout_seconds (120 ).timeout_policy (timeout_policy = TimeoutPolicy .TIME_OUT_WORKFLOW )
225+ wf .timeout_seconds (120 ).timeout_policy (
226+ timeout_policy = TimeoutPolicy .TIME_OUT_WORKFLOW
227+ )
195228 message = """
196229 I am a helpful bot that can help with your customer management.
197230
@@ -202,34 +235,46 @@ def main():
202235 3. Get the list of top N customers and send them a promo code
203236 """
204237 print (message )
205- workflow_run = wf .execute (wait_until_task_ref = user_input .task_reference_name , wait_for_seconds = 120 )
238+ workflow_run = wf .execute (
239+ wait_until_task_ref = user_input .task_reference_name , wait_for_seconds = 120
240+ )
206241 workflow_id = workflow_run .workflow_id
207- query = input ('>> ' )
208- input_task = workflow_run .get_task (task_reference_name = user_input .task_reference_name )
209- workflow_run = workflow_client .update_state (workflow_id = workflow_id ,
210- update_requesst = WorkflowStateUpdate (
211- task_reference_name = user_input .task_reference_name ,
212- task_result = TaskResult (task_id = input_task .task_id , output_data = {
213- 'query' : query
214- }, status = TaskResultStatus .COMPLETED )
215- ),
216- wait_for_seconds = 30 )
242+ query = input (">> " )
243+ input_task = workflow_run .get_task (
244+ task_reference_name = user_input .task_reference_name
245+ )
246+ workflow_run = workflow_client .update_state (
247+ workflow_id = workflow_id ,
248+ update_requesst = WorkflowStateUpdate (
249+ task_reference_name = user_input .task_reference_name ,
250+ task_result = TaskResult (
251+ task_id = input_task .task_id ,
252+ output_data = {"query" : query },
253+ status = TaskResultStatus .COMPLETED ,
254+ ),
255+ ),
256+ wait_for_seconds = 30 ,
257+ )
217258
218259 task_handler .stop_processes ()
219- output = json .dumps (workflow_run .output ['result' ], indent = 3 )
220- print (f"""
260+ output = json .dumps (workflow_run .output ["result" ], indent = 3 )
261+ print (
262+ f"""
221263
222264 { output }
223265
224- """ )
266+ """
267+ )
225268
226- print (f"""
269+ print (
270+ f"""
227271 See the complete execution graph here:
228272
229273 http://localhost:5001/execution/{ workflow_id }
230274
231- """ )
275+ """
276+ )
232277
233278
234- if __name__ == ' __main__' :
279+ if __name__ == " __main__" :
235280 main ()
0 commit comments