Skip to content

Commit 478e1bd

Browse files
committed
Merge branch 'ShellMonster-feature/csv-pipeline'
2 parents 0301b71 + 52dc67d commit 478e1bd

13 files changed

Lines changed: 1361 additions & 23 deletions

File tree

docs/source_code/Item.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,26 @@ class SpiderDataItem(Item):
102102
self.title = self.title.strip()
103103
```
104104

105+
## 指定入库使用的pipelines
106+
107+
```python
108+
109+
from feapder import Item
110+
from feapder.pipelines.csv_pipeline import CsvPipeline
111+
112+
113+
class SpiderDataItem(Item):
114+
115+
__pipelines__ = [CsvPipeline()]
116+
117+
def __init__(self, *args, **kwargs):
118+
# self.id = None
119+
self.title = None
120+
```
121+
122+
使用__pipelines__指定后,该item只会流经指定的pipelines处理
123+
124+
105125
## 更新数据
106126

107127
采集过程中,往往会有些数据漏采或解析出错,如果我们想更新已入库的数据,可将Item转为UpdateItem

docs/source_code/pipeline.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,26 @@
22

33
Pipeline是数据入库时流经的管道,用户可自定义,以便对接其他数据库。
44

5-
框架已内置mysql及mongo管道,其他管道作为扩展方式提供,可从[feapder_pipelines](https://github.com/Boris-code/feapder_pipelines)项目中按需安装
5+
框架已内置mysql、mongo、csv管道,其他管道作为扩展方式提供,可从[feapder_pipelines](https://github.com/Boris-code/feapder_pipelines)项目中按需安装
66

77
项目地址:https://github.com/Boris-code/feapder_pipelines
88

9-
## 使用方式
9+
## 选择内置的pipeline
10+
11+
在配置文件 `setting.py` 中的 `ITEM_PIPELINES` 中启用:
12+
13+
```python
14+
ITEM_PIPELINES = [
15+
"feapder.pipelines.mysql_pipeline.MysqlPipeline",
16+
# "feapder.pipelines.mongo_pipeline.MongoPipeline",
17+
# "feapder.pipelines.csv_pipeline.CsvPipeline",
18+
# "feapder.pipelines.console_pipeline.ConsolePipeline",
19+
]
20+
```
21+
22+
然后 爬虫中`yield``item`会流经选择的pipeline自动存储
23+
24+
## 自定义pipeline
1025

1126
注:item会被聚合成多条一起流经pipeline,方便批量入库
1227

feapder/buffer/item_buffer.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,18 @@ def __init__(self, redis_key, task_table=None):
5252
# 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
5353
}
5454

55+
self._item_pipelines = {
56+
# 'table_name': ['pipeline1', 'pipeline2'] # 缓存table_name与pipelines的关系
57+
}
58+
5559
self._pipelines = self.load_pipelines()
5660

5761
self._have_mysql_pipeline = MYSQL_PIPELINE_PATH in setting.ITEM_PIPELINES
5862
self._mysql_pipeline = None
5963

6064
if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
6165
if setting.ITEM_FILTER_SETTING.get(
62-
"filter_type"
66+
"filter_type"
6367
) == Dedup.BloomFilter or setting.ITEM_FILTER_SETTING.get("name"):
6468
self.__class__.dedup = Dedup(
6569
to_md5=False, **setting.ITEM_FILTER_SETTING
@@ -217,7 +221,7 @@ def __pick_items(self, items, is_update_item=False):
217221
将每个表之间的数据分开 拆分后 原items为空
218222
@param items:
219223
@param is_update_item:
220-
@return:
224+
@return: 表名与数据的字典
221225
"""
222226
datas_dict = {
223227
# 'table_name': [{}, {}]
@@ -232,22 +236,24 @@ def __pick_items(self, items, is_update_item=False):
232236
if not table_name:
233237
table_name = item.table_name
234238
self._item_tables[item_name] = table_name
239+
self._item_pipelines[table_name] = item.pipelines
240+
241+
if is_update_item and table_name not in self._item_update_keys:
242+
self._item_update_keys[table_name] = item.update_key
235243

236244
if table_name not in datas_dict:
237245
datas_dict[table_name] = []
238246

239247
datas_dict[table_name].append(item.to_dict)
240248

241-
if is_update_item and table_name not in self._item_update_keys:
242-
self._item_update_keys[table_name] = item.update_key
243-
244249
return datas_dict
245250

246-
def __export_to_db(self, table, datas, is_update=False, update_keys=()):
247-
for pipeline in self._pipelines:
251+
def __export_to_db(self, table, datas, is_update=False, update_keys=(), used_pipelines=None):
252+
pipelines = used_pipelines or self._pipelines # 优先采用指定的pipelines
253+
for pipeline in pipelines:
248254
if is_update:
249255
if table == self._task_table and not isinstance(
250-
pipeline, MysqlPipeline
256+
pipeline, MysqlPipeline
251257
):
252258
continue
253259

@@ -267,7 +273,7 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=()):
267273
# 若是任务表, 且上面的pipeline里没mysql,则需调用mysql更新任务
268274
if not self._have_mysql_pipeline and is_update and table == self._task_table:
269275
if not self.mysql_pipeline.update_items(
270-
table, datas, update_keys=update_keys
276+
table, datas, update_keys=update_keys
271277
):
272278
log.error(
273279
f"{self.mysql_pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
@@ -278,7 +284,7 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=()):
278284
return True
279285

280286
def __add_item_to_db(
281-
self, items, update_items, requests, callbacks, items_fingerprints
287+
self, items, update_items, requests, callbacks, items_fingerprints
282288
):
283289
export_success = True
284290
self._is_adding_to_db = True
@@ -287,14 +293,15 @@ def __add_item_to_db(
287293
if setting.ITEM_FILTER_ENABLE:
288294
items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
289295

290-
# 分捡
296+
# 分捡(返回值包含 pipelines_dict)
291297
items_dict = self.__pick_items(items)
292298
update_items_dict = self.__pick_items(update_items, is_update_item=True)
293299

294300
# item批量入库
295301
failed_items = {"add": [], "update": [], "requests": []}
296302
while items_dict:
297303
table, datas = items_dict.popitem()
304+
used_pipelines = self._item_pipelines.get(table)
298305

299306
log.debug(
300307
"""
@@ -305,13 +312,14 @@ def __add_item_to_db(
305312
% (table, tools.dumps_json(datas, indent=16))
306313
)
307314

308-
if not self.__export_to_db(table, datas):
315+
if not self.__export_to_db(table, datas, used_pipelines=used_pipelines):
309316
export_success = False
310317
failed_items["add"].append({"table": table, "datas": datas})
311318

312319
# 执行批量update
313320
while update_items_dict:
314321
table, datas = update_items_dict.popitem()
322+
used_pipelines = self._item_pipelines.get(table)
315323

316324
log.debug(
317325
"""
@@ -324,7 +332,7 @@ def __add_item_to_db(
324332

325333
update_keys = self._item_update_keys.get(table)
326334
if not self.__export_to_db(
327-
table, datas, is_update=True, update_keys=update_keys
335+
table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines
328336
):
329337
export_success = False
330338
failed_items["update"].append(

feapder/db/mysqldb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def convert(col):
202202
if isinstance(col, (datetime.date, datetime.time)):
203203
return str(col)
204204
elif isinstance(col, str) and (
205-
col.startswith("{") or col.startswith("[")
205+
col.startswith("{") or col.startswith("[")
206206
):
207207
try:
208208
# col = self.unescape_string(col)

feapder/network/item.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"""
1010

1111
import re
12+
from typing import List
1213

1314
import feapder.utils.tools as tools
1415

@@ -20,12 +21,14 @@ def __new__(cls, name, bases, attrs):
2021
attrs.setdefault("__name_underline__", None)
2122
attrs.setdefault("__update_key__", None)
2223
attrs.setdefault("__unique_key__", None)
24+
attrs.setdefault("__pipelines__", None)
2325

2426
return type.__new__(cls, name, bases, attrs)
2527

2628

2729
class Item(metaclass=ItemMetaclass):
28-
__unique_key__ = []
30+
__unique_key__: List = []
31+
__pipelines__: List = None
2932

3033
def __init__(self, **kwargs):
3134
self.__dict__ = kwargs
@@ -64,11 +67,12 @@ def to_dict(self):
6467
propertys = {}
6568
for key, value in self.__dict__.items():
6669
if key not in (
67-
"__name__",
68-
"__table_name__",
69-
"__name_underline__",
70-
"__update_key__",
71-
"__unique_key__",
70+
"__name__",
71+
"__table_name__",
72+
"__name_underline__",
73+
"__update_key__",
74+
"__unique_key__",
75+
"__pipelines__",
7276
):
7377
if key.startswith(f"_{self.__class__.__name__}"):
7478
key = key.replace(f"_{self.__class__.__name__}", "")
@@ -123,6 +127,17 @@ def unique_key(self, keys):
123127
else:
124128
self.__unique_key__ = (keys,)
125129

130+
@property
131+
def pipelines(self):
132+
return self.__pipelines__ or self.__class__.__pipelines__
133+
134+
@pipelines.setter
135+
def pipelines(self, pipelines):
136+
if isinstance(pipelines, (tuple, list)):
137+
self.__pipelines__ = pipelines
138+
else:
139+
self.__pipelines__ = (pipelines,)
140+
126141
@property
127142
def fingerprint(self):
128143
args = []

0 commit comments

Comments
 (0)