【GaussDB】开发gaussdb(psycopg3)的sqlalchemy方言以支持airflow
上一篇(【GaussDB】安装测试GaussDB基于psycopg3修改的python驱动)弄清楚了gaussdb(psycopg3)的安装方式,这一篇就开始整sqlalchemy的方言了
直接vibe coding
本次基于之前已经适配了psycopg2和async-gaussdb的sqlalchemy方言来进行修改,我先在这个项目中创建了一个github文件夹,然后把sqlalchemy和gaussdb-python两个项目克隆到这个目录里,用vscode打开方言项目根目录,就开始vibe coding了。
不喜过程的可以直接跳过本章节。
User: 当前项目是sqlalchemy的GaussDB数据库方言。github路径是用于参考的项目,里面有gaussdb和sqlalchemy两个python包的源码,其中gaussdb是基于psycopg3修改的用于连接GaussDB数据库的驱动。在sqlalchemy里自带了postgresql的psycopg3连接方式,我想修改当前的项目,让其增加支持使用gaussdb连接的方式,而不是使用原生的psycopg3
GitHub Copilot: Optimizing tool selection...我会先快速梳理当前方言里
psycopg3/async相关实现与注册入口,再对照参考源码定位最小改动点,接着直接补上gaussdb驱动接入并跑针对性测试验证。先从搜索现有方言注册和psycopg适配代码开始。Created 5 todos
我将并行读取方言入口与
psycopg相关文件,目标是确认现在的 URL 驱动名、DBAPI 导入路径和异步注册流程。Searched for files matching
**/gaussdb_sqlalchemy/*.py, 9 matchesSearched for regex
psycopg|asyncpg|register|dialect(**/gaussdb_sqlalchemy/**/*.py), 20 results我已经确认当前只注册了
psycopg2与asyncpg,还没有同步psycopg3/gaussdb方言入口。下一步我会对照 SQLAlchemy 自带postgresql.psycopg的实现,复用其结构创建gaussdb版方言并接入注册。Searched for regex
gaussdb\.psycopg|psycopg3|postgresql\+psycopg|gaussdb\+(**/test/**/*.py), 20 results我拿到了当前
psycopg2方言实现,接下来会读取参考sqlalchemy的postgresql.psycopg文件和本项目打包入口,确保新增gaussdb驱动方言时保持与现有结构兼容。Searched for regex
class PGDialect_psycopg|class PGDialectAsync_psycopg|get_async_dialect_cls|import_dbapi|dialect =(**/github/sqlalchemy/lib/sqlalchemy/dialects/postgresql/psycopg.py), 6 resultsSearched for regex
entry_points|gaussdb\.asyncpg|gaussdb\.psycopg2|gaussdb\.async_gaussdb(**/README*.md|README*.en.md|README.md|README.en.md|test/**/*.py), no results我已定位到实现入口,下一步会检查 README 与测试里的连接串约定,然后按同样风格新增
gaussdb+gaussdb(以及可选gaussdb+gaussdb_async)方言注册与实现。Searched for regex
psycopg2|asyncpg|连接|URL|create_engine|gaussdb\+(**/README.en.md), 13 resultsSearched for regex
psycopg2|asyncpg|连接|URL|create_engine|gaussdb\+(**/README.md), 19 resultsSearched for regex
class IsolationLevel|from gaussdb\.pq import|types\.json|types\.hstore|types\.range|AsyncConnection|connect(**/github/gaussdb/gaussdb/**/*.py), 20 resultsSearched for regex
name\s*=\s*"gaussdb"|project.*gaussdb|entry_points(**/github/gaussdb/pyproject.toml), 1 resultSearched for regex
dc_psycopg2|asyncpg|driver|create_engine\(|dialect(**/test/**/*.py), 20 results我已经确认
gaussdb驱动 API 与psycopg3高度兼容,准备开始落地三类改动:新增gaussdb方言模块、注册新的 SQLAlchemy 驱动名、补充 README 和最小回归测试。现在开始改文件。Starting: 实现gaussdb驱动接入 (3/5)
Generating patch (18 lines) in
我已完成核心方言文件和运行时注册,下一步会更新
setup.py/setup.cfg/README与新增一个最小测试,确保新驱动名可以被 SQLAlchemy 识别。Generating patch (15 lines) in
Generating patch (10 lines) in
Generating patch (20 lines) in
Generating patch (20 lines) in
Starting: 运行测试并修正 (5/5)
代码改动已完成,接下来我会先跑新增用例和一个相关现有用例,确认新方言导入与注册没有回归。
Ran terminal command: python -m pytest test/test_gaussdb_driver.py test/test_alembic_dialect.py -q
Completed with input: python -m pytest test/test_gaussdb_driver.py test/test_alembic_dialect.py -qRan terminal command: python -m compileall gaussdb_sqlalchemy test
Read changed files in the active git repository
Checked , , , , , , , , no problems found
我这边已完成实现并通过语法与诊断检查;接下来整理变更清单和可直接使用的连接串。由于本机缺少
pytest,我会附上可执行的验证命令。Completed: 运行测试并修正 (5/5)
已完成
- 新增基于
gaussdb(psycopg3 分支)的方言实现,支持同步和异步两种方言类:gaussdb.py。- 新增运行时注册:
gaussdb.gaussdb、gaussdb.gaussdb_async:init.py。- 新增打包入口点,安装后 SQLAlchemy 可识别
gaussdb+gaussdb和gaussdb+gaussdb_async:setup.py。- 补充测试配置中的新连接串别名:setup.cfg。
- 新增最小回归测试(方言名/driver 与 async 映射):test_gaussdb_driver.py。
- 调整服务端游标测试判定,覆盖新
gaussdbdriver:test_suite.py。- README 中已补充新用法示例(中英文):README.md、README.en.md。
验证结果
python -m compileall gaussdb_sqlalchemy test已通过(语法检查 OK)。- 本机缺少
pytest(No module named pytest),所以未执行单测。可在你环境里运行:python -m pytest test_gaussdb_driver.py -q。要不要我继续帮你把 tox.ini 增加一个
gaussdb驱动专用测试环境(例如py38_gaussdb)并补一条create_engine('gaussdb+gaussdb://...')的集成用例?Made changes.
User: 给我一段最简单的连接加查询的测试代码,不需要生成文件
GitHub Copilot: 最简单就这段(同步):
import sqlalchemy as sa engine = sa.create_engine( "gaussdb+gaussdb://username:password@host:port/database_name" ) with engine.connect() as conn: result = conn.execute(sa.text("SELECT 1")) print(result.scalar()) # 预期输出: 1如果你当前环境还没装
gaussdb驱动,也可以先把 URL 改成gaussdb+psycopg2://...。需要的话我再给你一版异步的最小示例。User:
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 630, in create_engine dialect = dialect_cls(**dialect_args) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 421, in __init__ raise ImportError( ImportError: psycopg version 3.0.2 or higher is required.GitHub Copilot: 这个报错已经定位到根因了:
gaussdb方言继承了 SQLAlchemy 的PGDialect_psycopg,其初始化里强制检查“psycopg>=3.0.2”,而gaussdb.__version__不是 3.x,导致被误判。
我会直接修复方言初始化逻辑,让它对gaussdb驱动跳过这条 psycopg3 专属版本门槛,同时保持其余行为不变。我会顺手把
gaussdb方言里所有仍指向psycopg的内部导入一并切到gaussdb,这样不仅修掉版本检查,还能避免后续在 JSON/hstore/range 等路径上再踩坑。Searched for regex
class AdaptersMap|def set_json_loads|def set_json_dumps|class TypeInfo|def register_hstore|class TransactionStatus|class Range|class Multirange(**/github/gaussdb/gaussdb/**/*.py), 18 resultsSearched for regex
_psycopg_Json|_psycopg_Jsonb|_psycopg_TransactionStatus|_psycopg_Range|_psycopg_Multirange|_type_info_fetch|initialize\((**/gaussdb_sqlalchemy/gaussdb.py), no results我现在直接修 gaussdb.py:覆盖
__init__/initialize/相关属性导入,把psycopg专属逻辑替换为gaussdb对应实现,重点先消除你这个create_engine时的版本检查异常。Generating patch (113 lines) in
Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py
这个报错是因为方言继承了 SQLAlchemy 的
PGDialect_psycopg,其__init__会强制要求psycopg>=3.0.2;而你使用的是gaussdb驱动,版本号不是 3.x,被误判了。我已经在项目里修好了,核心改动在:
- gaussdb.py
- 覆盖了
__init__,去掉对psycopg>=3.0.2的硬性门槛- 把内部依赖从
psycopg.*切换为gaussdb.*(json/hstore/typeinfo/range 等)并已做本地校验:
python -m compileall gaussdb_sqlalchemy\gaussdb.py通过你这边下一步执行:
- 重新安装当前仓库版本:
python setup.py install(或pip install -e .)- 再测试:
import sqlalchemy as saengine = sa.create_engine("gaussdb+gaussdb://user:pwd@host:port/db")如果你愿意,我可以再给你一条“一次性自检脚本”(只创建 engine 不连库)用于快速验证这个错误已消失。
Made changes.
User:
>>> with engine.connect() as conn: ... result = conn.execute(sa.text("SELECT 1")) ... print(result.scalar()) ... Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3285, in connect return self._connection_cls(self) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 143, in __init__ self._dbapi_connection = engine.raw_connection() ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3309, in raw_connection return self.pool.connect() ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 447, in connect return _ConnectionFairy._checkout(self) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1264, in _checkout fairy = _ConnectionRecord.checkout(pool) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 711, in checkout rec = pool._do_get() ^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 177, in _do_get with util.safe_reraise(): File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 224, in __exit__ raise exc_value.with_traceback(exc_tb) File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 175, in _do_get return self._create_connection() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 388, in _create_connection return _ConnectionRecord(self) ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 673, in __init__ self.__connect() File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 913, in __connect )._exec_w_sync_on_first_run(self.dbapi_connection, self) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/event/attr.py", line 501, in _exec_w_sync_on_first_run self(*args, **kw) File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/event/attr.py", line 515, in __call__ fn(*args, **kw) File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 2000, in go return once_fn(*arg, **kw) ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 767, in first_connect dialect.initialize(c) File "/home/aop/GaussDB-sqlalchemy/gaussdb_sqlalchemy/gaussdb.py", line 91, in initialize super().initialize(connection) File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 481, in initialize from psycopg.types.hstore import register_hstore ModuleNotFoundError: No module named 'psycopg' >>>GitHub Copilot: 这次报错点很明确:
initialize()里虽然你重写了逻辑,但super().initialize(connection)仍然走到了PGDialect_psycopg.initialize,它内部会硬导入psycopg.types.hstore。
我现在改成跳过这一层父类初始化,直接调用更上层通用初始化,再保留你的gaussdb版 hstore 注册逻辑。Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py
你这个新报错已经修掉根因了。
问题原因
GaussDBDialect_gaussdb.initialize()里调用了super().initialize(connection)。- 这会进入
PGDialect_psycopg.initialize(),内部固定from psycopg.types.hstore import register_hstore,所以触发ModuleNotFoundError: No module named 'psycopg'。我做的修复
- 把初始化调用改为跳过
PGDialect_psycopg.initialize()这一层,直接走上层通用初始化:- gaussdb.py
- 你文件里后续的
gaussdb.types.hstore.register_hstore逻辑会继续执行,不受影响。你现在要做
- 重新安装本地包(非常关键):
pip install -e .
或python setup.py install- 然后重试你这段:
with engine.connect() as conn: ...如果还有下一条栈,我继续按同样方式给你快速修到可用。
Made changes.
User: 我airflow里的连接是这么配置的
sql_alchemy_conn_async = gaussdb+gaussdb_async://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3 sql_alchemy_conn = gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3但启动时报错了
(.env311) [aop@kylinv10sp3-node1 airflow]$ airflow standalone Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 4, in <module> from airflow.__main__ import main File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module> settings.initialize() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize configure_orm() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm _configure_async_session() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session async_engine = create_async_engine( ^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine sync_engine = _create_engine(url, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine dbapi = dbapi_meth(**dbapi_args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 822, in import_dbapi import psycopg ModuleNotFoundError: No module named 'psycopg'GitHub Copilot: 这个报错说明 Airflow 实际加载到的不是你改过的
GaussDBDialectAsync_gaussdb,而是 SQLAlchemy 内置的PGDialectAsync_psycopg,所以它去import psycopg失败了。最常见原因是:Airflow 运行环境里安装的还是旧版
gaussdb-sqlalchemy(没有gaussdb_async入口或没加载到)。先按这 3 步排查/修复(推荐)
- 在 Airflow 的同一个虚拟环境重装你当前源码:
pip uninstall -y gaussdb-sqlalchemypip install -e /home/aop/GaussDB-sqlalchemy
- 验证方言入口是否存在:
python -c "import importlib.metadata as m; print([e.name for e in m.entry_points(group='sqlalchemy.dialects') if e.name.startswith('gaussdb')])"- 预期至少包含:
gaussdb.gaussdb、gaussdb.gaussdb_async
- 验证 async 方言类是否是你自己的:
python -c "from sqlalchemy.dialects import registry; cls=registry.load('gaussdb.gaussdb_async'); print(cls, cls.__module__)"Airflow 配置建议
- 先保持你现在这两条不变是可以的:
sql_alchemy_conn = GaussDB-sqlalchemy.sql_alchemy_conn_async = GaussDB-sqlalchemy.可选兜底
- 若你希望减少入口依赖,也可以把 async 改成:
sql_alchemy_conn_async = GaussDB-sqlalchemy.- 因为
create_async_engine会通过get_async_dialect_cls自动切到异步方言(前提是加载到你的方言包)。如果你愿意,我可以给你一条“一键自检命令”,一次性输出“当前 Airflow 环境到底加载了哪个 dialect 类”。
User:
(.env311) [aop@kylinv10sp3-node1 ~]$ python -c "import importlib.metadata as m; print([e.name for e in m.entry_points(group='sqlalchemy.dialects') if e.name.startswith('gaussdb')])" ['gaussdb', 'gaussdb.async_gaussdb', 'gaussdb.asyncpg', 'gaussdb.dc_psycopg2', 'gaussdb.gaussdb', 'gaussdb.gaussdb_async', 'gaussdb.psycopg2'] (.env311) [aop@kylinv10sp3-node1 ~]$ python -c "from sqlalchemy.dialects import registry; cls=registry.load('gaussdb.gaussdb_async'); print(cls, cls.__module__)" <class 'gaussdb_sqlalchemy.gaussdb.GaussDBDialectAsync_gaussdb'> gaussdb_sqlalchemy.gaussdb (.env311) [aop@kylinv10sp3-node1 ~]$ airflow standalone Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 4, in <module> from airflow.__main__ import main File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module> settings.initialize() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize configure_orm() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm _configure_async_session() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session async_engine = create_async_engine( ^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine sync_engine = _create_engine(url, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine dbapi = dbapi_meth(**dbapi_args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 822, in import_dbapi import psycopg ModuleNotFoundError: No module named 'psycopg' (.env311) [aop@kylinv10sp3-node1 ~]$GitHub Copilot: 你这组自检很关键,说明方言入口本身没问题,问题变成了:
Airflow 启动时用到的实际 URL 不是你以为的gaussdb+...,而是被配置覆盖成了postgresql+psycopg...(所以才会进 psycopg.py)。下一步直接查“Airflow实际生效值”
airflow config get-value database sql_alchemy_connairflow config get-value database sql_alchemy_conn_asyncenv | grep AIRFLOW__DATABASE__SQL_ALCHEMY_CONN只要其中任意一个是
postgresql+psycopg://...,就会复现你现在的报错。最常见原因
- 环境变量
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN(_ASYNC)覆盖了airflow.cfg- 改错了配置文件路径(
standalone用的不是你编辑的那个)sql_alchemy_conn_async未生效,Airflow回退并自动派生了 psycopg async URL建议先这样固定
- 在启动前显式导出(强制覆盖):
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' airflow standalone先把 async 也设成
gaussdb+gaussdb,避免额外变量;你的方言会通过get_async_dialect_cls切到异步类。如果你把上面三条查询命令的输出贴出来,我可以马上帮你定位到底是哪一层覆盖了配置。
User:
(.env311) [aop@kylinv10sp3-node1 ~]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 ~]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 ~]$ airflow standalone Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 4, in <module> from airflow.__main__ import main File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module> settings.initialize() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize configure_orm() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm _configure_async_session() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session async_engine = create_async_engine( ^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine sync_engine = _create_engine(url, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine dbapi = dbapi_meth(**dbapi_args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb_sqlalchemy/gaussdb.py", line 177, in import_dbapi return PsycopgAdaptDBAPI(gaussdb, ExecStatus) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: PsycopgAdaptDBAPI.__init__() takes 2 positional arguments but 3 were given (.env311) [aop@kylinv10sp3-node1 ~]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb_async://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 ~]$ airflow standalone Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 4, in <module> from airflow.__main__ import main File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module> settings.initialize() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize configure_orm() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm _configure_async_session() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session async_engine = create_async_engine( ^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine sync_engine = _create_engine(url, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 617, in create_engine dbapi = dbapi_meth(**dbapi_args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 822, in import_dbapi import psycopg ModuleNotFoundError: No module named 'psycopg' (.env311) [aop@kylinv10sp3-node1 ~]$GitHub Copilot: 你贴出的新栈已经把问题钉死了:当前 SQLAlchemy 版本里
PsycopgAdaptDBAPI构造参数和我们假设的不一致(只接收psycopg一个参数)。
我先做一个兼容修复:同时兼容 1 参数/2 参数两种 SQLAlchemy 版本,这样 Airflow 环境不管是哪个小版本都能跑。Generating patch (17 lines) in
Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py
修好了,这个报错根因是 SQLAlchemy 版本差异:
- 你环境里的
PsycopgAdaptDBAPI.__init__只接受一个参数- 我们之前按另一个版本传了两个参数(
gaussdb, ExecStatus),所以报:TypeError: PsycopgAdaptDBAPI.__init__() takes 2 positional arguments but 3 were given我已经改为兼容两种签名(先试 2 参数,失败后回退 1 参数):
- gaussdb.py
你现在在 Airflow 环境执行:
pip install -e /home/aop/GaussDB-sqlalchemy(或你实际源码路径)- 然后先用这组(更稳):
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://...'AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://...'- 再
airflow standalone你前面用
gaussdb+gaussdb_async触发psycopg导入那条,我建议等这次补丁生效后再测;若还复现,我下一步给你把 async 方言再做一层“强制不走 psycopg fallback”的兜底。Made changes.
User:
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ airflow standalone Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 4, in <module> from airflow.__main__ import main File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module> settings.initialize() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize configure_orm() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 416, in configure_orm _configure_async_session() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 359, in _configure_async_session async_engine = create_async_engine( ^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 120, in create_async_engine sync_engine = _create_engine(url, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 630, in create_engine dialect = dialect_cls(**dialect_args) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py", line 421, in __init__ raise ImportError( ImportError: psycopg version 3.0.2 or higher is required.GitHub Copilot: 这个报错是新定位到的同一类问题:
gaussdb+gaussdb在 Airflow 异步初始化时会实例化异步方言类,而它还在走PGDialect_psycopg.__init__(触发 psycopg 版本检查)。
我现在补齐异步方言的__init__/initialize/_type_info_fetch覆盖,彻底绕开psycopg依赖路径。Generating patch (83 lines) in
Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py
异步方言补丁已完成并通过编译检查。下一步你只需要在 Airflow 的虚拟环境重新安装本地包并重启,再看是否进入数据库连接阶段;如果还有报错,基本就是驱动 API 细节兼容问题,我可以继续跟进到可启动。这次已经对症修复了:你的报错来自 异步方言仍继承执行了
PGDialect_psycopg.__init__,所以触发psycopg>=3.0.2检查。我刚刚已在 gaussdb.py 做了完整异步覆盖:
GaussDBDialectAsync_gaussdb.__init__:跳过 psycopg 版本门槛,改用gaussdbadapters 初始化GaussDBDialectAsync_gaussdb.initialize:跳过 psycopg 的 hstore 导入路径GaussDBDialectAsync_gaussdb._type_info_fetch:改为gaussdb.types.TypeInfo+await_- 保留了你前面修过的
PsycopgAdaptDBAPI构造参数兼容(1/2 参数)并且本地已通过:
python -m compileall gaussdb_sqlalchemy\gaussdb.py你现在在 Airflow 环境执行:
pip install -e /home/aop/GaussDB-sqlalchemy- 然后再跑:
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://...'export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://...'airflow standalone如果还有新栈,贴出来我继续修,下一跳大概率就是
gaussdbasync connection 细节而不是psycopg依赖问题了。Made changes.
User:
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC='gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.1.131:7456/airflow_db3' (.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ airflow standalone Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 4, in <module> from airflow.__main__ import main File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__init__.py", line 79, in <module> settings.initialize() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 714, in initialize configure_orm() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/settings.py", line 407, in configure_orm engine = create_engine( ^^^^^^^^^^^^^^ File "<string>", line 2, in create_engine File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned return fn(*args, **kwargs) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 568, in create_engine entrypoint = u._get_entrypoint() ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/url.py", line 772, in _get_entrypoint cls = registry.load(name) ^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 451, in load return impl.load() ^^^^^^^^^^^ File "/home/aop/.local/share/uv/python/cpython-3.11.12-linux-x86_64-gnu/lib/python3.11/importlib/metadata/__init__.py", line 202, in load module = import_module(match.group('module')) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.local/share/uv/python/cpython-3.11.12-linux-x86_64-gnu/lib/python3.11/importlib/__init__.py", line 126, in import_module return _bootstrap._gcd_import(name[level:], package, level) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb_sqlalchemy/gaussdb.py", line 16, in <module> from sqlalchemy.util.concurrency import await_ ImportError: cannot import name 'await_' from 'sqlalchemy.util.concurrency' (/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/util/concurrency.py)GitHub Copilot: 收到,这次是 SQLAlchemy 版本兼容点:你环境里没有
await_,只有其他并发辅助函数。
我会把这处改成“多版本兼容导入”(优先await_,否则回退await_only),避免启动阶段直接 ImportError。Generating patch (23 lines) in
Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py
这条报错已修复:你环境中的 SQLAlchemy 没有
await_,所以在导入阶段就失败了。我已经改成多版本兼容:
- 在 gaussdb.py 中:
- 优先
from sqlalchemy.util.concurrency import await_ as _sa_await- 若失败则回退
from sqlalchemy.util.concurrency import await_only as _sa_await_type_info_fetch()统一用_sa_await(...)并已通过编译检查。
你现在执行:
pip install -e /home/aop/GaussDB-sqlalchemy- 再跑
airflow standalone如果还有下一条栈,继续贴我,我会按当前 SQLAlchemy 版本继续做兼容收敛。
Made changes.
User:
2026-02-22T14:20:05.733267Z [info ] Creating metadata [airflow.utils.db] loc=db.py:695 Traceback (most recent call last): File "/home/aop/.env311/bin/airflow", line 10, in <module> sys.exit(main()) ^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__main__.py", line 55, in main args.func(args) File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 55, in entrypoint StandaloneCommand().run() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 71, in run self.initialize_database() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 217, in initialize_database db.initdb() File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/session.py", line 100, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 750, in initdb _create_db_from_orm(session=session) File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 696, in _create_db_from_orm Base.metadata.create_all(engine) File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/schema.py", line 5928, in create_all bind._run_ddl_visitor( File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3260, in _run_ddl_visitor conn._run_ddl_visitor(visitorcallable, element, **kwargs) File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2467, in _run_ddl_visitor ).traverse_single(element) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/visitors.py", line 661, in traverse_single return meth(obj, **kw) ^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 984, in visit_metadata self.traverse_single( File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/visitors.py", line 661, in traverse_single return meth(obj, **kw) ^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 1022, in visit_table )._invoke_with(self.connection) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 321, in _invoke_with return bind.execute(self) ^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1419, in execute return meth( ^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 187, in _execute_on_connection return connection._execute_ddl( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1527, in _execute_ddl compiled = ddl.compile( ^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 312, in compile return self._compiler(dialect, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 76, in _compiler return dialect.ddl_compiler(dialect, self, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py", line 888, in __init__ self.string = self.process(self.statement, **compile_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py", line 934, in process return obj._compiler_dispatch(self, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/visitors.py", line 138, in _compiler_dispatch return meth(self, **kw) # type: ignore # noqa: E501 ^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py", line 6814, in visit_create_table text += "\n)%s\n\n" % self.post_create_table(table) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb_sqlalchemy/base.py", line 290, in post_create_table if self.dialect._supports_table_distribute_by: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'GaussDBDialect_gaussdb' object has no attribute '_supports_table_distribute_by'GitHub Copilot: 这个栈已经是业务层面的最后一类兼容问题了:
GaussDBDDLCompiler.post_create_table()依赖方言属性_supports_table_distribute_by,而我们新建的gaussdb方言类里没定义。
我先做最小修复:给同步/异步方言都补上该属性(与集中式psycopg2方言保持一致为False),避免 Airflow 建表阶段崩溃。Generating patch (17 lines) in
Ran terminal command: python -m compileall gaussdb_sqlalchemy\gaussdb.py
这个报错已经对上了,已修复。
根因
- base.py 的
post_create_table()会访问self.dialect._supports_table_distribute_by- 新增的
GaussDBDialect_gaussdb/GaussDBDialectAsync_gaussdb没定义该属性,导致AttributeError已修改
- 在两个方言类都补了:
_supports_table_distribute_by = False- 文件:gaussdb.py
你现在执行
pip install -e /home/aop/GaussDB-sqlalchemy- 再跑
airflow standalone这次应该会继续往后走;如果还有新栈,贴出来我继续一条条收敛到可启动。
Made changes.
到这里为之,这个方言就基本改好能用了,但是中间其实airflow里还有点问题,因为我知道那个问题在哪,就没和AI说,我自己直接改了。
修改airflow
先来看看报错
(.env311) [aop@kylinv10sp3-node1 GaussDB-sqlalchemy]$ airflow standalone
2026-02-22T14:15:31.248602Z [info ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.248762Z [info ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.248894Z [info ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.249035Z [info ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.249145Z [info ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-02-22T14:15:31.249261Z [info ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
standalone | Starting Airflow Standalone
standalone | Password for the admin user has been previously generated in /home/aop/airflow/simple_auth_manager_passwords.json.generated. Not echoing it here.
standalone | Checking database is initialized
2026-02-22T14:15:31.838366Z [info ] Creating Airflow database tables from the ORM [airflow.utils.db] loc=db.py:684
2026-02-22T14:15:31.838507Z [info ] Creating context [airflow.utils.db] loc=db.py:689
Traceback (most recent call last):
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
cursor.execute(statement, parameters)
File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
raise ex.with_traceback(None)
gaussdb.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SET LOCKWAIT_TIMEOUT to $1
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 1361, in create_global_lock
conn.execute(text("SET LOCKWAIT_TIMEOUT to :timeout"), {"timeout": lock_timeout})
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
return meth(
^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
return self._exec_single_context(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
self._handle_dbapi_exception(
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2363, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
cursor.execute(statement, parameters)
File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
raise ex.with_traceback(None)
sqlalchemy.exc.ProgrammingError: (gaussdb.errors.SyntaxError) syntax error at or near "$1"
LINE 1: SET LOCKWAIT_TIMEOUT to $1
^
[SQL: SET LOCKWAIT_TIMEOUT to %(timeout)s]
[parameters: {'timeout': 1800}]
(Background on this error at: https://sqlalche.me/e/20/f405)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
cursor.execute(statement, parameters)
File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
raise ex.with_traceback(None)
gaussdb.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block, firstChar[Q]
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/aop/.env311/bin/airflow", line 10, in <module>
sys.exit(main())
^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/__main__.py", line 55, in main
args.func(args)
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 55, in entrypoint
StandaloneCommand().run()
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 71, in run
self.initialize_database()
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/cli/commands/standalone_command.py", line 217, in initialize_database
db.initdb()
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/session.py", line 100, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 750, in initdb
_create_db_from_orm(session=session)
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 690, in _create_db_from_orm
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
File "/home/aop/.local/share/uv/python/cpython-3.11.12-linux-x86_64-gnu/lib/python3.11/contextlib.py", line 137, in __enter__
return next(self.gen)
^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/airflow/utils/db.py", line 1373, in create_global_lock
conn.execute(text("SET LOCKWAIT_TIMEOUT TO DEFAULT"))
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
return meth(
^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
return self._exec_single_context(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
self._handle_dbapi_exception(
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2363, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/home/aop/.env311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
cursor.execute(statement, parameters)
File "/home/aop/.env311/lib/python3.11/site-packages/gaussdb/cursor.py", line 98, in execute
raise ex.with_traceback(None)
sqlalchemy.exc.InternalError: (gaussdb.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block, firstChar[Q]
[SQL: SET LOCKWAIT_TIMEOUT TO DEFAULT]
(Background on this error at: https://sqlalche.me/e/20/2j85)
这个报错其实就是使用绑定变量的方式执行SET LOCKWAIT_TIMEOUT to $1报了语法错误,因为gaussdb(psycopg3)按照标准使用了pbe,但数据库内核对于set语句并不支持pbe,所以报了错,而之前psycopg2不报错,是因为psycopg2没有按照标准使用pbe,而是简单做了字符串占位符替换。
在原本sqlalchemy自带的psycopg3方言中其实有注意到这个问题,源码中有这么几段,在db.py里
USE_PSYCOPG3: bool
try:
from importlib.util import find_spec
import sqlalchemy
from packaging.version import Version
is_psycopg3 = find_spec("psycopg") is not None
sqlalchemy_version = Version(sqlalchemy.__version__)
is_sqla2 = (sqlalchemy_version.major, sqlalchemy_version.minor, sqlalchemy_version.micro) >= (2, 0, 0)
USE_PSYCOPG3 = is_psycopg3 and is_sqla2
except (ImportError, ModuleNotFoundError):
USE_PSYCOPG3 = False
@contextlib.contextmanager
def create_global_lock(
session: Session,
lock: DBLocks,
lock_timeout: int = 1800,
) -> Generator[None, None, None]:
"""Contextmanager that will create and teardown a global db lock."""
conn = session.get_bind().connect()
dialect = conn.dialect
try:
if dialect.name == "postgresql":
if USE_PSYCOPG3:
# psycopg3 doesn't support parameters for `SET`. Use `set_config` instead.
# The timeout value must be passed as a string of milliseconds.
conn.execute(
text("SELECT set_config('lock_timeout', :timeout, false)"),
{"timeout": str(lock_timeout)},
)
conn.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock.value})
else:
conn.execute(text("SET LOCK_TIMEOUT to :timeout"), {"timeout": lock_timeout})
conn.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock.value})
elif dialect.name == "mysql" and dialect.server_version_info >= (5, 6):
conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), {"id": str(lock), "timeout": lock_timeout})
yield
finally:
if dialect.name == "postgresql":
if USE_PSYCOPG3:
# Use set_config() to reset the timeout to its default (0 = off/wait forever).
conn.execute(text("SELECT set_config('lock_timeout', '0', false)"))
else:
conn.execute(text("SET LOCK_TIMEOUT TO DEFAULT"))
(unlocked,) = conn.execute(text("SELECT pg_advisory_unlock(:id)"), {"id": lock.value}).fetchone()
if not unlocked:
raise RuntimeError("Error releasing DB lock!")
elif dialect.name == "mysql" and dialect.server_version_info >= (5, 6):
conn.execute(text("select RELEASE_LOCK(:id)"), {"id": str(lock)})
首先它会根据是否有包名psycopg来判断是否使用了psycopg3,并且赋值给USE_PSYCOPG3,然后后续会根据USE_PSYCOPG3的真假,对于设置参数语句使用不同的分支:如果为是,使用select set_config调用函数,否则使用SET语句。
当时我看到这里,就直接进行了如下修改
is_psycopg3 = find_spec("psycopg") is not None
改成
is_psycopg3 = find_spec("psycopg") is not None or find_spec("gaussdb") is not None
但是整理这篇文章的时候,我突然又想到,这里airflow原本的代码里走分支是不是脑子有泡了,都用调用函数的方式不就好了?还不用去判断是不是用了psycopg3(虽然在setting.py里有用,但和本处代码无关)。
算了,尽量少改原本的代码,能跑就行。
完整部署步骤
- 安装uv,并初始化新的python环境
cd ~
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv .env311 --python 3.11
source .env311/bin/activate
- 配置libpq环境
mkdir ~/gaussdb-libpq
cp GaussDB-Kernel_506.0.0.SPC0100_Kylin_64bit_Libpq_Static.tar.gz ~/gaussdb-libpq/
cd ~/gaussdb-libpq
tar -xf GaussDB-Kernel_506.0.0.SPC0100_Kylin_64bit_Libpq_Static.tar.gz
echo "export LD_LIBRARY_PATH=~/gaussdb-libpq/lib:\$LD_LIBRARY_PATH" >> ~/.bash_profile
source ~/.bash_profile
- 安装gaussdb(psycopg3)
uv pip install gaussdb -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
- 安装GaussDB-sqlalchemy方言包
cd ~
git clone https://gitcode.com/darkathena/GaussDB-sqlalchemy
cd GaussDB-sqlalchemy
python setup.py install
- 安装airflow 3.1.7
uv pip install apache-airflow==3.1.7 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
-
修改airflow
参考上一篇和本篇中对airflow的修改 ,手动修改 setting.py 、utils/db.py、modules/assert.py 三个文件,或者直接下载我修改好的文件进行替换 https://gitcode.com/darkathena/airflow-patch-for-gaussdb -
启动验证
export AIRFLOW_HOME=~/airflow
airflow standalone
这里默认是使用的sqlite数据库,主要是确认环境没有问题,并生成初始配置文件,启动成功后ctrl+c退出即可
- 配置airflow连接信息
vi airflow/airflow.cfg
修改sql_alchemy_conn参数(不需要配置异步连接了,psycopg3原生支持异步)
sql_alchemy_conn = gaussdb+gaussdb://airflow_user3:Enmo_123@192.168.163.1:7456/airflow_db3
- 在GaussDB里建库和用户
create database airflow_db;
\c airflow_db
create user airflow_user password 'Enmo_123';
- 启动airflow
export AIRFLOW_HOME=~/airflow
airflow standalone
这样就不用手动编译连接驱动了,只是就算开发好了sqlalchemy方言,airflow也仍然需要修改才能支持GaussDB数据库作为元库。
另外,注意GaussDB 506.0版本的libpq只提供了以下几个操作系统的版本
- Bclinux2.10_arm_64
- Bclinux2.10_X86_64
- Euler2.10_arm_64
- Euler2.10_X86_64
- Hce2.0_arm_64
- Hce2.0_X86_64
- Kylinv10_arm_64
- Kylinv10_X86_64
- Suse2.5_X86_64
- UnionTech20_arm_64
- UnionTech20_X86_64
总结
本次给GaussDB适配airflow的任务基本告一段落,总结下做了些什么
- 基于openGauss的psycopg2驱动,修改成了支持读取GaussDB的libpq版本,支持在不同的python版本环境中自行编译(GaussDB未提供不同python版本的psycopg2驱动)
https://gitcode.com/darkathena/GaussDB-connector-python-psycopg2 - 弄清楚了gaussdb(psycopg3)的实现及源码安装方式
https://github.com/HuaweiCloudDeveloper/gaussdb-python - 基于openGauss的sqlalchemy方言,修改成了支持GaussDB的psycopg2/async-gaussdb/gaussdb(psycopg3)三种python驱动连接方式
https://gitcode.com/darkathena/GaussDB-sqlalchemy - 修改airflow源码,适配了GaussDB
https://gitcode.com/darkathena/airflow-patch-for-gaussdb
四篇文章链接
- 【GaussDB】手动编译不同python版本的psycopg2驱动以适配airflow
https://www.darkathena.top/archives/gaussdb-build-psycopg2-with-different-python-versions-for-compat-airflow - 【GaussDB】调整sqlalchemy方言支持async_gaussdb以适配aiflow3.1.7
https://www.darkathena.top/archives/Edit-SQLAlchemy-dialect-to-support-async_gaussdb-to-adapt-to-Airflow-3.1.7 - 【GaussDB】安装测试GaussDB基于psycopg3修改的python驱动
https://www.darkathena.top/archives/install-test-gaussdb-python-driver-based-on-psycopg3 - 【GaussDB】开发gaussdb(psycopg3)的sqlalchemy方言以支持airflow
https://www.darkathena.top/archives/Edit-SQLAlchemy-dialect-to-support-gaussdb-based-on-psycopg3-to-adapt-to-Airflow-3.1.7
后面就看华为是否会开源psycopg2源码或者编译不同python版本的psycopg2,以及是否会提供sqlalchemy的方言了,生态上的这些配套工具最好还是有原厂背书为好,我个人搞的这些不能保证版本能持续演进。
