-
Notifications
You must be signed in to change notification settings - Fork 945
Expand file tree
/
Copy pathshrink.py
More file actions
147 lines (113 loc) · 5.56 KB
/
shrink.py
File metadata and controls
147 lines (113 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# ---------------------------------------------------
# File Name: shrink.py
# Description: A Pyrogram bot for downloading files from Telegram channels or groups
# and uploading them back to Telegram.
# Author: Gagan
# GitHub: https://github.com/devgaganin/
# Telegram: https://t.me/team_spy_pro
# YouTube: https://youtube.com/@dev_gagan
# Created: 2025-01-11
# Last Modified: 2025-01-11
# Version: 2.0.5
# License: MIT License
# ---------------------------------------------------
from pyrogram import Client, filters
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
import random
import requests
import string
import aiohttp
from devgagan import app
from devgagan.core.func import *
from datetime import datetime, timedelta
from motor.motor_asyncio import AsyncIOMotorClient
from config import MONGO_DB, WEBSITE_URL, AD_API, LOG_GROUP
tclient = AsyncIOMotorClient(MONGO_DB)
tdb = tclient["telegram_bot"]
token = tdb["tokens"]
async def create_ttl_index():
await token.create_index("expires_at", expireAfterSeconds=0)
Param = {}
async def generate_random_param(length=8):
"""Generate a random parameter."""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
async def get_shortened_url(deep_link):
api_url = f"https://{WEBSITE_URL}/api?api={AD_API}&url={deep_link}"
async with aiohttp.ClientSession() as session:
async with session.get(api_url) as response:
if response.status == 200:
data = await response.json()
if data.get("status") == "success":
return data.get("shortenedUrl")
return None
async def is_user_verified(user_id):
"""Check if a user has an active session."""
session = await token.find_one({"user_id": user_id})
return session is not None
@app.on_message(filters.command("start"))
async def token_handler(client, message):
"""Handle the /token command."""
join = await subscribe(client, message)
if join == 1:
return
chat_id = "save_restricted_content_bots"
msg = await app.get_messages(chat_id, 796)
user_id = message.chat.id
if len(message.command) <= 1:
image_url = "https://graph.org/file/94bf9089d43b817bb76d2-c0ccb0de00b10adf9e.png"
join_button = InlineKeyboardButton("Updates Channel", url="https://t.me/smartkitbots")
premium = InlineKeyboardButton("How To Use Me", url="https://t.me/smartkitbots/28")
keyboard = InlineKeyboardMarkup([
[join_button],
[premium]
])
await message.reply_photo(
msg.photo.file_id,
caption=(
"Hi 👋 Welcome, Wanna intro...?\n\n"
"👉 What I Can Do:\n✨ Save posts from channels and groups where forwarding is off.\n✨ Easily fetch messages from public channels by sending their post links.\n✨ For private channels, use /login to access content securely.\n✨ Need assistance? Just type /help and I'll guide you!\n"
"\n👉 Premium Features:\n🔹 Use /token to get 3 hours of free premium access.\n🔹 Want unlimited access? Use /Plan to unlock premium features.\n🔹 Premium users enjoy faster processing, unlimited saves, and priority support.\n\n📌 Getting Started:\n✅ Send a post link from a public channel to save it instantly.\n✅ If the channel is private, use /login before sending the link.\n✅ For additional commands, check /help anytime! \n\nHappy saving!🚀"
),
reply_markup=keyboard
)
return
param = message.command[1] if len(message.command) > 1 else None
freecheck = await chk_user(message, user_id)
if freecheck != 1:
await message.reply("You are a premium user no need of token 😉")
return
if param:
if user_id in Param and Param[user_id] == param:
await token.insert_one({
"user_id": user_id,
"param": param,
"created_at": datetime.utcnow(),
"expires_at": datetime.utcnow() + timedelta(hours=3),
})
del Param[user_id]
await message.reply("✅ You have been verified successfully! Enjoy your session for next 3 hours.")
return
else:
await message.reply("❌ Invalid or expired verification link. Please generate a new token.")
return
@app.on_message(filters.command("token"))
async def smart_handler(client, message):
user_id = message.chat.id
freecheck = await chk_user(message, user_id)
if freecheck != 1:
await message.reply("You are a premium user no need of token 😉")
return
if await is_user_verified(user_id):
await message.reply("✅ Your free session is already active enjoy!")
else:
param = await generate_random_param()
Param[user_id] = param
deep_link = f"https://t.me/{client.me.username}?start={param}"
shortened_url = await get_shortened_url(deep_link)
if not shortened_url:
await message.reply("❌ Failed to generate the token link. Please try again.")
return
button = InlineKeyboardMarkup(
[[InlineKeyboardButton("Verify the token now...", url=shortened_url)]]
)
await message.reply("Click the button below to verify your free access token: \n\n> What will you get ? \n1. No time bound upto 3 hours \n2. Batch command limit will be FreeLimit + 20 \n3. All functions unlocked", reply_markup=button)