-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrawler.py
More file actions
407 lines (346 loc) · 16.2 KB
/
crawler.py
File metadata and controls
407 lines (346 loc) · 16.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# -*- coding: utf-8 -*-
"""
天气爬虫核心模块
使用 crawl4ai 进行网页爬取和数据提取
"""
import asyncio
import logging
import random
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from config import (
WEATHER_URL_TEMPLATE,
HISTORY_URL_TEMPLATE,
CITY_CODES,
CITY_PINYIN,
DATA_FIELDS,
REQUEST_DELAY,
RANDOM_DELAY_MIN,
RANDOM_DELAY_MAX,
MAX_RETRIES,
REQUEST_TIMEOUT,
HEADLESS,
BROWSER_TYPE,
USE_SYSTEM_CHROME,
)
logger = logging.getLogger(__name__)
class WeatherCrawler:
"""
天气数据爬虫类
支持爬取实时天气和历史天气数据
"""
def __init__(
self,
cities: List[str],
start_date: Optional[str] = None,
end_date: Optional[str] = None,
fields: Optional[Dict[str, bool]] = None,
custom_url_template: Optional[str] = None,
custom_extraction_schema: Optional[dict] = None,
):
"""
初始化爬虫
Args:
cities: 要爬取的城市列表
start_date: 开始日期 (格式: YYYY-MM-DD)
end_date: 结束日期 (格式: YYYY-MM-DD)
fields: 要爬取的字段配置,覆盖默认配置
custom_url_template: 自定义URL模板
custom_extraction_schema: 自定义数据提取schema
"""
self.cities = cities
self.start_date = start_date
self.end_date = end_date
self.fields = fields or DATA_FIELDS.copy()
self.custom_url_template = custom_url_template
self.custom_extraction_schema = custom_extraction_schema
self.results: List[Dict[str, Any]] = []
# 浏览器配置 - 支持使用系统Chrome,添加反反爬配置
browser_kwargs = {
"headless": HEADLESS,
"browser_type": BROWSER_TYPE,
# 模拟真实浏览器,绕过反爬检测
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
},
}
# 如果启用系统Chrome,使用channel方式调用系统已安装的Chrome
if USE_SYSTEM_CHROME:
browser_kwargs["chrome_channel"] = "chrome"
self.browser_config = BrowserConfig(**browser_kwargs)
def _get_date_range(self) -> List[str]:
"""获取日期范围列表"""
if not self.start_date or not self.end_date:
return [datetime.now().strftime("%Y%m")]
start = datetime.strptime(self.start_date, "%Y-%m-%d")
end = datetime.strptime(self.end_date, "%Y-%m-%d")
dates = []
current = start
while current <= end:
dates.append(current.strftime("%Y%m"))
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
return list(set(dates))
def _build_url(self, city: str, date: str, is_history: bool = True) -> str:
"""构建爬取URL"""
if self.custom_url_template:
return self.custom_url_template.format(city=city, date=date)
if is_history:
city_pinyin = CITY_PINYIN.get(city, city.lower())
return HISTORY_URL_TEMPLATE.format(city=city_pinyin, date=date)
else:
city_code = CITY_CODES.get(city, city)
return WEATHER_URL_TEMPLATE.format(city=city_code)
def _get_extraction_schema(self) -> dict:
"""获取数据提取schema"""
if self.custom_extraction_schema:
return self.custom_extraction_schema
# lishi.tianqi.com 历史天气提取schema
# 每行结构: 日期(th200) | 最高温(th140) | 最低温(th140) | 天气(th140) | 风向风力(th140)
return {
"name": "weather_data",
"baseSelector": "ul.thrui li",
"fields": [
{"name": "date", "selector": "div.th200", "type": "text"},
{"name": "temp_high", "selector": "div.th140:nth-of-type(1)", "type": "text"},
{"name": "temp_low", "selector": "div.th140:nth-of-type(2)", "type": "text"},
{"name": "weather", "selector": "div.th140:nth-of-type(3)", "type": "text"},
{"name": "wind", "selector": "div.th140:nth-of-type(4)", "type": "text"},
],
}
async def _crawl_single_page(
self, crawler: AsyncWebCrawler, url: str, city: str
) -> List[Dict[str, Any]]:
"""爬取单个页面"""
logger.info(f"正在爬取: {url}")
# 添加JavaScript来点击"查看更多"按钮,加载全部数据
js_click_more = """
(async () => {
// 查找并点击"查看更多"按钮
const moreBtn = document.querySelector('.lishidesc2');
if (moreBtn) {
moreBtn.click();
await new Promise(r => setTimeout(r, 2000));
}
})();
"""
run_config = CrawlerRunConfig(
wait_until="networkidle",
page_timeout=REQUEST_TIMEOUT * 1000,
js_code=js_click_more, # 执行JS点击加载更多
delay_before_return_html=3.0, # 等待数据加载
)
for attempt in range(MAX_RETRIES):
try:
result = await crawler.arun(url=url, config=run_config)
if result.success and result.html:
# 直接使用HTML解析,更可靠
data = self._parse_html_fallback(result.html, city)
if data:
logger.info(f"成功提取 {len(data)} 条数据")
return data
# 检查是否被反爬拦截(403错误)
if result.html and ('403' in result.html or 'Forbidden' in result.html):
wait_time = REQUEST_DELAY + random.uniform(RANDOM_DELAY_MIN * 2, RANDOM_DELAY_MAX * 2)
logger.warning(f"被反爬拦截,等待 {wait_time:.1f}s 后重试: {url}, 尝试 {attempt + 1}/{MAX_RETRIES}")
await asyncio.sleep(wait_time)
continue
logger.warning(f"页面爬取失败: {url}, 尝试 {attempt + 1}/{MAX_RETRIES}")
except Exception as e:
logger.error(f"爬取异常: {e}, 尝试 {attempt + 1}/{MAX_RETRIES}")
# 添加随机延迟,避免被识别为爬虫
wait_time = REQUEST_DELAY + random.uniform(RANDOM_DELAY_MIN, RANDOM_DELAY_MAX)
await asyncio.sleep(wait_time)
return []
def _parse_html_fallback(self, html: str, city: str) -> List[Dict[str, Any]]:
"""解析每日天气数据"""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
data = []
# 查找天气数据表格 - ul.thrui 下的 li 元素
rows = soup.select('ul.thrui li')
for row in rows:
divs = row.find_all('div', recursive=False)
# 每行有5个div: 日期、最高温、最低温、天气、风向风力
if len(divs) >= 5:
date_text = self._clean_value(divs[0].get_text())
# 过滤掉表头行
if date_text and not date_text.startswith("日期"):
# 风向风力在同一个div里,如 "西北风 3级"
wind_text = self._clean_value(divs[4].get_text())
wind_parts = wind_text.split()
wind_direction = wind_parts[0] if wind_parts else wind_text
wind_level = wind_parts[1] if len(wind_parts) > 1 else ""
record = {
"city": city,
"date": date_text,
"temp_high": self._clean_value(divs[1].get_text()),
"temp_low": self._clean_value(divs[2].get_text()),
"weather": self._clean_value(divs[3].get_text()),
"wind_direction": wind_direction,
"wind_level": wind_level,
}
data.append(record)
return data
except ImportError:
logger.warning("BeautifulSoup未安装,无法使用备用解析")
return []
except Exception as e:
logger.error(f"HTML解析失败: {e}")
return []
def _parse_monthly_stats(self, html: str, city: str, year_month: str) -> Dict[str, Any]:
"""解析月度统计数据"""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
stats = {
"city": city,
"year_month": year_month,
"avg_temp_high": "", # 平均高温
"avg_temp_low": "", # 平均低温
"extreme_temp_high": "", # 极端高温
"extreme_temp_low": "", # 极端低温
"avg_aqi": "", # 平均空气质量指数
"best_aqi": "", # 空气最好
"best_aqi_date": "", # 空气最好日期
"worst_aqi": "", # 空气最差
"worst_aqi_date": "", # 空气最差日期
}
# 查找月度统计区域 ul.tian_two
stat_items = soup.select('ul.tian_two li')
for item in stat_items:
# 获取所有 tian_twoa 和 tian_twob 元素
values = item.select('.tian_twoa')
labels = item.select('.tian_twob')
# 处理包含多个统计项的li(如平均高温和平均低温在同一个li中)
for i, (val_elem, lab_elem) in enumerate(zip(values, labels)):
value = val_elem.get_text().strip()
label_text = lab_elem.get_text().strip()
# 提取日期(如果有)
date_span = lab_elem.select_one('span')
date_text = ""
if date_span:
date_text = date_span.get_text().strip().strip('()')
# 从label中移除日期部分
label_text = label_text.replace(date_span.get_text(), '').strip()
if "平均高温" in label_text:
stats["avg_temp_high"] = value
elif "平均低温" in label_text:
stats["avg_temp_low"] = value
elif "极端高温" in label_text:
stats["extreme_temp_high"] = value
elif "极端低温" in label_text:
stats["extreme_temp_low"] = value
elif "平均空气质量" in label_text:
stats["avg_aqi"] = value
elif "空气最好" in label_text:
stats["best_aqi"] = value
stats["best_aqi_date"] = date_text
elif "空气最差" in label_text:
stats["worst_aqi"] = value
stats["worst_aqi_date"] = date_text
return stats
except Exception as e:
logger.error(f"月度统计解析失败: {e}")
return {}
def _clean_value(self, value: str) -> str:
"""清理提取的值"""
if not value:
return ""
value = re.sub(r'\s+', ' ', value.strip())
return value
async def crawl(self) -> List[Dict[str, Any]]:
"""执行爬取任务(每日数据)"""
self.results = []
date_range = self._get_date_range()
logger.info(f"开始爬取任务: 城市={self.cities}, 日期范围={date_range}")
async with AsyncWebCrawler(config=self.browser_config) as crawler:
for city in self.cities:
for date in date_range:
url = self._build_url(city, date, is_history=True)
data = await self._crawl_single_page(crawler, url, city)
self.results.extend(data)
# 添加随机延迟
wait_time = REQUEST_DELAY + random.uniform(RANDOM_DELAY_MIN, RANDOM_DELAY_MAX)
await asyncio.sleep(wait_time)
logger.info(f"爬取完成,共获取 {len(self.results)} 条数据")
return self.results
async def crawl_monthly_stats(self) -> List[Dict[str, Any]]:
"""爬取月度统计数据"""
monthly_stats = []
date_range = self._get_date_range()
logger.info(f"开始爬取月度统计: 城市={self.cities}, 月份={date_range}")
# 添加JavaScript来点击"查看更多"按钮
js_click_more = """
(async () => {
const moreBtn = document.querySelector('.lishidesc2');
if (moreBtn) {
moreBtn.click();
await new Promise(r => setTimeout(r, 2000));
}
})();
"""
run_config = CrawlerRunConfig(
wait_until="networkidle",
page_timeout=REQUEST_TIMEOUT * 1000,
js_code=js_click_more,
delay_before_return_html=3.0,
)
async with AsyncWebCrawler(config=self.browser_config) as crawler:
for city in self.cities:
for year_month in date_range:
url = self._build_url(city, year_month, is_history=True)
logger.info(f"正在爬取月度统计: {url}")
for attempt in range(MAX_RETRIES):
try:
result = await crawler.arun(url=url, config=run_config)
if result.success and result.html:
# 检查是否被反爬拦截
if '403' in result.html or 'Forbidden' in result.html:
wait_time = REQUEST_DELAY + random.uniform(RANDOM_DELAY_MIN * 2, RANDOM_DELAY_MAX * 2)
logger.warning(f"被反爬拦截,等待 {wait_time:.1f}s 后重试")
await asyncio.sleep(wait_time)
continue
stats = self._parse_monthly_stats(result.html, city, year_month)
if stats and stats.get("avg_temp_high"):
monthly_stats.append(stats)
logger.info(f"成功提取 {city} {year_month} 月度统计")
break
except Exception as e:
logger.error(f"爬取月度统计失败: {e}, 尝试 {attempt + 1}/{MAX_RETRIES}")
wait_time = REQUEST_DELAY + random.uniform(RANDOM_DELAY_MIN, RANDOM_DELAY_MAX)
await asyncio.sleep(wait_time)
# 每个月份之间的延迟
wait_time = REQUEST_DELAY + random.uniform(RANDOM_DELAY_MIN, RANDOM_DELAY_MAX)
await asyncio.sleep(wait_time)
logger.info(f"月度统计爬取完成,共获取 {len(monthly_stats)} 条记录")
return monthly_stats
def get_results(self) -> List[Dict[str, Any]]:
"""获取爬取结果"""
return self.results
class CustomWeatherCrawler(WeatherCrawler):
"""自定义天气爬虫,支持用户自定义URL和提取规则"""
def __init__(
self,
cities: List[str],
url_template: str,
extraction_schema: dict,
**kwargs
):
super().__init__(
cities=cities,
custom_url_template=url_template,
custom_extraction_schema=extraction_schema,
**kwargs
)