python实现sqlalchemy使用代码示例

作者:袖梨 2022-06-25

本篇文章小编给大家分享一下python实现sqlalchemy使用代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。

特点是操纵Python对象而不是SQL查询,也就是在代码层面考虑的是对象,而不是SQL,体现的是一种程序化思维,这样使得Python程序更加简洁易懂。

具体的实现方式是将数据库表转换为Python类,其中数据列作为属性,数据库操作作为方法。

abstract# 辅助sqlAlchemy实现类的继承,自动继承属性,省去super()

SQLAlchemy定义的ORM,在继承父级ORM时候,Foreign Key外键是不能继承的,它强制要求在子类中重新定义。

使用概述

在使用sqlalchemy访问数据库的时候,以类的形式表示表格,因此在使用之前,需要先定义类。

类的定义有三种:基类BASE、父类、子类

基类是sqlalchemy底层的;当需要一份数据切分为多个子表的时候,或多个表的字段一致时,可以使用一个父类定义字段的类型,多个子表继承父类的属性。

一、创建引擎和会话

通过创建引擎、绑定引擎来创建会话,实现数据库的访问。

from sqlalchemy import create_engine                          # 引擎
from sqlalchemy.orm import sessionmaker                       # 创建orm的会话池,orm和sql均可以管理对象关系型数据库,需要绑定引擎才可以使用会话,

# 创建连接
engine = create_engine("mysql+pymysql://root:1234;@127.0.0.1/test", # 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库
                       #echo=True,         # 打印操作对应的SQL语句
                       pool_size=8,       # 连接个数
                       pool_recycle=60*30 # 不使用时断开
                       )

# 创建session
DbSession = sessionmaker(bind=engine)  # 会话工厂,与引擎绑定。
session = DbSession()                  # 实例化

session.close()  # 关闭会话

二、定义类来表示虚拟表格

在使用sqlalchemy访问数据库的时候,以类的形式表示表格,因此在使用之前,需要先定义类。使用类的名称而不是tablename实现之后的增删改查。

# 导入定义类需要的模块
from sqlalchemy.ext.declarative import declarative_base       # 调用sqlalchemy的基类
from sqlalchemy import Column, Index, distinct, update        # 指定字段属性,索引、唯一、DML
from sqlalchemy.types import *                                # 所有字段类型

1. 直接建立一个可调用的表格

需要先继承基类,在定义__init__函数,设置输入参数。

# 创建库表类型
Base = declarative_base()  # 调用sqlalchemy的基类

class Users(Base):
    '''继承基类'''
    __tablename__ = "users"                     # 数据表的名字
    __table_args__ = {'extend_existing': True}  # 当数据库中已经有该表时,或内存中已声明该表,可以用此语句重新覆盖声明。
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True)
    #email = Column(String(64))

    def __init__(self, name, email):
        self.name = name
        self.email = email                      # 声明需要调用的特征,可以只声明数据库中表格列的子集
        
Base.metadata.create_all(engine)                # 表生效:将所有定义的类,使用引擎创建,此时可以在数据库中看到这些表。

2. 创建多个相同列属性的表格 先建立一个表格的父类,指定列的属性,再通过继承父类

不同的表

# 创建库表类型
Base = declarative_base()  # 调用sqlalchemy的基类

class model_data(BASE):  
    '''创建数据库表类:模型所需的基本字段'''
    __abstract__ = True                         # 辅助sqlAlchemy实现类的继承,自动继承属性,省去super()
    __table_args__ = {'extend_existing': True}  # 若表的声明在内存中已存在,则重新声明表的名称,不然会报错
    ai_xdr_id = Column(BigInteger(), primary_key=True, unique=True, autoincrement= True)
    ai_sdk_id = Column(BigInteger())

class TrainData(model_data): # 训练集表
    '''继承model_data的属性,并将表的名字定义为:'xxx_train_data'存入数据库 '''
    __tablename__ = 'xxx_train_data'
    
class DevData(model_data):   # 开发集表
    '''表的名字定义为:'xxx_dev_data' '''
    __tablename__ = 'xxx_dev_data'

class TestData(model_data):  # 测试集表
    __tablename__ = 'xxx_test_data'

Base.metadata.create_all(engine)                # 表生效:将所有定义的类,使用引擎创建,此时可以在数据库中看到这些表。

三、增删改查

因为是会话操作,当某个语句,例如增加数据时,不成功的时候需要回滚。

增加数据

# 增加数据
add_user = Users("test3", "[email protected]")
session.add(add_user)
session.commit()

# add_users = Users(("test", "[email protected]"),('a','b')))
# session.add(add_users)
# session.commit()

# 当上述语句出现执行错误时,需要执行回滚语句,才能继续操作
session.rollback()

删除数据

delete_users = session.query(Users).filter(Users.name == "test").first()
if delete_users:
    session.delete(delete_users)
    session.commit()
    
session.query(Users).filter(Users.name == "test").delete()
session.commit()

更改数据

# 改
session.query(Users).filter_by(id=1).update({'name': "Jack"})

users = session.query(Users).filter_by(name="Jack").first()
users.name = "test"

查找数据

users = session.query(Users).filter_by(id=5).all()
for item in users:
    print(item.name)
    print(item.email)   # 若未在类中声明,则无法访问数据库中该表的属性。

四、进阶技能

将DataFrame格式的数据导入数据库

class DataAccessLayer:# 数据连接层、定义了连接和关闭。
    '''数据连接层、定义了连接和关闭。'''
    def __init__(self):
        self.ENGINE = None                 # 引擎
        self.SESSION = None                # 会话
        self.conn_string = "mysql+pymysql://root:1234;@127.0.0.1/test"  ## 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库

    def connect(self):
        '''连接时建立引擎和会话。'''
        self.ENGINE = create_engine(self.conn_string, encoding='utf-8',isolation_level="AUTOCOMMIT", connect_args={'connect_timeout': 7200})
        # self.ENGINE = create_engine(self.conn_string, encoding='utf-8',connect_args={'connect_timeout': 7200})
        self.SESSION = sessionmaker(bind=self.ENGINE)()

    def disconnect(self):
        '''断开时,关闭引擎。'''
        self.ENGINE.close()

def df_save_db(df,tablename):
    '''将数据集DataFrame保存到数据库'''
    db_ac = DataAccessLayer()
    db_ac.connect()
    conn = db_ac.ENGINE.connect()
    df.to_sql(name=tablename, con=conn, if_exists='append', index=False)
    conn.close()
    print('%s updated.'%tablename)

df = pd.read_csv('traindata_jiangsu_donghai.csv')
df_save_db(df,'traindata_jiangsu_donghai')

相关文章

精彩推荐