基于SQLAlchemy的MySQL数据向金仓数据库迁移方案

作者:袖梨 2026-05-24

将SQLAlchemy项目迁移至金仓数据库时,方言包配置问题常让人困扰。本文详细记录从报错排查到成功连接的完整过程,特别针对方言包安装这一关键环节提供解决方案。

从一个报错开始

上周接了个小活,要把一个用SQLAlchemy写的数据分析脚本从MySQL迁到金仓。本来以为换个数据库驱动就行,结果跑起来直接报错:No module named 'sqlalchemy.dialects.kingbase'

SQLAlchemy编写的MySQL数据库迁移至金仓数据库

查了一圈才发现,SQLAlchemy不像MySQL、PostgreSQL那样自带驱动,金仓的方言包需要手动装。折腾了两天才跑通,今天把过程记下来,希望能帮到遇到同样问题的人。

一、方言包是什么,为什么非要手动装

SQLAlchemy本身只是个壳,它不知道自己该怎么跟数据库说话。每种数据库需要一个“翻译”,这个翻译就叫方言包(dialect)。

MySQL、PostgreSQL这种主流数据库,SQLAlchemy安装的时候就把它们的方言包带上了。但金仓不在这个列表里,所以得自己去官网下,然后手动放到指定位置。

方言包底层依赖ksycopg2驱动,所以这两个都得装。

二、安装步骤(踩坑记录)

2.1 先装SQLAlchemy

pip install sqlalchemy

装完看一眼装在哪了,后面放方言包要用:

pip show sqlalchemy

我的机器输出是这样的:

Name: SQLAlchemyVersion: 1.4.36Location: /usr/local/lib/python3.8/site-packages

记下这个Location路径。

2.2 装ksycopg2驱动

pip install ksycopg2

这个倒是顺利,没遇到什么问题。Windows用户可能需要装ksycopg2-win64

2.3 放方言包(这里卡了我半天)

找到SQLAlchemy的安装目录,进去找到dialects文件夹:

cd /usr/local/lib/python3.8/site-packages/sqlalchemy/dialects

把从官网下载的kingbase文件夹整个复制到这里。最终目录结构应该是:

sqlalchemy/dialects/├── kingbase/│   ├── __init__.py│   └── base.py├── mysql/├── postgresql/└── ...

我一开始放错了地方,放到了site-packages根目录下,结果死活加载不了。后来仔细看了文档才发现要放到dialects下面。

版本匹配问题:官方提供了1.3、1.4、2.0三个版本的方言包。我用的SQLAlchemy是1.4.36,选了1.4版本的方言包。版本不对会报一些奇怪的错,比如找不到某个模块。

三、连接数据库

3.1 连接字符串怎么写

折腾完安装,终于可以写代码了。连接字符串的格式是:

from sqlalchemy import create_engineengine = create_engine('kingbase+ksycopg2://SYSTEM:[email protected]:54321/TEST')

翻译一下:

  1. kingbase:方言名,告诉SQLAlchemy用金仓的方言包
  2. ksycopg2:驱动名,实际干活的是这个
  3. SYSTEM:数据库用户名
  4. 123456:密码
  5. 127.0.0.1:IP
  6. 54321:端口(金仓默认是这个)
  7. TEST:数据库名

+ksycopg2其实可以省略,写成kingbase://...就行,默认就是用它。

3.2 测试一下能不能连上

from sqlalchemy import create_engineconn_str = 'kingbase://SYSTEM:[email protected]:54321/TEST'engine = create_engine(conn_str)conn = engine.connect()result = conn.execute("SELECT version()")print(result.fetchone())conn.close()

如果看到版本信息输出,恭喜,连上了。

3.3 连接池参数(生产环境有用)

如果是写Web服务或者长期运行的脚本,建议配置一下连接池:

engine = create_engine(    'kingbase://SYSTEM:[email protected]:54321/TEST',    pool_size=10,          # 连接池里放多少个连接    max_overflow=20,       # 不够用时最多再创建多少个    pool_recycle=3600,     # 连接用多久回收(秒)    pool_pre_ping=True     # 用之前先ping一下,确认还活着)

pool_pre_ping=True这个参数挺实用的,能避免拿到一个已经断开的连接。

四、ORM建模和基本操作

连接搞定了,接下来看看怎么用ORM操作数据库。

4.1 定义模型

先建个基类,然后定义表对应的类:

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, DateTimefrom datetime import datetimeBase = declarative_base()class User(Base):    __tablename__ = 'test_user'  # 表名        id = Column(Integer, primary_key=True)    name = Column(String(50))    email = Column(String(100))    created_at = Column(DateTime, default=datetime.now)        def __repr__(self):        return f"User(id={self.id}, name='{self.name}', email='{self.email}')"

__tablename__是必须的,不写SQLAlchemy不知道表叫什么。

4.2 建表

# 自动创建表(如果不存在的话)Base.metadata.create_all(engine)

这个操作是幂等的,执行多次也不会重复建表。

4.3 创建Session

from sqlalchemy.orm import sessionmakerSession = sessionmaker(bind=engine)session = Session()

4.4 增删改查

插入数据:

user1 = User(name='张三', email='[email protected]')user2 = User(name='李四', email='[email protected]')session.add(user1)session.add(user2)# 或者一次加多个:session.add_all([user1, user2])session.commit()

查询数据:

# 查所有users = session.query(User).all()for u in users:    print(u.name, u.email)# 条件查询user = session.query(User).filter(User.name == '张三').first()print(user.email)# 模糊查询users = session.query(User).filter(User.name.like('%张%')).all()

更新数据:

# 方式1:查出来改属性user = session.query(User).filter(User.name == '张三').first()user.email = '[email protected]'session.commit()# 方式2:批量更新(不查直接改)session.query(User).filter(User.name == '张三').update(    {"email": "[email protected]"})session.commit()

相关文章

精彩推荐