Fastapi JWT 认证简单使用
Fastapi JWT 认证简单使用
参考
https:// dev.to/deta/get-started -with-fastapi-jwt-authentication-part-2-18ok
代码
auth.py
import os
import jwt # used for encoding and decoding jwt tokens
from fastapi import HTTPException # used to handle error handling
from passlib.context import CryptContext # used for hashing the password
from datetime import datetime, timedelta # used to handle expiry time for tokens
class Auth():
# hasher= CryptContext()
hasher= CryptContext(schemes=["sha512_crypt"])
# secret = os.getenv("APP_SECRET_STRING")
secret = "test"
def encode_password(self, password):
return self.hasher.hash(password)
def verify_password(self, password, encoded_password):
return self.hasher.verify(password, encoded_password)
def encode_token(self, username):
payload = {
'exp' : datetime.utcnow() + timedelta(days=0, minutes=30),
'iat' : datetime.utcnow(),
'scope': 'access_token',
'sub' : username
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
def decode_token(self, token):
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
if (payload['scope'] == 'access_token'):
return payload['sub']
raise HTTPException(status_code=401, detail='Scope for the token is invalid')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid token')
def encode_refresh_token(self, username):
payload = {
'exp' : datetime.utcnow() + timedelta(days=0, hours=10),
'iat' : datetime.utcnow(),
'scope': 'refresh_token',
'sub' : username
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
def refresh_token(self, refresh_token):
payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
if (payload['scope'] == 'refresh_token'):
username = payload['sub']
new_token = self.encode_token(username)
return new_token
raise HTTPException(status_code=401, detail='Invalid scope for token')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Refresh token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid refresh token')
access_token 用来访问api,有效期较短
refresh_token 用来生成 access_token,有效期较长,过期后需重新 登录
user_model.py
user_model.py
from pydantic import BaseModel
class AuthModel(BaseModel):
username: str
password: str
main.py
from auth import Auth
from user_model import AuthModel
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
app=FastAPI()
security = HTTPBearer()
auth_handler = Auth()
users_db = {}
@app.post('/signup')
def signup(user_details: AuthModel):
if users_db.get(user_details.username) != None:
return 'Account already exists'
hashed_password = auth_handler.encode_password(user_details.password)
user = {'key': user_details.username, 'password': hashed_password}
# return users_db.put(user)
users_db[user_details.username] = user
return users_db
except Exception as e:
print(e)
error_msg = 'Failed to signup user'
return error_msg
@app.post('/login')
def login(user_details: AuthModel):
user = users_db.get(user_details.username)
if (user is None):
return HTTPException(status_code=401, detail='Invalid username')
if (not auth_handler.verify_password(user_details.password, user['password'])):
return HTTPException(status_code=401, detail='Invalid password')
access_token = auth_handler.encode_token(user['key'])
refresh_token = auth_handler.encode_refresh_token(user['key'])
return {'access_token': access_token, 'refresh_token': refresh_token}
@app.post('/secret')
def secret_data(credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
if(auth_handler.decode_token(token)):
return 'Top Secret data only authorized users can access this info'
@app.get('/notsecret')
def not_secret_data():
return 'Not secret data'
@app.get('/refresh_token')
def refresh_token(credentials: HTTPAuthorizationCredentials = Security(security)):
refresh_token = credentials.credentials