Skip to content

Commit 12a1b3d

Browse files
committed
feat(groups): distributed optimistic lock for updates via database
- Replaced in-memory global _update_in_progress with a new 'locks' database table. - This prevents duplicate parallel group updates across multiple Cloud Run instances. - Lock automatically expires (TTL) after 15 minutes to prevent deadlocks if an instance dies.
1 parent 3930e8d commit 12a1b3d

2 files changed

Lines changed: 36 additions & 8 deletions

File tree

handlers/groups.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,32 +203,56 @@ async def _update_groups(context: ContextTypes.DEFAULT_TYPE):
203203
logger.info("Finished update_groups job")
204204

205205
from handlers.admin import admin_ids
206+
from models import Lock
207+
import datetime
206208

207-
_update_in_progress = False
209+
async def acquire_lock(session, key: str, ttl_minutes: int = 15) -> bool:
210+
now = datetime.datetime.utcnow()
211+
# Try to clean up expired locks first
212+
session.query(Lock).filter(Lock.key == key, Lock.expires_at < now).delete()
213+
session.commit()
214+
215+
existing = session.query(Lock).filter_by(key=key).first()
216+
if existing:
217+
return False
218+
219+
new_lock = Lock(key=key, expires_at=now + datetime.timedelta(minutes=ttl_minutes))
220+
session.add(new_lock)
221+
try:
222+
session.commit()
223+
return True
224+
except Exception:
225+
session.rollback()
226+
return False
227+
228+
async def release_lock(session, key: str):
229+
session.query(Lock).filter_by(key=key).delete()
230+
session.commit()
208231

209232
async def _background_update(update: Update, context: ContextTypes.DEFAULT_TYPE):
210-
global _update_in_progress
211233
try:
212234
await _update_groups(context)
213235
await context.bot.send_message(chat_id=update.effective_chat.id, text="¡Grupos actualizados!")
214236
except Exception as e:
215237
logger.error(f"Background update failed: {e}", exc_info=True)
216238
await context.bot.send_message(chat_id=update.effective_chat.id, text=f"Error actualizando grupos: {e}")
217239
finally:
218-
_update_in_progress = False
240+
with get_session() as session:
241+
await release_lock(session, "update_groups")
219242

220243
async def actualizar_grupos(update: Update, context: ContextTypes.DEFAULT_TYPE):
221-
global _update_in_progress
222244
user_id = update.effective_user.id
223245
if user_id not in admin_ids and str(user_id) not in admin_ids:
224246
logger.warning(f"Unauthorized user {user_id} tried to access /actualizar_grupos")
225247
return
226248

227-
if _update_in_progress:
228-
await update.message.reply_text("Ya hay una actualización de grupos en progreso. Por favor, esperá a que termine.")
249+
with get_session() as session:
250+
lock_acquired = await acquire_lock(session, "update_groups", ttl_minutes=15)
251+
252+
if not lock_acquired:
253+
await update.message.reply_text("Ya hay una actualización de grupos en progreso (u ocurrió un error reciente). Por favor, esperá unos minutos a que termine o expire.")
229254
return
230255

231-
_update_in_progress = True
232256
logger.info(f"Manual update of groups triggered by {user_id}")
233257
await update.message.reply_text("Actualizando grupos en segundo plano (esto puede demorar varios minutos)...")
234258
asyncio.create_task(_background_update(update, context))

models.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@ class Command(Base):
3434
description = Column(String)
3535
enabled = Column(Boolean, nullable=False, default=True)
3636

37+
class Lock(Base):
38+
__tablename__ = 'locks'
39+
key = Column(String, primary_key=True)
40+
expires_at = Column(DateTime, nullable=False)
41+
3742
class SentMessage(Base):
38-
__tablename__ = 'sent_messages'
3943
id = Column(Integer, primary_key=True)
4044
command = Column(String, nullable=False)
4145
chat_id = Column(String, nullable=False)

0 commit comments

Comments
 (0)