Skip to content

Commit b9979a3

Browse files
Add AppSync events model
1 parent 18f20d2 commit b9979a3

4 files changed

Lines changed: 63 additions & 4 deletions

File tree

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@
2727
RequestContextV2AuthorizerJwt,
2828
RequestContextV2Http,
2929
)
30-
from .appsync import (
31-
AppSyncResolverEventModel,
32-
)
30+
from .appsync import AppSyncResolverEventModel
31+
from .appsync_events import AppSyncResolverEventsModel
3332
from .bedrock_agent import (
3433
BedrockAgentEventModel,
3534
BedrockAgentFunctionEventModel,
@@ -170,6 +169,7 @@
170169
"AlbRequestContext",
171170
"AlbRequestContextData",
172171
"AppSyncResolverEventModel",
172+
"AppSyncResolverEventsModel",
173173
"DynamoDBStreamModel",
174174
"EventBridgeModel",
175175
"DynamoDBStreamChangedRecordModel",
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from typing import Any, Dict, List, Literal, Optional
2+
3+
from pydantic import BaseModel
4+
5+
from aws_lambda_powertools.utilities.parser.models.appsync import AppSyncIdentity, AppSyncRequestModel
6+
7+
8+
class AppSyncEventsInfoChannelModel(BaseModel):
9+
path: str
10+
segments: List[str]
11+
12+
13+
class AppSyncEventsInfoChannelNamespaceModel(BaseModel):
14+
name: str
15+
16+
17+
class AppSyncEventsInfoModel(BaseModel):
18+
channel: AppSyncEventsInfoChannelModel
19+
channelNamespace: AppSyncEventsInfoChannelNamespaceModel
20+
operation: Literal["PUBLISH", "SUBSCRIBE"]
21+
22+
23+
class AppSyncEventsEventModel(BaseModel):
24+
id: str
25+
payload: Dict[str, Any]
26+
27+
28+
class AppSyncResolverEventsModel(BaseModel):
29+
identity: Optional[AppSyncIdentity] = None
30+
request: AppSyncRequestModel
31+
info: AppSyncEventsInfoModel
32+
prev: Optional[str] = None
33+
outErrors: Optional[List[str]] = None
34+
stash: Optional[Dict[str, Any]] = None
35+
events: Optional[List[AppSyncEventsEventModel]] = None

tests/events/appSyncEventsEvent.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"identity":"None",
2+
"identity": null,
33
"result":"None",
44
"request":{
55
"headers": {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import pytest
2+
3+
from aws_lambda_powertools.utilities.parser import ValidationError, parse
4+
from aws_lambda_powertools.utilities.parser.models import AppSyncResolverEventsModel
5+
from tests.functional.utils import load_event
6+
7+
8+
def test_appsync_event_model_parses_successfully():
9+
"""
10+
Validate that a valid AppSync resolver events is correctly parsed by the model.
11+
"""
12+
event = load_event("appSyncEventsEvent.json")
13+
parsed_event = parse(event=event, model=AppSyncResolverEventsModel)
14+
15+
assert isinstance(parsed_event, AppSyncResolverEventsModel)
16+
17+
18+
def test_appsync_event_model_invalid_payload_raises():
19+
"""
20+
Validate that parsing an invalid AppSync resolver events payload raises a ValidationError.
21+
"""
22+
invalid_event = {"invalid": "event"}
23+
with pytest.raises(ValidationError):
24+
parse(event=invalid_event, model=AppSyncResolverEventsModel)

0 commit comments

Comments
 (0)