Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions docs/IDENTITY_POLICY.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,21 @@ AgentCore Policy is a service that controls what your AI agents are allowed to d

## Architecture / Flow

The identity propagation flow has six steps:
The identity propagation flow has seven steps:

```
1. User logs in → Frontend gets JWT from Cognito
2. Frontend sends request → Runtime validates JWT, extracts user_id (sub claim)
3. Runtime calls Cognito /oauth2/token with aws_client_metadata containing user_id
4. Cognito V3 Pre-Token Lambda fires → reads user_id → injects department/role claims into M2M token
5. Runtime calls Gateway tool with the enriched M2M token
6. Gateway's CUSTOM_JWT Authorizer maps token claims to Cedar principal tags → Policy Engine evaluates Cedar policy → allow or deny
3. Runtime resolves the user's email from the sub (Cognito ListUsers), since the access token carries no email claim
4. Runtime calls Cognito /oauth2/token with aws_client_metadata containing user_id (sub) and email
5. Cognito V3 Pre-Token Lambda fires → reads email → injects department/role claims into M2M token
6. Runtime calls Gateway tool with the enriched M2M token
7. Gateway's CUSTOM_JWT Authorizer maps token claims to Cedar principal tags → Policy Engine evaluates Cedar policy → allow or deny
```

Key security property: the `user_id` comes from the validated JWT in the Runtime's Session Context (`sub` claim), not from the LLM or request payload. This ensures the identity chain is cryptographically secure end-to-end.
Key security property: the `user_id` comes from the validated JWT in the Runtime's Session Context (`sub` claim), not from the LLM or request payload. This ensures the identity chain is cryptographically secure end-to-end. The email used for group assignment is resolved server-side from that same `sub` (never taken from the payload), so it inherits the same integrity guarantee.

> **Why resolve the email separately?** The `sub` claim is an opaque UUID, so it can never contain a substring like `fastprojectadmin`. The Cognito **access token** sent to the Runtime does not include an `email` claim either. To drive email-based group assignment, the Runtime looks the email up from the `sub` via the Cognito `ListUsers` API (see `get_user_email` in `patterns/utils/auth.py`) and propagates it as `verified_email`.

## Components

Expand All @@ -70,7 +73,7 @@ The Cognito User Pool is configured with `featurePlan: ESSENTIALS`. This is requ

This Lambda fires on every token generation event (both user login and M2M). It only processes M2M flows (`TokenGeneration_ClientCredentials`) and skips user login flows.

For M2M flows, it reads `verified_user_id` from `clientMetadata` and assigns department/role claims based on the user's identity:
For M2M flows, it reads `verified_email` from `clientMetadata` and assigns department/role claims based on the email (the `verified_user_id` sub is also available as a stable identifier):

| User Email Contains | Department | Role |
|---------------------|------------|------|
Expand Down Expand Up @@ -300,11 +303,12 @@ def lambda_handler(event, context):

# Existing user identity logic (unchanged)
meta = event["request"].get("clientMetadata", {})
user_id = meta.get("verified_user_id", "")
user_id = meta.get("verified_user_id", "") # Cognito sub (UUID)
user_email = meta.get("verified_email", "") # resolved from sub server-side

if "fastprojectadmin" in user_id.lower():
if "fastprojectadmin" in user_email.lower():
department, role = "finance", "admin"
elif "fastuser" in user_id.lower():
elif "fastuser" in user_email.lower():
department, role = "engineering", "developer"
else:
department, role = "guest", "viewer"
Expand Down
35 changes: 28 additions & 7 deletions infra-cdk/lambdas/pretoken-v3/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,26 @@
are passed through unchanged.

Custom claims injected (application-defined, not standard JWT/OIDC claims):
- user_id: The authenticated user's ID (e.g., "yourname@company.com")
- user_id: The authenticated user's stable Cognito sub (a UUID)
- department: The user's department (e.g., "finance")
- role: The user's role (e.g., "admin")

These claim names are arbitrary — you can define any names you need.
Just ensure the names match between this Lambda's output and the Cedar
policy's principal.getTag() references.

The verified_user_id is read from clientMetadata, which is passed via the
Two values are read from clientMetadata, both passed via the
aws_client_metadata parameter in the direct Cognito /oauth2/token call
(see patterns/utils/auth.py — get_gateway_access_token).
(see patterns/utils/auth.py — get_gateway_access_token):
- verified_user_id: the Cognito sub (UUID), a stable opaque identifier
- verified_email: the user's email, resolved from the sub server-side

Group assignment below is keyed off the EMAIL (verified_email), not the sub.
The JWT sub claim is an opaque UUID and never contains a substring like
"fastprojectadmin", so matching against the sub would assign every user to
the default "guest" group. The email is resolved from the sub in
get_gateway_access_token (the access token sent to the Runtime carries no
email claim) and passed here as verified_email.

Group assignment is hardcoded for demo purposes:
- fastprojectadmin@* → department: "finance", role: "admin"
Expand Down Expand Up @@ -52,31 +61,43 @@ def lambda_handler(event: dict, context: dict) -> dict:
print("[PRE-TOKEN] Not a Client Credentials flow - skipping")
return event

# Get verified user_id from clientMetadata
# This is passed via aws_client_metadata in the direct Cognito /oauth2/token call
# Read identity values from clientMetadata. Both are passed via
# aws_client_metadata in the direct Cognito /oauth2/token call.
# verified_user_id: the Cognito sub (UUID), a stable opaque identifier
# verified_email: the user's email, used for the demo group mapping
meta = event["request"].get("clientMetadata", {})
user_id = meta.get("verified_user_id", "")
user_email = meta.get("verified_email", "")

if user_id:
print("[PRE-TOKEN] Processing M2M token - verified_user_id received")
else:
print("[PRE-TOKEN] Processing M2M token - no verified_user_id in metadata")
if not user_email:
print(
"[PRE-TOKEN] No verified_email in metadata - "
"group assignment will fall back to the default group"
)

# Demo identity assignment for Cedar policy evaluation.
# Replace this logic with a DynamoDB lookup, directory service query,
# or other identity provider for real deployments.
#
# NOTE: assignment is keyed off the EMAIL, not the sub. The sub is an opaque
# UUID and never contains "fastprojectadmin"/"fastuser", so matching against
# it would send everyone to the default "guest" group.
#
# The Cedar policy (gateway/policies/policy.cedar) has two versions:
# V1: permits all departments including "guest"
# V2: permits only "finance" and "engineering" (guest is denied)
#
# To test different access levels, change the assignment logic below
# and update the Cedar policy to match.
if "fastprojectadmin" in user_id.lower():
if "fastprojectadmin" in user_email.lower():
department = "finance"
role = "admin"
print("[PRE-TOKEN] Assigned: department=finance, role=admin")
elif "fastuser" in user_id.lower():
elif "fastuser" in user_email.lower():
department = "engineering"
role = "developer"
print("[PRE-TOKEN] Assigned: department=engineering, role=developer")
Expand Down
16 changes: 16 additions & 0 deletions infra-cdk/lib/backend-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,22 @@ export class BackendStack extends cdk.NestedStack {
})
)

// Add Cognito permission to resolve a user's email from their sub.
// The access token sent to the Runtime carries no email claim, so the
// agent resolves the email via ListUsers (filtered by sub) to drive
// email-based group assignment in the Pre-Token Lambda. See
// patterns/utils/auth.py — get_user_email.
agentRole.addToPolicy(
new iam.PolicyStatement({
sid: "CognitoListUsersForEmailLookup",
effect: iam.Effect.ALLOW,
actions: ["cognito-idp:ListUsers"],
resources: [
`arn:aws:cognito-idp:${this.region}:${this.account}:userpool/${this.userPoolId}`,
],
})
)

// Add Code Interpreter permissions
agentRole.addToPolicy(
new iam.PolicyStatement({
Expand Down
79 changes: 75 additions & 4 deletions patterns/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,60 @@ def extract_user_id_from_context(context: RequestContext) -> str:
return user_id


def get_user_email(user_id: str) -> str:
"""
Resolve a user's email address from their Cognito ``sub`` (user ID).

The JWT ``sub`` claim is an opaque UUID, not the email address. When group
assignment in the Pre-Token Lambda is driven by the email (e.g. the demo
mapping ``fastprojectadmin`` -> finance), the email must be looked up
separately. The access token sent to the Runtime does not carry an ``email``
claim, so this resolves it via the Cognito ``ListUsers`` API filtered by
``sub``.

Args:
user_id (str): The authenticated user's ID (``sub`` claim from the
validated JWT).

Returns:
str: The user's email address, or an empty string if it cannot be
resolved (so callers can fall back to the ``sub`` without failing
the request).
"""
stack_name = os.environ["STACK_NAME"]
region = os.environ.get(
"AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
)

try:
user_pool_id = get_ssm_parameter(f"/{stack_name}/cognito-user-pool-id")
cognito = boto3.client("cognito-idp", region_name=region)
# ListUsers with a sub filter is the documented way to find a user by
# their immutable sub. AdminGetUser requires the username, which for a
# pool with email as the username attribute is itself the sub-derived
# UUID, so ListUsers keeps this robust across pool configurations.
response = cognito.list_users(
UserPoolId=user_pool_id,
Filter=f'sub = "{user_id}"',
Limit=1,
)
users = response.get("Users", [])
if not users:
logger.warning("No Cognito user found for sub: %s", user_id)
return ""
for attr in users[0].get("Attributes", []):
if attr["Name"] == "email":
return attr["Value"]
logger.warning("Cognito user %s has no email attribute", user_id)
return ""
except Exception as e:
# Email resolution is best-effort: never fail the request just because
# the lookup failed. Callers fall back to the sub (which yields the
# default group in the Pre-Token Lambda).
logger.warning("Failed to resolve email for sub %s: %s", user_id, e)
return ""


def get_secret(secret_name: str) -> str:
"""
Fetch a secret value from AWS Secrets Manager.
Expand Down Expand Up @@ -142,6 +196,12 @@ def get_gateway_access_token(user_id: str) -> str:
(extracted by extract_user_id_from_context). This ensures the identity chain
is cryptographically secure end-to-end.

The user_id is the opaque Cognito ``sub`` (a UUID), so it is unsuitable for
email-based group assignment on its own. The user's email is resolved from
the ``sub`` (see get_user_email) and propagated as ``verified_email`` so the
Pre-Token Lambda can assign department/role from the email. The ``sub`` is
still propagated as ``verified_user_id`` for use as a stable identifier.

Args:
user_id (str): The authenticated user's ID (sub claim from validated JWT).

Expand Down Expand Up @@ -182,10 +242,21 @@ def get_gateway_access_token(user_id: str) -> str:
"Content-Type": "application/x-www-form-urlencoded",
}

# Include aws_client_metadata with verified_user_id so the Cognito V3
# Pre-Token Lambda can read it and inject user-specific claims into the
# M2M access token. This is the bridge between user auth and M2M auth.
client_metadata = json.dumps({"verified_user_id": user_id})
# Resolve the user's email from the sub. The access token carries no email
# claim, and the sub is a UUID, so email-based group assignment in the
# Pre-Token Lambda needs this explicit lookup.
user_email = get_user_email(user_id)
# Avoid logging the email itself (PII); log only whether it was resolved.
logger.info("Email resolved for group assignment: %s", bool(user_email))

# Include aws_client_metadata so the Cognito V3 Pre-Token Lambda can read it
# and inject user-specific claims into the M2M access token. This is the
# bridge between user auth and M2M auth.
# verified_user_id: the stable Cognito sub (UUID)
# verified_email: the email, used for the demo's email-based group mapping
client_metadata = json.dumps(
{"verified_user_id": user_id, "verified_email": user_email}
)

data = {
"grant_type": "client_credentials",
Expand Down
Loading