Skip to content

Commit 66e1148

Browse files
committed
Add SQL table and more thorough example
1 parent 14c6b88 commit 66e1148

7 files changed

Lines changed: 931 additions & 15 deletions

File tree

examples/demos/procurement_agent/project/activities/activities.py

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
update_delivery_date_for_item_for_workflow,
1919
remove_delivery_item_for_workflow,
2020
update_project_end_date_for_workflow,
21+
create_procurement_item,
22+
update_procurement_item,
23+
delete_procurement_item,
24+
get_procurement_item_by_name,
25+
get_all_procurement_items,
2126
DatabaseError,
2227
DataCorruptionError,
2328
)
@@ -309,4 +314,252 @@ async def update_project_end_date(workflow_id: str, new_end_date: str) -> str:
309314
except Exception as e:
310315
# Unexpected error - log and let Temporal retry
311316
logger.error(f"Unexpected error updating project end date: {e}")
317+
raise
318+
319+
320+
@activity.defn
321+
async def create_procurement_item_activity(
322+
workflow_id: str,
323+
item: str,
324+
status: str,
325+
eta: str | None = None,
326+
date_arrived: str | None = None,
327+
purchase_order_id: str | None = None
328+
) -> str:
329+
"""
330+
Creates a new procurement item for tracking through the workflow.
331+
332+
Call this when:
333+
- A submittal is approved (Submittal_Approved event) - automatically after submittal approval
334+
- Human feedback requests creating a new procurement item
335+
336+
Args:
337+
workflow_id: The Temporal workflow ID
338+
item: The item name (e.g., "Steel Beams")
339+
status: Current status of the item (e.g., "submittal_approved")
340+
eta: Optional estimated time of arrival
341+
date_arrived: Optional date the item arrived
342+
purchase_order_id: Optional purchase order ID
343+
344+
Raises:
345+
ApplicationError: Non-retryable if data is invalid
346+
DatabaseError: Retryable if database connection fails
347+
"""
348+
logger.info(f"Creating procurement item for workflow {workflow_id}: {item} with status {status}")
349+
350+
try:
351+
await create_procurement_item(
352+
workflow_id=workflow_id,
353+
item=item,
354+
status=status,
355+
eta=eta,
356+
date_arrived=date_arrived,
357+
purchase_order_id=purchase_order_id
358+
)
359+
return f"Procurement item created: {item} with status {status}"
360+
361+
except DataCorruptionError as e:
362+
# Application error - invalid data, don't retry
363+
logger.error(f"Data corruption error creating procurement item: {e}")
364+
raise ApplicationError(
365+
f"Invalid data creating procurement item: {e}",
366+
type="DataCorruptionError",
367+
non_retryable=True
368+
) from e
369+
370+
except DatabaseError as e:
371+
# Platform error - database connection issue, let Temporal retry
372+
logger.warning(f"Database error creating procurement item (will retry): {e}")
373+
raise # Let Temporal retry with activity retry policy
374+
375+
except Exception as e:
376+
# Unexpected error - log and let Temporal retry
377+
logger.error(f"Unexpected error creating procurement item: {e}")
378+
raise
379+
380+
381+
@activity.defn
382+
async def update_procurement_item_activity(
383+
workflow_id: str,
384+
status: str | None = None,
385+
eta: str | None = None,
386+
date_arrived: str | None = None,
387+
purchase_order_id: str | None = None
388+
) -> str:
389+
"""
390+
Updates a procurement item's fields.
391+
392+
Call this when:
393+
- Any event occurs that changes the item's status (e.g., shipment departed, arrived, inspection scheduled/failed/passed)
394+
- Human feedback requests updating the procurement item
395+
- Purchase order is issued
396+
- ETA is updated
397+
- Item arrives at site
398+
399+
Args:
400+
workflow_id: The Temporal workflow ID
401+
status: Optional new status
402+
eta: Optional new estimated time of arrival
403+
date_arrived: Optional new arrival date
404+
purchase_order_id: Optional new purchase order ID
405+
406+
Raises:
407+
ApplicationError: Non-retryable if workflow_id invalid or item not found
408+
DatabaseError: Retryable if database connection fails
409+
"""
410+
logger.info(f"Updating procurement item for workflow {workflow_id}")
411+
412+
try:
413+
await update_procurement_item(
414+
workflow_id=workflow_id,
415+
status=status,
416+
eta=eta,
417+
date_arrived=date_arrived,
418+
purchase_order_id=purchase_order_id
419+
)
420+
return f"Procurement item updated for workflow {workflow_id}"
421+
422+
except DataCorruptionError as e:
423+
# Application error - item not found or invalid data, don't retry
424+
logger.error(f"Data corruption error updating procurement item: {e}")
425+
raise ApplicationError(
426+
f"Failed to update procurement item: {e}",
427+
type="DataCorruptionError",
428+
non_retryable=True
429+
) from e
430+
431+
except DatabaseError as e:
432+
# Platform error - database connection issue, let Temporal retry
433+
logger.warning(f"Database error updating procurement item (will retry): {e}")
434+
raise # Let Temporal retry with activity retry policy
435+
436+
except Exception as e:
437+
# Unexpected error - log and let Temporal retry
438+
logger.error(f"Unexpected error updating procurement item: {e}")
439+
raise
440+
441+
442+
@activity.defn
443+
async def delete_procurement_item_activity(workflow_id: str) -> str:
444+
"""
445+
Deletes a procurement item from the database.
446+
447+
Call this when:
448+
- Human feedback explicitly requests removing/deleting an item (e.g., "remove the steel beams")
449+
- Item is no longer needed in the project
450+
451+
Args:
452+
workflow_id: The Temporal workflow ID
453+
454+
Raises:
455+
ApplicationError: Non-retryable if workflow_id invalid or item not found
456+
DatabaseError: Retryable if database connection fails
457+
"""
458+
logger.info(f"Deleting procurement item for workflow {workflow_id}")
459+
460+
try:
461+
await delete_procurement_item(workflow_id)
462+
return f"Procurement item deleted for workflow {workflow_id}"
463+
464+
except DataCorruptionError as e:
465+
# Application error - item not found, don't retry
466+
logger.error(f"Data corruption error deleting procurement item: {e}")
467+
raise ApplicationError(
468+
f"Failed to delete procurement item: {e}",
469+
type="DataCorruptionError",
470+
non_retryable=True
471+
) from e
472+
473+
except DatabaseError as e:
474+
# Platform error - database connection issue, let Temporal retry
475+
logger.warning(f"Database error deleting procurement item (will retry): {e}")
476+
raise # Let Temporal retry with activity retry policy
477+
478+
except Exception as e:
479+
# Unexpected error - log and let Temporal retry
480+
logger.error(f"Unexpected error deleting procurement item: {e}")
481+
raise
482+
483+
484+
@activity.defn
485+
async def get_procurement_item_by_name_activity(workflow_id: str, item: str) -> str:
486+
"""
487+
Retrieves a procurement item by workflow ID and item name.
488+
489+
Call this when:
490+
- You need to check the status of a specific item
491+
- You need context about an item before making decisions
492+
- Human feedback requests information about a specific item
493+
494+
Args:
495+
workflow_id: The Temporal workflow ID
496+
item: The item name (e.g., "Steel Beams")
497+
498+
Returns:
499+
JSON string of the procurement item or message if not found
500+
501+
Raises:
502+
ApplicationError: Non-retryable if input data is invalid
503+
DatabaseError: Retryable if database connection fails
504+
"""
505+
logger.info(f"Getting procurement item for workflow {workflow_id}: {item}")
506+
507+
try:
508+
result = await get_procurement_item_by_name(workflow_id, item)
509+
510+
if result is None:
511+
return f"No procurement item found for workflow {workflow_id} with item name: {item}"
512+
513+
return json.dumps(result)
514+
515+
except DataCorruptionError as e:
516+
# Application error - invalid input, don't retry
517+
logger.error(f"Data corruption error getting procurement item: {e}")
518+
raise ApplicationError(
519+
f"Invalid input getting procurement item: {e}",
520+
type="DataCorruptionError",
521+
non_retryable=True
522+
) from e
523+
524+
except DatabaseError as e:
525+
# Platform error - database connection issue, let Temporal retry
526+
logger.warning(f"Database error getting procurement item (will retry): {e}")
527+
raise # Let Temporal retry with activity retry policy
528+
529+
except Exception as e:
530+
# Unexpected error - log and let Temporal retry
531+
logger.error(f"Unexpected error getting procurement item: {e}")
532+
raise
533+
534+
535+
@activity.defn
536+
async def get_all_procurement_items_activity() -> str:
537+
"""
538+
Retrieves all procurement items from the database.
539+
540+
Call this when:
541+
- You need an overview of all procurement items
542+
- You need to check the status of multiple items
543+
- Human feedback requests a summary of all items
544+
545+
Returns:
546+
JSON string of all procurement items
547+
548+
Raises:
549+
DatabaseError: Retryable if database connection fails
550+
"""
551+
logger.info("Getting all procurement items")
552+
553+
try:
554+
results = await get_all_procurement_items()
555+
return json.dumps(results)
556+
557+
except DatabaseError as e:
558+
# Platform error - database connection issue, let Temporal retry
559+
logger.warning(f"Database error getting all procurement items (will retry): {e}")
560+
raise # Let Temporal retry with activity retry policy
561+
562+
except Exception as e:
563+
# Unexpected error - log and let Temporal retry
564+
logger.error(f"Unexpected error getting all procurement items: {e}")
312565
raise

0 commit comments

Comments
 (0)