-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathscript.py
More file actions
203 lines (171 loc) · 8.59 KB
/
script.py
File metadata and controls
203 lines (171 loc) · 8.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import ElementNotVisibleException, TimeoutException
from webdriver_manager.chrome import ChromeDriverManager
from time import sleep, time
from urllib.parse import quote
import openpyxl
import pandas as pd
import atexit
import pyqrcode
import pickle
import json
import ast
LATEST_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
class WhatsAppBot:
def __init__(self, headless: bool = True):
# constants
self.DEFAULT_EXT = "91"
self.SCREENSHOT_FOLDER = "screenshots"
self.COOKIE_PATH = "whatsapp_cookies.pkl"
self.headless = headless
options = webdriver.ChromeOptions()
if self.headless:
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--start-maximized")
options.add_argument(f"--user-agent={LATEST_USER_AGENT}")
options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
options.add_experimental_option('useAutomationExtension', False)
options.add_argument("--disable-blink-features=AutomationControlled")
self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
self.driver.get("https://web.whatsapp.com/")
# Load cookies if exist
if os.path.exists(self.COOKIE_PATH):
cookies = pickle.load(open(self.COOKIE_PATH, "rb"))
for cookie in cookies:
self.driver.add_cookie(cookie)
self.driver.refresh()
sleep(5) # Give time for refresh to take effect
# fluent wait
self.wait = WebDriverWait(self.driver, timeout=20, poll_frequency=1, ignored_exceptions=[ElementNotVisibleException])
# Check if logged in by looking for the search bar
logged_in = self.is_logged_in()
if not logged_in:
self.handle_login()
# closing
atexit.register(self.__close)
def is_logged_in(self):
try:
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[placeholder="Search or start new chat"]')))
return True
except TimeoutException:
return False
def handle_login(self):
try:
# Wait for QR code canvas
qr_canvas = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'canvas[aria-label="Scan this QR code to link a device!"]')))
# Get data-ref from parent div
qr_element = qr_canvas.find_element(By.XPATH, "..")
qr_info = qr_element.get_attribute("data-ref")
qr_code = pyqrcode.create(qr_info)
print(qr_code.terminal(module_color="black", background="white"))
print("Scan the QR code above with your WhatsApp app.")
input("Press enter after scanning QR code")
# Wait for login to complete
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Search input textbox"]')))
self.save_cookies()
print("Login successful.")
except TimeoutException:
raise Exception("Failed to find QR code or complete login")
# function to correctly format the number
# default is indian
def format_number(self, phone_number):
phone_number = str(phone_number).replace(" ", "").replace("+", "").strip()
if len(phone_number) == 10:
phone_number = f"{self.DEFAULT_EXT}{phone_number}"
return phone_number
# send message to number
def sendMessage(self, number: str, message: str, delayAfterMessage: int = 3):
message = quote(message)
number = self.format_number(number)
url = f"https://web.whatsapp.com/send?phone={number}&text={message}"
self.driver.get(url)
try:
# Wait for the compose box to be ready
compose_box = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-placeholder="Type a message"]')))
sleep(1) # Small delay
# Since message is pre-filled via URL, just send
compose_box.send_keys(Keys.ENTER)
sleep(delayAfterMessage)
except TimeoutException:
t = str(time())
error_string = f"Error in sending message to {number} (timeout)"
print(error_string)
with open("error.log", "a") as f:
f.write(f"\n{t} : {error_string}")
f.close()
if not os.path.exists(self.SCREENSHOT_FOLDER):
os.makedirs(self.SCREENSHOT_FOLDER)
self.driver.save_screenshot(os.path.join(self.SCREENSHOT_FOLDER, f"{t}.png"))
def sendBulkMessage(self, df: pd.DataFrame, message: str = "Hello from WhatsApp Bot!", column=1, delayAfterMessage: int = 4, mappings: dict = None):
if isinstance(column, int):
num_col = df.columns[column]
else:
num_col = column
for _, row in df.iterrows():
num = row[num_col]
if mappings:
format_dict = {}
for placeholder, col_name in mappings.items():
if isinstance(col_name, int):
col_name = df.columns[col_name]
format_dict[placeholder] = row[col_name]
personalized_message = message.format(**format_dict)
else:
personalized_message = message
self.sendMessage(num, personalized_message, delayAfterMessage)
# change default phone extension
def changeDefaultExt(self, newExt: str):
newExt = newExt.replace(" ", "").replace("+", "").strip()
self.DEFAULT_EXT = newExt
# change default screenshot folder
def changeDefaultScreenshotLocation(self, newLocation: str):
self.SCREENSHOT_FOLDER = newLocation
def save_cookies(self):
pickle.dump(self.driver.get_cookies(), open(self.COOKIE_PATH, "wb"))
def __close(self):
self.driver.quit()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("file", help="Path to Excel file containing numbers to send message to")
parser.add_argument("-c", "--column", help="Column name or number where numbers are located", default=1)
parser.add_argument("message", nargs='?', help="Text file containing message to send", default="Hello from WhatsApp Bot!")
parser.add_argument("-d", "--delay", help="Time (in seconds) to wait after sending the message. Default = 4", default=4, type=int)
parser.add_argument("-s", "--string", help="Treat message as string input", action='store_true')
parser.add_argument("-e", "--extension", help="Change default phone extension. Default is Indian: 91")
parser.add_argument("--screenshot", help="Defines error screenshot folder")
parser.add_argument("-z", "--head", help="Runs without headless mode", action="store_false", default=True)
parser.add_argument("--mappings", type=str, help="Python dict string mapping of placeholders to columns, e.g. \"{'name': 'Name'}\"", default=None)
args = parser.parse_args()
bot = WhatsAppBot(args.head)
if args.string:
message = args.message
else:
with open(args.message) as f:
message = f.read()
f.close()
if args.extension is not None:
bot.changeDefaultExt(args.extension)
if args.screenshot is not None:
bot.changeDefaultScreenshotLocation(args.screenshot)
try:
df = pd.read_excel(args.file, dtype=str)
except FileNotFoundError:
print("File not found:", args.file)
exit()
if args.mappings:
if args.mappings.startswith(("'", '"')) and args.mappings.endswith(args.mappings[0]):
args.mappings = args.mappings[1:-1]
mappings = ast.literal_eval(args.mappings)
else:
mappings = None
bot.sendBulkMessage(df=df, message=message, column=args.column, delayAfterMessage=args.delay, mappings=mappings)