Skip to content

Commit f64fb85

Browse files
committed
feat: add webhook idempotency check
- Created 'processed_updates' table to record incoming Telegram update IDs. - Modified main.py to immediately return 200 OK for duplicate update IDs. - Added daily cleanup routine in cron.py to prune updates older than 48 hours. - Prevents redundant command execution when Cloud Run processes responses slowly and Telegram sends retries.
1 parent 7dff739 commit f64fb85

3 files changed

Lines changed: 31 additions & 0 deletions

File tree

cron.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import logging
44
import traceback
55
import resource
6+
import datetime
67
from telegram.ext import Application
78
from tg_ids import ROZEN_CHATID
89
from bot_logic import _update_groups, felizdia, actualizarRiver
10+
from handlers.db import get_session
11+
from models import ProcessedUpdate
912

1013
def main():
1114
"""Runs the update_groups command."""
@@ -17,6 +20,15 @@ def main():
1720
application = Application.builder().token(os.environ["TELEGRAM_BOT_TOKEN"]).build()
1821

1922
async def run_update():
23+
try:
24+
# Clean up old processed updates (older than 48h) to save DB space
25+
with get_session() as session:
26+
forty_eight_hours_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=48)
27+
deleted = session.query(ProcessedUpdate).filter(ProcessedUpdate.timestamp < forty_eight_hours_ago).delete()
28+
logging.info(f"Cleaned up {deleted} old processed updates from DB")
29+
except Exception as e:
30+
logging.error(f"Error cleaning up processed updates: {e}", exc_info=True)
31+
2032
try:
2133
await felizdia(application)
2234
except Exception as e:

main.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
from bot_logic import COMMANDS, button
1313
from tg_ids import ROZEN_CHATID
14+
from handlers.db import get_session
15+
from models import ProcessedUpdate
1416

1517
async def error_handler(update, context):
1618
"""Log the error and send a telegram message to notify the developer."""
@@ -117,6 +119,18 @@ async def telegram_webhook(token: str, request: Request):
117119
try:
118120
data = await request.json()
119121
update = telegram.Update.de_json(data=data, bot=application.bot)
122+
123+
# Idempotency check: Have we processed this update_id already?
124+
with get_session() as session:
125+
existing = session.query(ProcessedUpdate).filter_by(update_id=update.update_id).first()
126+
if existing:
127+
logging.getLogger("DCUBABOT").info(f"Skipping duplicate update_id {update.update_id}")
128+
return Response(status_code=200)
129+
130+
# If not, mark as processed and commit immediately
131+
session.add(ProcessedUpdate(update_id=update.update_id))
132+
session.commit()
133+
120134
# Await the processing SYNCHRONOUSLY before returning 200 OK
121135
# This prevents Cloud Run from throttling the CPU while the bot is doing work
122136
# TODO: If user traffic scales significantly and Telegram throws `RetryAfter` for

models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class Lock(Base):
3939
key = Column(String, primary_key=True)
4040
expires_at = Column(DateTime, nullable=False)
4141

42+
class ProcessedUpdate(Base):
43+
__tablename__ = 'processed_updates'
44+
update_id = Column(Integer, primary_key=True)
45+
timestamp = Column(DateTime, nullable=False, default=datetime.datetime.utcnow)
46+
4247
class SentMessage(Base):
4348
__tablename__ = 'sent_messages'
4449
id = Column(Integer, primary_key=True)

0 commit comments

Comments
 (0)