Skip to content

Commit 52dc67d

Browse files
committed
支持csv pipelines, item 支持指定使用的pipelines
1 parent cf40a26 commit 52dc67d

14 files changed

Lines changed: 115 additions & 1243 deletions

File tree

docs/csv_pipeline.md

Lines changed: 0 additions & 544 deletions
This file was deleted.

docs/source_code/Item.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,26 @@ class SpiderDataItem(Item):
102102
self.title = self.title.strip()
103103
```
104104

105+
## 指定入库使用的pipelines
106+
107+
```python
108+
109+
from feapder import Item
110+
from feapder.pipelines.csv_pipeline import CsvPipeline
111+
112+
113+
class SpiderDataItem(Item):
114+
115+
__pipelines__ = [CsvPipeline()]
116+
117+
def __init__(self, *args, **kwargs):
118+
# self.id = None
119+
self.title = None
120+
```
121+
122+
使用__pipelines__指定后,该item只会流经指定的pipelines处理
123+
124+
105125
## 更新数据
106126

107127
采集过程中,往往会有些数据漏采或解析出错,如果我们想更新已入库的数据,可将Item转为UpdateItem

docs/source_code/pipeline.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,26 @@
22

33
Pipeline是数据入库时流经的管道,用户可自定义,以便对接其他数据库。
44

5-
框架已内置mysql及mongo管道,其他管道作为扩展方式提供,可从[feapder_pipelines](https://github.com/Boris-code/feapder_pipelines)项目中按需安装
5+
框架已内置mysql、mongo、csv管道,其他管道作为扩展方式提供,可从[feapder_pipelines](https://github.com/Boris-code/feapder_pipelines)项目中按需安装
66

77
项目地址:https://github.com/Boris-code/feapder_pipelines
88

9-
## 使用方式
9+
## 选择内置的pipeline
10+
11+
在配置文件 `setting.py` 中的 `ITEM_PIPELINES` 中启用:
12+
13+
```python
14+
ITEM_PIPELINES = [
15+
"feapder.pipelines.mysql_pipeline.MysqlPipeline",
16+
# "feapder.pipelines.mongo_pipeline.MongoPipeline",
17+
# "feapder.pipelines.csv_pipeline.CsvPipeline",
18+
# "feapder.pipelines.console_pipeline.ConsolePipeline",
19+
]
20+
```
21+
22+
然后 爬虫中`yield``item`会流经选择的pipeline自动存储
23+
24+
## 自定义pipeline
1025

1126
注:item会被聚合成多条一起流经pipeline,方便批量入库
1227

examples/csv_pipeline_example.py

Lines changed: 0 additions & 144 deletions
This file was deleted.

feapder/buffer/item_buffer.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,18 @@ def __init__(self, redis_key, task_table=None):
5252
# 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
5353
}
5454

55+
self._item_pipelines = {
56+
# 'table_name': ['pipeline1', 'pipeline2'] # 缓存table_name与pipelines的关系
57+
}
58+
5559
self._pipelines = self.load_pipelines()
5660

5761
self._have_mysql_pipeline = MYSQL_PIPELINE_PATH in setting.ITEM_PIPELINES
5862
self._mysql_pipeline = None
5963

6064
if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
6165
if setting.ITEM_FILTER_SETTING.get(
62-
"filter_type"
66+
"filter_type"
6367
) == Dedup.BloomFilter or setting.ITEM_FILTER_SETTING.get("name"):
6468
self.__class__.dedup = Dedup(
6569
to_md5=False, **setting.ITEM_FILTER_SETTING
@@ -217,14 +221,11 @@ def __pick_items(self, items, is_update_item=False):
217221
将每个表之间的数据分开 拆分后 原items为空
218222
@param items:
219223
@param is_update_item:
220-
@return: (datas_dict, pipelines_dict)
224+
@return: 表名与数据的字典
221225
"""
222226
datas_dict = {
223227
# 'table_name': [{}, {}]
224228
}
225-
pipelines_dict = {
226-
# 'table_name': ['csv', 'mysql'] or None
227-
}
228229

229230
while items:
230231
item = items.pop(0)
@@ -235,32 +236,24 @@ def __pick_items(self, items, is_update_item=False):
235236
if not table_name:
236237
table_name = item.table_name
237238
self._item_tables[item_name] = table_name
239+
self._item_pipelines[table_name] = item.pipelines
240+
241+
if is_update_item and table_name not in self._item_update_keys:
242+
self._item_update_keys[table_name] = item.update_key
238243

239244
if table_name not in datas_dict:
240245
datas_dict[table_name] = []
241-
# 保存这个 table 的 pipelines 配置(只需保存一次)
242-
pipelines_dict[table_name] = getattr(item, '__pipelines__', None)
243246

244247
datas_dict[table_name].append(item.to_dict)
245248

246-
if is_update_item and table_name not in self._item_update_keys:
247-
self._item_update_keys[table_name] = item.update_key
248-
249-
return datas_dict, pipelines_dict
250-
251-
def __export_to_db(self, table, datas, is_update=False, update_keys=(), allowed_pipelines=None):
252-
for pipeline in self._pipelines:
253-
# 如果 item 指定了 pipelines,检查是否匹配(忽略大小写)
254-
if allowed_pipelines is not None:
255-
pipeline_name = pipeline.__class__.__name__.replace("Pipeline", "").lower()
256-
# 将用户指定的 pipeline 名称也转为小写进行比较
257-
allowed_pipelines_lower = [p.lower() for p in allowed_pipelines]
258-
if pipeline_name not in allowed_pipelines_lower:
259-
continue # 跳过不匹配的 pipeline
249+
return datas_dict
260250

251+
def __export_to_db(self, table, datas, is_update=False, update_keys=(), used_pipelines=None):
252+
pipelines = used_pipelines or self._pipelines # 优先采用指定的pipelines
253+
for pipeline in pipelines:
261254
if is_update:
262255
if table == self._task_table and not isinstance(
263-
pipeline, MysqlPipeline
256+
pipeline, MysqlPipeline
264257
):
265258
continue
266259

@@ -280,7 +273,7 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=(), allowed_
280273
# 若是任务表, 且上面的pipeline里没mysql,则需调用mysql更新任务
281274
if not self._have_mysql_pipeline and is_update and table == self._task_table:
282275
if not self.mysql_pipeline.update_items(
283-
table, datas, update_keys=update_keys
276+
table, datas, update_keys=update_keys
284277
):
285278
log.error(
286279
f"{self.mysql_pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
@@ -291,7 +284,7 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=(), allowed_
291284
return True
292285

293286
def __add_item_to_db(
294-
self, items, update_items, requests, callbacks, items_fingerprints
287+
self, items, update_items, requests, callbacks, items_fingerprints
295288
):
296289
export_success = True
297290
self._is_adding_to_db = True
@@ -301,14 +294,14 @@ def __add_item_to_db(
301294
items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
302295

303296
# 分捡(返回值包含 pipelines_dict)
304-
items_dict, items_pipelines = self.__pick_items(items)
305-
update_items_dict, update_pipelines = self.__pick_items(update_items, is_update_item=True)
297+
items_dict = self.__pick_items(items)
298+
update_items_dict = self.__pick_items(update_items, is_update_item=True)
306299

307300
# item批量入库
308301
failed_items = {"add": [], "update": [], "requests": []}
309302
while items_dict:
310303
table, datas = items_dict.popitem()
311-
allowed_pipelines = items_pipelines.get(table)
304+
used_pipelines = self._item_pipelines.get(table)
312305

313306
log.debug(
314307
"""
@@ -319,14 +312,14 @@ def __add_item_to_db(
319312
% (table, tools.dumps_json(datas, indent=16))
320313
)
321314

322-
if not self.__export_to_db(table, datas, allowed_pipelines=allowed_pipelines):
315+
if not self.__export_to_db(table, datas, used_pipelines=used_pipelines):
323316
export_success = False
324317
failed_items["add"].append({"table": table, "datas": datas})
325318

326319
# 执行批量update
327320
while update_items_dict:
328321
table, datas = update_items_dict.popitem()
329-
allowed_pipelines = update_pipelines.get(table)
322+
used_pipelines = self._item_pipelines.get(table)
330323

331324
log.debug(
332325
"""
@@ -339,7 +332,7 @@ def __add_item_to_db(
339332

340333
update_keys = self._item_update_keys.get(table)
341334
if not self.__export_to_db(
342-
table, datas, is_update=True, update_keys=update_keys, allowed_pipelines=allowed_pipelines
335+
table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines
343336
):
344337
export_success = False
345338
failed_items["update"].append(

0 commit comments

Comments
 (0)