Skip to content

Commit 860e53b

Browse files
committed
refactor: harden buffer flush state
1 parent 064450a commit 860e53b

2 files changed

Lines changed: 150 additions & 96 deletions

File tree

feapder/buffer/item_buffer.py

Lines changed: 97 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -299,133 +299,134 @@ def __export_to_db(self, table, datas, is_update=False, update_keys=(), used_pip
299299
def __add_item_to_db(
300300
self, items, update_items, requests, callbacks, items_fingerprints
301301
):
302-
export_success = True
303302
self._is_adding_to_db = True
303+
try:
304+
export_success = True
304305

305-
# 去重
306-
if setting.ITEM_FILTER_ENABLE:
307-
items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
306+
# 去重
307+
if setting.ITEM_FILTER_ENABLE:
308+
items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
308309

309-
# 分捡(返回值包含 pipelines_dict)
310-
items_dict = self.__pick_items(items)
311-
update_items_dict = self.__pick_items(update_items, is_update_item=True)
310+
# 分捡(返回值包含 pipelines_dict)
311+
items_dict = self.__pick_items(items)
312+
update_items_dict = self.__pick_items(update_items, is_update_item=True)
312313

313-
# item批量入库
314-
failed_items = {"add": [], "update": [], "requests": []}
315-
while items_dict:
316-
table, datas = items_dict.popitem()
317-
used_pipelines = self._item_pipelines.get(table)
314+
# item批量入库
315+
failed_items = {"add": [], "update": [], "requests": []}
316+
while items_dict:
317+
table, datas = items_dict.popitem()
318+
used_pipelines = self._item_pipelines.get(table)
318319

319-
log.debug(
320-
"""
320+
log.debug(
321+
"""
321322
-------------- item 批量入库 --------------
322323
表名: %s
323324
datas: %s
324325
"""
325-
% (table, tools.dumps_json(datas, indent=16))
326-
)
326+
% (table, tools.dumps_json(datas, indent=16))
327+
)
327328

328-
if not self.__export_to_db(table, datas, used_pipelines=used_pipelines):
329-
export_success = False
330-
failed_items["add"].append({"table": table, "datas": datas})
329+
if not self.__export_to_db(table, datas, used_pipelines=used_pipelines):
330+
export_success = False
331+
failed_items["add"].append({"table": table, "datas": datas})
331332

332-
# 执行批量update
333-
while update_items_dict:
334-
table, datas = update_items_dict.popitem()
335-
used_pipelines = self._item_pipelines.get(table)
333+
# 执行批量update
334+
while update_items_dict:
335+
table, datas = update_items_dict.popitem()
336+
used_pipelines = self._item_pipelines.get(table)
336337

337-
log.debug(
338-
"""
338+
log.debug(
339+
"""
339340
-------------- item 批量更新 --------------
340341
表名: %s
341342
datas: %s
342343
"""
343-
% (table, tools.dumps_json(datas, indent=16))
344-
)
345-
346-
update_keys = self._item_update_keys.get(table)
347-
if not self.__export_to_db(
348-
table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines
349-
):
350-
export_success = False
351-
failed_items["update"].append(
352-
{"table": table, "datas": datas, "update_keys": update_keys}
344+
% (table, tools.dumps_json(datas, indent=16))
353345
)
354346

355-
if export_success:
356-
# 执行回调
357-
while callbacks:
358-
try:
359-
callback = callbacks.pop(0)
360-
callback()
361-
except Exception as e:
362-
log.exception(e)
347+
update_keys = self._item_update_keys.get(table)
348+
if not self.__export_to_db(
349+
table, datas, is_update=True, update_keys=update_keys, used_pipelines=used_pipelines
350+
):
351+
export_success = False
352+
failed_items["update"].append(
353+
{"table": table, "datas": datas, "update_keys": update_keys}
354+
)
363355

364-
# 删除做过的request
365-
if requests:
366-
self.redis_db.zrem(self._table_request, requests)
356+
if export_success:
357+
# 执行回调
358+
while callbacks:
359+
try:
360+
callback = callbacks.pop(0)
361+
callback()
362+
except Exception as e:
363+
log.exception(e)
367364

368-
# 去重入库
369-
if setting.ITEM_FILTER_ENABLE:
370-
if items_fingerprints:
371-
self.__class__.dedup.add(items_fingerprints, skip_check=True)
372-
else:
373-
failed_items["requests"] = requests
365+
# 删除做过的request
366+
if requests:
367+
self.redis_db.zrem(self._table_request, requests)
374368

375-
if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
376-
if self._redis_key != "air_spider":
377-
# 失败的item记录到redis
378-
self.redis_db.sadd(self._table_failed_items, failed_items)
369+
# 去重入库
370+
if setting.ITEM_FILTER_ENABLE:
371+
if items_fingerprints:
372+
self.__class__.dedup.add(items_fingerprints, skip_check=True)
373+
else:
374+
failed_items["requests"] = requests
379375

380-
# 删除做过的request
381-
if requests:
382-
self.redis_db.zrem(self._table_request, requests)
376+
if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
377+
if self._redis_key != "air_spider":
378+
# 失败的item记录到redis
379+
self.redis_db.sadd(self._table_failed_items, failed_items)
383380

384-
log.error(
385-
"入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
386-
tools.dumps_json(failed_items)
387-
)
388-
)
389-
self.export_retry_times = 0
381+
# 删除做过的request
382+
if requests:
383+
self.redis_db.zrem(self._table_request, requests)
390384

391-
else:
392-
tip = ["入库不成功"]
393-
if callbacks:
394-
tip.append("不执行回调")
395-
if requests:
396-
tip.append("不删除任务")
397-
exists = self.redis_db.zexists(self._table_request, requests)
398-
for exist, request in zip(exists, requests):
399-
if exist:
400-
self.redis_db.zadd(self._table_request, requests, 300)
385+
log.error(
386+
"入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
387+
tools.dumps_json(failed_items)
388+
)
389+
)
390+
self.export_retry_times = 0
401391

402-
if setting.ITEM_FILTER_ENABLE:
403-
tip.append("数据不入去重库")
392+
else:
393+
tip = ["入库不成功"]
394+
if callbacks:
395+
tip.append("不执行回调")
396+
if requests:
397+
tip.append("不删除任务")
398+
exists = self.redis_db.zexists(self._table_request, requests)
399+
for exist, request in zip(exists, requests):
400+
if exist:
401+
self.redis_db.zadd(self._table_request, requests, 300)
404402

405-
if self._redis_key != "air_spider":
406-
tip.append("将自动重试")
403+
if setting.ITEM_FILTER_ENABLE:
404+
tip.append("数据不入去重库")
407405

408-
tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
409-
log.error(",".join(tip))
406+
if self._redis_key != "air_spider":
407+
tip.append("将自动重试")
410408

411-
self.export_falied_times += 1
409+
tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
410+
log.error(",".join(tip))
412411

413-
if self._redis_key != "air_spider":
414-
self.export_retry_times += 1
412+
self.export_falied_times += 1
415413

416-
if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
417-
# 报警
418-
msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
419-
self._redis_key, self.export_falied_times
420-
)
421-
log.error(msg)
422-
tools.send_msg(
423-
msg=msg,
424-
level="error",
425-
message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
426-
)
414+
if self._redis_key != "air_spider":
415+
self.export_retry_times += 1
427416

428-
self._is_adding_to_db = False
417+
if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
418+
# 报警
419+
msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
420+
self._redis_key, self.export_falied_times
421+
)
422+
log.error(msg)
423+
tools.send_msg(
424+
msg=msg,
425+
level="error",
426+
message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
427+
)
428+
finally:
429+
self._is_adding_to_db = False
429430

430431
def metric_datas(self, table, datas):
431432
"""

tests/test_runtime_status.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from feapder.core.collector import Collector
77
from feapder.core.scheduler import Scheduler
88
from feapder.core.runtime_state import RuntimeState
9+
from feapder.network.item import Item
910

1011

1112
class FakeRedis:
@@ -185,3 +186,55 @@ def popleft(self):
185186
assert buffer._requests_deque.busy_observations
186187
assert all(buffer._requests_deque.busy_observations)
187188
assert buffer.is_idle() is True
189+
190+
191+
def test_request_buffer_flush_resets_flag_when_zadd_raises():
192+
class ExplodingDB:
193+
def zadd(self, table, values, prioritys=0):
194+
raise RuntimeError("write failed")
195+
196+
request = type(
197+
"FakeRequest",
198+
(),
199+
{
200+
"priority": 300,
201+
"filter_repeat": False,
202+
"url": "https://example.com",
203+
"to_dict": {"url": "https://example.com"},
204+
},
205+
)()
206+
buffer = build_request_buffer(request_count=0, delete_count=0, flushing=False)
207+
buffer._db = ExplodingDB()
208+
buffer._table_request = "test:z_requests"
209+
buffer._table_failed_request = "test:z_failed_requests"
210+
buffer._requests_deque.append(request)
211+
212+
buffer.flush()
213+
214+
assert buffer.is_adding_to_db() is False
215+
216+
217+
def test_item_buffer_flush_resets_flag_when_export_raises(monkeypatch):
218+
monkeypatch.setattr("feapder.buffer.item_buffer.setting.ITEM_FILTER_ENABLE", False)
219+
item = Item(title="boom")
220+
buffer = build_item_buffer(item_count=0, flushing=False)
221+
buffer._items_queue.put(item)
222+
buffer._redis_key = "test"
223+
buffer._task_table = None
224+
buffer._item_tables = {}
225+
buffer._item_update_keys = {}
226+
buffer._item_pipelines = {}
227+
buffer._pipelines = []
228+
buffer._have_mysql_pipeline = True
229+
buffer._mysql_pipeline = None
230+
buffer.export_retry_times = 0
231+
buffer.export_falied_times = 0
232+
233+
def raise_export(table, datas, is_update=False, update_keys=(), used_pipelines=None):
234+
raise RuntimeError("export failed")
235+
236+
buffer._ItemBuffer__export_to_db = raise_export
237+
238+
buffer.flush()
239+
240+
assert buffer.is_adding_to_db() is False

0 commit comments

Comments
 (0)