diff --git a/README.md b/README.md index bada313..76a96a3 100644 --- a/README.md +++ b/README.md @@ -70,20 +70,21 @@ RESEND_API_KEY= RESEND_WEBHOOK_SECRET= RESEND_WEBHOOK_PUBLISH_TO_REDIS=false #RESEND_WEBHOOK_QUEUE_MAXLEN=100 -#RESEND_WEBHOOK_LOCK_EXPIRE=10 -##RESEND_WEBHOOK_ATTACHMENTS_DOWNLOAD_TIMEOUT=10.0 -##RESEND_ATTACHMENTS_S3_ACCESS_KEY_ID=s3_access_key_id -##RESEND_ATTACHMENTS_S3_ACCESS_SECRET=s3_access_secret -##RESEND_ATTACHMENTS_S3_REGION= -##RESEND_ATTACHMENTS_S3_ENDPOINT_URL=https://s3.oss-cn-hangzhou.aliyuncs.com -##RESEND_ATTACHMENTS_S3_CONN_TIMEOUT=4.5 -##RESEND_ATTACHMENTS_S3_SIGNATURE_VERSION=s3 -##RESEND_ATTACHMENTS_S3_ADDRESSING_STYLE=virtual -##RESEND_ATTACHMENTS_S3_BUCKET=resend-attachments -##RESEND_ATTACHMENTS_S3_PREFIX=origin -## 0 for no presigned URL -##RESEND_ATTACHMENTS_S3_PRESIGNED_EXPIRE=3600 -##RESEND_ATTACHMENTS_S3_MULTIPART_THRESHOLD=1073741824 +RESEND_WEBHOOK_LOCK_EXPIRE=10 +RESEND_WEBHOOK_ATTACHMENTS_DOWNLOAD_TIMEOUT=10.0 + +# S3 +S3_ACCESS_KEY_ID= +S3_ACCESS_SECRET="" +S3_REGION="" +S3_BUCKET="resend_attachments" +S3_ENDPOINT_URL=https://s3.oss-cn-hangzhou.aliyuncs.com +S3_CONN_TIMEOUT=4.5 +S3_SIGNATURE_VERSION=s3 +S3_ADDRESSING_STYLE=virtual +S3_KEY_PREFIX="" +S3_MULTIPART_THRESHOLD=1073741824 +S3_PRESIGNED_EXPIRE=3600 # 0 for no presigned URL # AI API #AI_API_KEY=openai_api_key diff --git a/app/settings.py b/app/settings.py index 344f5d3..09bffc4 100644 --- a/app/settings.py +++ b/app/settings.py @@ -51,17 +51,19 @@ class Settings(BaseSettings): resend_webhook_queue_maxlen: int | None = 100 # Redis Streams max length resend_webhook_lock_expire: int = 10 # Webhook lock expire time resend_webhook_attachments_download_timeout: float = 10.0 - resend_attachments_s3_access_key_id: str | None = None - resend_attachments_s3_access_secret: SecretStr = SecretStr('') - resend_attachments_s3_region: str = '' - resend_attachments_s3_endpoint_url: HttpUrl | None = None # Aliyun OSS - resend_attachments_s3_conn_timeout: float = 4.5 - resend_attachments_s3_signature_version: str = 's3' - resend_attachments_s3_addressing_style: str = 'virtual' - resend_attachments_s3_bucket: str = 'resend-attachments' - resend_attachments_s3_prefix: str = '' - resend_attachments_s3_presigned_expire: int = 3600 - resend_attachments_s3_multipart_threshold: int = 1024**3 + + # S3 + s3_access_key_id: str | None = None + s3_access_secret: SecretStr = SecretStr('') + s3_region: str = '' + s3_endpoint_url: HttpUrl | None = None # Aliyun OSS + s3_conn_timeout: float = 4.5 + s3_signature_version: str = 's3' + s3_addressing_style: str = 'virtual' + s3_bucket: str = 'resend_attachments' + s3_key_prefix: str = '' + s3_multipart_threshold: int = 1024**3 + s3_presigned_expire: int = 3600 # AI API ai_api_key: SecretStr = SecretStr('') diff --git a/app/webhook/api.py b/app/webhook/api.py index ec8749f..3083b26 100644 --- a/app/webhook/api.py +++ b/app/webhook/api.py @@ -14,7 +14,7 @@ from app.dependencies import get_redis_session from app.settings import get_settings from app.utils import pid_str -from task.celery_worker import handle_resend_email_received +from task.celery_worker import handle_resend_email_attachments_to_s3 settings = get_settings() @@ -58,11 +58,11 @@ async def resend_webhook( # Check if the email is being processed message_id = json_data['data']['message_id'] - message_lock_ck = f'{ck_prefix}:message:{message_id}' - if await redis_session.exists(message_lock_ck): + ck_message_lock = f'{ck_prefix}:message:{message_id}' + if await redis_session.exists(ck_message_lock): logger.debug(f'Message [{message_id}] is being processed') return JSONResponse({'success': True, 'task_id': ''}) - await redis_session.set(message_lock_ck, '1', ex=settings.resend_webhook_lock_expire) + await redis_session.set(ck_message_lock, '1', ex=settings.resend_webhook_lock_expire) task = None match event_type: @@ -72,7 +72,8 @@ async def resend_webhook( logger.info(f'Email received [{email_id}] from {email_from}') # Process the email - task = handle_resend_email_received.delay(json_data, message_lock_ck) + # task = handle_resend_email_received.delay(json_data, ck_message_lock) + task = handle_resend_email_attachments_to_s3(json_data, ck_message_lock) logger.debug(f'Email received task {task.id} delayed') case _: pass diff --git a/task/celery_worker.py b/task/celery_worker.py index ced1814..ae18999 100644 --- a/task/celery_worker.py +++ b/task/celery_worker.py @@ -17,7 +17,6 @@ from redis import Redis from sqlmodel import Session as SQLSession from sqlmodel import create_engine -from types_boto3_s3.client import S3Client from app.db_models import EmailAttachment, EmailWebhookEnum, EmailWebhookEventTypeEnum from app.settings import get_settings @@ -64,6 +63,15 @@ ) celery_app.config_from_object('task.celeryconfig') + +# S3 +s3_session = boto3.Session( + aws_access_key_id=settings.s3_access_key_id, + aws_secret_access_key=settings.s3_access_secret.get_secret_value(), + region_name=settings.s3_region, +) + + ai_client = OpenAI( api_key=settings.ai_api_key.get_secret_value(), base_url=settings.ai_api_base_url.encoded_string() if settings.ai_api_base_url else None, @@ -71,8 +79,12 @@ ) +class TaskResult(TypedDict): + ok: bool + msg: str + + class HandleResendEmailReceivedResult(TypedDict): - save_to_s3: bool ai_file_ids: dict[str, str | None] @@ -86,46 +98,109 @@ def do_something() -> None: logger.debug('do_something') +@celery_app.task +def handle_resend_email_attachments_to_s3( + email_data: dict[str, Any], ck_message_lock: str +) -> TaskResult: + """Handle Resend email attachments to S3 (Save attachments to S3).""" + if settings.s3_access_key_id is None or settings.s3_access_secret.get_secret_value() is None: + return {'ok': False, 'msg': 'S3 access key ID/secret is not set'} + + bucket_name = settings.s3_bucket + s3_region = settings.s3_region + s3_endpoint_url = settings.s3_endpoint_url + s3_conn_timeout = settings.s3_conn_timeout + + email_id = email_data['data']['email_id'] + logger.debug(f'Saving attachments to S3 for email [{email_id}]: {email_data}') + + if s3_endpoint_url: + s3_client = s3_session.client( + 's3', + endpoint_url=s3_endpoint_url.encoded_string(), + config=S3Config( + signature_version=settings.s3_signature_version, + s3={'addressing_style': settings.s3_addressing_style}, # pyright: ignore[reportArgumentType] + connect_timeout=s3_conn_timeout, + ), + ) + else: + s3_client = s3_session.client( + 's3', + region_name=s3_region, + config=S3Config(connect_timeout=s3_conn_timeout), + ) + + attachment_list = email_data['data']['attachments'] + download_timeout_config = httpx.Timeout(settings.resend_webhook_attachments_download_timeout) + ck_file_digest = f'{settings.cache_prefix}:file_digest_sha256' + with ( + httpx.Client(timeout=download_timeout_config) as http_client, + SQLSession(sql_db_engine) as sql_session, + ): + for attachment in attachment_list: + attachment_id = attachment['id'] + content_type = attachment['content_type'] + file_ext = content_type.split('/')[-1] + file_name = f'resend_{email_id}_{attachment_id}.{file_ext}' + + # Fetch attachments from Resend + attachment_detail = resend.Emails.Receiving.Attachments.get(email_id, attachment_id) + attachment_response = http_client.get(attachment_detail['download_url']) + + # Calculate the SHA-256 hash of the attachment + sha256_hash = sha256(attachment_response.content) + file_digest = sha256_hash.hexdigest() + + s3_key = '/'.join([settings.s3_key_prefix, file_name]) + s3_client.upload_fileobj( + BytesIO(attachment_response.content), + bucket_name, + s3_key, + Config=S3TransferConfig(multipart_threshold=settings.s3_multipart_threshold), + ) + + sql_session.add( + EmailAttachment( + webhook=EmailWebhookEnum.RESEND, + webhook_event_type=EmailWebhookEventTypeEnum.EMAIL_RECEIVED, + message_id=email_data['data']['message_id'], + email_id=email_id, + attachment_id=attachment_id, + email_subject=email_data['data']['subject'], + email_from=email_data['data']['from'], + email_to=email_data['data']['to'], + filename=attachment['filename'], + content_type=content_type, + file_size=attachment_detail['size'], + created_at=email_data['data']['created_at'], + s3_region=s3_region, + s3_bucket=bucket_name, + s3_key=s3_key, + ) + ) + + redis_client.hset(ck_file_digest, file_name, file_digest) + + sql_session.commit() + + # Release the message lock + redis_client.delete(ck_message_lock) + + return {'ok': True, 'msg': 'Attachments saved to S3'} + + @celery_app.task def handle_resend_email_received( email_data: dict[str, Any], message_lock_ck: str ) -> HandleResendEmailReceivedResult: """Handle Resend email received event.""" - save_to_s3 = settings.resend_attachments_s3_access_key_id is not None email_id = email_data['data']['email_id'] logger.debug(f'Processing email [{email_id}]: {email_data}') - # Save attachment to S3 - s3_client: S3Client | None = None - bucket_name = settings.resend_attachments_s3_bucket - if save_to_s3: - boto3_session = boto3.Session( - aws_access_key_id=settings.resend_attachments_s3_access_key_id, - aws_secret_access_key=settings.resend_attachments_s3_access_secret.get_secret_value(), - region_name=settings.resend_attachments_s3_region, - ) - if settings.resend_attachments_s3_endpoint_url: - s3_client = boto3_session.client( - 's3', - endpoint_url=settings.resend_attachments_s3_endpoint_url.encoded_string(), - config=S3Config( - signature_version=settings.resend_attachments_s3_signature_version, - s3={'addressing_style': settings.resend_attachments_s3_addressing_style}, # pyright: ignore[reportArgumentType] - connect_timeout=settings.resend_attachments_s3_conn_timeout, - ), - ) - else: - s3_client = boto3_session.client( - 's3', - region_name=settings.resend_attachments_s3_region, - config=S3Config( - connect_timeout=settings.resend_attachments_s3_conn_timeout, - ), - ) - attachment_list = email_data['data']['attachments'] - ai_file_ids: dict[str, str | None] = {} download_timeout_config = httpx.Timeout(settings.resend_webhook_attachments_download_timeout) + ai_file_ids: dict[str, str | None] = {} ck_file_digest = f'{settings.cache_prefix}:file_digest' ck_ai_files = f'{settings.cache_prefix}:ai:files' with ( @@ -138,44 +213,13 @@ def handle_resend_email_received( file_ext = content_type.split('/')[-1] file_name = f'resend_{email_id}_{attachment_id}.{file_ext}' - # Save attachment to S3 - if s3_client: - attachment_detail = resend.Emails.Receiving.Attachments.get(email_id, attachment_id) - attachment_response = http_client.get(attachment_detail['download_url']) - - # Calculate the SHA-256 hash of the attachment - sha256_hash = sha256(attachment_response.content) - file_digest = sha256_hash.hexdigest() - - bucket_key = '/'.join([settings.resend_attachments_s3_prefix, file_name]) - s3_client.upload_fileobj( - BytesIO(attachment_response.content), - bucket_name, - bucket_key, - Config=S3TransferConfig( - multipart_threshold=settings.resend_attachments_s3_multipart_threshold - ), - ) + # Fetch attachments from Resend + attachment_detail = resend.Emails.Receiving.Attachments.get(email_id, attachment_id) + attachment_response = http_client.get(attachment_detail['download_url']) - sql_session.add( - EmailAttachment( - webhook=EmailWebhookEnum.RESEND, - webhook_event_type=EmailWebhookEventTypeEnum.EMAIL_RECEIVED, - message_id=email_data['data']['message_id'], - email_id=email_id, - attachment_id=attachment_id, - email_subject=email_data['data']['subject'], - email_from=email_data['data']['from'], - email_to=email_data['data']['to'], - filename=attachment['filename'], - content_type=content_type, - file_size=attachment_detail['size'], - created_at=email_data['data']['created_at'], - s3_region=settings.resend_attachments_s3_region, - s3_bucket=bucket_name, - s3_key=bucket_key, - ) - ) + # Calculate the SHA-256 hash of the attachment + sha256_hash = sha256(attachment_response.content) + file_digest = sha256_hash.hexdigest() # Upload attachments to AI ai_file_id = redis_client.hget(ck_ai_files, file_name) @@ -207,7 +251,6 @@ def handle_resend_email_received( ) ai_file_id = ai_fileobj.id redis_client.hset(ck_ai_files, file_name, ai_file_id) - redis_client.hset(ck_file_digest, file_name, file_digest) ai_file_ids[file_name] = ai_file_id @@ -217,6 +260,5 @@ def handle_resend_email_received( redis_client.delete(message_lock_ck) return { - 'save_to_s3': save_to_s3, 'ai_file_ids': ai_file_ids, }