SQLAlchemy FastAPI middleware
Description
Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.
Install
pip install fastapi-async-sqlalchemy它也适用于 ``sqlmodel
示例
请注意,由 db.session 提供的会话对象基于 Python3.7+ 的 ContextVar。这意味着
每个会话都与其创建时的单独请求上下文相关联。
python
from fastapi import FastAPI from fastapi_async_sqlalchemy import SQLAlchemyMiddleware from fastapi_async_sqlalchemy import db # provide access to a database session from sqlalchemy import column from sqlalchemy import table
app = FastAPI() app.add_middleware( SQLAlchemyMiddleware, db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db", engine_args={ # engine arguments example "echo": True, # print all SQL statements "pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool "pool_size": 5, # number of connections to keep open at a time "max_overflow": 10, # number of connections to allow to be opened above pool_size }, )
once the middleware is applied, any route can then access the database session
from the global db
foo = table("ms_files", column("id"))
Usage inside of a route
@app.get("/") async def get_files(): result = await db.session.execute(foo.select()) return result.fetchall()async def get_db_fetch():
# It uses the same db object and use it as a context manager:
async with db():
result = await db.session.execute(foo.select())
return result.fetchall()
Usage inside of a route using a db context
@app.get("/db_context") async def db_context(): return await get_db_fetch()Usage outside of a route using a db context
@app.on_event("startup") async def on_startup(): # We are outside of a request context, therefore we cannot rely onSQLAlchemyMiddleware
# to create a database session for us.
result = await get_db_fetch()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
#### 多数据库的使用databases.py
python
from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxyFirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()
main.py
python
from fastapi import FastAPIfrom databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router
app = FastAPI()
app.include_router(router)
app.add_middleware(
FirstSQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
app.add_middleware(
SecondSQLAlchemyMiddleware,
db_url="mysql+aiomysql://user:user@192.168.88.200:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
routes.py
python
import asynciofrom fastapi import APIRouter
from sqlalchemy import column, table, text
from databases import first_db, second_db
router = APIRouter()
foo = table("ms_files", column("id"))
@router.get("/first-db-files")
async def get_files_from_first_db():
result = await first_db.session.execute(foo.select())
return result.fetchall()
@router.get("/second-db-files")
async def get_files_from_second_db():
result = await second_db.session.execute(foo.select())
return result.fetchall()
@router.get("/concurrent-queries")
async def parallel_select():
async with first_db(multi_sessions=True):
async def execute_query(query):
return await first_db.session.execute(text(query))
tasks = [
asyncio.create_task(execute_query("SELECT 1")),
asyncio.create_task(execute_query("SELECT 2")),
asyncio.create_task(execute_query("SELECT 3")),
asyncio.create_task(execute_query("SELECT 4")),
asyncio.create_task(execute_query("SELECT 5")),
asyncio.create_task(execute_query("SELECT 6")),
]
await asyncio.gather(*tasks)
``--- Tranlated By Open Ai Tx | Last indexed: 2026-03-20 ---