用 JWT token认证 加强 Fastapi 接口安全
https:// testdriven.io/blog/fast api-jwt-auth/
token 生成
# 生产中应使用该命令产生的值作为JWT_SECRET
JWT_SECRET = 'please_please_update_me_please'
def signJWT(user_id: str) -> Dict[str, str]:
payload = {
"user_id": user_id,
"expires": time.time() + 600
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token_response(token)
def decodeJWT(token: str) -> dict:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["expires"] >= time.time() else None
return {}
利用 jwt 模块生成 token。JWT_SECRET 为加密密钥。为更安全可用binascii.hexlify(os.urandom(24))方式生成密钥。默认加密算法为JWT_ALGORITHM = 'HS256'。
利用"expires": time.time() + 600 设置token 有效期。
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
# print(await request.body())
# print(await request.form())
# print(credentials)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
payload = decodeJWT(jwtoken)
payload = None
if payload:
isTokenValid = True
return isTokenValid
FastAPI 通过HTTPBearer 类提供了一个认证方式。我们可以通过此类获取认证的token信息。
本样例中只要是jwt 生成的token 都可以生效。更严格的筛选可在verify_jwt中配置。
api 中使用
@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict:
return {
"data": "post added."
