-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
193 lines (136 loc) · 8.74 KB
/
scraper.py
File metadata and controls
193 lines (136 loc) · 8.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import asyncio
from requests_html import AsyncHTMLSession
from bs4 import BeautifulSoup
import json
# Client of the Mongo Database.
from db.client import db_client
class Scraper:
def __init__(self, product: str):
# Create an AsyncHTMLSession object for making asynchronous HTTP requests.
self.session = AsyncHTMLSession()
# Store the product name in an instance variable.
self.product = product.replace(' ', '-').replace('_', '-')
self.urls_collection_name, self.data_collection_name = self.get_unique_collection_names()
def get_unique_collection_names(self):
""" Generate unique collection names to avoid overwriting data. """
base_urls = f"urls_{self.product}"
base_data = f"data_{self.product}"
# Check if they already exist in the database and add numeric suffix.
existing_collections = db_client.monguito.list_collection_names()
count = 1
urls_name, data_name = base_urls, base_data
while urls_name in existing_collections or data_name in existing_collections:
count += 1
urls_name = f"{base_urls}_{count}"
data_name = f"{base_data}_{count}"
return urls_name, data_name
async def get_urls(self):
# Calling the product that user choosed from the init method.
self.product = self.product
""" First loop for looping into the pages of a single search. """
# Loop through pages, starting from 1, incrementing by 48 up to 1921.
for page in range(1, 47, 48): # Change 1921 to 47 to testing, just to don't have to scrape a massive amount of data for a little test.
# Construct the URL for each page using the product name and page number.
main_url = f'https://listado.mercadolibre.com.mx/{self.product}_Desde_{page}_NoIndex_True'
try:
# Perform an asynchronous GET request to fetch the page content.
response = await self.session.get(main_url)
# Parse the HTML content using BeautifulSoup.
soup = BeautifulSoup(response.html.html, 'html.parser')
# Find all anchor elements with the specified class.
elements_a = soup.find_all('a', class_="poly-component__title")
""" Second loop for getting all the urls for the almost 54 products. """
# Loop through each anchor element found.
for element_a in elements_a:
# Extract the URL from the href attribute of the anchor tag.
url = element_a.get('href', '')
# If the url exists then add it to a dictionary, and then inserting it to the Mongo DB urls collection.
if url:
url_dict = {"url": url}
db_client.monguito[self.urls_collection_name].insert_one(url_dict)
print(f"URL stored: {url}")
# This error handle is for preventing the running of the API to die if there's a problem scraping the urls.
except Exception as e:
print(f"Error fetching URLs: {e}")
print("Url scraping process successfully finished! ")
async def get_data(self):
# Querying the collection urls for the Mongo DB.
url_collection = db_client.monguito[self.urls_collection_name].find({})
# For to pass through all the documents of the collection urls.
for document in url_collection:
# Getting the values of the url field in the document.
url = document.get("url")
# If there is a url in the document then try to make the scraping process.
if url:
try:
# Perform an asynchronous GET request to fetch the page content of each URL.
request_url = await self.session.get(url)
# Parse the HTML content using BeautifulSoup.
soup = BeautifulSoup(request_url.html.html, 'html.parser')
# Extract various details from the product page.
name_tag = soup.find('h1', {'class': 'ui-pdp-title'})
name = name_tag.text if name_tag else None
price_tag = soup.find('div', {'class': 'ui-pdp-price__second-line'}).find('span', {'class': 'andes-money-amount__fraction'})
price = price_tag.text if price_tag else None
category_tag = soup.find("a", {'class': 'andes-breadcrumb__link'})
category = category_tag.text if category_tag else None
availability_tag = soup.find('span', {'class': 'ui-pdp-buybox__quantity__available'})
availability = availability_tag.text if availability_tag else None
sold_tag = soup.find('span', {'class': 'ui-pdp-subtitle'})
sold = sold_tag.text if sold_tag else None
rating_tag = soup.find('span', {'class': 'ui-pdp-review__rating'})
rating = rating_tag.text if rating_tag else None
amount_rating_tag = soup.find('span', {'class': 'ui-pdp-review__amount'})
amount_rating = amount_rating_tag.text if amount_rating_tag else None
product_code_tag = soup.find('span', {'class': 'ui-pdp-color--BLUE ui-pdp-family--REGULAR'})
product_code = product_code_tag.text if product_code_tag else None
product_number_tag = soup.find('span', {'class': 'ui-pdp-color--BLACK ui-pdp-family--SEMIBOLD'})
product_number = product_number_tag.text if product_number_tag else None
price_discount_tag = soup.find('div', {'class': 'ui-pdp-price__main-container'}).find('span', {'class': 'andes-money-amount__discount'})
price_discount = price_discount_tag.text if price_discount_tag else None
amount_fraction_tag = soup.find('span', class_=lambda x: x is not None and 'andes-money-amount__fraction' in x)
amount_fraction = amount_fraction_tag.text if amount_fraction_tag else None
img_zoom_tags = soup.select('div.ui-pdp-gallery__column .ui-pdp-gallery__wrapper figure.ui-pdp-gallery__figure img[data-zoom]')
img_zoom_urls = [img_zoom['data-zoom'] for img_zoom in img_zoom_tags]
# Create a dictionary with the extracted data.
data_dict = {
"_id": document["_id"], # Using the same id for the respective url document.
"Name": name,
"Price": price,
"Category": category,
"Available Units": availability,
"Sold Quantity": sold,
"Rating": rating,
"Number of Ratings": amount_rating,
"Product Code": product_code,
"Product Number": product_number,
"Discount": price_discount,
"Original Price": amount_fraction,
"Images": img_zoom_urls,
}
# Inserting the dictionary with all the scraped data of a product into the data collection of the Mongo DB.
data_insert = db_client.monguito[self.data_collection_name].insert_one(data_dict)
# Fixing the type of the id cause it generates problems for the printing in console.
data_dict["_id"] = str(data_insert.inserted_id)
# Convert the dictionary to a JSON formatted string.
json_data = json.dumps(data_dict, indent=2)
# Optionally, print or save json_data as needed.
print(json_data)
# This error handle is for preventing the running of the API to die if there's a problem scraping the data of each url.
except Exception as e:
print(f"Error fetching data: {e}")
else:
print("No URL found in document.")
print("Data scraping process successfully finished! ")
# Example of usage and testing of the scraper.
async def main():
product_to_scrape = "laptop"
# Create an instance of the Scraper class with a product name.
scraper = Scraper(product_to_scrape)
# Run the get_urls method to fetch URLs.
await scraper.get_urls()
# Run the get_data method to fetch and process data from the URLs.
await scraper.get_data()
if __name__ == "__main__":
# Run the main function using asyncio.
asyncio.run(main())