目 录CONTENT

文章目录

【GaussDB】调整sqlalchemy方言支持async_gaussdb以适配aiflow3.1.7

DarkAthena
2026-03-02 / 0 评论 / 0 点赞 / 6 阅读 / 0 字

【GaussDB】调整sqlalchemy方言支持async_gaussdb以适配aiflow3.1.7

上一篇通过修改openGauss的psycopg2驱动并编译,适配了airflow2.9.2版本,【GaussDB】手动编译不同python版本的psycopg2驱动以适配airflow ,但客户用了一段时间后,要升级到airflow 3.1.7版本,之前的方案就不行了,前面也提到过,airflow 3以上的版本,不能仅使用psycopg2,需要同时支持同步和异步两种连接,即psycopg2和asyncpg两种驱动同时支持,本文就来记录一下适配过程。

airflow3.1.7仅使用psycopg2的报错信息

(.env310) [aop@kylinv10sp3-node1 ~]$ airflow standalone
2026-02-15T08:31:02.373594Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-02-15T08:31:02.373760Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-02-15T08:31:02.373868Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-02-15T08:31:02.373965Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-02-15T08:31:02.374059Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-02-15T08:31:02.374152Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
Traceback (most recent call last):
  File "/home/aop/.env310/bin/airflow", line 4, in <module>
    from airflow.__main__ import main
  File "/home/aop/.env310/lib/python3.10/site-packages/airflow/__init__.py", line 79, in <module>
    settings.initialize()
  File "/home/aop/.env310/lib/python3.10/site-packages/airflow/settings.py", line 714, in initialize
    configure_orm()
  File "/home/aop/.env310/lib/python3.10/site-packages/airflow/settings.py", line 416, in configure_orm
    _configure_async_session()
  File "/home/aop/.env310/lib/python3.10/site-packages/airflow/settings.py", line 359, in _configure_async_session
    async_engine = create_async_engine(
  File "/home/aop/.env310/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py", line 121, in create_async_engine
    return AsyncEngine(sync_engine)
  File "/home/aop/.env310/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py", line 1035, in __init__
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: The asyncio extension requires an async driver to be used. The loaded 'psycopg2' is not async.

可以很明显的看到,airflow打算创建一个异步的连接,但是sqlalchemy报错psycopg2不是异步驱动,所以我们需要引入一个异步的驱动。

async-gaussdb

上一篇也说过,GaussDB官方其实已经对异步驱动asyncpg进行了适配修改:
https://github.com/HuaweiCloudDeveloper/gaussdb-python-async

正式发布驱动名叫async-gaussdb, oython 3.8以上都支持 ,win/linux/mac上都发布了
https://pypi.org/project/async-gaussdb/#files

使用pip即可在线安装

uv pip install async-gaussdb

修改sqlalchemy方言

上一篇也提供了能使用psycopg2的sqlalchemy GaussDB方言,
https://gitcode.com/darkathena/GaussDB-sqlalchemy
虽然我后面合入了openGauss社区支持异步驱动的commit,但是openGauss社区支持的是asyncpg这个驱动,而不是async-gaussdb,所以还是不能直接用。

过程中遇到的一堆坑我也懒得把流水账都摆出来了,直接贴下commit信息吧,AI整理出了修改点
https://gitcode.com/darkathena/GaussDB-sqlalchemy/commit/76077ae51659e10aa3f220c7873b4f0035e2e161?ref=for_gaussdb

Add support for the async_gaussdb async driver and a schema-less pg_catalog shim tailored for GaussDB. Changes include:

  • New module gaussdb_sqlalchemy/gaussdb_pg_catalog.py: a schema-less copy of SQLAlchemy's pg_catalog to avoid three-part (schema.table.column) column qualification which GaussDB disallows.
  • Wire the schema-less pg_catalog into both psycopg2 and asyncpg dialect modules by assigning sqlalchemy.dialects.postgresql.base.pg_catalog = gaussdb_pg_catalog (noting this global replacement is intentional for GaussDB behavior).
  • Add AsyncAdapt_async_gaussdb_dbapi adapter and register GaussDBDialect_asyncpg to support async_gaussdb (driver name and import adapter). Map async_gaussdb exceptions to SQLAlchemy errors.
  • Update GaussDBDialect implementations to expose proper isolation level values and maintain server version info.
  • Extend GaussDBCompiler.for_update_clause to normalize FOR UPDATE semantics for GaussDB (avoid unsupported FOR NO KEY UPDATE / FOR KEY SHARE while preserving OF/NOWAIT/SKIP LOCKED).
  • Update README with async driver installation and local install notes.
  • Update setup.py: add support for newer Python classifiers, remove strict sqlalchemy version pin, and add entry point for gaussdb.async_gaussdb.

These changes enable async-gaussdb usage with SQLAlchemy and adjust SQL generation/metadata lookup to match GaussDB constraints.

为 async_gaussdb 异步驱动和针对 GaussDB 定制的无模式 pg_catalog Shim 添加支持。更改包括:

  • 新模块 gaussdb_sqlalchemy/gaussdb_pg_catalog.py:SQLAlchemy pg_catalog 的无模式副本,以避免 GaussDB 不允许的三部分(schema.table.column)列限定。
  • 将无模式 pg_catalog 接入 psycopg2 和 asyncpg 方言模块,通过赋值 sqlalchemy.dialects.postgresql.base.pg_catalog = gaussdb_pg_catalog(注意此全局替换是为了 GaussDB 行为刻意设置的)。
  • 添加 AsyncAdapt_async_gaussdb_dbapi 适配器,并注册 GaussDBDialect_asyncpg 以支持 async_gaussdb(驱动名称和导入适配器)。将 async_gaussdb 异常映射到 SQLAlchemy 错误。
  • 更新 GaussDBDialect 实现以暴露适当的隔离级别值并维护服务器版本信息。
  • 扩展 GaussDBCompiler.for_update_clause 以规范 GaussDB 的 FOR UPDATE 语义(避免不支持的 FOR NO KEY UPDATE / FOR KEY SHARE ,同时保留 OF/NOWAIT/SKIP LOCKED)。
  • 更新 README,包括异步驱动安装和本地安装说明。
  • 更新 setup.py:添加对较新 Python 分类的支持,移除严格的 sqlalchemy 版本限制,并为 gaussdb.async_gaussdb 添加入口点。

这些更改使 SQLAlchemy 能够使用 async-gaussdb,并调整 SQL 生成/元数据查找以符合 GaussDB 限制。

这个工作量比前面弄psycopg2的要多不少,主要是openGauss还支持schema.table.column三段式的字段引用,但GaussDB的A兼容模式已经禁止了这种用法。在sqlalchemy内置的postgresql方言里有大量的元数据拼接逻辑,AI本来打算根据报错一个一个去写拼接好的SQL去override内置的,但这样太不优雅了,我换了几个模型并结合自己对源码的观察,对比ORACLE方言为什么不会拼出schema名称(pg_catalog_meta = MetaData()pg_catalog_meta = MetaData(schema="pg_catalog") 的区别),终于成功引导AI自建了gaussdb_pg_catalog.py,也就是说元数据的定义以新建的这个文件为准,替代了sqlalchemy内置的,但元数据查询SQL拼接的逻辑仍然使用内置的。

for no key update/for key share在GaussDB的ustore是不支持的,不弄这个我还真没注意到。修改方式就是把不支持的分支都变成for update。

这些问题点都是在启动airflow过程中发现的。

改airflow

问题列举

很遗憾,这次适配无法仅靠方言插件来支持,还是得手动修改airflow的源码。我本来是想着airflow能支持postgresql,那么把GaussDB也当成postgresql去跑就行,但是airflow中写了很多硬编码,会以连接串来识别方言名,然后针对方言名为postgresql的做了一些操作,比如

  1. json类型变成jsonb类型
    airflow中会执行group by json类型的SQL,但gaussdb中,json类型不能group by,jsonb类型可以。比如dag_run表的context_carrier和conf字段

  2. 使用mysql函数
    这个timestampdiff(MICROSECOND ,GaussDB只在B模式支持。airflow会判断如果是postgresql,就不会用这个函数。

UPDATE task_instance SET end_date=%(end_date)s,duration=(timestampdiff(MICROSECOND, task_instance.start_date, %(timestampdiff_1)s) / CAST(%(timestampdiff_2)s AS NUMERIC)) ...
  1. 设置锁超时时间 set lock_timeout
    原生postgresql中有lock_timeout这个锁超时的参数,但是gaussdb里不是这个名称,是lockwait_timeout

  2. 另外,在GaussDB的A模式中,插入''会变成null,但是airflow有些表的字段设置是不允许为空,但airflow自己会插入'',比如asset_alias的group字段。

源码修改

在airflow源码里搜了下'postgresl',发现还是很多的,所以我先让AI分析能不能找到方言名赋值的位置,这样就能把gaussdb认为是postgresql,执行原生PG的逻辑。
然后AI改了setting.py这两处地方

    async_engine = create_async_engine(
        SQL_ALCHEMY_CONN_ASYNC,
        connect_args=_get_connect_args("async"),
        future=True,
    )
    if async_engine.dialect.name == "gaussdb":
        async_engine.dialect.name = "postgresql"
    engine = create_engine(
        SQL_ALCHEMY_CONN,
        connect_args=connect_args,
        **engine_args,
        future=True,
    )
    if engine.dialect.name == "gaussdb":
        engine.dialect.name = "postgresql"

对于锁超时参数名的问题,要改utils/db.py ,顾不上再兼容postgresql了,直接改参数名。如果要兼容,需要引入其他全局变量,懒得弄了

def create_global_lock(
    session: Session,
    lock: DBLocks,
    lock_timeout: int = 1800,
) -> Generator[None, None, None]:
    """Contextmanager that will create and teardown a global db lock."""
    conn = session.get_bind().connect()
    dialect = conn.dialect
    try:
        if dialect.name == "postgresql":
            if USE_PSYCOPG3:
                # psycopg3 doesn't support parameters for `SET`. Use `set_config` instead.
                # The timeout value must be passed as a string of milliseconds.
                conn.execute(
                    text("SELECT set_config('lockwait_timeout', :timeout, false)"),
                    {"timeout": str(lock_timeout)},
                )
                conn.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock.value})
            else:
                conn.execute(text("SET LOCKWAIT_TIMEOUT to :timeout"), {"timeout": lock_timeout})
                conn.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock.value})
        elif dialect.name == "mysql" and dialect.server_version_info >= (5, 6):
            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), {"id": str(lock), "timeout": lock_timeout})

        yield
    finally:
        if dialect.name == "postgresql":
            if USE_PSYCOPG3:
                # Use set_config() to reset the timeout to its default (0 = off/wait forever).
                conn.execute(text("SELECT set_config('lockwait_timeout', '0', false)"))
            else:
                conn.execute(text("SET LOCKWAIT_TIMEOUT TO DEFAULT"))
            (unlocked,) = conn.execute(text("SELECT pg_advisory_unlock(:id)"), {"id": lock.value}).fetchone()
            if not unlocked:
                raise RuntimeError("Error releasing DB lock!")
        elif dialect.name == "mysql" and dialect.server_version_info >= (5, 6):
            conn.execute(text("select RELEASE_LOCK(:id)"), {"id": str(lock)})

对于字段不允许为空的,修改 modules/asset.py

    group = Column(
        String(length=1500).with_variant(
            String(
                length=1500,
                # latin1 allows for more indexed length in mysql
                # and this field should only be ascii chars
                collation="latin1_general_cs",
            ),
            "mysql",
        ),
        default="",
        nullable=True,
    )

暂时总体改动量还算可控。

其他坑点

  1. python 3.10.0 版本有bug,airflow能启动,但是执行dag会报错
api-server | |   File "/home/aop/.env310/lib/python3.10/site-packages/cadwyn/schema_generation.py", line 456, in _get_defined_annotations_through_mro
api-server | |     for parent in reversed(self._get_parents(schemas)):
api-server | |   File "/home/aop/.env310/lib/python3.10/site-packages/cadwyn/schema_generation.py", line 434, in _get_parents
api-server | |     if self._parents is not None:
api-server | | AttributeError: '_PydanticModelWrapper' object has no attribute '_parents'

找这个问题折腾了挺久,GPT-5.1-Codex-Max一直认为是我环境中安装的cadwyn版本不对,哪怕我反复强调版本是对的,然后换成Gemini 3 pro,它开始上网各种搜,最后识别到python 3.10.0有BUG会出现这个问题,然后我换成python 3.11后就好了。

  1. airflow 2.9.2版本的配置文件不能直接用于启动airflow 3.1.7
    具体原因没细查,估计是有些设置项缺了。

  2. 在独立出GaussDB的元数据表模型前,启动会有警告说没找到log_template这个表,然后后续执行dag时报错不能是 Nonetype,但实际上这个表是存在的
    这个问题我也没细查,切了几个AI没找到原因,AI总认为是search_path的问题。表都是airflow首次启动一次性初始化进去的,怎么可能会有search_path的问题。AI建议手动往这个表插入两行数据,的确DAG执行不报错了。
    不过在把GaussDB的元数据表模型独立出来后,这个问题直接就不存在了,不需要手动插入数据,我猜是之前sqlalchemy拼接的检查表是否存在的SQL有问题,执行报错,导致airflow以为这个表不存在,就没往里面插入数据,所以把元数据独立出来后,这个问题就同时被修正了。

  3. 在airflow 3.1.7里,如果使用psycopg2,那么必须同时配置两个连接串,一个同步一个异步

sql_alchemy_conn = gaussdb+psycopg2://airflow_user3:Enmo_123@192.168.1.131:8000/airflow_db3
sql_alchemy_conn_async = gaussdb+async_gaussdb://airflow_user3:Enmo_123@192.1.163.131:8000/airflow_db3

最终完全部署步骤

  1. 安装uv,并初始化新的python环境
cd ~
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv .env311 --python 3.11
source .env311/bin/activate
  1. 编译psycopg2
# 安装setuptools
uv pip install setuptools -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

# 克隆psycopg2源码
git clone https://gitcode.com/darkathena/GaussDB-connector-python-psycopg2
cd GaussDB-connector-python-psycopg2/

# 上传GaussDB 的libpq驱动包到depend目录并解压
cp GaussDB-Kernel_506.0.0.SPC0100_Kylin_64bit_Libpq_Static.tar.gz depend/
cd depend
tar -xf GaussDB-Kernel_506.0.0.SPC0100_Kylin_64bit_Libpq_Static.tar.gz
cd ..
chmod +x depend/bin/pg_config
chmod +x build.sh

# 编译命令,-v指定打包文件名里的版本号
sh build.sh -bd $PWD/depend -v 506.0.0.SPC0100
  1. 安装psycopg2
cd output
tar -xf GaussDB-Python-506.0.0.SPC0100*.tar.gz
cp -r psycopg2 $(python3 -c 'import site; print(site.getsitepackages()[0])')/ && chmod 755 $(python3 -c 'import site; print(site.getsitepackages()[0])')/psycopg2
cp -r psycopg2.libs $(python3 -c 'import site; print(site.getsitepackages()[0])')/ && chmod 755 $(python3 -c 'import site; print(site.getsitepackages()[0])')/psycopg2.libs
  1. 安装async-gaussdb
uv pip install async-gaussdb -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
  1. 安装GaussDB-sqlalchemy方言包
cd ~
git clone https://gitcode.com/darkathena/GaussDB-sqlalchemy
cd GaussDB-sqlalchemy
python setup.py install
  1. 安装airflow 3.1.7
uv pip install apache-airflow==3.1.7 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
  1. 修改airflow
    见本文 改airflow 章节的源码修改 ,手动修改 setting.py 、utils/db.py、modules/assert.py 三个文件 ,或者直接下载我修改好的文件进行替换 https://gitcode.com/darkathena/airflow-patch-for-gaussdb

  2. 启动验证

export AIRFLOW_HOME=~/airflow
airflow standalone

这里默认是使用的sqlite数据库,主要是确认环境没有问题,并生成初始配置文件,启动成功后ctrl+c退出即可

  1. 配置airflow连接信息
vi airflow/airflow.cfg

修改sql_alchemy_conn参数

sql_alchemy_conn = gaussdb+psycopg2://airflow_user3:Enmo_123@192.168.1.131:8000/airflow_db3
sql_alchemy_conn_async = gaussdb+async_gaussdb://airflow_user3:Enmo_123@192.1.163.131:8000/airflow_db3
  1. 在GaussDB里建库和用户
create database airflow_db;
\c airflow_db
create user airflow_user password 'Enmo_123';
  1. 启动airflow
export AIRFLOW_HOME=~/airflow
airflow standalone

这事还没完

虽然最后还是成功完成了适配,但本次psycopg2属实是把我恶心到了。没想到这个驱动还有不支持异步的问题,要同时使用两个连接驱动才能完成数据库操作,真没法想象用postgresql的这些python开发者是怎么过来的。好在后面又出了psycopg3,把psycopg2的各种不足都补全了,sqlalchemy是支持同步异步都使用psycopg3这个驱动的。而华为云的确也基于psycopg3改了个适配GaussDB的版本,驱动名叫gaussdb。但是现在传到pypi上的这个版本,没写平台环境,至少在我的python 3.10和 3.11上,调用进去就直接coredump。而且当我发现驱动有问题之前,我已经花了很长时间让AI来基于gaussdb这个驱动写sqlalchemy的方言,直到最后无论怎么改都报错,我才直接检查了下,才发现gaussdb这个驱动有coredump的问题。
https://pypi.org/project/gaussdb/
https://github.com/HuaweiCloudDeveloper/gaussdb-python
这个可能是环境的问题,后续我可能会用这个gaussdb驱动源码安装看能不能用,再看看能不能写出sqlalchemy的方言。

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin
博主关闭了所有页面的评论