本篇文章小编给大家分享一下python实现sqlalchemy使用代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
特点是操纵Python对象而不是SQL查询,也就是在代码层面考虑的是对象,而不是SQL,体现的是一种程序化思维,这样使得Python程序更加简洁易懂。
具体的实现方式是将数据库表转换为Python类,其中数据列作为属性,数据库操作作为方法。
abstract# 辅助sqlAlchemy实现类的继承,自动继承属性,省去super()
SQLAlchemy定义的ORM,在继承父级ORM时候,Foreign Key外键是不能继承的,它强制要求在子类中重新定义。
使用概述
在使用sqlalchemy访问数据库的时候,以类的形式表示表格,因此在使用之前,需要先定义类。
类的定义有三种:基类BASE、父类、子类
基类是sqlalchemy底层的;当需要一份数据切分为多个子表的时候,或多个表的字段一致时,可以使用一个父类定义字段的类型,多个子表继承父类的属性。
一、创建引擎和会话
通过创建引擎、绑定引擎来创建会话,实现数据库的访问。
from sqlalchemy import create_engine # 引擎 from sqlalchemy.orm import sessionmaker # 创建orm的会话池,orm和sql均可以管理对象关系型数据库,需要绑定引擎才可以使用会话, # 创建连接 engine = create_engine("mysql+pymysql://root:1234;@127.0.0.1/test", # 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库 #echo=True, # 打印操作对应的SQL语句 pool_size=8, # 连接个数 pool_recycle=60*30 # 不使用时断开 ) # 创建session DbSession = sessionmaker(bind=engine) # 会话工厂,与引擎绑定。 session = DbSession() # 实例化 session.close() # 关闭会话
二、定义类来表示虚拟表格
在使用sqlalchemy访问数据库的时候,以类的形式表示表格,因此在使用之前,需要先定义类。使用类的名称而不是tablename实现之后的增删改查。
# 导入定义类需要的模块 from sqlalchemy.ext.declarative import declarative_base # 调用sqlalchemy的基类 from sqlalchemy import Column, Index, distinct, update # 指定字段属性,索引、唯一、DML from sqlalchemy.types import * # 所有字段类型
1. 直接建立一个可调用的表格
需要先继承基类,在定义__init__函数,设置输入参数。
# 创建库表类型 Base = declarative_base() # 调用sqlalchemy的基类 class Users(Base): '''继承基类''' __tablename__ = "users" # 数据表的名字 __table_args__ = {'extend_existing': True} # 当数据库中已经有该表时,或内存中已声明该表,可以用此语句重新覆盖声明。 id = Column(Integer, primary_key=True) name = Column(String(64), unique=True) #email = Column(String(64)) def __init__(self, name, email): self.name = name self.email = email # 声明需要调用的特征,可以只声明数据库中表格列的子集 Base.metadata.create_all(engine) # 表生效:将所有定义的类,使用引擎创建,此时可以在数据库中看到这些表。
2. 创建多个相同列属性的表格 先建立一个表格的父类,指定列的属性,再通过继承父类
不同的表
# 创建库表类型 Base = declarative_base() # 调用sqlalchemy的基类 class model_data(BASE): '''创建数据库表类:模型所需的基本字段''' __abstract__ = True # 辅助sqlAlchemy实现类的继承,自动继承属性,省去super() __table_args__ = {'extend_existing': True} # 若表的声明在内存中已存在,则重新声明表的名称,不然会报错 ai_xdr_id = Column(BigInteger(), primary_key=True, unique=True, autoincrement= True) ai_sdk_id = Column(BigInteger()) class TrainData(model_data): # 训练集表 '''继承model_data的属性,并将表的名字定义为:'xxx_train_data'存入数据库 ''' __tablename__ = 'xxx_train_data' class DevData(model_data): # 开发集表 '''表的名字定义为:'xxx_dev_data' ''' __tablename__ = 'xxx_dev_data' class TestData(model_data): # 测试集表 __tablename__ = 'xxx_test_data' Base.metadata.create_all(engine) # 表生效:将所有定义的类,使用引擎创建,此时可以在数据库中看到这些表。
三、增删改查
因为是会话操作,当某个语句,例如增加数据时,不成功的时候需要回滚。
增加数据
# 增加数据 add_user = Users("test3", "[email protected]") session.add(add_user) session.commit() # add_users = Users(("test", "[email protected]"),('a','b'))) # session.add(add_users) # session.commit() # 当上述语句出现执行错误时,需要执行回滚语句,才能继续操作 session.rollback()
删除数据
delete_users = session.query(Users).filter(Users.name == "test").first() if delete_users: session.delete(delete_users) session.commit() session.query(Users).filter(Users.name == "test").delete() session.commit()
更改数据
# 改 session.query(Users).filter_by(id=1).update({'name': "Jack"}) users = session.query(Users).filter_by(name="Jack").first() users.name = "test"
查找数据
users = session.query(Users).filter_by(id=5).all() for item in users: print(item.name) print(item.email) # 若未在类中声明,则无法访问数据库中该表的属性。
四、进阶技能
将DataFrame格式的数据导入数据库
class DataAccessLayer:# 数据连接层、定义了连接和关闭。 '''数据连接层、定义了连接和关闭。''' def __init__(self): self.ENGINE = None # 引擎 self.SESSION = None # 会话 self.conn_string = "mysql+pymysql://root:1234;@127.0.0.1/test" ## 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库 def connect(self): '''连接时建立引擎和会话。''' self.ENGINE = create_engine(self.conn_string, encoding='utf-8',isolation_level="AUTOCOMMIT", connect_args={'connect_timeout': 7200}) # self.ENGINE = create_engine(self.conn_string, encoding='utf-8',connect_args={'connect_timeout': 7200}) self.SESSION = sessionmaker(bind=self.ENGINE)() def disconnect(self): '''断开时,关闭引擎。''' self.ENGINE.close() def df_save_db(df,tablename): '''将数据集DataFrame保存到数据库''' db_ac = DataAccessLayer() db_ac.connect() conn = db_ac.ENGINE.connect() df.to_sql(name=tablename, con=conn, if_exists='append', index=False) conn.close() print('%s updated.'%tablename) df = pd.read_csv('traindata_jiangsu_donghai.csv') df_save_db(df,'traindata_jiangsu_donghai')