|
| 1 | +# CSV Pipeline 修复报告 |
| 2 | + |
| 3 | +## 修复日期 |
| 4 | +2025-11-07 |
| 5 | + |
| 6 | +## 问题概述 |
| 7 | + |
| 8 | +原始 `csv_pipeline.py` 存在以下两个关键问题: |
| 9 | + |
| 10 | +### 问题 1:数据列错位(重复存储表现) |
| 11 | + |
| 12 | +**根本原因**: |
| 13 | +- 每次 `save_items()` 调用都从 `items[0]` 重新提取字段名(`fieldnames`) |
| 14 | +- 当批次中的items字段顺序不一致时,会导致CSV列顺序变化 |
| 15 | +- 不同批次写入同一CSV时,前面批次的表头和后面批次的数据列顺序不匹配 |
| 16 | + |
| 17 | +**具体场景**: |
| 18 | +``` |
| 19 | +第一批items字段顺序: [name, age, city] |
| 20 | +第二批items字段顺序: [age, name, city] # 字段顺序变了 |
| 21 | +
|
| 22 | +结果: |
| 23 | +- 表头: name,age,city |
| 24 | +- 第一批数据: Alice,25,Beijing (正确) |
| 25 | +- 第二批数据: 26,Charlie,Shenzhen (字段值映射错了!) |
| 26 | +``` |
| 27 | + |
| 28 | +### 问题 2:批处理机制失效 |
| 29 | + |
| 30 | +**根本原因**: |
| 31 | +- ItemBuffer 会按 `ITEM_UPLOAD_BATCH_MAX_SIZE` 分批调用 pipeline |
| 32 | +- 每批数据调用一次 `save_items()` (通常一批100-1000条) |
| 33 | +- 但因为字段名提取逻辑错误,导致批处理的正常流程被破坏 |
| 34 | + |
| 35 | +--- |
| 36 | + |
| 37 | +## 修复方案 |
| 38 | + |
| 39 | +### 核心改动 |
| 40 | + |
| 41 | +#### 1. 添加表级别的字段名缓存(第37-39行) |
| 42 | + |
| 43 | +```python |
| 44 | +# 用于缓存每个表的字段名顺序(Per-Table Fieldnames Cache) |
| 45 | +# 确保跨批次、跨线程的字段顺序一致 |
| 46 | +_table_fieldnames = {} |
| 47 | +``` |
| 48 | + |
| 49 | +**设计思路**: |
| 50 | +- 使用静态变量 `_table_fieldnames`,跨实例和跨线程共享 |
| 51 | +- 每个表只缓存一次字段顺序,所有后续批次复用该顺序 |
| 52 | +- 这样设计既保证线程安全(通过Per-Table Lock),又避免重复提取 |
| 53 | + |
| 54 | +#### 2. 新增 `_get_and_cache_fieldnames()` 静态方法(第80-114行) |
| 55 | + |
| 56 | +```python |
| 57 | +@staticmethod |
| 58 | +def _get_and_cache_fieldnames(table, items): |
| 59 | + """获取并缓存表对应的字段名顺序""" |
| 60 | + |
| 61 | + # 如果该表已经缓存了字段名,直接返回缓存的 |
| 62 | + if table in CsvPipeline._table_fieldnames: |
| 63 | + return CsvPipeline._table_fieldnames[table] |
| 64 | + |
| 65 | + # 第一次调用,从items提取字段名并缓存 |
| 66 | + if not items: |
| 67 | + return [] |
| 68 | + |
| 69 | + first_item = items[0] |
| 70 | + fieldnames = list(first_item.keys()) if isinstance(first_item, dict) else [] |
| 71 | + |
| 72 | + if fieldnames: |
| 73 | + # 缓存字段名(使用静态变量,跨实例共享) |
| 74 | + CsvPipeline._table_fieldnames[table] = fieldnames |
| 75 | + log.info(f"表 {table} 的字段名已缓存: {fieldnames}") |
| 76 | + |
| 77 | + return fieldnames |
| 78 | +``` |
| 79 | + |
| 80 | +**工作流程**: |
| 81 | +- ✅ 第一批数据:检查缓存(无) → 从items[0]提取 → 缓存 → 返回 |
| 82 | +- ✅ 第二批数据:检查缓存(有) → 直接返回缓存的字段名 |
| 83 | +- ✅ 第三批及以后:都使用相同的缓存字段名 |
| 84 | + |
| 85 | +#### 3. 修改 `save_items()` 使用缓存的字段名(第163行) |
| 86 | + |
| 87 | +```python |
| 88 | +# 原来的代码 |
| 89 | +fieldnames = self._get_fieldnames(items) |
| 90 | + |
| 91 | +# 修复后的代码 |
| 92 | +fieldnames = self._get_and_cache_fieldnames(table, items) |
| 93 | +``` |
| 94 | + |
| 95 | +**改动的影响**: |
| 96 | +- 确保所有批次使用同一份字段顺序 |
| 97 | +- 避免字段顺序变化导致的列错位 |
| 98 | +- 性能提升:只提取一次字段名,后续批次直接返回缓存 |
| 99 | + |
| 100 | +--- |
| 101 | + |
| 102 | +## 修复效果对比 |
| 103 | + |
| 104 | +### 修复前 |
| 105 | +``` |
| 106 | +场景:爬取数据,分两批保存 |
| 107 | +
|
| 108 | +第一批(100条): {name, age, city} |
| 109 | +├─ 调用 save_items() |
| 110 | +├─ 提取 fieldnames: ['name', 'age', 'city'] |
| 111 | +└─ 写入CSV: 表头 + 100行数据 ✅ |
| 112 | +
|
| 113 | +第二批(100条): {age, name, city} # 字段顺序不同 |
| 114 | +├─ 调用 save_items() |
| 115 | +├─ 提取 fieldnames: ['age', 'name', 'city'] # 顺序变了! |
| 116 | +└─ 写入CSV: 100行数据(用新顺序) ❌ 列错位! |
| 117 | +
|
| 118 | +结果:前100行和后100行的列对应关系不一致 |
| 119 | +``` |
| 120 | + |
| 121 | +### 修复后 |
| 122 | +``` |
| 123 | +第一批(100条): {name, age, city} |
| 124 | +├─ 调用 save_items() |
| 125 | +├─ 调用 _get_and_cache_fieldnames() |
| 126 | +├─ 检查缓存 → 无 → 提取 ['name', 'age', 'city'] |
| 127 | +├─ 缓存到 _table_fieldnames['users'] = ['name', 'age', 'city'] |
| 128 | +└─ 写入CSV: 表头 + 100行数据 ✅ |
| 129 | +
|
| 130 | +第二批(100条): {age, name, city} |
| 131 | +├─ 调用 save_items() |
| 132 | +├─ 调用 _get_and_cache_fieldnames() |
| 133 | +├─ 检查缓存 → 有! → 返回 ['name', 'age', 'city'] |
| 134 | +└─ 写入CSV: 100行数据(强制使用缓存顺序) ✅ 列顺序一致! |
| 135 | +
|
| 136 | +结果:所有行的列顺序完全一致,数据准确 |
| 137 | +``` |
| 138 | + |
| 139 | +--- |
| 140 | + |
| 141 | +## 技术亮点 |
| 142 | + |
| 143 | +### 1. 设计模式 |
| 144 | + |
| 145 | +采用 **缓存策略 + Per-Table Lock** 的组合设计: |
| 146 | + |
| 147 | +| 组件 | 用途 | 特点 | |
| 148 | +|------|------|------| |
| 149 | +| `_table_fieldnames` | 字段名缓存 | 一次提取,多次复用 | |
| 150 | +| `_file_locks` | 文件锁 | 按表分粒度,支持多表并行 | |
| 151 | + |
| 152 | +### 2. 并发安全 |
| 153 | + |
| 154 | +- 字段名缓存在获取锁之前(避免持有锁时做复杂计算) |
| 155 | +- 每个表有独立的锁,不同表可并行写入 |
| 156 | +- 同一表的多批数据串行写入,保证一致性 |
| 157 | + |
| 158 | +### 3. 向后兼容 |
| 159 | + |
| 160 | +- 修复前的代码逻辑保持不变 |
| 161 | +- 仅改进了字段名提取的时机 |
| 162 | +- 不需要修改爬虫代码或调用方式 |
| 163 | + |
| 164 | +--- |
| 165 | + |
| 166 | +## 验证方法 |
| 167 | + |
| 168 | +### 测试场景 1:多批次相同表 |
| 169 | + |
| 170 | +```python |
| 171 | +# 第一批: 100条user数据,字段: name, age, city |
| 172 | +pipeline.save_items('users', batch1) # 缓存 fieldnames |
| 173 | + |
| 174 | +# 第二批: 100条user数据,字段顺序: age, name, city |
| 175 | +pipeline.save_items('users', batch2) # 使用缓存的 fieldnames |
| 176 | + |
| 177 | +# 验证:CSV中所有列的对应关系一致 |
| 178 | +# users.csv: |
| 179 | +# name,age,city |
| 180 | +# Alice,25,Beijing |
| 181 | +# 26,Charlie,Shenzhen # 注意:是缓存的顺序,不是第二批的顺序 |
| 182 | +``` |
| 183 | + |
| 184 | +### 测试场景 2:多表并行写入 |
| 185 | + |
| 186 | +```python |
| 187 | +# 线程1: 写入users表(10个批次) |
| 188 | +# 线程2: 同时写入products表(10个批次) |
| 189 | + |
| 190 | +# 预期:每个表的字段顺序单独缓存,不互相影响 |
| 191 | +# users.csv: 所有行字段顺序一致 |
| 192 | +# products.csv: 所有行字段顺序一致 |
| 193 | +``` |
| 194 | + |
| 195 | +### 测试场景 3:断点续爬 |
| 196 | + |
| 197 | +```python |
| 198 | +# 第一天: 爬取100条数据,保存到users.csv |
| 199 | +pipeline.save_items('users', batch1) |
| 200 | + |
| 201 | +# 第二天: 断点续爬,再爬取100条数据 |
| 202 | +pipeline.save_items('users', batch2) |
| 203 | + |
| 204 | +# 预期:新旧数据的列对应关系一致 |
| 205 | +``` |
| 206 | + |
| 207 | +--- |
| 208 | + |
| 209 | +## 代码改动总结 |
| 210 | + |
| 211 | +| 行号 | 改动 | 说明 | |
| 212 | +|------|------|------| |
| 213 | +| 31 | 更新文档 | 添加"表级别的字段名缓存"说明 | |
| 214 | +| 37-39 | 新增代码 | 添加 `_table_fieldnames` 静态变量 | |
| 215 | +| 80-114 | 新增方法 | 新增 `_get_and_cache_fieldnames()` 方法 | |
| 216 | +| 127-145 | 删除方法 | 删除旧的 `_get_fieldnames()` 方法 | |
| 217 | +| 163 | 修改代码 | `save_items()` 中调用新的缓存方法 | |
| 218 | + |
| 219 | +**总计**: |
| 220 | +- ✅ 新增 1 个静态变量 |
| 221 | +- ✅ 新增 1 个静态方法(35行代码) |
| 222 | +- ✅ 删除 1 个成员方法(14行代码) |
| 223 | +- ✅ 修改 1 处调用 |
| 224 | + |
| 225 | +--- |
| 226 | + |
| 227 | +## 后续建议 |
| 228 | + |
| 229 | +### 1. 可选优化:字段验证 |
| 230 | + |
| 231 | +如果需要更严格的数据质量保证,可在 `_get_and_cache_fieldnames()` 中添加验证: |
| 232 | + |
| 233 | +```python |
| 234 | +# 可选:验证后续批次是否有新增字段 |
| 235 | +actual_fields = set(items[0].keys()) |
| 236 | +cached_fields = set(cached_fieldnames) |
| 237 | +new_fields = actual_fields - cached_fields |
| 238 | + |
| 239 | +if new_fields: |
| 240 | + log.warning(f"检测到新增字段: {new_fields},将被忽略") |
| 241 | +``` |
| 242 | + |
| 243 | +### 2. 可选优化:缓存清理 |
| 244 | + |
| 245 | +长期运行的爬虫可能需要定期清理缓存(可选): |
| 246 | + |
| 247 | +```python |
| 248 | +@classmethod |
| 249 | +def clear_cache(cls): |
| 250 | + """清理字段名缓存(可选,用于清理长期运行的进程)""" |
| 251 | + cls._table_fieldnames.clear() |
| 252 | + log.info("已清理字段名缓存") |
| 253 | +``` |
| 254 | + |
| 255 | +### 3. 监控和日志 |
| 256 | + |
| 257 | +- ✅ 已添加日志记录字段名缓存时机 |
| 258 | +- ✅ 已添加错误处理和异常日志 |
| 259 | +- 可考虑添加缓存命中率的打点指标 |
| 260 | + |
| 261 | +--- |
| 262 | + |
| 263 | +## 相关文件 |
| 264 | + |
| 265 | +- 修复前:`csv_pipeline.py` (原始版本) |
| 266 | +- 修复后:`csv_pipeline.py` (当前版本) |
| 267 | +- 参考文件: |
| 268 | + - `feapder/pipelines/mysql_pipeline.py` (数据库Pipeline的设计参考) |
| 269 | + - `feapder/buffer/item_buffer.py` (ItemBuffer的批处理机制) |
| 270 | + |
| 271 | +--- |
| 272 | + |
| 273 | +## 修复者 |
| 274 | + |
| 275 | +修复日期:2025-11-07 |
| 276 | +修复内容:字段名缓存机制,确保跨批数据一致性 |
0 commit comments