Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions alembic/versions/2026-06-01_channel_lifecycle_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""add channel lifecycle event table

Revision ID: c1e2f3a4b5d6
Revises: a9f0d6c2b1e3
Create Date: 2026-06-01 07:00:00.000000

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "c1e2f3a4b5d6"
down_revision = "a9f0d6c2b1e3"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"channel_lifecycle_event",
sa.Column("id", sa.Integer(), sa.Identity(always=False), nullable=False),
sa.Column("channel", sa.String(), nullable=False),
sa.Column("telegram_event_id", sa.BigInteger(), nullable=False),
sa.Column("telegram_user_id", sa.BigInteger(), nullable=True),
sa.Column("event_type", sa.String(), nullable=False),
sa.Column("event_at", sa.DateTime(), nullable=False),
sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column(
"created_at",
sa.DateTime(),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("channel_lifecycle_event_pkey")),
sa.UniqueConstraint(
"channel",
"telegram_event_id",
name=op.f("uq_channel_lifecycle_event_channel_event"),
),
)
op.create_index(
"ix_channel_lifecycle_event_channel_time",
"channel_lifecycle_event",
["channel", "event_at"],
unique=False,
)
op.create_index(
"ix_channel_lifecycle_event_user_time",
"channel_lifecycle_event",
["telegram_user_id", "event_at"],
unique=False,
)


def downgrade() -> None:
op.drop_index(
"ix_channel_lifecycle_event_user_time",
table_name="channel_lifecycle_event",
)
op.drop_index(
"ix_channel_lifecycle_event_channel_time",
table_name="channel_lifecycle_event",
)
op.drop_table("channel_lifecycle_event")
48 changes: 48 additions & 0 deletions docs/analyst/crossposting.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-- (legacy rows may still use 's_%_%')
-- - Channel deep links: user_deep_link_log.deep_link LIKE 'sc_%_%'
-- - Channel forwards/views: crossposting + crossposting_snapshots
-- - Passive lifecycle: channel_lifecycle_event from Telethon admin logs
--
-- Reference:
-- - specs/crossposting-share-optimization-2026-05-18.md
Expand All @@ -31,6 +32,53 @@ GROUP BY channel
ORDER BY channel;


-- =============================================
-- SECTION: PASSIVE CHANNEL LIFECYCLE DAILY READOUT
-- =============================================
-- Joins/leaves are collected from Telegram admin logs and deduped by
-- (channel, telegram_event_id). Telegram user IDs match user_tg.id when that
-- channel member has also started the bot.

WITH daily AS (
SELECT
cle.channel,
cle.event_at::date AS date,
count(*) FILTER (WHERE cle.event_type = 'join') AS joins,
count(*) FILTER (WHERE cle.event_type = 'leave') AS leaves,
count(DISTINCT cle.telegram_user_id) FILTER (
WHERE cle.event_type = 'join'
AND utg.id IS NOT NULL
) AS known_joined_bot_users,
count(DISTINCT cle.telegram_user_id) FILTER (
WHERE cle.event_type = 'leave'
AND utg.id IS NOT NULL
) AS known_left_bot_users,
count(DISTINCT cle.telegram_user_id) FILTER (
WHERE cle.event_type = 'join'
AND u.created_at >= cle.event_at
AND u.created_at < cle.event_at + interval '1 day'
) AS new_bot_sessions_within_24h_after_join
FROM channel_lifecycle_event cle
LEFT JOIN user_tg utg
ON utg.id = cle.telegram_user_id
LEFT JOIN "user" u
ON u.id = utg.id
WHERE cle.event_at > now() - interval '30 days'
GROUP BY cle.channel, cle.event_at::date
)
SELECT
channel,
date,
joins,
leaves,
joins - leaves AS net_change,
known_joined_bot_users,
known_left_bot_users,
new_bot_sessions_within_24h_after_join
FROM daily
ORDER BY date DESC, channel;


-- =============================================
-- SECTION: 24H CHANNEL POST LABELS
-- =============================================
Expand Down
16 changes: 16 additions & 0 deletions src/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,22 @@ def _is_deadlock_error(exc: BaseException) -> bool:
UniqueConstraint("channel", "date"),
)

channel_lifecycle_event = Table(
"channel_lifecycle_event",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("channel", String, nullable=False),
Column("telegram_event_id", BigInteger, nullable=False),
Column("telegram_user_id", BigInteger),
Column("event_type", String, nullable=False),
Column("event_at", DateTime, nullable=False),
Column("data", JSONB),
Column("created_at", DateTime, server_default=func.now(), nullable=False),
UniqueConstraint("channel", "telegram_event_id"),
Index("ix_channel_lifecycle_event_channel_time", "channel", "event_at"),
Index("ix_channel_lifecycle_event_user_time", "telegram_user_id", "event_at"),
)

editorial_posts = Table(
"editorial_posts",
metadata,
Expand Down
160 changes: 159 additions & 1 deletion src/flows/crossposting/stats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import json
import logging
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from prefect import flow, get_run_logger
from sqlalchemy import text
Expand All @@ -29,10 +29,12 @@
from src.config import settings
from src.database import (
channel_daily_stats,
channel_lifecycle_event,
crossposting_snapshots,
editorial_post_snapshots,
execute,
fetch_all,
fetch_one,
)
from src.flows.hooks import notify_telegram_on_failure

Expand Down Expand Up @@ -78,6 +80,161 @@ def _extract_metrics(msg) -> tuple[int, int, int, dict[str, int], int]:
return views, forwards, reaction_count, reactions_detail, comments


def _participant_user_id(participant) -> int | None:
if participant is None:
return None
return getattr(participant, "user_id", None)


def _normalize_channel_lifecycle_event(channel_key: str, event) -> dict | None:
event_type = None
join_source = None
telegram_user_id = None
action_name = type(event.action).__name__

if getattr(event, "joined", False):
event_type = "join"
join_source = "public"
telegram_user_id = event.user_id
elif getattr(event, "joined_invite", False):
event_type = "join"
join_source = "invite"
telegram_user_id = _participant_user_id(getattr(event, "new", None)) or event.user_id
elif action_name == "ChannelAdminLogEventActionParticipantJoinByInvite":
event_type = "join"
join_source = "invite_link"
telegram_user_id = event.user_id
elif action_name == "ChannelAdminLogEventActionParticipantJoinByRequest":
event_type = "join"
join_source = "join_request"
telegram_user_id = event.user_id
elif getattr(event, "left", False):
event_type = "leave"
telegram_user_id = event.user_id

if event_type is None:
return None

event_at = event.date
if event_at is not None and event_at.tzinfo is not None:
event_at = event_at.astimezone(timezone.utc).replace(tzinfo=None)

return {
"channel": channel_key,
"telegram_event_id": event.id,
"telegram_user_id": telegram_user_id,
"event_type": event_type,
"event_at": event_at,
"data": {
"join_source": join_source,
"actor_user_id": event.user_id,
"action": action_name,
},
}


async def _collect_channel_lifecycle_events(
client: TelegramClient,
channel_key: str,
channel_username: str,
limit: int | None = 500,
) -> int:
"""Collect join/leave admin-log events with database-level dedupe."""
log = get_run_logger()

try:
entity = await client.get_entity(channel_username)
except ChannelPrivateError:
log.error(f"Cannot access @{channel_username} admin log — private or no access")
return 0

last_event = await fetch_one(
text(
"""
SELECT max(telegram_event_id) AS max_event_id
FROM channel_lifecycle_event
WHERE channel = :channel
"""
),
{"channel": channel_key},
)
min_id = last_event["max_event_id"] if last_event and last_event["max_event_id"] else 0

inserted = 0
async for event in client.iter_admin_log(
entity,
limit=None if min_id else limit,
min_id=min_id,
join=True,
leave=True,
invite=True,
):
row = _normalize_channel_lifecycle_event(channel_key, event)
if row is None:
continue
result = await execute(
insert(channel_lifecycle_event)
.values(row)
.on_conflict_do_nothing(
index_elements=[
channel_lifecycle_event.c.channel,
channel_lifecycle_event.c.telegram_event_id,
]
)
)
inserted += result.rowcount or 0

log.info(f"@{channel_username}: {inserted} new lifecycle events")
return inserted


async def get_channel_lifecycle_readout(days: int = 7) -> list[dict]:
"""Analyst hook: daily joins/leaves, net change, and known bot-user overlap."""
return await fetch_all(
text(
"""
WITH daily AS (
SELECT
cle.channel,
cle.event_at::date AS date,
COUNT(*) FILTER (WHERE cle.event_type = 'join') AS joins,
COUNT(*) FILTER (WHERE cle.event_type = 'leave') AS leaves,
COUNT(DISTINCT cle.telegram_user_id) FILTER (
WHERE cle.event_type = 'join' AND utg.id IS NOT NULL
) AS known_joined_bot_users,
COUNT(DISTINCT cle.telegram_user_id) FILTER (
WHERE cle.event_type = 'leave' AND utg.id IS NOT NULL
) AS known_left_bot_users,
COUNT(DISTINCT cle.telegram_user_id) FILTER (
WHERE cle.event_type = 'join'
AND u.created_at >= cle.event_at
AND u.created_at < cle.event_at + interval '1 day'
) AS new_bot_sessions_within_24h_after_join
FROM channel_lifecycle_event cle
LEFT JOIN user_tg utg
ON utg.id = cle.telegram_user_id
LEFT JOIN "user" u
ON u.id = utg.id
WHERE cle.event_at >= now() - (:days * interval '1 day')
GROUP BY cle.channel, cle.event_at::date
)
SELECT
channel,
date,
joins,
leaves,
joins - leaves AS net_change,
known_joined_bot_users,
known_left_bot_users,
new_bot_sessions_within_24h_after_join
FROM daily
ORDER BY date DESC, channel
"""
),
{"days": days},
)


async def _collect_post_stats(client: TelegramClient, channel_key: str, channel_username: str):
"""Collect views/forwards/reactions for recent posts in a channel.

Expand Down Expand Up @@ -275,6 +432,7 @@ async def collect_channel_stats():
try:
await _collect_post_stats(client, channel_key, channel_username)
await _collect_subscriber_count(client, channel_key, channel_username)
await _collect_channel_lifecycle_events(client, channel_key, channel_username)
except FloodWaitError as e:
log.warning(f"Telethon flood wait: {e.seconds}s — skipping @{channel_username}")
except SessionExpiredError:
Expand Down
Loading
Loading