用 JWT token认证 加强 Fastapi 接口安全
用 JWT token认证 加强 Fastapi 接口安全
参考
https:// testdriven.io/blog/fast api-jwt-auth/
说明
token 生成
# 生产中应使用该命令产生的值作为JWT_SECRET
print(binascii.hexlify(os.urandom(24)))
JWT_SECRET = 'please_please_update_me_please'
JWT_ALGORITHM = 'HS256'
def signJWT(user_id: str) -> Dict[str, str]:
payload = {
"user_id": user_id,
"expires": time.time() + 600
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token_response(token)
def decodeJWT(token: str) -> dict:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["expires"] >= time.time() else None
except:
return {}
利用 jwt 模块生成 token。JWT_SECRET 为加密密钥。为更安全可用binascii.hexlify(os.urandom(24))方式生成密钥。默认加密算法为JWT_ALGORITHM = 'HS256'。
利用"expires": time.time() + 600 设置token 有效期。
信用信息解析
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
# print(await request.body())
# print(await request.form())
# print(credentials)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
payload = decodeJWT(jwtoken)
print(payload)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
FastAPI 通过HTTPBearer 类提供了一个认证方式。我们可以通过此类获取认证的token信息。
本样例中只要是jwt 生成的token 都可以生效。更严格的筛选可在verify_jwt中配置。
api 中使用
@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict:
return {
"data": "post added."
}
全部源码
import time
from typing import Dict
import uvicorn
import jwt
import os
import binascii
from pydantic import BaseModel,Field,EmailStr
from fastapi import FastAPI, Body,Depends
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# 生产中应使用该命令产生的值作为JWT_SECRET
print(binascii.hexlify(os.urandom(24)))
JWT_SECRET = 'please_please_update_me_please'
JWT_ALGORITHM = 'HS256'
def token_response(token: str):
return {
"access_token": token
def signJWT(user_id: str) -> Dict[str, str]:
payload = {
"user_id": user_id,
"expires": time.time() + 600
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token_response(token)
def decodeJWT(token: str) -> dict:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["expires"] >= time.time() else None
except:
return {}
class PostSchema(BaseModel):
name: str = Field(...)
title: EmailStr = Field(...)
class UserSchema(BaseModel):
fullname: str = Field(...)
email: EmailStr = Field(...)
password: str = Field(...)
class Config:
schema_extra = {
"example": {
"fullname": "Abdulazeez Abdulazeez Adeshina",
"email": "abdulazeez@x.com",
"password": "weakpassword"
class UserLoginSchema(BaseModel):
email: EmailStr = Field(...)
password: str = Field(...)
class Config:
schema_extra = {
"example": {
"email": "abdulazeez@x.com",
"password": "weakpassword"
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
# print(await request.body())
# print(await request.form())
# print(credentials)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
payload = decodeJWT(jwtoken)
print(payload)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
def check_user(data: UserLoginSchema):
for user in users:
if user.email == data.email and user.password == data.password:
return True
return False
users = []
app = FastAPI()
@app.post("/user/signup", tags=["user"])
async def create_user(user: UserSchema = Body(...)):
users.append(user) # replace with db call, making sure to hash the password first
return signJWT(user.email)
@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
if check_user(user):
return signJWT(user.email)
return {
"error": "Wrong login details!"
@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict: