JWT token认证登陆

前一篇博客讲述了获取和验证请求参数, 这一篇就实践下,演示一个最基础的JWT认证,我公司是用了两个token方式验证,一个请求token,一个刷新token,请求token过期时间短,专门用于请求数据,刷新token专门用于刷新过期请求token用的。

jwt官网 https://jwt.io/

如果还有不懂JWT的,就需要好好看看JWT的知识了,JWT认证目前是前后端分离中非常流行的一种认证方式: 由三段组成 第一段通常是加密算法,第二段是你存储的自定义信息(未加密任何人可以去https://jwt.io/看到数据) 第三段是 第一段和第二段生成的签名参数确保token没有被修改

** 更多关于FastAPI的文章, 请关注 个人网站 https://www.charmcode.cn/**

生成Token

python 目前有好几个库实现jwt验证

  • python-jose
  • pyjwt
  • jwcrypto
  • authlib (ps:有幸在PyCon2019上海见过此库作者 github
  • 这里不做对比演示,就随便选一个

    pip install python-jose
    

    简单的演示

    from datetime import datetime, timedelta
    from jose import jwt
    # 加密密钥 这个很重要千万不能泄露了
    SECRET_KEY = "kkkkk"
    # 设置过期时间 现在时间 + 有效时间    示例5分钟
    expire = datetime.utcnow() + timedelta(minutes=5)
    # exp 是固定写法必须得传  sub和uid是自己存的值
    to_encode = {"exp": expire, "sub": str(123), "uid": "12345"}
    # 生成token 
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
    print(encoded_jwt) 
    # eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTU1MDg5MzQsInN1YiI6IjEyMyIsInVpZCI6IjEyMzQ1In0.lttAYe808lVQgGhL9NXei2bbC1LIGs-SS0l6qfU_QxU
    

    可以复制去 https://jwt.io/ 解出来看看

    解密token

    payload = jwt.decode(
                "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTU1MDg5MzQsInN1YiI6IjEyMyIsInVpZCI6IjEyMzQ1In0.lttAYe808lVQgGhL9NXei2bbC1LIGs-SS0l6qfU_QxU",
                SECRET_KEY, algorithms="HS256"
    print(payload)
    # {'exp': 1595508934, 'sub': '123', 'uid': '12345'}
    

    正确的解密方式

    上述方式是token什么都是正确的时候,而且还没有过期,就会正常解出来。现在加上常见的异常捕获。

    from jose.exceptions import ExpiredSignatureError, JWTError
        payload = jwt.decode(
                    "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTU1MDk0ODQsInN1YiI6IjEyMyIsInVpZCI6IjEyMzQ1In0.deulPSOPfON-lfbXtvQfTfc-DwqvFoQqv7Y1BhMecBw",
                    SECRET_KEY, algorithms="HS256"
        print(payload)
    # 当然两个异常捕获也可以写在一起,不区分
    except ExpiredSignatureError as e:
        print("token过期")
    except JWTError as e:
        print("token验证失败")
    

    在FastAPI中实现JWT认证登陆

    上述的jwt加密解密的过程搞清楚了,这一步就很简单了
    首先创建一个security.py文件专门进行加密解密的

    from datetime import datetime, timedelta
    from typing import Any, Union, Optional
    from jose import jwt
    from fastapi import Header
    # 导入配置文件
    from setting import config
    ALGORITHM = "HS256"
    def create_access_token(
        subject: Union[str, Any], expires_delta: timedelta = None
    ) -> str:
        # 生成token
        :param subject: 保存到token的值
        :param expires_delta: 过期时间
        :return:
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(
                minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES
        to_encode = {"exp": expire, "sub": str(subject)}
        encoded_jwt = jwt.encode(to_encode, config.SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    def check_jwt_token(
         token: Optional[str] = Header(...)
    ) -> Union[str, Any]:
        解析验证 headers中为token的值 当然也可以用 Header(..., alias="Authentication") 或者 alias="X-token"
        :param token:
        :return:
            payload = jwt.decode(
                token,
                config.SECRET_KEY, algorithms=[ALGORITHM]
            return payload
        except (jwt.JWTError, jwt.ExpiredSignatureError, AttributeError):
            # 抛出自定义异常, 然后捕获统一响应
            raise custom_exc.TokenAuthError(err_desc="access token fail")
    

    上面一定定义好了, 加密和解密token的方式,这一步来登陆生成token

    # 从刚刚定义好jwt的文件导入生成方法
    from security import create_access_token
    from pydantic import BaseModel
    class UserInfo(BaseModel):
        username: str
        password: str
    @router.post("/login/access-token", summary="用户登录认证")
    async def login_access_token(
            db: Session = Depends(deps.get_db),
            user_info: UserInfo,
    ) -> Any:
        :param db:
        :param user_info:
        :return:
        # 验证用户账号密码是否正确
         user = curd_user.authenticate(db, email=user_info.username, password=user_info.password)
        if not user:
            logger.info(f"用户邮箱认证错误: email{user_info.username} password:{user_info.password}")
            return response_code.resp_500(message="用户名或者密码错误")
        elif not curd_user.is_active(user):
            return response_code.resp_500(message="用户邮箱未激活")
        # 如果用户正确通过 则生成token
        # 设置过期时间
        access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        # 登录token 只存放了user.id
        return response_code.resp_200(data={
            "token": create_access_token(user.id, expires_delta=access_token_expires),
    

    验证token

    这一步是使用 from fastapi import Depends 来验证 headers中的token
    在上面security.py文件中有定义取headers中的token参数

    from typing import Any, Union
    from fastapi import Depends
    # 从刚刚定义好jwt的文件导入解密方法
    from security import check_jwt_token
    @router.get("/user/info", summary="获取用户信息", response_model=user.UserInfo)
    async def get_user_info(
    	token_data: Union[str, Any] = Depends(check_jwt_token)  
    ) -> Any:
        获取用户信息
        :param token_data:
        :return:
        print(token_data)
        # 这个状态能响应说明token验证通过
        return response_code.resp_200(data={
            "username": "用户信息"
    

    所以正确的请求方式应该是这样的,在headers中携带token字段, 再次重述也可以再check_jwt_token方法中给token取别名,最常见的如Authentication
    为什么不在check_jwt_token参数中直接写Authentication 了?
    因为参数写成大写字母开头不符合python 编程pep8规范,还有就是X-Token的这种,变量不支持-符号,所以写成别名。

    import requests
    res = requests.get("http://127.0.0.1:8000/user/info", headers={
        "token": "xxxx",
        "content-type": "application/json"
    

    熟悉了前半部分的jwt token生成与解密的方式,就可以在任何Python框架(Django,Flask,Tornado,Sanic, Bottle等等)里面套用,很多封装的扩展,本质也是这个,一般不喜欢用扩展。

    jwt认证其实非常简单,搞清楚加密解密的过程,原理稍微懂点就可以了,
    后面结合redis, 可以完成单点登陆等操作。
    还有就是token续签的问题,比如你正写博客了,博客还没写完token失效了,结果提交的时候token认证失败了,就需要token心跳检测续签了。

    完整代码GitHub地址

    见个人网站 https://www.charmcode.cn/article/2020-07-23_fastapi_jwt