从日志中可以分析导致异常的原因,主要集中在以下几个问题点:

异常概述

psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway 是核心异常。这表明:一个异步数据库查询正在运行,而另一个查询尝试在同一连接或事务中被执行,从而导致冲突。这种情况通常发生在使用 SQLAlchemypsycopg2 驱动的异步操作场景下。


根因分析

从栈追踪和代码结构分析,可以识别几个潜在问题:

  1. 异步任务冲突
  • 异常提示是 execute cannot be used while an asynchronous query is underway。这通常是因为多个任务或线程共享同一个数据库连接,而其中有异步查询未完成时,另一个查询试图使用这个连接。
  • 日志中的线程,特别是 [Thread-892][Thread-890] 看起来彼此是并发运行的,它们之间可能共享了同一个数据库会话(Session)或连接池。
  • 在异步环境中,每个任务都需要独立的数据库连接,而不是跨任务共享。
  1. SQLAlchemy 和 psycopg2 的使用方式
  • 日志显示使用的是 SQLAlchemy ORM 和 Psycopg2:
File "/app/api/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1963, in _exec_single_context
  • Psycopg2 支持两种模式:同步和异步模式。如果异步模式没有正确配置,例如:同时执行 async 和普通 execute 查询,会导致此类冲突。
  1. 连接池的配置问题
  • 日志中显示异常是在执行一个 SQL 查询:
SQL: SELECT end_users.id AS end_users_id ...
  • SQLAlchemy 在默认情况下通过连接池维护数据库连接。如果连接池存在不足(例如池大小过小),导致多个任务或线程竞争同一连接,可能发生类似的问题。
  • 同时,如果连接池未处理好异步上下文的分配,比如未启用适配 asyncpg,也可能导致连接竞争。
  1. 当前代码生命周期问题
  • 异常涉及访问用户对象 current_usertenant_id 属性:
tenant_id = extract_tenant_id(current_user)
return user.tenant_id
  • 很可能在获取 tenant_id 属性时,触发 ORM 查询加载属性,该查询被发往数据库时又出现了连接冲突。
  • 如果 current_user 在不同线程或异步任务中被共享,而未正确绑定独立事务或连接,会导致此类问题。

解决方案

短期解决

  1. 检查连接池的配置
  • 增大 SQLAlchemy 的连接池大小。
  • 确保 Session 在任务间隔离:
from sqlalchemy.orm import sessionmaker

SessionLocal = sessionmaker(bind=engine)

每个线程或异步任务应确保使用独立的会话实例。

  1. 改用异步驱动
  • 如果你的应用需要异步查询,使用 asyncpg 驱动替换 psycopg2
  • 启用 SQLAlchemy 的异步支持:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/dbname")
async_session = sessionmaker(engine, class_=AsyncSession)
  1. 上下文管理
  • 使用 contextvars 或类似机制,在异步任务中确保独立的用户上下文:
from contextvars import ContextVar

current_user = ContextVar("current_user")

长期优化

  1. 任务隔离
    检查对共享资源的访问规则,例如:
  • 不同异步任务应独立使用数据库连接。
  • 为异步任务显式创建独立的事务。
  1. 代码审查
  • 在栈追踪暴露的路径中,检查 Sessionengine 的生命周期管理。
  • 确保数据库对象(如 tenant_id 属性的加载)在正确的上下文中加载。
  1. 连接模式优化
    考虑将所有查询转换为异步模式,特别是长时间查询场景:
async with async_session() as session:
    async with session.begin():
        result = await session.execute(statement)
  1. 监控连接池状态
  • 使用 SQLAlchemy 的调试工具检查连接池状态。
  • 在高并发场景下增强池性能,例如适当调整 max_overflowpool_size

总结

异常的关键在于多任务间对数据库连接的竞争。一些任务可能执行了长时间挂起的异步查询,而其他任务尝试使用同一连接导致冲突。通过隔离任务间的数据库上下文、优化连接池配置或改为纯异步操作,可以有效解决这些问题。



psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway插图

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台

除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接

本文链接:http://www.choupangxia.com/2025/08/02/psycopg2-programmingerror-execute-cannot-be-used-while-an-asynchronous-query-is-underway/