Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,21 @@ RESEND_API_KEY=
RESEND_WEBHOOK_SECRET=
RESEND_WEBHOOK_PUBLISH_TO_REDIS=false
#RESEND_WEBHOOK_QUEUE_MAXLEN=100
#RESEND_WEBHOOK_LOCK_EXPIRE=10
##RESEND_WEBHOOK_ATTACHMENTS_DOWNLOAD_TIMEOUT=10.0
##RESEND_ATTACHMENTS_S3_ACCESS_KEY_ID=s3_access_key_id
##RESEND_ATTACHMENTS_S3_ACCESS_SECRET=s3_access_secret
##RESEND_ATTACHMENTS_S3_REGION=
##RESEND_ATTACHMENTS_S3_ENDPOINT_URL=https://s3.oss-cn-hangzhou.aliyuncs.com
##RESEND_ATTACHMENTS_S3_CONN_TIMEOUT=4.5
##RESEND_ATTACHMENTS_S3_SIGNATURE_VERSION=s3
##RESEND_ATTACHMENTS_S3_ADDRESSING_STYLE=virtual
##RESEND_ATTACHMENTS_S3_BUCKET=resend-attachments
##RESEND_ATTACHMENTS_S3_PREFIX=origin
## 0 for no presigned URL
##RESEND_ATTACHMENTS_S3_PRESIGNED_EXPIRE=3600
##RESEND_ATTACHMENTS_S3_MULTIPART_THRESHOLD=1073741824
RESEND_WEBHOOK_LOCK_EXPIRE=10
RESEND_WEBHOOK_ATTACHMENTS_DOWNLOAD_TIMEOUT=10.0

# S3
S3_ACCESS_KEY_ID=
S3_ACCESS_SECRET=""
S3_REGION=""
S3_BUCKET="resend_attachments"
S3_ENDPOINT_URL=https://s3.oss-cn-hangzhou.aliyuncs.com
S3_CONN_TIMEOUT=4.5
S3_SIGNATURE_VERSION=s3
S3_ADDRESSING_STYLE=virtual
S3_KEY_PREFIX=""
S3_MULTIPART_THRESHOLD=1073741824
S3_PRESIGNED_EXPIRE=3600 # 0 for no presigned URL

# AI API
#AI_API_KEY=openai_api_key
Expand Down
24 changes: 13 additions & 11 deletions app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ class Settings(BaseSettings):
resend_webhook_queue_maxlen: int | None = 100 # Redis Streams max length
resend_webhook_lock_expire: int = 10 # Webhook lock expire time
resend_webhook_attachments_download_timeout: float = 10.0
resend_attachments_s3_access_key_id: str | None = None
resend_attachments_s3_access_secret: SecretStr = SecretStr('')
resend_attachments_s3_region: str = ''
resend_attachments_s3_endpoint_url: HttpUrl | None = None # Aliyun OSS
resend_attachments_s3_conn_timeout: float = 4.5
resend_attachments_s3_signature_version: str = 's3'
resend_attachments_s3_addressing_style: str = 'virtual'
resend_attachments_s3_bucket: str = 'resend-attachments'
resend_attachments_s3_prefix: str = ''
resend_attachments_s3_presigned_expire: int = 3600
resend_attachments_s3_multipart_threshold: int = 1024**3

# S3
s3_access_key_id: str | None = None
s3_access_secret: SecretStr = SecretStr('')
s3_region: str = ''
s3_endpoint_url: HttpUrl | None = None # Aliyun OSS
s3_conn_timeout: float = 4.5
s3_signature_version: str = 's3'
s3_addressing_style: str = 'virtual'
s3_bucket: str = 'resend_attachments'
s3_key_prefix: str = ''
s3_multipart_threshold: int = 1024**3
s3_presigned_expire: int = 3600

# AI API
ai_api_key: SecretStr = SecretStr('')
Expand Down
11 changes: 6 additions & 5 deletions app/webhook/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from app.dependencies import get_redis_session
from app.settings import get_settings
from app.utils import pid_str
from task.celery_worker import handle_resend_email_received
from task.celery_worker import handle_resend_email_attachments_to_s3

settings = get_settings()

Expand Down Expand Up @@ -58,11 +58,11 @@ async def resend_webhook(

# Check if the email is being processed
message_id = json_data['data']['message_id']
message_lock_ck = f'{ck_prefix}:message:{message_id}'
if await redis_session.exists(message_lock_ck):
ck_message_lock = f'{ck_prefix}:message:{message_id}'
if await redis_session.exists(ck_message_lock):
logger.debug(f'Message [{message_id}] is being processed')
return JSONResponse({'success': True, 'task_id': ''})
await redis_session.set(message_lock_ck, '1', ex=settings.resend_webhook_lock_expire)
await redis_session.set(ck_message_lock, '1', ex=settings.resend_webhook_lock_expire)

task = None
match event_type:
Expand All @@ -72,7 +72,8 @@ async def resend_webhook(
logger.info(f'Email received [{email_id}] from {email_from}')

# Process the email
task = handle_resend_email_received.delay(json_data, message_lock_ck)
# task = handle_resend_email_received.delay(json_data, ck_message_lock)
task = handle_resend_email_attachments_to_s3(json_data, ck_message_lock)
logger.debug(f'Email received task {task.id} delayed')
case _:
pass
Expand Down
184 changes: 113 additions & 71 deletions task/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from redis import Redis
from sqlmodel import Session as SQLSession
from sqlmodel import create_engine
from types_boto3_s3.client import S3Client

from app.db_models import EmailAttachment, EmailWebhookEnum, EmailWebhookEventTypeEnum
from app.settings import get_settings
Expand Down Expand Up @@ -64,15 +63,28 @@
)
celery_app.config_from_object('task.celeryconfig')


# S3
s3_session = boto3.Session(
aws_access_key_id=settings.s3_access_key_id,
aws_secret_access_key=settings.s3_access_secret.get_secret_value(),
region_name=settings.s3_region,
)


ai_client = OpenAI(
api_key=settings.ai_api_key.get_secret_value(),
base_url=settings.ai_api_base_url.encoded_string() if settings.ai_api_base_url else None,
max_retries=settings.ai_api_max_retries,
)


class TaskResult(TypedDict):
ok: bool
msg: str


class HandleResendEmailReceivedResult(TypedDict):
save_to_s3: bool
ai_file_ids: dict[str, str | None]


Expand All @@ -86,46 +98,109 @@ def do_something() -> None:
logger.debug('do_something')


@celery_app.task
def handle_resend_email_attachments_to_s3(
email_data: dict[str, Any], ck_message_lock: str
) -> TaskResult:
"""Handle Resend email attachments to S3 (Save attachments to S3)."""
if settings.s3_access_key_id is None or settings.s3_access_secret.get_secret_value() is None:
return {'ok': False, 'msg': 'S3 access key ID/secret is not set'}

bucket_name = settings.s3_bucket
s3_region = settings.s3_region
s3_endpoint_url = settings.s3_endpoint_url
s3_conn_timeout = settings.s3_conn_timeout

email_id = email_data['data']['email_id']
logger.debug(f'Saving attachments to S3 for email [{email_id}]: {email_data}')

if s3_endpoint_url:
s3_client = s3_session.client(
's3',
endpoint_url=s3_endpoint_url.encoded_string(),
config=S3Config(
signature_version=settings.s3_signature_version,
s3={'addressing_style': settings.s3_addressing_style}, # pyright: ignore[reportArgumentType]
connect_timeout=s3_conn_timeout,
),
)
else:
s3_client = s3_session.client(
's3',
region_name=s3_region,
config=S3Config(connect_timeout=s3_conn_timeout),
)

attachment_list = email_data['data']['attachments']
download_timeout_config = httpx.Timeout(settings.resend_webhook_attachments_download_timeout)
ck_file_digest = f'{settings.cache_prefix}:file_digest_sha256'
with (
httpx.Client(timeout=download_timeout_config) as http_client,
SQLSession(sql_db_engine) as sql_session,
):
for attachment in attachment_list:
attachment_id = attachment['id']
content_type = attachment['content_type']
file_ext = content_type.split('/')[-1]
file_name = f'resend_{email_id}_{attachment_id}.{file_ext}'

# Fetch attachments from Resend
attachment_detail = resend.Emails.Receiving.Attachments.get(email_id, attachment_id)
attachment_response = http_client.get(attachment_detail['download_url'])

# Calculate the SHA-256 hash of the attachment
sha256_hash = sha256(attachment_response.content)
file_digest = sha256_hash.hexdigest()

s3_key = '/'.join([settings.s3_key_prefix, file_name])
s3_client.upload_fileobj(
BytesIO(attachment_response.content),
bucket_name,
s3_key,
Config=S3TransferConfig(multipart_threshold=settings.s3_multipart_threshold),
)

sql_session.add(
EmailAttachment(
webhook=EmailWebhookEnum.RESEND,
webhook_event_type=EmailWebhookEventTypeEnum.EMAIL_RECEIVED,
message_id=email_data['data']['message_id'],
email_id=email_id,
attachment_id=attachment_id,
email_subject=email_data['data']['subject'],
email_from=email_data['data']['from'],
email_to=email_data['data']['to'],
filename=attachment['filename'],
content_type=content_type,
file_size=attachment_detail['size'],
created_at=email_data['data']['created_at'],
s3_region=s3_region,
s3_bucket=bucket_name,
s3_key=s3_key,
)
)

redis_client.hset(ck_file_digest, file_name, file_digest)

sql_session.commit()

# Release the message lock
redis_client.delete(ck_message_lock)

return {'ok': True, 'msg': 'Attachments saved to S3'}


@celery_app.task
def handle_resend_email_received(
email_data: dict[str, Any], message_lock_ck: str
) -> HandleResendEmailReceivedResult:
"""Handle Resend email received event."""
save_to_s3 = settings.resend_attachments_s3_access_key_id is not None
email_id = email_data['data']['email_id']
logger.debug(f'Processing email [{email_id}]: {email_data}')

# Save attachment to S3
s3_client: S3Client | None = None
bucket_name = settings.resend_attachments_s3_bucket
if save_to_s3:
boto3_session = boto3.Session(
aws_access_key_id=settings.resend_attachments_s3_access_key_id,
aws_secret_access_key=settings.resend_attachments_s3_access_secret.get_secret_value(),
region_name=settings.resend_attachments_s3_region,
)
if settings.resend_attachments_s3_endpoint_url:
s3_client = boto3_session.client(
's3',
endpoint_url=settings.resend_attachments_s3_endpoint_url.encoded_string(),
config=S3Config(
signature_version=settings.resend_attachments_s3_signature_version,
s3={'addressing_style': settings.resend_attachments_s3_addressing_style}, # pyright: ignore[reportArgumentType]
connect_timeout=settings.resend_attachments_s3_conn_timeout,
),
)
else:
s3_client = boto3_session.client(
's3',
region_name=settings.resend_attachments_s3_region,
config=S3Config(
connect_timeout=settings.resend_attachments_s3_conn_timeout,
),
)

attachment_list = email_data['data']['attachments']
ai_file_ids: dict[str, str | None] = {}
download_timeout_config = httpx.Timeout(settings.resend_webhook_attachments_download_timeout)
ai_file_ids: dict[str, str | None] = {}
ck_file_digest = f'{settings.cache_prefix}:file_digest'
ck_ai_files = f'{settings.cache_prefix}:ai:files'
with (
Expand All @@ -138,44 +213,13 @@ def handle_resend_email_received(
file_ext = content_type.split('/')[-1]
file_name = f'resend_{email_id}_{attachment_id}.{file_ext}'

# Save attachment to S3
if s3_client:
attachment_detail = resend.Emails.Receiving.Attachments.get(email_id, attachment_id)
attachment_response = http_client.get(attachment_detail['download_url'])

# Calculate the SHA-256 hash of the attachment
sha256_hash = sha256(attachment_response.content)
file_digest = sha256_hash.hexdigest()

bucket_key = '/'.join([settings.resend_attachments_s3_prefix, file_name])
s3_client.upload_fileobj(
BytesIO(attachment_response.content),
bucket_name,
bucket_key,
Config=S3TransferConfig(
multipart_threshold=settings.resend_attachments_s3_multipart_threshold
),
)
# Fetch attachments from Resend
attachment_detail = resend.Emails.Receiving.Attachments.get(email_id, attachment_id)
attachment_response = http_client.get(attachment_detail['download_url'])

sql_session.add(
EmailAttachment(
webhook=EmailWebhookEnum.RESEND,
webhook_event_type=EmailWebhookEventTypeEnum.EMAIL_RECEIVED,
message_id=email_data['data']['message_id'],
email_id=email_id,
attachment_id=attachment_id,
email_subject=email_data['data']['subject'],
email_from=email_data['data']['from'],
email_to=email_data['data']['to'],
filename=attachment['filename'],
content_type=content_type,
file_size=attachment_detail['size'],
created_at=email_data['data']['created_at'],
s3_region=settings.resend_attachments_s3_region,
s3_bucket=bucket_name,
s3_key=bucket_key,
)
)
# Calculate the SHA-256 hash of the attachment
sha256_hash = sha256(attachment_response.content)
file_digest = sha256_hash.hexdigest()

# Upload attachments to AI
ai_file_id = redis_client.hget(ck_ai_files, file_name)
Expand Down Expand Up @@ -207,7 +251,6 @@ def handle_resend_email_received(
)
ai_file_id = ai_fileobj.id
redis_client.hset(ck_ai_files, file_name, ai_file_id)
redis_client.hset(ck_file_digest, file_name, file_digest)

ai_file_ids[file_name] = ai_file_id

Expand All @@ -217,6 +260,5 @@ def handle_resend_email_received(
redis_client.delete(message_lock_ck)

return {
'save_to_s3': save_to_s3,
'ai_file_ids': ai_file_ids,
}
Loading