本篇文章小编给大家分享一下Python中Scrapy+adbapi提高数据库写入效率实现代码,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
一:twisted中的adbapi
数据库pymysql的commit()和execute()在提交数据时,都是同步提交至数据库,由于scrapy框架数据的解析和异步多线程的,所以scrapy的数据解析速度,要远高于数据的写入数据库的速度。如果数据写入过慢,会造成数据库写入的阻塞,影响数据库写入的效率。
使用twisted异步IO框架,实现数据的异步写入,通过多线程异步的形式对数据进行写入,可以提高数据的写入速度。
1.1 两个主要方法
adbapi.ConnectionPool:
创建一个数据库连接池对象,其中包括多个连接对象,每个连接对象在独立的线程中工作。adbapi只是提供了异步访问数据库的编程框架,再其内部依然使MySQLdb这样的库访问数据库。
dbpool.runInteraction(do_insert,item):
异步调用do_insert函数,dbpool会选择连接池中的一个连接对象在独立线程中调用insert_db,其中参数item会被传给do_insert的第二个参数,传给do_insert的第一个参数是一个Transaction对象,其接口与Cursor对象类似,可以调用execute方法执行SQL语句,do_insert执行后,连接对象会自动调用commit方法
1.2 使用实例
from twisted.enterprise import adbapi
# 初始化数据库连接池(线程池) # 参数一:mysql的驱动 # 参数二:连接mysql的配置信息 dbpool = adbapi.ConnectionPool('pymysql', **params)
# 参数1:在异步任务中要执行的函数insert_db; # 参数2:给该函数insert_db传递的参数 query = self.dbpool.runInteraction(self.do_insert, item)
# 在execute()之后,不需要再进行commit(),连接池内部会进行提交的操作。 def do_insert(self, cursor, item): insert_sql = """ insert into qa_sample( need_id, need_question_uptime, need_title, need_title_describe, need_answer_uptime, need_answer) values (%s, %s, %s, %s, %s, %s) """ params = (item['need_id'], item['need_question_uptime'], item['need_title'], item['need_title_describe'], item['need_answer_uptime'], item['need_answer']) cursor.execute(insert_sql, params)
二:结合scrapy中的pipelines
# -*- coding: utf-8 -*- from twisted.enterprise import adbapi import pymysql # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html class QaSpiderPipeline(object): def process_item(self, item, spider): return item class MysqlTwistedPipeline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): dbparams = dict( host=settings['MYSQL_HOST'], db=settings['MYSQL_DBNAME'], user=settings['MYSQL_USER'], passwd=settings['MYSQL_PASSWORD'], charset='utf8', cursorclass=pymysql.cursors.DictCursor, use_unicode=True ) dbpool = adbapi.ConnectionPool('pymysql', **dbparams) return cls(dbpool) def process_item(self, item, spider): query = self.dbpool.runInteraction(self.do_insert, item) def do_insert(self, cursor, item): insert_sql = """ insert into qa_sample( need_id, need_question_uptime, need_title, need_title_describe, need_answer_uptime, need_answer) values (%s, %s, %s, %s, %s, %s) """ params = (item['need_id'], item['need_question_uptime'], item['need_title'], item['need_title_describe'], item['need_answer_uptime'], item['need_answer']) cursor.execute(insert_sql, params)