From 27509040bd91661b2d6f65111f04bc427c988b35 Mon Sep 17 00:00:00 2001 From: Daniil Okhlopkov <5613295+ohld@users.noreply.github.com> Date: Mon, 1 Jun 2026 06:48:30 +0000 Subject: [PATCH] feat: track channel lifecycle events --- .../2026-06-01_channel_lifecycle_event.py | 66 ++++++ docs/analyst/crossposting.sql | 48 ++++ src/database.py | 16 ++ src/flows/crossposting/stats_collector.py | 160 +++++++++++++- .../crossposting/test_channel_lifecycle.py | 208 ++++++++++++++++++ 5 files changed, 497 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/2026-06-01_channel_lifecycle_event.py create mode 100644 tests/flows/crossposting/test_channel_lifecycle.py diff --git a/alembic/versions/2026-06-01_channel_lifecycle_event.py b/alembic/versions/2026-06-01_channel_lifecycle_event.py new file mode 100644 index 00000000..1ed2319d --- /dev/null +++ b/alembic/versions/2026-06-01_channel_lifecycle_event.py @@ -0,0 +1,66 @@ +"""add channel lifecycle event table + +Revision ID: c1e2f3a4b5d6 +Revises: a9f0d6c2b1e3 +Create Date: 2026-06-01 07:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "c1e2f3a4b5d6" +down_revision = "a9f0d6c2b1e3" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "channel_lifecycle_event", + sa.Column("id", sa.Integer(), sa.Identity(always=False), nullable=False), + sa.Column("channel", sa.String(), nullable=False), + sa.Column("telegram_event_id", sa.BigInteger(), nullable=False), + sa.Column("telegram_user_id", sa.BigInteger(), nullable=True), + sa.Column("event_type", sa.String(), nullable=False), + sa.Column("event_at", sa.DateTime(), nullable=False), + sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column( + "created_at", + sa.DateTime(), + server_default=sa.text("now()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("id", name=op.f("channel_lifecycle_event_pkey")), + sa.UniqueConstraint( + "channel", + "telegram_event_id", + name=op.f("uq_channel_lifecycle_event_channel_event"), + ), + ) + op.create_index( + "ix_channel_lifecycle_event_channel_time", + "channel_lifecycle_event", + ["channel", "event_at"], + unique=False, + ) + op.create_index( + "ix_channel_lifecycle_event_user_time", + "channel_lifecycle_event", + ["telegram_user_id", "event_at"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index( + "ix_channel_lifecycle_event_user_time", + table_name="channel_lifecycle_event", + ) + op.drop_index( + "ix_channel_lifecycle_event_channel_time", + table_name="channel_lifecycle_event", + ) + op.drop_table("channel_lifecycle_event") diff --git a/docs/analyst/crossposting.sql b/docs/analyst/crossposting.sql index 26d029ae..51a4021d 100644 --- a/docs/analyst/crossposting.sql +++ b/docs/analyst/crossposting.sql @@ -9,6 +9,7 @@ -- (legacy rows may still use 's_%_%') -- - Channel deep links: user_deep_link_log.deep_link LIKE 'sc_%_%' -- - Channel forwards/views: crossposting + crossposting_snapshots +-- - Passive lifecycle: channel_lifecycle_event from Telethon admin logs -- -- Reference: -- - specs/crossposting-share-optimization-2026-05-18.md @@ -31,6 +32,53 @@ GROUP BY channel ORDER BY channel; +-- ============================================= +-- SECTION: PASSIVE CHANNEL LIFECYCLE DAILY READOUT +-- ============================================= +-- Joins/leaves are collected from Telegram admin logs and deduped by +-- (channel, telegram_event_id). Telegram user IDs match user_tg.id when that +-- channel member has also started the bot. + +WITH daily AS ( + SELECT + cle.channel, + cle.event_at::date AS date, + count(*) FILTER (WHERE cle.event_type = 'join') AS joins, + count(*) FILTER (WHERE cle.event_type = 'leave') AS leaves, + count(DISTINCT cle.telegram_user_id) FILTER ( + WHERE cle.event_type = 'join' + AND utg.id IS NOT NULL + ) AS known_joined_bot_users, + count(DISTINCT cle.telegram_user_id) FILTER ( + WHERE cle.event_type = 'leave' + AND utg.id IS NOT NULL + ) AS known_left_bot_users, + count(DISTINCT cle.telegram_user_id) FILTER ( + WHERE cle.event_type = 'join' + AND u.created_at >= cle.event_at + AND u.created_at < cle.event_at + interval '1 day' + ) AS new_bot_sessions_within_24h_after_join + FROM channel_lifecycle_event cle + LEFT JOIN user_tg utg + ON utg.id = cle.telegram_user_id + LEFT JOIN "user" u + ON u.id = utg.id + WHERE cle.event_at > now() - interval '30 days' + GROUP BY cle.channel, cle.event_at::date +) +SELECT + channel, + date, + joins, + leaves, + joins - leaves AS net_change, + known_joined_bot_users, + known_left_bot_users, + new_bot_sessions_within_24h_after_join +FROM daily +ORDER BY date DESC, channel; + + -- ============================================= -- SECTION: 24H CHANNEL POST LABELS -- ============================================= diff --git a/src/database.py b/src/database.py index c59fc79c..9d334238 100644 --- a/src/database.py +++ b/src/database.py @@ -617,6 +617,22 @@ def _is_deadlock_error(exc: BaseException) -> bool: UniqueConstraint("channel", "date"), ) +channel_lifecycle_event = Table( + "channel_lifecycle_event", + metadata, + Column("id", Integer, Identity(), primary_key=True), + Column("channel", String, nullable=False), + Column("telegram_event_id", BigInteger, nullable=False), + Column("telegram_user_id", BigInteger), + Column("event_type", String, nullable=False), + Column("event_at", DateTime, nullable=False), + Column("data", JSONB), + Column("created_at", DateTime, server_default=func.now(), nullable=False), + UniqueConstraint("channel", "telegram_event_id"), + Index("ix_channel_lifecycle_event_channel_time", "channel", "event_at"), + Index("ix_channel_lifecycle_event_user_time", "telegram_user_id", "event_at"), +) + editorial_posts = Table( "editorial_posts", metadata, diff --git a/src/flows/crossposting/stats_collector.py b/src/flows/crossposting/stats_collector.py index 05d8f537..76340805 100644 --- a/src/flows/crossposting/stats_collector.py +++ b/src/flows/crossposting/stats_collector.py @@ -12,7 +12,7 @@ import json import logging -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from prefect import flow, get_run_logger from sqlalchemy import text @@ -29,10 +29,12 @@ from src.config import settings from src.database import ( channel_daily_stats, + channel_lifecycle_event, crossposting_snapshots, editorial_post_snapshots, execute, fetch_all, + fetch_one, ) from src.flows.hooks import notify_telegram_on_failure @@ -78,6 +80,161 @@ def _extract_metrics(msg) -> tuple[int, int, int, dict[str, int], int]: return views, forwards, reaction_count, reactions_detail, comments +def _participant_user_id(participant) -> int | None: + if participant is None: + return None + return getattr(participant, "user_id", None) + + +def _normalize_channel_lifecycle_event(channel_key: str, event) -> dict | None: + event_type = None + join_source = None + telegram_user_id = None + action_name = type(event.action).__name__ + + if getattr(event, "joined", False): + event_type = "join" + join_source = "public" + telegram_user_id = event.user_id + elif getattr(event, "joined_invite", False): + event_type = "join" + join_source = "invite" + telegram_user_id = _participant_user_id(getattr(event, "new", None)) or event.user_id + elif action_name == "ChannelAdminLogEventActionParticipantJoinByInvite": + event_type = "join" + join_source = "invite_link" + telegram_user_id = event.user_id + elif action_name == "ChannelAdminLogEventActionParticipantJoinByRequest": + event_type = "join" + join_source = "join_request" + telegram_user_id = event.user_id + elif getattr(event, "left", False): + event_type = "leave" + telegram_user_id = event.user_id + + if event_type is None: + return None + + event_at = event.date + if event_at is not None and event_at.tzinfo is not None: + event_at = event_at.astimezone(timezone.utc).replace(tzinfo=None) + + return { + "channel": channel_key, + "telegram_event_id": event.id, + "telegram_user_id": telegram_user_id, + "event_type": event_type, + "event_at": event_at, + "data": { + "join_source": join_source, + "actor_user_id": event.user_id, + "action": action_name, + }, + } + + +async def _collect_channel_lifecycle_events( + client: TelegramClient, + channel_key: str, + channel_username: str, + limit: int | None = 500, +) -> int: + """Collect join/leave admin-log events with database-level dedupe.""" + log = get_run_logger() + + try: + entity = await client.get_entity(channel_username) + except ChannelPrivateError: + log.error(f"Cannot access @{channel_username} admin log — private or no access") + return 0 + + last_event = await fetch_one( + text( + """ + SELECT max(telegram_event_id) AS max_event_id + FROM channel_lifecycle_event + WHERE channel = :channel + """ + ), + {"channel": channel_key}, + ) + min_id = last_event["max_event_id"] if last_event and last_event["max_event_id"] else 0 + + inserted = 0 + async for event in client.iter_admin_log( + entity, + limit=None if min_id else limit, + min_id=min_id, + join=True, + leave=True, + invite=True, + ): + row = _normalize_channel_lifecycle_event(channel_key, event) + if row is None: + continue + result = await execute( + insert(channel_lifecycle_event) + .values(row) + .on_conflict_do_nothing( + index_elements=[ + channel_lifecycle_event.c.channel, + channel_lifecycle_event.c.telegram_event_id, + ] + ) + ) + inserted += result.rowcount or 0 + + log.info(f"@{channel_username}: {inserted} new lifecycle events") + return inserted + + +async def get_channel_lifecycle_readout(days: int = 7) -> list[dict]: + """Analyst hook: daily joins/leaves, net change, and known bot-user overlap.""" + return await fetch_all( + text( + """ + WITH daily AS ( + SELECT + cle.channel, + cle.event_at::date AS date, + COUNT(*) FILTER (WHERE cle.event_type = 'join') AS joins, + COUNT(*) FILTER (WHERE cle.event_type = 'leave') AS leaves, + COUNT(DISTINCT cle.telegram_user_id) FILTER ( + WHERE cle.event_type = 'join' AND utg.id IS NOT NULL + ) AS known_joined_bot_users, + COUNT(DISTINCT cle.telegram_user_id) FILTER ( + WHERE cle.event_type = 'leave' AND utg.id IS NOT NULL + ) AS known_left_bot_users, + COUNT(DISTINCT cle.telegram_user_id) FILTER ( + WHERE cle.event_type = 'join' + AND u.created_at >= cle.event_at + AND u.created_at < cle.event_at + interval '1 day' + ) AS new_bot_sessions_within_24h_after_join + FROM channel_lifecycle_event cle + LEFT JOIN user_tg utg + ON utg.id = cle.telegram_user_id + LEFT JOIN "user" u + ON u.id = utg.id + WHERE cle.event_at >= now() - (:days * interval '1 day') + GROUP BY cle.channel, cle.event_at::date + ) + SELECT + channel, + date, + joins, + leaves, + joins - leaves AS net_change, + known_joined_bot_users, + known_left_bot_users, + new_bot_sessions_within_24h_after_join + FROM daily + ORDER BY date DESC, channel + """ + ), + {"days": days}, + ) + + async def _collect_post_stats(client: TelegramClient, channel_key: str, channel_username: str): """Collect views/forwards/reactions for recent posts in a channel. @@ -275,6 +432,7 @@ async def collect_channel_stats(): try: await _collect_post_stats(client, channel_key, channel_username) await _collect_subscriber_count(client, channel_key, channel_username) + await _collect_channel_lifecycle_events(client, channel_key, channel_username) except FloodWaitError as e: log.warning(f"Telethon flood wait: {e.seconds}s — skipping @{channel_username}") except SessionExpiredError: diff --git a/tests/flows/crossposting/test_channel_lifecycle.py b/tests/flows/crossposting/test_channel_lifecycle.py new file mode 100644 index 00000000..b0530d0a --- /dev/null +++ b/tests/flows/crossposting/test_channel_lifecycle.py @@ -0,0 +1,208 @@ +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace + +import pytest + +from src.flows.crossposting import stats_collector + + +class _FakeLog: + def info(self, *_args, **_kwargs): + pass + + def error(self, *_args, **_kwargs): + pass + + +class _FakeAdminLog: + def __init__(self, events): + self._events = events + + def __aiter__(self): + self._iter = iter(self._events) + return self + + async def __anext__(self): + try: + return next(self._iter) + except StopIteration as exc: + raise StopAsyncIteration from exc + + +class _FakeClient: + def __init__(self, events): + self.events = events + self.iter_kwargs = None + + async def get_entity(self, channel_username): + return SimpleNamespace(username=channel_username) + + def iter_admin_log(self, entity, **kwargs): + self.iter_kwargs = kwargs + return _FakeAdminLog(self.events) + + +def _event( + event_id: int, + *, + user_id: int, + joined: bool = False, + joined_invite: bool = False, + left: bool = False, + participant_user_id: int | None = None, + action_name: str = "FakeAction", +): + participant = ( + SimpleNamespace(user_id=participant_user_id) if participant_user_id is not None else None + ) + action = type(action_name, (), {})() + return SimpleNamespace( + id=event_id, + user_id=user_id, + joined=joined, + joined_invite=joined_invite, + left=left, + date=datetime(2026, 6, 1, 9, 30, tzinfo=timezone(timedelta(hours=3))), + new=participant, + action=action, + ) + + +def test_normalize_public_join_converts_timestamp_to_naive_utc() -> None: + row = stats_collector._normalize_channel_lifecycle_event( + "tgchannelru", + _event(101, user_id=20001, joined=True), + ) + + assert row["channel"] == "tgchannelru" + assert row["telegram_event_id"] == 101 + assert row["telegram_user_id"] == 20001 + assert row["event_type"] == "join" + assert row["event_at"] == datetime(2026, 6, 1, 6, 30) + assert row["data"]["join_source"] == "public" + + +def test_normalize_invite_join_prefers_participant_user_id() -> None: + row = stats_collector._normalize_channel_lifecycle_event( + "tgchannelen", + _event(102, user_id=30001, joined_invite=True, participant_user_id=40001), + ) + + assert row["telegram_user_id"] == 40001 + assert row["event_type"] == "join" + assert row["data"]["actor_user_id"] == 30001 + assert row["data"]["join_source"] == "invite" + + +def test_normalize_invite_link_join_from_action_name() -> None: + row = stats_collector._normalize_channel_lifecycle_event( + "tgchannelen", + _event( + 104, + user_id=40002, + action_name="ChannelAdminLogEventActionParticipantJoinByInvite", + ), + ) + + assert row["telegram_user_id"] == 40002 + assert row["event_type"] == "join" + assert row["data"]["join_source"] == "invite_link" + + +def test_normalize_leave_records_leaving_user() -> None: + row = stats_collector._normalize_channel_lifecycle_event( + "tgchannelru", + _event(103, user_id=20002, left=True), + ) + + assert row["telegram_user_id"] == 20002 + assert row["event_type"] == "leave" + assert row["data"]["join_source"] is None + + +@pytest.mark.asyncio +async def test_collect_channel_lifecycle_events_dedupes_at_insert(monkeypatch) -> None: + executed = [] + + async def fake_execute(statement): + executed.append(statement) + return SimpleNamespace(rowcount=1 if len(executed) == 1 else 0) + + async def fake_fetch_one(statement, params): + assert "max(telegram_event_id)" in str(statement) + assert params == {"channel": "tgchannelru"} + return None + + monkeypatch.setattr(stats_collector, "execute", fake_execute) + monkeypatch.setattr(stats_collector, "fetch_one", fake_fetch_one) + monkeypatch.setattr(stats_collector, "get_run_logger", lambda: _FakeLog()) + + client = _FakeClient( + [ + _event(101, user_id=20001, joined=True), + _event(101, user_id=20001, joined=True), + ] + ) + + inserted = await stats_collector._collect_channel_lifecycle_events( + client, + "tgchannelru", + "fastfoodmemes", + limit=25, + ) + + assert inserted == 1 + assert len(executed) == 2 + assert client.iter_kwargs == { + "limit": 25, + "min_id": 0, + "join": True, + "leave": True, + "invite": True, + } + assert "ON CONFLICT (channel, telegram_event_id) DO NOTHING" in str(executed[0]) + + +@pytest.mark.asyncio +async def test_collect_channel_lifecycle_events_uses_high_water_mark(monkeypatch) -> None: + async def fake_execute(statement): + return SimpleNamespace(rowcount=1) + + async def fake_fetch_one(statement, params): + return {"max_event_id": 900} + + monkeypatch.setattr(stats_collector, "execute", fake_execute) + monkeypatch.setattr(stats_collector, "fetch_one", fake_fetch_one) + monkeypatch.setattr(stats_collector, "get_run_logger", lambda: _FakeLog()) + + client = _FakeClient([_event(901, user_id=20001, joined=True)]) + + inserted = await stats_collector._collect_channel_lifecycle_events( + client, + "tgchannelru", + "fastfoodmemes", + limit=25, + ) + + assert inserted == 1 + assert client.iter_kwargs["min_id"] == 900 + assert client.iter_kwargs["limit"] is None + + +@pytest.mark.asyncio +async def test_channel_lifecycle_readout_uses_days_window(monkeypatch) -> None: + calls = [] + + async def fake_fetch_all(statement, params): + calls.append((str(statement), params)) + return [{"channel": "tgchannelru", "joins": 3, "leaves": 1}] + + monkeypatch.setattr(stats_collector, "fetch_all", fake_fetch_all) + + rows = await stats_collector.get_channel_lifecycle_readout(days=14) + + assert rows == [{"channel": "tgchannelru", "joins": 3, "leaves": 1}] + sql, params = calls[0] + assert "channel_lifecycle_event" in sql + assert "new_bot_sessions_within_24h_after_join" in sql + assert params == {"days": 14}