Skip to content

Commit 88ce7d6

Browse files
authored
Merge pull request #2992 from ras-al-jil/ras-al-jil-feature-eventbridge-scheduler-ai-agent-trigger
New serverless pattern - eventbridge-scheduler-ai-agent-trigger
2 parents 943612c + b102109 commit 88ce7d6

8 files changed

Lines changed: 1109 additions & 0 deletions

File tree

37.6 KB
Loading
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Amazon EventBridge Scheduler to Amazon Bedrock AI Agent
2+
3+
This pattern demonstrates how to trigger an Amazon Bedrock AI Agent on a recurring schedule using Amazon EventBridge Scheduler. An orchestrator AWS Lambda function, invoked by the scheduler, sends a task payload to the Bedrock Agent, which processes the input, generates an execution summary, and persists the result to a Amazon DynamoDB table via an action group Lambda.
4+
5+
Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/eventbridge-scheduler-ai-agent-trigger
6+
7+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
8+
9+
## Requirements
10+
11+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
12+
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
13+
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
14+
* [Terraform](https://www.terraform.io/downloads.html) >= 1.0 installed
15+
16+
## Architecture
17+
18+
![Architecture Diagram](Architecture.png)
19+
20+
The pattern deploys the following resources:
21+
22+
1. **Amazon EventBridge Scheduler** – Triggers the orchestrator Lambda on a recurring schedule (default: `rate(1 hour)`).
23+
2. **Orchestrator Lambda** (Python 3.14) – Receives the scheduler event and invokes the Bedrock Agent with a task payload.
24+
3. **Amazon Bedrock Agent** – Processes the task payload, generates an execution summary using a foundation model (default: Claude 3 Haiku), and calls the action group.
25+
4. **Action Group Lambda** (Python 3.14) – Persists execution records to DynamoDB.
26+
5. **Amazon DynamoDB Table** – Stores task execution records.
27+
6. **Amazon SQS Dead-Letter Queue** – Captures failed scheduler invocations after retries are exhausted.
28+
29+
## Deployment Instructions
30+
31+
1. Clone the repository:
32+
```
33+
git clone https://github.com/aws-samples/serverless-patterns
34+
```
35+
1. Change directory to the pattern directory:
36+
```
37+
cd serverless-patterns/eventbridge-scheduler-ai-agent-trigger
38+
```
39+
1. Initialize Terraform:
40+
```
41+
terraform init
42+
```
43+
1. Deploy the infrastructure:
44+
```
45+
terraform apply -auto-approve
46+
```
47+
During the prompts, provide values for:
48+
* `aws_region` – AWS region (e.g. `us-east-1`)
49+
* `prefix` – Unique prefix for all resource names
50+
51+
1. Note the outputs from the deployment. These contain the resource names and ARNs used for testing.
52+
53+
## How it works
54+
55+
1. EventBridge Scheduler fires on the configured schedule and invokes the orchestrator Lambda with a JSON payload containing `taskType`, `scheduleName`, and `scheduledTime`.
56+
2. The orchestrator Lambda calls `bedrock-agent-runtime:InvokeAgent` with the payload, targeting the agent alias.
57+
3. The Bedrock Agent parses the payload, generates an executive summary using the foundation model, and calls the `recordTaskExecution` action group.
58+
4. The action group Lambda writes the execution record (task ID, type, scheduled time, summary, and recorded timestamp) to the DynamoDB table.
59+
5. If the scheduler invocation fails after 3 retries, the event is sent to the SQS dead-letter queue.
60+
61+
## Testing
62+
63+
1. Replace `<prefix>` with the prefix chosen during deployment and invoke the orchestrator Lambda function manually:
64+
```
65+
aws lambda invoke \
66+
--function-name <prefix>-agent-orchestrator \
67+
--payload '{"taskType":"scheduled-report","scheduleName":"manual-test","scheduledTime":"2026-03-13T10:00:00Z"}' \
68+
--cli-binary-format raw-in-base64-out \
69+
output.json
70+
```
71+
2. Check the DynamoDB table for the new execution record:
72+
```
73+
aws dynamodb scan --table-name <prefix>-agent-task-executions
74+
```
75+
76+
## Cleanup
77+
78+
1. Destroy the stack:
79+
```
80+
terraform destroy --auto-approve
81+
```
82+
1. Confirm all resources have been removed:
83+
```
84+
terraform show
85+
```
86+
----
87+
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
88+
89+
SPDX-License-Identifier: MIT-0
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import boto3
2+
import json
3+
import os
4+
import logging
5+
from datetime import datetime, timezone
6+
7+
logger = logging.getLogger()
8+
logger.setLevel(logging.INFO)
9+
10+
dynamodb = boto3.resource("dynamodb")
11+
table = dynamodb.Table(os.environ["DYNAMODB_TABLE"])
12+
13+
14+
def lambda_handler(event, context):
15+
"""
16+
Bedrock Agent Action Group Lambda.
17+
Called by the agent to persist task execution records in DynamoDB.
18+
"""
19+
logger.info("Action group event: %s", json.dumps(event))
20+
21+
api_path = event.get("apiPath", "")
22+
action_group = event.get("actionGroup", "")
23+
http_method = event.get("httpMethod", "")
24+
params = _extract_parameters(event)
25+
26+
logger.info("API path: %s | params: %s", api_path, json.dumps(params))
27+
28+
if api_path == "/record-task-execution":
29+
result = _record_task_execution(params)
30+
elif api_path == "/get-last-execution":
31+
result = _get_last_execution(params)
32+
else:
33+
result = {
34+
"statusCode": 400,
35+
"body": json.dumps({"error": f"Unknown API path: {api_path}"}),
36+
}
37+
38+
return {
39+
"messageVersion": "1.0",
40+
"response": {
41+
"actionGroup": action_group,
42+
"apiPath": api_path,
43+
"httpMethod": http_method,
44+
"httpStatusCode": result["statusCode"],
45+
"responseBody": {
46+
"application/json": {"body": result["body"]}
47+
},
48+
},
49+
}
50+
51+
52+
# ──────────────────────────────────────────
53+
# Action handlers
54+
# ──────────────────────────────────────────
55+
56+
def _record_task_execution(params: dict) -> dict:
57+
"""Write an execution record to DynamoDB."""
58+
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
59+
60+
item = {
61+
"TaskId": params.get("taskId", f"task-{now}"),
62+
"TaskType": params.get("taskType", "unknown"),
63+
"ScheduledTime": params.get("scheduledTime", now),
64+
"ExecutionSummary": params.get("executionSummary", ""),
65+
"Status": "COMPLETED",
66+
"RecordedAt": now,
67+
}
68+
69+
table.put_item(Item=item)
70+
logger.info("Recorded task execution: %s", item["TaskId"])
71+
72+
return {
73+
"statusCode": 200,
74+
"body": json.dumps(
75+
{
76+
"message": f"Task execution {item['TaskId']} recorded successfully",
77+
"taskId": item["TaskId"],
78+
"recordedAt": now,
79+
}
80+
),
81+
}
82+
83+
84+
def _get_last_execution(params: dict) -> dict:
85+
"""Scan for the most recent execution (simple approach for demo)."""
86+
task_type = params.get("taskType", "scheduled-report")
87+
88+
response = table.scan(
89+
FilterExpression="TaskType = :tt",
90+
ExpressionAttributeValues={":tt": task_type},
91+
Limit=10,
92+
)
93+
94+
items = sorted(
95+
response.get("Items", []),
96+
key=lambda x: x.get("RecordedAt", ""),
97+
reverse=True,
98+
)
99+
100+
if items:
101+
last = items[0]
102+
return {
103+
"statusCode": 200,
104+
"body": json.dumps(
105+
{
106+
"taskId": last["TaskId"],
107+
"taskType": last["TaskType"],
108+
"scheduledTime": last["ScheduledTime"],
109+
"executionSummary": last.get("ExecutionSummary", ""),
110+
"recordedAt": last["RecordedAt"],
111+
}
112+
),
113+
}
114+
115+
return {
116+
"statusCode": 404,
117+
"body": json.dumps(
118+
{"message": f"No executions found for task type: {task_type}"}
119+
),
120+
}
121+
122+
123+
# ──────────────────────────────────────────
124+
# Helpers
125+
# ──────────────────────────────────────────
126+
127+
def _extract_parameters(event: dict) -> dict:
128+
"""Pull parameters from the Bedrock Agent request body and/or parameters list."""
129+
params = {}
130+
131+
# From requestBody (POST actions)
132+
properties = (
133+
event.get("requestBody", {})
134+
.get("content", {})
135+
.get("application/json", {})
136+
.get("properties", [])
137+
)
138+
for prop in properties:
139+
params[prop["name"]] = prop.get("value", "")
140+
141+
# From top-level parameters (GET actions)
142+
for param in event.get("parameters", []):
143+
params[param["name"]] = param.get("value", "")
144+
145+
return params
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
{
2+
"openapi": "3.0.0",
3+
"info": {
4+
"title": "Scheduled Task Execution API",
5+
"version": "1.0.0",
6+
"description": "Actions for recording and retrieving scheduled AI agent task executions"
7+
},
8+
"paths": {
9+
"/record-task-execution": {
10+
"post": {
11+
"operationId": "recordTaskExecution",
12+
"summary": "Record a scheduled task execution in the tracking database",
13+
"description": "Persists a task execution record with task ID, type, timestamp, and an AI-generated summary to DynamoDB",
14+
"requestBody": {
15+
"required": true,
16+
"content": {
17+
"application/json": {
18+
"schema": {
19+
"type": "object",
20+
"required": [
21+
"taskId",
22+
"taskType",
23+
"scheduledTime",
24+
"executionSummary"
25+
],
26+
"properties": {
27+
"taskId": {
28+
"type": "string",
29+
"description": "Unique identifier for this task execution — combine scheduleName and scheduledTime"
30+
},
31+
"taskType": {
32+
"type": "string",
33+
"description": "The category of the scheduled task (e.g. scheduled-report)"
34+
},
35+
"scheduledTime": {
36+
"type": "string",
37+
"description": "ISO 8601 UTC timestamp when the task was scheduled to run"
38+
},
39+
"executionSummary": {
40+
"type": "string",
41+
"description": "AI-generated summary describing the task execution and its outcome"
42+
}
43+
}
44+
}
45+
}
46+
}
47+
},
48+
"responses": {
49+
"200": {
50+
"description": "Execution recorded successfully",
51+
"content": {
52+
"application/json": {
53+
"schema": {
54+
"type": "object",
55+
"properties": {
56+
"message": { "type": "string" },
57+
"taskId": { "type": "string" },
58+
"recordedAt": { "type": "string" }
59+
}
60+
}
61+
}
62+
}
63+
}
64+
}
65+
}
66+
},
67+
"/get-last-execution": {
68+
"get": {
69+
"operationId": "getLastExecution",
70+
"summary": "Get the most recent task execution for a given task type",
71+
"description": "Retrieves the latest execution record from DynamoDB filtered by task type",
72+
"parameters": [
73+
{
74+
"name": "taskType",
75+
"in": "query",
76+
"required": true,
77+
"schema": { "type": "string" },
78+
"description": "Task type to look up (e.g. scheduled-report)"
79+
}
80+
],
81+
"responses": {
82+
"200": {
83+
"description": "Last execution found",
84+
"content": {
85+
"application/json": {
86+
"schema": {
87+
"type": "object",
88+
"properties": {
89+
"taskId": { "type": "string" },
90+
"taskType": { "type": "string" },
91+
"scheduledTime": { "type": "string" },
92+
"executionSummary": { "type": "string" },
93+
"recordedAt": { "type": "string" }
94+
}
95+
}
96+
}
97+
}
98+
},
99+
"404": {
100+
"description": "No executions found for the given task type"
101+
}
102+
}
103+
}
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)