Skip to content

Commit 1102330

Browse files
committed
fix (tasks): triggered tasks composio setup
1 parent cae5ade commit 1102330

3 files changed

Lines changed: 21 additions & 13 deletions

File tree

src/server/main/integrations/routes.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -420,27 +420,33 @@ async def composio_webhook(request: Request):
420420
"""
421421
try:
422422
payload = await request.json()
423-
user_id = payload.get("userId")
424-
trigger_slug = payload.get("triggerSlug")
425-
event_data = payload.get("payload")
423+
# Correctly parse the payload based on the provided sample structure
424+
event_data = payload.get("data")
425+
if not event_data:
426+
raise HTTPException(status_code=400, detail="Webhook payload missing 'data' object.")
426427

427-
if not all([user_id, trigger_slug, event_data]):
428-
raise HTTPException(status_code=400, detail="Missing required fields in webhook payload.")
428+
user_id = event_data.get("user_id")
429+
trigger_type = payload.get("type")
429430

431+
if not all([user_id, trigger_type, event_data]):
432+
raise HTTPException(status_code=400, detail="Missing required fields (user_id, type) in webhook payload.")
433+
434+
# Map from Composio's trigger type to our internal service_name and event_type
435+
# Using lowercase to match the sample payload
430436
service_name_map = {
431-
"GOOGLECALENDAR_GOOGLE_CALENDAR_EVENT_SYNC_TRIGGER": "gcalendar",
432-
"GMAIL_NEW_GMAIL_MESSAGE": "gmail"
437+
"googlecalendar_google_calendar_event_sync_trigger": "gcalendar",
438+
"gmail_new_gmail_message": "gmail"
433439
}
434440
event_type_map = {
435-
"GOOGLECALENDAR_GOOGLE_CALENDAR_EVENT_SYNC_TRIGGER": "new_event",
436-
"GMAIL_NEW_GMAIL_MESSAGE": "new_email"
441+
"googlecalendar_google_calendar_event_sync_trigger": "new_event",
442+
"gmail_new_gmail_message": "new_email"
437443
}
438444

439-
service_name = service_name_map.get(trigger_slug)
440-
event_type = event_type_map.get(trigger_slug)
445+
service_name = service_name_map.get(trigger_type)
446+
event_type = event_type_map.get(trigger_type)
441447

442448
if not service_name:
443-
logger.warning(f"Received webhook for unhandled trigger slug: {trigger_slug}")
449+
logger.warning(f"Received webhook for unhandled trigger type: {trigger_type}")
444450
return JSONResponse(content={"status": "ignored", "reason": "unhandled trigger"})
445451

446452
logger.info(f"Received Composio trigger for user '{user_id}' - Service: '{service_name}', Event: '{event_type}'")
@@ -461,7 +467,7 @@ async def composio_webhook(request: Request):
461467
logger.info(f"Event for user '{user_id}' was discarded by the pre-filter.")
462468
return JSONResponse(content={"status": "ignored", "reason": "pre-filter discard"})
463469

464-
# 3. Dispatch to Celery worker
470+
# 3. Dispatch to a Celery task that handles finding and running all matching triggered workflows.
465471
execute_triggered_task.delay(
466472
user_id=user_id,
467473
source=service_name,

src/server/main/tasks/prompts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
- `source`: The service that triggers the workflow (e.g., "gmail", "gcalendar").
2828
- `event`: The specific event (e.g., "new_email", "new_event").
2929
- `filter`: A dictionary of conditions to match (e.g., `{{"from": "boss@example.com"}}`).
30+
The task will execute *after* the trigger occurs, using the event data (like the email content) as context.
3031
- CRUCIAL DISTINCTION: Differentiate between the *task's execution time* (`run_at`) and the *event's time* mentioned in the prompt. A task to arrange a future event (e.g., 'book a flight for next month', 'schedule a meeting for Friday') should be executed *now* to make the arrangement. Therefore, its `run_at` should be null, since setting run_at to null makes the task run immediately. The future date belongs in the task `description`.
3132
- Ambiguity: Phrases like "weekly hourly" are ambiguous. Interpret "weekly" as the frequency and ignore "hourly".
3233
- Use the current time and user's timezone to resolve relative dates like "tomorrow", "next Friday at 2pm", etc. correctly.

src/server/workers/planner/prompts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
6. Anticipate Information Gaps: If crucial information is still missing after checking context, the first step should be to use a tool to find it (e.g., `internet_search` for public information, `gpeople` for contacts, `memory` for personal information, `gcalendar` for upcoming events and so on).
1616
7. Output a Clear Plan: Your final output must be a single, valid JSON object containing a concise description of the overall goal and a list of specific, actionable steps for the executor.
1717
8. If the task is scheduled or recurring, only plan for an indivisual occurrence, not the entire series. The executor will handle scheduling. For example, if the user asks you to "Send a news summary every day at 8 AM", your plan should only include the steps for the indivisual run such as "Search for the news", "Summarize the news", "Send the news on WhatsApp". The executor will then handle the scheduling for future occurrences. Do NOT include any steps using the `gcalendar` tool to create a recurring or scheduled events.
18+
9. If the task is triggered by an event (like a new email or calendar event), your plan should focus on the immediate actions to take after that event has occured, such as "Search for the email", "Extract relevant information", "Send a summary to the user". The executor will handle the triggering mechanism. The executor is receiving the incoming email or calendar event as input, along with your plan so write each plan step as if the event has already occured and you are now processing it. For example, if the triggered task is to send a summary for each new email, your plan should be to directly read the email and summarize it, you do not need to search for the email as it is part of the input trigger.
1819
1920
Here is the complete list of services (tools) available to the executor agent, that you can use in your plan:
2021
{{

0 commit comments

Comments
 (0)