Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cli/magic-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ function getTypedEnvVar<T>(
fs.readFileSync("./bin/config.json").toString("utf8")
);
options.prefix = config.prefix;
options.directSend = config.directSend;
options.createCMKs = config.createCMKs;
options.retainOnDelete = config.retainOnDelete;
options.ddbDeletionProtection = config.ddbDeletionProtection;
Expand Down Expand Up @@ -802,6 +803,12 @@ async function processCreateOptions(options: any): Promise<void> {
: "Only letters, numbers, and dashes are allowed. The max length is 10 characters.";
},
},
{
type: "confirm",
name: "directSend",
message: "Do you want lambda handlers to send directly to client",
initial: options.directSend ?? false,
},
{
type: "confirm",
name: "existingVpc",
Expand Down Expand Up @@ -1820,6 +1827,7 @@ async function processCreateOptions(options: any): Promise<void> {
// Create the config object
const config = {
prefix: answers.prefix,
directSend: answers.directSend,
createCMKs: answers.createCMKs,
retainOnDelete: answers.retainOnDelete,
ddbDeletionProtection: answers.ddbDeletionProtection,
Expand Down
11 changes: 11 additions & 0 deletions docs/guide/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ The configuration will allow you to define what AWS resource to create. To get a
### Prefix
Set a prefix to the resource names including the CloudFormation stack name. It is usefull if you plan to deploy this project multiple times in the same AWS account and region.

### Direct Send
When enabled, Lambda handlers send tokens directly to clients via AppSync, bypassing the SNS/SQS messaging path. This reduces latency for token streaming in LLM responses.

**Default:** Disabled

**When to enable:**
- You need lower latency for streaming responses
- Your deployment has high token throughput requirements

**Note:** This feature requires AppSync GraphQL API permissions for Lambda functions.

### Use an existing Amazon VPC
Add the project to an existing [Amazon Virtual Private Cloud (Amazon VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html). Note the VPC has to have private subnets that can connect to the Internet. (For example when crawling a website to populate a RAG Workspace.)

Expand Down
3 changes: 3 additions & 0 deletions lib/aws-genai-llm-chatbot-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export class AwsGenAILLMChatbotStack extends cdk.Stack {
byUserIdIndex: chatBotApi.byUserIdIndex,
applicationTable: chatBotApi.applicationTable,
chatbotFilesBucket: chatBotApi.filesBucket,
graphqlApi: chatBotApi.graphqlApi,
});

// Route all incoming messages targeted to langchain to the langchain model interface queue
Expand Down Expand Up @@ -125,6 +126,7 @@ export class AwsGenAILLMChatbotStack extends cdk.Stack {
sessionsTable: chatBotApi.sessionsTable,
byUserIdIndex: chatBotApi.byUserIdIndex,
chatbotFilesBucket: chatBotApi.filesBucket,
graphqlApi: chatBotApi.graphqlApi,
}
);

Expand Down Expand Up @@ -164,6 +166,7 @@ export class AwsGenAILLMChatbotStack extends cdk.Stack {
sessionsTable: chatBotApi.sessionsTable,
byUserIdIndex: chatBotApi.byUserIdIndex,
chatbotFilesBucket: chatBotApi.filesBucket,
graphqlApi: chatBotApi.graphqlApi,
});

// Route all incoming messages targeted to idefics to the idefics model interface queue
Expand Down
6 changes: 6 additions & 0 deletions lib/model-interfaces/bedrock-agents/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as logs from "aws-cdk-lib/aws-logs";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as sns from "aws-cdk-lib/aws-sns";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as appsync from "aws-cdk-lib/aws-appsync";
import { Construct } from "constructs";
import * as path from "path";
import { Shared } from "../../shared";
Expand All @@ -19,6 +20,7 @@ interface BedrockAgentsInterfaceProps {
readonly sessionsTable: dynamodb.Table;
readonly byUserIdIndex: string;
readonly chatbotFilesBucket: s3.Bucket;
readonly graphqlApi: appsync.GraphqlApi;
}

export class BedrockAgentsInterface extends Construct {
Expand Down Expand Up @@ -60,6 +62,8 @@ export class BedrockAgentsInterface extends Construct {
SESSIONS_BY_USER_ID_INDEX_NAME: props.byUserIdIndex,
CHATBOT_FILES_BUCKET_NAME: props.chatbotFilesBucket.bucketName,
MESSAGES_TOPIC_ARN: props.messagesTopic.topicArn,
APPSYNC_ENDPOINT: props.graphqlApi.graphqlUrl,
DIRECT_SEND: props.config.directSend ? "true" : "false",
},
});

Expand All @@ -74,6 +78,8 @@ export class BedrockAgentsInterface extends Construct {
props.sessionsTable.grantReadWriteData(requestHandler);
props.chatbotFilesBucket.grantRead(requestHandler);
props.messagesTopic.grantPublish(requestHandler);
props.graphqlApi.grantQuery(requestHandler);
props.graphqlApi.grantMutation(requestHandler);

requestHandler.addToRolePolicy(
new iam.PolicyStatement({
Expand Down
7 changes: 6 additions & 1 deletion lib/model-interfaces/idefics/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as s3 from "aws-cdk-lib/aws-s3";
import { CfnEndpoint } from "aws-cdk-lib/aws-sagemaker";
import * as sns from "aws-cdk-lib/aws-sns";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as appsync from "aws-cdk-lib/aws-appsync";
import { Construct } from "constructs";
import * as path from "path";
import { Shared } from "../../shared";
Expand All @@ -23,6 +24,7 @@ interface IdeficsInterfaceProps {
readonly sessionsTable: dynamodb.Table;
readonly byUserIdIndex: string;
readonly chatbotFilesBucket: s3.Bucket;
readonly graphqlApi: appsync.GraphqlApi;
}

export class IdeficsInterface extends Construct {
Expand Down Expand Up @@ -68,6 +70,8 @@ export class IdeficsInterface extends Construct {
MESSAGES_TOPIC_ARN: props.messagesTopic.topicArn,
CHATBOT_FILES_BUCKET_NAME: props.chatbotFilesBucket.bucketName,
CHATBOT_FILES_PRIVATE_API: api?.url ?? "",
APPSYNC_ENDPOINT: props.graphqlApi.graphqlUrl,
DIRECT_SEND: props.config.directSend ? "true" : "false",
},
}
);
Expand All @@ -79,7 +83,8 @@ export class IdeficsInterface extends Construct {
})
);
}

props.graphqlApi.grantQuery(requestHandler);
props.graphqlApi.grantMutation(requestHandler);
props.chatbotFilesBucket.grantReadWrite(requestHandler);
props.sessionsTable.grantReadWriteData(requestHandler);
props.messagesTopic.grantPublish(requestHandler);
Expand Down
7 changes: 6 additions & 1 deletion lib/model-interfaces/langchain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as logs from "aws-cdk-lib/aws-logs";
import { CfnEndpoint } from "aws-cdk-lib/aws-sagemaker";
import * as sns from "aws-cdk-lib/aws-sns";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as appsync from "aws-cdk-lib/aws-appsync";
import { Construct } from "constructs";
import * as path from "path";
import { RagEngines } from "../../rag-engines";
Expand All @@ -24,6 +25,7 @@ interface LangChainInterfaceProps {
readonly byUserIdIndex: string;
readonly applicationTable: dynamodb.Table;
readonly chatbotFilesBucket: s3.Bucket;
readonly graphqlApi: appsync.GraphqlApi;
}

export class LangChainInterface extends Construct {
Expand Down Expand Up @@ -59,6 +61,8 @@ export class LangChainInterface extends Construct {
APPLICATIONS_TABLE_NAME: props.applicationTable.tableName,
API_KEYS_SECRETS_ARN: props.shared.apiKeysSecret.secretArn,
MESSAGES_TOPIC_ARN: props.messagesTopic.topicArn,
APPSYNC_ENDPOINT: props.graphqlApi.graphqlUrl,
DIRECT_SEND: props.config.directSend ? "true" : "false",
WORKSPACES_TABLE_NAME:
props.ragEngines?.workspacesTable.tableName ?? "",
WORKSPACES_BY_OBJECT_TYPE_INDEX_NAME:
Expand Down Expand Up @@ -259,7 +263,8 @@ export class LangChainInterface extends Construct {
}
}
}

props.graphqlApi.grantMutation(requestHandler);
props.graphqlApi.grantQuery(requestHandler);
props.sessionsTable.grantReadWriteData(requestHandler);
props.applicationTable.grantReadWriteData(requestHandler);
props.messagesTopic.grantPublish(requestHandler);
Expand Down
86 changes: 86 additions & 0 deletions lib/shared/layers/python-sdk/python/genai_core/utils/appsync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json
import os
from urllib.parse import urlparse
import boto3
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from aws_lambda_powertools import Logger, Tracer
import requests
import datetime

logger = Logger()
tracer = Tracer()

AWS_REGION = os.environ["AWS_REGION"]
APPSYNC_ENDPOINT = os.environ["APPSYNC_ENDPOINT"]


def query(user_id, session_id, data_object):
# Create the data object and convert it to a JSON string

# Convert to JSON string and escape quotes
data_json = json.dumps(data_object).replace('"', '\\"')

return f"""mutation Mutation {{
publishResponse(
data: "{data_json}",
sessionId: "{session_id}",
userId: "{user_id}"
) {{
data
sessionId
userId
}}
}}"""


@tracer.capture_method
def direct_send_to_client(data):
query_string = query(
user_id=data["userId"],
session_id=data["data"]["sessionId"],
data_object=data,
)
method = "POST"
service = "appsync"
url = APPSYNC_ENDPOINT
region = AWS_REGION
host = urlparse(APPSYNC_ENDPOINT).netloc
session = boto3.Session()

# Create the request with the current timestamp
request = AWSRequest(
method, url, headers={"Host": host, "Content-Type": "application/json"}
)

# Set the timestamp in the request context
request.context["timestamp"] = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")

# Get the SigV4 signer
signer = SigV4Auth(session.get_credentials(), service, region)

# Add logging before signing
payload = json.dumps({"query": query_string.strip(), "variables": {}})
request.data = payload.encode("utf-8")

# Add auth headers
signer.add_auth(request)

try:
response = requests.request(
method,
url,
headers=dict(request.headers),
data=payload,
timeout=5,
)

if response.status_code != 200:
logger.error("AppSync request failed",
status=response.status_code,
error=response.text)

return response
except Exception as e:
logger.error("AppSync request exception", error=str(e))
raise
17 changes: 13 additions & 4 deletions lib/shared/layers/python-sdk/python/genai_core/utils/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
from decimal import Decimal

import boto3

from genai_core.utils.appsync import direct_send_to_client
from genai_core.types import ChatbotAction
from ..types import Direction

sns = boto3.client("sns")

topic_arn = os.environ["MESSAGES_TOPIC_ARN"]
direct = os.environ.get("DIRECT_SEND", "false").lower() == "true"


# Custom JSON encoder to handle bytes, EventStream, and other non-serializable types
class CustomJSONEncoder(json.JSONEncoder):
Expand All @@ -29,7 +35,10 @@ def send_to_client(detail, topic_arn=None):
if not topic_arn:
topic_arn = os.environ["MESSAGES_TOPIC_ARN"]

sns.publish(
TopicArn=topic_arn,
Message=json.dumps(detail, cls=CustomJSONEncoder),
)
if direct and detail["action"] == ChatbotAction.LLM_NEW_TOKEN.value:
direct_send_to_client(detail)
else:
sns.publish(
TopicArn=topic_arn,
Message=json.dumps(detail, cls=CustomJSONEncoder),
)
1 change: 1 addition & 0 deletions lib/shared/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export interface ModelConfig {

export interface SystemConfig {
prefix: string;
directSend?: boolean;
createCMKs?: boolean;
retainOnDelete?: boolean;
ddbDeletionProtection?: boolean;
Expand Down
58 changes: 58 additions & 0 deletions tests/shared/test_appsync_direct_send.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pytest
from unittest.mock import patch, MagicMock
import json


@pytest.fixture
def sample_message():
return {
"type": "text",
"action": "llm_new_token",
"userId": "test-user-123",
"timestamp": "1234567890",
"data": {
"sessionId": "test-session-456",
"token": {"runId": "run-789", "sequenceNumber": 1, "value": "Hello"},
},
"direction": "OUT",
}


@patch("genai_core.utils.appsync.requests.request")
@patch("genai_core.utils.appsync.boto3.Session")
def test_direct_send_formats_mutation_correctly(mock_session, mock_request, sample_message):
"""Test that direct_send_to_client formats GraphQL mutation correctly"""
from genai_core.utils.appsync import direct_send_to_client

mock_response = MagicMock()
mock_response.status_code = 200
mock_response.content = json.dumps({"data": {"publishResponse": {"data": "success"}}}).encode()
mock_request.return_value = mock_response

mock_credentials = MagicMock()
mock_session.return_value.get_credentials.return_value = mock_credentials

response = direct_send_to_client(sample_message)

assert response.status_code == 200
assert mock_request.called

# Verify mutation structure
call_args = mock_request.call_args
payload = json.loads(call_args[1]["data"])
assert "mutation Mutation" in payload["query"]
assert "publishResponse" in payload["query"]


@patch("genai_core.utils.appsync.requests.request")
@patch("genai_core.utils.appsync.boto3.Session")
def test_direct_send_handles_errors(mock_session, mock_request, sample_message):
"""Test that direct_send_to_client handles errors gracefully"""
from genai_core.utils.appsync import direct_send_to_client

mock_request.side_effect = Exception("Network error")
mock_credentials = MagicMock()
mock_session.return_value.get_credentials.return_value = mock_credentials

with pytest.raises(Exception, match="Network error"):
direct_send_to_client(sample_message)
Loading
Loading