Scrapy将抓取结果写入MySQL数据库

Scrapy中pipelines.py是用来处理Item存储的,可以写入文件或数据库。
以代码示例Scrapy写入MySQL数据库。


利用MySQLdb驱动连接MySQL,示例代码如下:

class DoubanMovieTop250MySQLPipeline:
def __init__(self, mysql_config):
self.mysql_config = mysql_config
@classmethod
def from_crawler(cls, crawler):
mysql_config = {
'db' : crawler.settings.get('MYSQL_DB'),
'user' : crawler.settings.get('MYSQL_USER'),
'host' : crawler.settings.get('MYSQL_HOST'),
'passwd' : crawler.settings.get('MYSQL_PASSWD'),
'charset' : crawler.settings.get('MYSQL_CHARSET'),
}
return cls(mysql_config=mysql_config)
def process_item(self, item, spider):
con = MySQLdb.connect(**self.mysql_config)
cur = con.cursor()
logger.error('连接打开')
sql = """INSERT INTO movies (name, full_name, score, count, is_top250, top_num)
VALUES (%s, %s, %s, %s, %s, %s)"""
args = (
item["name"], item["full_name"], item["score"], item["count"], 1, item["top_num"]
)
try:
cur.execute(sql, args)
except Exception as e:
logger.error(traceback.format_exc())
con.rollback()
else:
con.commit()
finally:
cur.close()
con.close()
logger.error('连接关闭')

运行Scrapy:

scrapy crawl douban_movie_top_250 -L ERROR

示例正常跑通,数据正常写入MySQL数据库中。
但可以看出输出了很多日志,很明显,这是每处理一条Item就打开和关闭了一次数据库的连接。这是很低效的做法,改进方法自然是遍历完所有Item后一次性插入数据库中。主要是用executemany代替execute,另外利用了close_spider的特性。改进后代码如下:

class DoubanMovieTop250MySQLPipeline:
def __init__(self, mysql_config):
self.values_list = []
self.mysql_config = mysql_config
@classmethod
def from_crawler(cls, crawler):
mysql_config = {
'db' : crawler.settings.get('MYSQL_DB'),
'user' : crawler.settings.get('MYSQL_USER'),
'host' : crawler.settings.get('MYSQL_HOST'),
'passwd' : crawler.settings.get('MYSQL_PASSWD'),
'charset' : crawler.settings.get('MYSQL_CHARSET'),
}
return cls(mysql_config=mysql_config)
def process_item(self, item, spider):
values = (
item["name"], item["full_name"], item["score"], item["count"], 1, item["top_num"]
)
self.values_list.append(values)
def close_spider(self, spider):
con = MySQLdb.connect(**self.mysql_config)
cur = con.cursor()
logger.error('连接打开')
sql = """INSERT INTO movies (name, full_name, score, count, is_top250, top_num)
VALUES (%s, %s, %s, %s, %s, %s)"""
try:
cur.executemany(sql, self.values_list)
except Exception as e:
logger.error(traceback.format_exc())
con.rollback()
else:
con.commit()
finally:
cur.close()
con.close()
logger.error('连接关闭')

正常跑通并插入数据库,效率大大地提升了,只打开和关闭一次数据库连接并一次性插入。

数据截图:

TODO:有空可以研究一下怎么利用Twisted的驱动连接MySQL,通过twisted.enterprise.adbapi来实现,这是异步的方式,理解和处理起来自然会麻烦很多。一般以上的方式是够用了。