0%

【fastapi+sqlalchemy+pymysql】一些总结和沉淀

最近在用fastapi+sqlalchemy+pymysql构建API服务器,总结一下自己遇到的问题和解决方法

FastAPI

Depends - 依赖注入

https://fastapi.tiangolo.com/zh/tutorial/dependencies/

fastapi对接口提供了依赖注入的功能,方便进行代码复用和统一的身份验证等

官方解释的依赖注入的作用:

编程中的「依赖注入」是声明代码(本文中为路径操作函数 )运行所需的,或要使用的「依赖」的一种方式。

然后,由系统(本文中为 FastAPI)负责执行任意需要的逻辑,为代码提供这些依赖(「注入」依赖项)。

依赖注入常用于以下场景:

  • 共享业务逻辑(复用相同的代码逻辑)
  • 共享数据库连接
  • 实现安全、验证、角色权限
  • 等……

上述场景均可以使用依赖注入,将代码重复最小化。

官方文档写了很多具体的用法,这里仅介绍一下我自己用的一些特性。

对子路由整体进行鉴权

fastapi提供了APIRouter类来进行子路由的配置,同时,支持在配置子路由时配置依赖注入,使得所有该子路由下的接口可以统一配置鉴权,这样省去了每个接口单独鉴权的麻烦。

1
2
3
4
5
6
7
async def admin_verify_ticket(request: Request):
if request.session.get("user", None):
return request.session.get("user")
else:
raise HTTPException(detail="用户未认证,需要跳转登陆", status_code=401)

router = APIRouter(prefix='/test', dependencies=[Depends(admin_verify_ticket)])

需要注意的是,配置在路径操作和子路由上的依赖注入函数并不会把返回值传递给API接口,但是可以正常的抛出异常

对通用参数输入进行处理

通过使用类作为依赖注入,可以方便的对通用的一些参数进行处理,比如列表页的排序、分页等参数

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import FastAPI
from fastapi import Depends, Query

app = FastAPI()

class CommonQueryParams:
def __init__(self, q: int = Query(..., ge=1, le=4)):
self.q = q

@app.get('/')
async def index(params: CommonQueryParams = Depends()):
return {"q": params.q}

内存文件Response

开发过程中遇到了在内存中生成了文件,需要返回给前端,但是fastapi本身没有直接返回内存中的文件的Response类。

于是参考了FileResponse的实现,继承SteamingResponse开发了一个自己的Response方法。

需要注意的是,没有经过严格的单元测试和性能测试,谨慎使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from mimetypes import guess_type as mimetypes_guess_type
from urllib.parse import quote

class MemoryFileResponse(StreamingResponse):
def __init__(self, file_content: bytes, filename: str):
self.file_content = file_content
self.filename = filename
super().__init__(content=self.read_from_bytes(), media_type=self.guess_type(), headers={"Content-Disposition": self.gen_content_disposition()})

def gen_content_disposition(self):
content_disposition_filename = quote(self.filename)
if content_disposition_filename != self.filename:
content_disposition = "attachment; filename*=utf-8''{}".format(
content_disposition_filename
)
else:
content_disposition = f'attachment; filename="{self.filename}"'
return content_disposition

def guess_type(self):
return mimetypes_guess_type(self.filename, strict=True)[0]

def read_from_bytes(self):
"""
从字节数组里按段读取,返回生成器
"""
chunk_size = 4096
i = 0
more_body = True
while more_body:
if i + chunk_size >= len(self.file_content):
more_body = False
chunk = self.file_content[i:]
else:
chunk = self.file_content[i:i + chunk_size]
i += chunk_size
yield chunk


app = FastAPI()


@app.get(path='/')
async def index():
with open("test.jpg", "rb") as f:
content = f.read()
return MemoryFileResponse(content, "test.jpg")

通用的返回值处理

通过依赖注入可以方便的进行参数处理,同样,fastapi提供了通用的返回值处理机制,能够把无法json序列化的字段做自定义处理

1
2
3
4
5
6
7
8
9
from fastapi.encoders import jsonable_encoder

message = jsonable_encoder(
message,
custom_encoder={
datetime.datetime: lambda x: x.strftime("%Y-%m-%d %H:%M:%S"),
bson.ObjectId: lambda x: str(x)
}
)

上述代码就将自动把datetime对象转化为指定的字符串形式,把mongo的ObjectId对象转化为字符串形式,通过装饰器可以通用在所有的API的返回值上,做到通用的返回值处理

自定义参数错误的返回

fastapi默认的参数验证器会在校验参数失败时返回默认的信息可读性较差,或者我们需要屏蔽错误详细信息,此时可以重新注册错误处理函数,自定义错误返回信息

1
2
3
4
5
6
7
8
9
10
11
from fastapi.exceptions import RequestValidationError

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc: RequestValidationError):
return JSONResponse(
status_code=200,
content={
"success": 0,
"message": f"参数错误"
}
)

sqlalchemy事务和锁

注意:这里写的使用事务和锁的方法前提是在初始化数据库引擎的时候不要设置AUTOCOMMIT

通过session上下文来方便的管理事务

1
2
3
4
5
6
7
8
9
session = await get_mysql_session()
async with session.begin():
r = await session.execute(update_stmt)
if r.rowcount == 0:
return False

r = await session.execute(update_stmt2)
if r.rowcount == 0:
return False

通过上下文的方式可以很方便的使用事务:当update_stmt2操作出错或结果不符合预期时,直接return即可以自动回滚事务,无需考虑session的注销以及update_stmt的操作回滚的问题。

使用with_for_update()设置查询锁

非常深奥的mysql锁的内容这里不涉及,只是简单记录一下在sqlalchemy使用FOR UPDATE语句的方法。

同样是在一个session的上下文中,在query中加上with_for_update()会对查询的行(或者表,取决于查询的where条件中是否有主键)加锁,只有同一session的修改和查询可以执行,其他修改和查询操作会delay直到锁被释放。

一个小例子:

1
2
3
4
5
6
session = await get_mysql_session()
async with session.begin():
query = query.with_for_update()
r = await session.execute(query)

r = await session.execute(update_stmt)

用这种方式可以缓解因为高并发带来的条件竞争问题

PyMySQL字典游标

有需求从mysql查询时以字典返回,创建游标对象时加入参数pymysql.cursors.DictCursor来决定获取的每条数据的数据类型为dict

1
2
3
4
5
6
7
import pymysql

conn = pymysql.connect(host='localhost', user='root', password='******', database='test')
cur = conn.cursor(pymysql.cursors.DictCursor)
cur.execute("select * from test;")
data = cur.fetchall()
print(data)

执行结果:[{'id': 1, 'status': 1, 'sum': 1}]