Fastapi JWT 认证简单使用

Fastapi JWT 认证简单使用

Fastapi JWT 认证简单使用

参考

dev.to/deta/get-started

代码

auth.py

import os
import jwt # used for encoding and decoding jwt tokens
from fastapi import HTTPException # used to handle error handling
from passlib.context import CryptContext # used for hashing the password 
from datetime import datetime, timedelta # used to handle expiry time for tokens
class Auth():
    # hasher= CryptContext()
    hasher= CryptContext(schemes=["sha512_crypt"])
    # secret = os.getenv("APP_SECRET_STRING")
    secret = "test"
    def encode_password(self, password):
        return self.hasher.hash(password)
    def verify_password(self, password, encoded_password):
        return self.hasher.verify(password, encoded_password)
    def encode_token(self, username):
        payload = {
            'exp' : datetime.utcnow() + timedelta(days=0, minutes=30),
            'iat' : datetime.utcnow(),
        'scope': 'access_token',
            'sub' : username
        return jwt.encode(
            payload, 
            self.secret,
            algorithm='HS256'
    def decode_token(self, token):
            payload = jwt.decode(token, self.secret, algorithms=['HS256'])
            if (payload['scope'] == 'access_token'):
                return payload['sub']   
            raise HTTPException(status_code=401, detail='Scope for the token is invalid')
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail='Token expired')
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail='Invalid token')
    def encode_refresh_token(self, username):
        payload = {
            'exp' : datetime.utcnow() + timedelta(days=0, hours=10),
            'iat' : datetime.utcnow(),
        'scope': 'refresh_token',
            'sub' : username
        return jwt.encode(
            payload, 
            self.secret,
            algorithm='HS256'
    def refresh_token(self, refresh_token):
            payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
            if (payload['scope'] == 'refresh_token'):
                username = payload['sub']
                new_token = self.encode_token(username)
                return new_token
            raise HTTPException(status_code=401, detail='Invalid scope for token')
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail='Refresh token expired')
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail='Invalid refresh token')

access_token 用来访问api,有效期较短

refresh_token 用来生成 access_token,有效期较长,过期后需重新 登录

user_model.py

user_model.py
from pydantic import BaseModel
class AuthModel(BaseModel):
    username: str
    password: str

main.py

from auth import Auth
from user_model import AuthModel
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
app=FastAPI()
security = HTTPBearer()
auth_handler = Auth()
users_db = {}
@app.post('/signup')
def signup(user_details: AuthModel):
    if users_db.get(user_details.username) != None:
        return 'Account already exists'
        hashed_password = auth_handler.encode_password(user_details.password)
        user = {'key': user_details.username, 'password': hashed_password}
        # return users_db.put(user)
        users_db[user_details.username] = user
        return users_db
    except Exception as e:
        print(e)
        error_msg = 'Failed to signup user'
        return error_msg
@app.post('/login')
def login(user_details: AuthModel):
    user = users_db.get(user_details.username)
    if (user is None):
        return HTTPException(status_code=401, detail='Invalid username')
    if (not auth_handler.verify_password(user_details.password, user['password'])):
        return HTTPException(status_code=401, detail='Invalid password')
    access_token = auth_handler.encode_token(user['key'])
    refresh_token = auth_handler.encode_refresh_token(user['key'])
    return {'access_token': access_token, 'refresh_token': refresh_token}        
@app.post('/secret')
def secret_data(credentials: HTTPAuthorizationCredentials = Security(security)):
    token = credentials.credentials
    if(auth_handler.decode_token(token)):
        return 'Top Secret data only authorized users can access this info'
@app.get('/notsecret')
def not_secret_data():
    return 'Not secret data'
@app.get('/refresh_token')
def refresh_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    refresh_token = credentials.credentials