scrapy pipelines过滤重复数据
- 方法 1:基于内存的简单去重(适合小规模数据)
- 方法 2:基于持久化存储去重(适合大规模数据/重启恢复)
- 方法 3:使用 Scrapy 内置的 dupefilter(针对请求去重)
- 方法 4:布隆过滤器(超大数据集优化)
- 方法 5:分布式去重(Redis)
- 关键点总结
方法 1:基于内存的简单去重(适合小规模数据)
使用 Python 的 set 或 dict 存储已抓取数据的唯一标识(如 URL、ID),在 Pipeline 中检查是否重复。
python"># pipelines.py
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
def __init__(self):
self.seen_ids = set() # 存储已处理的唯一标识
def process_item(self, item, spider):
# 假设 item 中有唯一标识字段 'id'
unique_id = item['id']
if unique_id in self.seen_ids:
raise DropItem(f"Duplicate item found: {item}")
self.seen_ids.add(unique_id)
return item
配置启用 Pipeline:
python"># settings.py
ITEM_PIPELINES = {
'your_project.pipelines.DuplicatesPipeline': 300,
}
方法 2:基于持久化存储去重(适合大规模数据/重启恢复)
当数据量较大或需要持久化时,可以使用数据库(如 SQLite、Redis)或文件存储唯一标识。
示例:使用 SQLite
python"># pipelines.py
import sqlite3
from scrapy.exceptions import DropItem
class SQLiteDuplicatesPipeline:
def __init__(self):
self.conn = sqlite3.connect('scrapy_data.db')
self.cursor = self.conn.cursor()
self.cursor.execute('CREATE TABLE IF NOT EXISTS seen_ids (id TEXT PRIMARY KEY)')
def process_item(self, item, spider):
unique_id = item['id']
self.cursor.execute('SELECT id FROM seen_ids WHERE id=?', (unique_id,))
if self.cursor.fetchone():
raise DropItem(f"Duplicate item found: {item}")
else:
self.cursor.execute('INSERT INTO seen_ids VALUES (?)', (unique_id,))
self.conn.commit()
return item
def close_spider(self, spider):
self.conn.close()
方法 3:使用 Scrapy 内置的 dupefilter(针对请求去重)
Scrapy 默认通过 DUPEFILTER_CLASS 过滤重复请求(基于 URL),但如果你需要更细粒度的 Item 去重,仍需自定义 Pipeline。
方法 4:布隆过滤器(超大数据集优化)
使用布隆过滤器(Bloom Filter)降低内存占用,适合海量数据去重,但有一定误判率。
python"># 安装:pip install pybloom-live
from pybloom_live import ScalableBloomFilter
from scrapy.exceptions import DropItem
class BloomDuplicatesPipeline:
def __init__(self):
self.bf = ScalableBloomFilter(initial_capacity=1000, mode=ScalableBloomFilter.SMALL_SET_GROWTH)
def process_item(self, item, spider):
unique_id = item['id']
if unique_id in self.bf:
raise DropItem(f"Duplicate item found: {item}")
self.bf.add(unique_id)
return item
配置启用 Pipeline:
python"># settings.py
ITEM_PIPELINES = {
'your_project.pipelines.BloomDuplicatesPipeline': 200,
}
方法 5:分布式去重(Redis)
分布式爬虫中,使用 Redis 存储全局唯一标识,支持多爬虫实例共享去重数据。
python"># pipelines.py
import redis
from scrapy.exceptions import DropItem
class RedisDuplicatesPipeline:
def __init__(self, redis_host, redis_port):
self.redis = redis.StrictRedis(host=redis_host, port=redis_port, db=0)
@classmethod
def from_crawler(cls, crawler):
return cls(
redis_host=crawler.settings.get('REDIS_HOST'),
redis_port=crawler.settings.get('REDIS_PORT')
)
def process_item(self, item, spider):
unique_id = item['id']
if self.redis.sismember('seen_ids', unique_id):
raise DropItem(f"Duplicate item found: {item}")
self.redis.sadd('seen_ids', unique_id)
return item
关键点总结
- 唯一标识选择:根据业务选择唯一字段(如 URL、商品 ID、哈希值)。
- 内存 vs 持久化:小数据用内存结构(set),大数据用数据库或布隆过滤器。
- 分布式需求:使用 Redis 或类似工具实现全局去重。
- 异常处理:发现重复时抛出 DropItem 终止后续 Pipeline 处理。
根据实际场景选择最适合的方案!