目 录CONTENT

文章目录

【GaussDB】开发gaussdb(psycopg3)的sqlalchemy方言以支持airflow

DarkAthena
2026-03-02 / 0 评论 / 0 点赞 / 3 阅读 / 0 字

【GaussDB】开发gaussdb(psycopg3)的sqlalchemy方言以支持airflow

上一篇(【GaussDB】安装测试GaussDB基于psycopg3修改的python驱动)弄清楚了gaussdb(psycopg3)的安装方式,这一篇就开始整sqlalchemy的方言了

直接vibe coding

本次基于之前已经适配了psycopg2和async-gaussdb的sqlalchemy方言来进行修改,我先在这个项目中创建了一个github文件夹,然后把sqlalchemy和gaussdb-python两个项目克隆到这个目录里,用vscode打开方言项目根目录,就开始vibe coding了。

不喜过程的可以直接跳过本章节。

User: 当前项目是sqlalchemy的GaussDB数据库方言。github路径是用于参考的项目,里面有gaussdb和sqlalchemy两个python包的源码,其中gaussdb是基于psycopg3修改的用于连接GaussDB数据库的驱动。在sqlalchemy里自带了postgresql的psycopg3连接方式,我想修改当前的项目,让其增加支持使用gaussdb连接的方式,而不是使用原生的psycopg3

GitHub Copilot: Optimizing tool selection...我会先快速梳理当前方言里 psycopg3/async 相关实现与注册入口,再对照参考源码定位最小改动点,接着直接补上 gaussdb 驱动接入并跑针对性测试验证。先从搜索现有方言注册和 psycopg 适配代码开始。

Created 5 todos

我将并行读取方言入口与 psycopg 相关文件,目标是确认现在的 URL 驱动名、DBAPI 导入路径和异步注册流程。

Searched for files matching **/gaussdb_sqlalchemy/*.py, 9 matches

Searched for regex psycopg|asyncpg|register|dialect (**/gaussdb_sqlalchemy/**/*.py), 20 results

Read

Read

我已经确认当前只注册了 psycopg2asyncpg,还没有同步 psycopg3/gaussdb 方言入口。下一步我会对照 SQLAlchemy 自带 postgresql.psycopg 的实现,复用其结构创建 gaussdb 版方言并接入注册。

Read

Read

Searched for regex gaussdb\.psycopg|psycopg3|postgresql\+psycopg|gaussdb\+ (**/test/**/*.py), 20 results

我拿到了当前 psycopg2 方言实现,接下来会读取参考 sqlalchemypostgresql.psycopg 文件和本项目打包入口,确保新增 gaussdb 驱动方言时保持与现有结构兼容。

Read , lines 1 to 420

Read

Read

Read

Searched for regex class PGDialect_psycopg|class PGDialectAsync_psycopg|get_async_dialect_cls|import_dbapi|dialect = (**/github/sqlalchemy/lib/sqlalchemy/dialects/postgresql/psycopg.py), 6 results

Read , lines 440 to 816

Read

Read , lines 1 to 300

Searched for regex entry_points|gaussdb\.asyncpg|gaussdb\.psycopg2|gaussdb\.async_gaussdb (**/README*.md|README*.en.md|README.md|README.en.md|test/**/*.py), no results

Read

我已定位到实现入口,下一步会检查 README 与测试里的连接串约定,然后按同样风格新增 gaussdb+gaussdb(以及可选 gaussdb+gaussdb_async)方言注册与实现。

Searched for regex psycopg2|asyncpg|连接|URL|create_engine|gaussdb\+ (**/README.en.md), 13 results

Searched for regex psycopg2|asyncpg|连接|URL|create_engine|gaussdb\+ (**/README.md), 19 results

Read

Read

Read

Searched for regex class IsolationLevel|from gaussdb\.pq import|types\.json|types\.hstore|types\.range|AsyncConnection|connect (**/github/gaussdb/gaussdb/**/*.py), 20 results

Searched for regex name\s*=\s*"gaussdb"|project.*gaussdb|entry_points (**/github/gaussdb/pyproject.toml), 1 result

Read

Read

Read , lines 340 to 460

Read

Searched for regex dc_psycopg2|asyncpg|driver|create_engine\(|dialect (**/test/**/*.py), 20 results

Read , lines 930 to 980

我已经确认 gaussdb 驱动 API 与 psycopg3 高度兼容,准备开始落地三类改动:新增 gaussdb 方言模块、注册新的 SQLAlchemy 驱动名、补充 README 和最小回归测试。现在开始改文件。

Starting: 实现gaussdb驱动接入 (3/5)

Created

Generating patch (18 lines) in

我已完成核心方言文件和运行时注册,下一步会更新 setup.py/setup.cfg/README 与新增一个最小测试,确保新驱动名可以被 SQLAlchemy 识别。

Generating patch (15 lines) in

Generating patch (10 lines) in

Created

Generating patch (7 lines) in

Generating patch (20 lines) in

Generating patch (20 lines) in

Starting: 运行测试并修正 (5/5)

代码改动已完成,接下来我会先跑新增用例和一个相关现有用例,确认新方言导入与注册没有回归。

Ran terminal command: python -m pytest test/test_gaussdb_driver.py test/test_alembic_dialect.py -q
Completed with input: python -m pytest test/test_gaussdb_driver.py test/test_alembic_dialect.py -q

Ran terminal command: python -m compileall gaussdb_sqlalchemy test

Read changed files in the active git repository

Checked , , , , , , , , no problems found

我这边已完成实现并通过语法与诊断检查;接下来整理变更清单和可直接使用的连接串。由于本机缺少 pytest,我会附上可执行的验证命令。

Read

Read

Read

Read

Read , lines 70 to 100

Read , lines 30 to 60

Read

Read , lines 946 to 966

Completed: 运行测试并修正 (5/5)

已完成

  • 新增基于 gaussdb(psycopg3 分支)的方言实现,支持同步和异步两种方言类:gaussdb.py。
  • 新增运行时注册:gaussdb.gaussdbgaussdb.gaussdb_asyncinit.py。
  • 新增打包入口点,安装后 SQLAlchemy 可识别 gaussdb+gaussdbgaussdb+gaussdb_async:setup.py。
  • 补充测试配置中的新连接串别名:setup.cfg。
  • 新增最小回归测试(方言名/driver 与 async 映射):test_gaussdb_driver.py。
  • 调整服务端游标测试判定,覆盖新 gaussdb driver:test_suite.py。
  • README 中已补充新用法示例(中英文):README.md、README.en.md。

验证结果

  • python -m compileall gaussdb_sqlalchemy test 已通过(语法检查 OK)。
  • 本机缺少 pytestNo module named pytest),所以未执行单测。可在你环境里运行:python -m pytest test_gaussdb_driver.py -q

要不要我继续帮你把 tox.ini 增加一个 gaussdb 驱动专用测试环境(例如 py38_gaussdb)并补一条 create_engine('gaussdb+gaussdb://...') 的集成用例?

Made changes.

User: 给我一段最简单的连接加查询的测试代码,不需要生成文件

GitHub Copilot: 最简单就这段(同步):

import sqlalchemy as sa

engine = sa.create_engine(
   "gaussdb+gaussdb://username:password@host:port/database_name"
)

with engine.connect() as conn:
   result = conn.execute(sa.text("SELECT 1"))
   print(result.scalar())  # 预期输出: 1

如果你当前环境还没装 gaussdb 驱动,也可以先把 URL 改成 gaussdb+psycopg2://...。需要的话我再给你一版异步的最小示例。

User:

Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 630, in create_engine
   dialect = dialect_cls(**dialect_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 421, in __init__
   raise ImportError(
ImportError: psycopg version 3.0.2 or higher is required.

GitHub Copilot: 这个报错已经定位到根因了:gaussdb 方言继承了 SQLAlchemy 的 PGDialect_psycopg,其初始化里强制检查“psycopg>=3.0.2”,而 gaussdb.__version__ 不是 3.x,导致被误判。
我会直接修复方言初始化逻辑,让它对 gaussdb 驱动跳过这条 psycopg3 专属版本门槛,同时保持其余行为不变。

Read , lines 360 to 560

我会顺手把 gaussdb 方言里所有仍指向 psycopg 的内部导入一并切到 gaussdb,这样不仅修掉版本检查,还能避免后续在 JSON/hstore/range 等路径上再踩坑。

Searched for regex class AdaptersMap|def set_json_loads|def set_json_dumps|class TypeInfo|def register_hstore|class TransactionStatus|class Range|class Multirange (**/github/gaussdb/gaussdb/**/*.py), 18 results

Searched for regex _psycopg_Json|_psycopg_Jsonb|_psycopg_TransactionStatus|_psycopg_Range|_psycopg_Multirange|_type_info_fetch|initialize\( (**/gaussdb_sqlalchemy/gaussdb.py), no results

我现在直接修 gaussdb.py:覆盖 __init__/initialize/相关属性导入,把 psycopg 专属逻辑替换为 gaussdb 对应实现,重点先消除你这个 create_engine 时的版本检查异常。

Generating patch (113 lines) in

Checked , no problems found

Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py

这个报错是因为方言继承了 SQLAlchemy 的 PGDialect_psycopg,其 __init__ 会强制要求 psycopg>=3.0.2;而你使用的是 gaussdb 驱动,版本号不是 3.x,被误判了。

我已经在项目里修好了,核心改动在:

  • gaussdb.py
  • 覆盖了 __init__,去掉对 psycopg>=3.0.2 的硬性门槛
  • 把内部依赖从 psycopg.* 切换为 gaussdb.*(json/hstore/typeinfo/range 等)

并已做本地校验:

  • python -m compileall gaussdb_sqlalchemy\gaussdb.py 通过

你这边下一步执行:

  • 重新安装当前仓库版本:python setup.py install(或 pip install -e .
  • 再测试:
  • import sqlalchemy as sa
  • engine = sa.create_engine("gaussdb+gaussdb://user:pwd@host:port/db")

如果你愿意,我可以再给你一条“一次性自检脚本”(只创建 engine 不连库)用于快速验证这个错误已消失。

Made changes.

User:

>>> with engine.connect() as conn:
...     result = conn.execute(sa.text("SELECT 1"))
...     print(result.scalar())
...
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3285, in connect
   return self._connection_cls(self)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 143, in __init__
   self._dbapi_connection = engine.raw_connection()
                            ^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3309, in raw_connection
   return self.pool.connect()
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 447, in connect
   return _ConnectionFairy._checkout(self)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1264, in _checkout
   fairy = _ConnectionRecord.checkout(pool)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 711, in checkout
   rec = pool._do_get()
         ^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 177, in _do_get
   with util.safe_reraise():
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 224, in __exit__
   raise exc_value.with_traceback(exc_tb)
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 175, in _do_get
   return self._create_connection()
          ^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 388, in _create_connection
   return _ConnectionRecord(self)
          ^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 673, in __init__
   self.__connect()
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 913, in __connect
   )._exec_w_sync_on_first_run(self.dbapi_connection, self)
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/event/attr.py", line 501, in _exec_w_sync_on_first_run
   self(*args, **kw)
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/event/attr.py", line 515, in __call__
   fn(*args, **kw)
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 2000, in go
   return once_fn(*arg, **kw)
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 767, in first_connect
   dialect.initialize(c)
 File "/home/aop/GaussDB-sqlalchemy/gaussdb_sqlalchemy/gaussdb.py", line 91, in initialize
   super().initialize(connection)
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 481, in initialize
   from psycopg.types.hstore import register_hstore
ModuleNotFoundError: No module named 'psycopg'
>>>

GitHub Copilot: 这次报错点很明确:initialize() 里虽然你重写了逻辑,但 super().initialize(connection) 仍然走到了 PGDialect_psycopg.initialize,它内部会硬导入 psycopg.types.hstore
我现在改成跳过这一层父类初始化,直接调用更上层通用初始化,再保留你的 gaussdb 版 hstore 注册逻辑。

Generating patch (7 lines) in

Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py

你这个新报错已经修掉根因了。

问题原因

  • GaussDBDialect_gaussdb.initialize() 里调用了 super().initialize(connection)
  • 这会进入 PGDialect_psycopg.initialize(),内部固定 from psycopg.types.hstore import register_hstore,所以触发 ModuleNotFoundError: No module named 'psycopg'

我做的修复

  • 把初始化调用改为跳过 PGDialect_psycopg.initialize() 这一层,直接走上层通用初始化:
  • gaussdb.py
  • 你文件里后续的 gaussdb.types.hstore.register_hstore 逻辑会继续执行,不受影响。

你现在要做

  • 重新安装本地包(非常关键):
  • pip install -e .
  • python setup.py install
  • 然后重试你这段:
  • with engine.connect() as conn: ...

如果还有下一条栈,我继续按同样方式给你快速修到可用。

Made changes.

User: 我airflow里的连接是这么配置的

sql_alchemy_conn_async = gaussdb+gaussdb_async://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3
sql_alchemy_conn = gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3

但启动时报错了

(.env311) [aop@kylinv10sp3-node1 airflow]$ airflow standalone
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 4, in <module>
   from airflow.__main__ import main
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module>
   settings.initialize()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize
   configure_orm()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm
   _configure_async_session()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session
   async_engine = create_async_engine(
                  ^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine
   sync_engine = _create_engine(url, **kw)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine
   dbapi = dbapi_meth(**dbapi_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 822, in import_dbapi
   import psycopg
ModuleNotFoundError: No module named 'psycopg'

GitHub Copilot: 这个报错说明 Airflow 实际加载到的不是你改过的 GaussDBDialectAsync_gaussdb,而是 SQLAlchemy 内置的 PGDialectAsync_psycopg,所以它去 import psycopg 失败了。

最常见原因是:Airflow 运行环境里安装的还是旧版 gaussdb-sqlalchemy(没有 gaussdb_async 入口或没加载到)。

先按这 3 步排查/修复(推荐)

    1. 在 Airflow 的同一个虚拟环境重装你当前源码:
  • pip uninstall -y gaussdb-sqlalchemy
  • pip install -e /home/aop/GaussDB-sqlalchemy
    1. 验证方言入口是否存在:
  • python -c "import importlib.metadata as m; print([e.name for e in m.entry_points(group='sqlalchemy.dialects') if e.name.startswith('gaussdb')])"
  • 预期至少包含:gaussdb.gaussdbgaussdb.gaussdb_async
    1. 验证 async 方言类是否是你自己的:
  • python -c "from sqlalchemy.dialects import registry; cls=registry.load('gaussdb.gaussdb_async'); print(cls, cls.__module__)"

Airflow 配置建议

  • 先保持你现在这两条不变是可以的:
  • sql_alchemy_conn = GaussDB-sqlalchemy.
  • sql_alchemy_conn_async = GaussDB-sqlalchemy.

可选兜底

  • 若你希望减少入口依赖,也可以把 async 改成:
  • sql_alchemy_conn_async = GaussDB-sqlalchemy.
  • 因为 create_async_engine 会通过 get_async_dialect_cls 自动切到异步方言(前提是加载到你的方言包)。

如果你愿意,我可以给你一条“一键自检命令”,一次性输出“当前 Airflow 环境到底加载了哪个 dialect 类”。

User:

(.env311) [aop@kylinv10sp3-node1 ~]$ python -c "import importlib.metadata as m; print([e.name for e in m.entry_points(group='sqlalchemy.dialects') if e.name.startswith('gaussdb')])"
['gaussdb', 'gaussdb.async_gaussdb', 'gaussdb.asyncpg', 'gaussdb.dc_psycopg2', 'gaussdb.gaussdb', 'gaussdb.gaussdb_async', 'gaussdb.psycopg2']
(.env311) [aop@kylinv10sp3-node1 ~]$ python -c "from sqlalchemy.dialects import registry; cls=registry.load('gaussdb.gaussdb_async'); print(cls, cls.__module__)"
<class 'gaussdb_sqlalchemy.gaussdb.GaussDBDialectAsync_gaussdb'> gaussdb_sqlalchemy.gaussdb
(.env311) [aop@kylinv10sp3-node1 ~]$ airflow standalone
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 4, in <module>
   from airflow.__main__ import main
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module>
   settings.initialize()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize
   configure_orm()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm
   _configure_async_session()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session
   async_engine = create_async_engine(
                  ^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine
   sync_engine = _create_engine(url, **kw)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine
   dbapi = dbapi_meth(**dbapi_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 822, in import_dbapi
   import psycopg
ModuleNotFoundError: No module named 'psycopg'
(.env311) [aop@kylinv10sp3-node1 ~]$

GitHub Copilot: 你这组自检很关键,说明方言入口本身没问题,问题变成了:
Airflow 启动时用到的实际 URL 不是你以为的 gaussdb+...,而是被配置覆盖成了 postgresql+psycopg...(所以才会进 psycopg.py)。

下一步直接查“Airflow实际生效值”

  • airflow config get-value database sql_alchemy_conn
  • airflow config get-value database sql_alchemy_conn_async
  • env | grep AIRFLOW__DATABASE__SQL_ALCHEMY_CONN

只要其中任意一个是 postgresql+psycopg://...,就会复现你现在的报错。

最常见原因

  • 环境变量 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN(_ASYNC) 覆盖了 airflow.cfg
  • 改错了配置文件路径(standalone 用的不是你编辑的那个)
  • sql_alchemy_conn_async 未生效,Airflow回退并自动派生了 psycopg async URL

建议先这样固定

  • 在启动前显式导出(强制覆盖):
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
airflow standalone

先把 async 也设成 gaussdb+gaussdb,避免额外变量;你的方言会通过 get_async_dialect_cls 切到异步类。

如果你把上面三条查询命令的输出贴出来,我可以马上帮你定位到底是哪一层覆盖了配置。

User:

(.env311) [aop@kylinv10sp3-node1 ~]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 ~]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 ~]$ airflow standalone
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 4, in <module>
   from airflow.__main__ import main
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module>
   settings.initialize()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize
   configure_orm()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm
   _configure_async_session()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session
   async_engine = create_async_engine(
                  ^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine
   sync_engine = _create_engine(url, **kw)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine
   dbapi = dbapi_meth(**dbapi_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb_sqlalchemy/gaussdb.py", line 177, in import_dbapi
   return PsycopgAdaptDBAPI(gaussdb, ExecStatus)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: PsycopgAdaptDBAPI.__init__() takes 2 positional arguments but 3 were given
(.env311) [aop@kylinv10sp3-node1 ~]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb_async://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 ~]$ airflow standalone
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 4, in <module>
   from airflow.__main__ import main
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module>
   settings.initialize()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize
   configure_orm()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm
   _configure_async_session()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session
   async_engine = create_async_engine(
                  ^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine
   sync_engine = _create_engine(url, **kw)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine
   dbapi = dbapi_meth(**dbapi_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 822, in import_dbapi
   import psycopg
ModuleNotFoundError: No module named 'psycopg'
(.env311) [aop@kylinv10sp3-node1 ~]$

GitHub Copilot: 你贴出的新栈已经把问题钉死了:当前 SQLAlchemy 版本里 PsycopgAdaptDBAPI 构造参数和我们假设的不一致(只接收 psycopg 一个参数)。
我先做一个兼容修复:同时兼容 1 参数/2 参数两种 SQLAlchemy 版本,这样 Airflow 环境不管是哪个小版本都能跑。

Generating patch (17 lines) in

Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py

修好了,这个报错根因是 SQLAlchemy 版本差异

  • 你环境里的 PsycopgAdaptDBAPI.__init__ 只接受一个参数
  • 我们之前按另一个版本传了两个参数(gaussdb, ExecStatus),所以报:
  • TypeError: PsycopgAdaptDBAPI.__init__() takes 2 positional arguments but 3 were given

我已经改为兼容两种签名(先试 2 参数,失败后回退 1 参数):

  • gaussdb.py

你现在在 Airflow 环境执行:

  • pip install -e /home/aop/GaussDB-sqlalchemy(或你实际源码路径)
  • 然后先用这组(更稳):
  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://...'
  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://...'
  • airflow standalone

你前面用 gaussdb+gaussdb_async 触发 psycopg 导入那条,我建议等这次补丁生效后再测;若还复现,我下一步给你把 async 方言再做一层“强制不走 psycopg fallback”的兜底。

Made changes.

User:

(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ airflow standalone
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 4, in <module>
   from airflow.__main__ import main
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module>
   settings.initialize()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize
   configure_orm()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm
   _configure_async_session()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session
   async_engine = create_async_engine(
                  ^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine
   sync_engine = _create_engine(url, **kw)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 630, in create_engine
   dialect = dialect_cls(**dialect_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 421, in __init__
   raise ImportError(
ImportError: psycopg version 3.0.2 or higher is required.

GitHub Copilot: 这个报错是新定位到的同一类问题:gaussdb+gaussdb 在 Airflow 异步初始化时会实例化异步方言类,而它还在走 PGDialect_psycopg.__init__(触发 psycopg 版本检查)。
我现在补齐异步方言的 __init__/initialize/_type_info_fetch 覆盖,彻底绕开 psycopg 依赖路径。

Generating patch (83 lines) in

Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py

异步方言补丁已完成并通过编译检查。下一步你只需要在 Airflow 的虚拟环境重新安装本地包并重启,再看是否进入数据库连接阶段;如果还有报错,基本就是驱动 API 细节兼容问题,我可以继续跟进到可启动。这次已经对症修复了:你的报错来自 异步方言仍继承执行了 PGDialect_psycopg.__init__,所以触发 psycopg>=3.0.2 检查。

我刚刚已在 gaussdb.py 做了完整异步覆盖:

  • GaussDBDialectAsync_gaussdb.__init__:跳过 psycopg 版本门槛,改用 gaussdb adapters 初始化
  • GaussDBDialectAsync_gaussdb.initialize:跳过 psycopg 的 hstore 导入路径
  • GaussDBDialectAsync_gaussdb._type_info_fetch:改为 gaussdb.types.TypeInfo + await_
  • 保留了你前面修过的 PsycopgAdaptDBAPI 构造参数兼容(1/2 参数)

并且本地已通过:

  • python -m compileall gaussdb_sqlalchemy\gaussdb.py

你现在在 Airflow 环境执行:

  • pip install -e /home/aop/GaussDB-sqlalchemy
  • 然后再跑:
  • export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://...'
  • export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://...'
  • airflow standalone

如果还有新栈,贴出来我继续修,下一跳大概率就是 gaussdb async connection 细节而不是 psycopg 依赖问题了。

Made changes.

User:

(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3'
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ airflow standalone
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 4, in <module>
   from airflow.__main__ import main
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module>
   settings.initialize()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize
   configure_orm()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 407, in configure_orm
   engine = create_engine(
            ^^^^^^^^^^^^^^
 File "<string>", line 2, in create_engine
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
   return fn(*args, **kwargs)  # type: ignore[no-any-return]
          ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 568, in create_engine
   entrypoint = u._get_entrypoint()
                ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/url.py", line 772, in _get_entrypoint
   cls = registry.load(name)
         ^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 451, in load
   return impl.load()
          ^^^^^^^^^^^
 File "/home/aop/.local/share/uv/python/cpython-3.11.12-linux-x86_64-gnu/lib/python3.11/importlib/metadata/__init__.py", line 202, in load
   module = import_module(match.group('module'))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.local/share/uv/python/cpython-3.11.12-linux-x86_64-gnu/lib/python3.11/importlib/__init__.py", line 126, in import_module
   return _bootstrap._gcd_import(name[level:], package, level)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb_sqlalchemy/gaussdb.py", line 16, in <module>
   from sqlalchemy.util.concurrency import await_
ImportError: cannot import name 'await_' from 'sqlalchemy.util.concurrency' (/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/concurrency.py)

GitHub Copilot: 收到,这次是 SQLAlchemy 版本兼容点:你环境里没有 await_,只有其他并发辅助函数。
我会把这处改成“多版本兼容导入”(优先 await_,否则回退 await_only),避免启动阶段直接 ImportError。

Generating patch (23 lines) in

Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py

这条报错已修复:你环境中的 SQLAlchemy 没有 await_,所以在导入阶段就失败了。

我已经改成多版本兼容:

  • 在 gaussdb.py 中:
  • 优先 from sqlalchemy.util.concurrency import await_ as _sa_await
  • 若失败则回退 from sqlalchemy.util.concurrency import await_only as _sa_await
  • _type_info_fetch() 统一用 _sa_await(...)

并已通过编译检查。

你现在执行:

  • pip install -e /home/aop/GaussDB-sqlalchemy
  • 再跑 airflow standalone

如果还有下一条栈,继续贴我,我会按当前 SQLAlchemy 版本继续做兼容收敛。

Made changes.

User:

2026-02-22T14:20:05.733267Z [info     ] Creating metadata              [airflow.utils.db] loc=db.py:695
Traceback (most recent call last):
 File "/home/aop/.env311/bin/airflow", line 10, in <module>
   sys.exit(main())
            ^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__main__.py", line 55, in main
   args.func(args)
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
   return func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 55, in entrypoint
   StandaloneCommand().run()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
   return func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 71, in run
   self.initialize_database()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 217, in initialize_database
   db.initdb()
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/session.py", line 100, in wrapper
   return func(*args, session=session, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 750, in initdb
   _create_db_from_orm(session=session)
 File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 696, in _create_db_from_orm
   Base.metadata.create_all(engine)
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/schema.py", line 5928, in create_all
   bind._run_ddl_visitor(
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3260, in _run_ddl_visitor
   conn._run_ddl_visitor(visitorcallable, element, **kwargs)
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2467, in _run_ddl_visitor
   ).traverse_single(element)
     ^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/visitors.py", line 661, in traverse_single
   return meth(obj, **kw)
          ^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 984, in visit_metadata
   self.traverse_single(
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/visitors.py", line 661, in traverse_single
   return meth(obj, **kw)
          ^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 1022, in visit_table
   )._invoke_with(self.connection)
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 321, in _invoke_with
   return bind.execute(self)
          ^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
   return meth(
          ^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 187, in _execute_on_connection
   return connection._execute_ddl(
          ^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1527, in _execute_ddl
   compiled = ddl.compile(
              ^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 312, in compile
   return self._compiler(dialect, **kw)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 76, in _compiler
   return dialect.ddl_compiler(dialect, self, **kw)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py", line 888, in __init__
   self.string = self.process(self.statement, **compile_kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py", line 934, in process
   return obj._compiler_dispatch(self, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/visitors.py", line 138, in _compiler_dispatch
   return meth(self, **kw)  # type: ignore  # noqa: E501
          ^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py", line 6814, in visit_create_table
   text += "\n)%s\n\n" % self.post_create_table(table)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb_sqlalchemy/base.py", line 290, in post_create_table
   if self.dialect._supports_table_distribute_by:
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'GaussDBDialect_gaussdb' object has no attribute '_supports_table_distribute_by'

GitHub Copilot: 这个栈已经是业务层面的最后一类兼容问题了:GaussDBDDLCompiler.post_create_table() 依赖方言属性 _supports_table_distribute_by,而我们新建的 gaussdb 方言类里没定义。
我先做最小修复:给同步/异步方言都补上该属性(与集中式 psycopg2 方言保持一致为 False),避免 Airflow 建表阶段崩溃。

Generating patch (17 lines) in

Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py

这个报错已经对上了,已修复。

根因

  • base.py 的 post_create_table() 会访问 self.dialect._supports_table_distribute_by
  • 新增的 GaussDBDialect_gaussdb / GaussDBDialectAsync_gaussdb 没定义该属性,导致 AttributeError

已修改

  • 在两个方言类都补了:
  • _supports_table_distribute_by = False
  • 文件:gaussdb.py

你现在执行

  • pip install -e /home/aop/GaussDB-sqlalchemy
  • 再跑 airflow standalone

这次应该会继续往后走;如果还有新栈,贴出来我继续一条条收敛到可启动。

Made changes.

到这里为之,这个方言就基本改好能用了,但是中间其实airflow里还有点问题,因为我知道那个问题在哪,就没和AI说,我自己直接改了。

修改airflow

先来看看报错

(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ airflow standalone
2026-02-22T14:15:31.248602Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.248762Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.248894Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.249035Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.249145Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.249261Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
standalone | Starting Airflow Standalone
standalone | Password for the admin user has been previously generated in /home/aop/airflow/simple_auth_manager_passwords.json.generated. Not echoing it here.
standalone | Checking database is initialized
2026-02-22T14:15:31.838366Z [info     ] Creating Airflow database tables from the ORM [airflow.utils.db] loc=db.py:684
2026-02-22T14:15:31.838507Z [info     ] Creating context               [airflow.utils.db] loc=db.py:689
Traceback (most recent call last):
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
  File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
    raise ex.with_traceback(None)
gaussdb.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SET LOCKWAIT_TIMEOUT to $1
                                ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 1361, in create_global_lock
    conn.execute(text("SET LOCKWAIT_TIMEOUT to :timeout"), {"timeout": lock_timeout})
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
    return meth(
           ^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2363, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
  File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
    raise ex.with_traceback(None)
sqlalchemy.exc.ProgrammingError: (gaussdb.errors.SyntaxError) syntax error at or near "$1"
LINE 1: SET LOCKWAIT_TIMEOUT to $1
                                ^
[SQL: SET LOCKWAIT_TIMEOUT to %(timeout)s]
[parameters: {'timeout': 1800}]
(Background on this error at: https://sqlalche.me/e/20/f405)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
  File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
    raise ex.with_traceback(None)
gaussdb.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block, firstChar[Q]

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/aop/.env311/bin/airflow", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 55, in entrypoint
    StandaloneCommand().run()
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 71, in run
    self.initialize_database()
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 217, in initialize_database
    db.initdb()
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 750, in initdb
    _create_db_from_orm(session=session)
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 690, in _create_db_from_orm
    with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
  File "/home/aop/.local/share/uv/python/cpython-3.11.12-linux-x86_64-gnu/lib/python3.11/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 1373, in create_global_lock
    conn.execute(text("SET LOCKWAIT_TIMEOUT TO DEFAULT"))
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
    return meth(
           ^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2363, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
  File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
    raise ex.with_traceback(None)
sqlalchemy.exc.InternalError: (gaussdb.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block, firstChar[Q]
[SQL: SET LOCKWAIT_TIMEOUT TO DEFAULT]
(Background on this error at: https://sqlalche.me/e/20/2j85)

这个报错其实就是使用绑定变量的方式执行SET LOCKWAIT_TIMEOUT to $1报了语法错误,因为gaussdb(psycopg3)按照标准使用了pbe,但数据库内核对于set语句并不支持pbe,所以报了错,而之前psycopg2不报错,是因为psycopg2没有按照标准使用pbe,而是简单做了字符串占位符替换。

在原本sqlalchemy自带的psycopg3方言中其实有注意到这个问题,源码中有这么几段,在db.py里

USE_PSYCOPG3: bool
try:
    from importlib.util import find_spec

    import sqlalchemy
    from packaging.version import Version

    is_psycopg3 = find_spec("psycopg") is not None
    sqlalchemy_version = Version(sqlalchemy.__version__)
    is_sqla2 = (sqlalchemy_version.major, sqlalchemy_version.minor, sqlalchemy_version.micro) >= (2, 0, 0)

    USE_PSYCOPG3 = is_psycopg3 and is_sqla2
except (ImportError, ModuleNotFoundError):
    USE_PSYCOPG3 = False

@contextlib.contextmanager
def create_global_lock(
    session: Session,
    lock: DBLocks,
    lock_timeout: int = 1800,
) -> Generator[None, None, None]:
    """Contextmanager that will create and teardown a global db lock."""
    conn = session.get_bind().connect()
    dialect = conn.dialect
    try:
        if dialect.name == "postgresql":
            if USE_PSYCOPG3:
                # psycopg3 doesn't support parameters for `SET`. Use `set_config` instead.
                # The timeout value must be passed as a string of milliseconds.
                conn.execute(
                    text("SELECT set_config('lock_timeout', :timeout, false)"),
                    {"timeout": str(lock_timeout)},
                )
                conn.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock.value})
            else:
                conn.execute(text("SET LOCK_TIMEOUT to :timeout"), {"timeout": lock_timeout})
                conn.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock.value})
        elif dialect.name == "mysql" and dialect.server_version_info >= (5, 6):
            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), {"id": str(lock), "timeout": lock_timeout})

        yield
    finally:
        if dialect.name == "postgresql":
            if USE_PSYCOPG3:
                # Use set_config() to reset the timeout to its default (0 = off/wait forever).
                conn.execute(text("SELECT set_config('lock_timeout', '0', false)"))
            else:
                conn.execute(text("SET LOCK_TIMEOUT TO DEFAULT"))
            (unlocked,) = conn.execute(text("SELECT pg_advisory_unlock(:id)"), {"id": lock.value}).fetchone()
            if not unlocked:
                raise RuntimeError("Error releasing DB lock!")
        elif dialect.name == "mysql" and dialect.server_version_info >= (5, 6):
            conn.execute(text("select RELEASE_LOCK(:id)"), {"id": str(lock)})

首先它会根据是否有包名psycopg来判断是否使用了psycopg3,并且赋值给USE_PSYCOPG3,然后后续会根据USE_PSYCOPG3的真假,对于设置参数语句使用不同的分支:如果为是,使用select set_config调用函数,否则使用SET语句。
当时我看到这里,就直接进行了如下修改

is_psycopg3 = find_spec("psycopg") is not None

改成

is_psycopg3 = find_spec("psycopg") is not None or find_spec("gaussdb") is not None

但是整理这篇文章的时候,我突然又想到,这里airflow原本的代码里走分支是不是脑子有泡了,都用调用函数的方式不就好了?还不用去判断是不是用了psycopg3(虽然在setting.py里有用,但和本处代码无关)。
算了,尽量少改原本的代码,能跑就行。

完整部署步骤

  1. 安装uv,并初始化新的python环境
cd ~
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv .env311 --python 3.11
source .env311/bin/activate
  1. 配置libpq环境
mkdir ~/gaussdb-libpq
cp GaussDB-Kernel_506.0.0.SPC0100_Kylin_64bit_Libpq_Static.tar.gz ~/gaussdb-libpq/
cd ~/gaussdb-libpq
tar -xf GaussDB-Kernel_506.0.0.SPC0100_Kylin_64bit_Libpq_Static.tar.gz
echo "export LD_LIBRARY_PATH=~/gaussdb-libpq/lib:\$LD_LIBRARY_PATH" >> ~/.bash_profile
source ~/.bash_profile
  1. 安装gaussdb(psycopg3)
uv pip install gaussdb -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
  1. 安装GaussDB-sqlalchemy方言包
cd ~
git clone https://gitcode.com/darkathena/GaussDB-sqlalchemy
cd GaussDB-sqlalchemy
python setup.py install
  1. 安装airflow 3.1.7
uv pip install apache-airflow==3.1.7 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
  1. 修改airflow
    参考上一篇和本篇中对airflow的修改 ,手动修改 setting.py 、utils/db.py、modules/assert.py 三个文件,或者直接下载我修改好的文件进行替换 https://gitcode.com/darkathena/airflow-patch-for-gaussdb

  2. 启动验证

export AIRFLOW_HOME=~/airflow
airflow standalone

这里默认是使用的sqlite数据库,主要是确认环境没有问题,并生成初始配置文件,启动成功后ctrl+c退出即可

  1. 配置airflow连接信息
vi airflow/airflow.cfg

修改sql_alchemy_conn参数(不需要配置异步连接了,psycopg3原生支持异步)

sql_alchemy_conn = gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.163.1:7456/airflow_db3
  1. 在GaussDB里建库和用户
create database airflow_db;
\c airflow_db
create user airflow_user password 'Enmo_123';
  1. 启动airflow
export AIRFLOW_HOME=~/airflow
airflow standalone

这样就不用手动编译连接驱动了,只是就算开发好了sqlalchemy方言,airflow也仍然需要修改才能支持GaussDB数据库作为元库。
另外,注意GaussDB 506.0版本的libpq只提供了以下几个操作系统的版本

  • Bclinux2.10_arm_64
  • Bclinux2.10_X86_64
  • Euler2.10_arm_64
  • Euler2.10_X86_64
  • Hce2.0_arm_64
  • Hce2.0_X86_64
  • Kylinv10_arm_64
  • Kylinv10_X86_64
  • Suse2.5_X86_64
  • UnionTech20_arm_64
  • UnionTech20_X86_64

总结

本次给GaussDB适配airflow的任务基本告一段落,总结下做了些什么

  1. 基于openGauss的psycopg2驱动,修改成了支持读取GaussDB的libpq版本,支持在不同的python版本环境中自行编译(GaussDB未提供不同python版本的psycopg2驱动)
    https://gitcode.com/darkathena/GaussDB-connector-python-psycopg2
  2. 弄清楚了gaussdb(psycopg3)的实现及源码安装方式
    https://github.com/HuaweiCloudDeveloper/gaussdb-python
  3. 基于openGauss的sqlalchemy方言,修改成了支持GaussDB的psycopg2/async-gaussdb/gaussdb(psycopg3)三种python驱动连接方式
    https://gitcode.com/darkathena/GaussDB-sqlalchemy
  4. 修改airflow源码,适配了GaussDB
    https://gitcode.com/darkathena/airflow-patch-for-gaussdb

四篇文章链接

后面就看华为是否会开源psycopg2源码或者编译不同python版本的psycopg2,以及是否会提供sqlalchemy的方言了,生态上的这些配套工具最好还是有原厂背书为好,我个人搞的这些不能保证版本能持续演进。

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin
博主关闭了所有页面的评论