跳转至

28 认证机制:Flask认证机制设计与实现

你好,我是Barry。

上节课,我们初步了解了Flask认证机制,也完成了使用Token进行认证的前置工作。在我们的视频直播平台中,也需要通过认证机制来实现用户的平台认证和安全保障。

这节课,我们就进入项目实战环节,巩固一下你对Flask认证机制的应用能力。整体流程包括生成Token、Token验证、登录认证和用户鉴权这四个环节。

认证的第一步,我们就从生成Token开始说起。

生成Token

上节课,我们学习过Token结构,它有三个部分,分别是header,playload和signature。

在项目中我们借助Flask的扩展Flask-JWT来生成Token,具体就是使用JWT.encode函数将JSON对象编码为JWT Token。因此,我们有必要了解一下JWT.encode函数的参数,你可以参考后面我画的思维导图。

你或许注意到了,在JWT.encode函数中只传入了payload部分。这是因为在使用JWT.encode函数时,会自动根据默认算法生成Header部分,并将Header和Payload部分进行签名生成最终的Token字符串。我们需要手动指定Payload部分。

具体生成Token的实现代码是后面这样,你可以参考一下。

import time
import datetime
import jwt
from flask import current_app
from api import redis_store
from api.models.user import UserLogin
from api.utils import constants
from api.utils.response_utils import error, HttpCode, success
from config.config import Config
class Auth(object):
    @staticmethod
    # 声明为静态方法
    def encode_auth_token(user_id, login_time):
        """
        生成认证Token
        :param user_id: int
        :param login_time: int(timestamp)
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1),
                'iat': datetime.datetime.utcnow(),
                'iss': 'Barry',
                'data': {
                    'id': user_id,
                    'login_time': login_time
                }
            }
            return jwt.encode(
                payload,
                Config.SECRET_KEY,
                algorithm='HS256'
            )
        except Exception as e:
            print(e)
            return error(code=HttpCode.auth_error, msg='没有生成对应的token')

接下来,我们一起来解读一下这段代码。函数前的@staticmethod装饰器,我们将该方法声明为静态方法,也就是类的方法可以直接调用,而不需要再创建该类的实例。

紧接着我们在encode_auth_token函数中传入两个参数,分别是用户的user_id和用户登录时间login_time,用户登录时间用于检查Token是否过期,保证时效性。然后是Token的有效负载payload,其中主要包括Token的过期时间、签发时间、发行人和自定义数据。在自定义数据中两个参数是用户ID和登录时间。

其中的payload为字典类型,以便作为参数传入encode函数中。这里使用Config.SECRET_KEY作为加密用的密钥,采用HS256算法对JWT进行加密。HS256算法是一种基于哈希函数的对称加密算法。如果生成过程出现异常,则返回一个错误消息。这里的auth_error是我们上节课自定义的HTTP状态函数。

验证Token

生成Token的下一步就是Token的验证。方法就是借助JWT扩展的decode函数,将客户端发送的Token进行解码。我们还是结合代码来理解。

@staticmethod
def decode_auth_token(auth_token):
    """
    验证Token
    :param auth_token:
    :return: integer|string
    """
    try:
        # payload = jwt.decode(auth_token, Config.SECRET_KEY, leeway=datetime.timedelta(days=1))
        # 取消过期时间验证
        payload = jwt.decode(auth_token, Config.SECRET_KEY, options={'verify_exp': False})
        # options,不要执行过期时间验证
        if 'data' in payload and 'id' in payload['data']:
            return dict(code=HttpCode.ok, payload=payload)
        else:
            raise dict(code=HttpCode.auth_error, msg=jwt.InvalidTokenError)
    except jwt.ExpiredSignatureError:
        return dict(code=HttpCode.auth_error, msg='Token过期')
    except jwt.InvalidTokenError:
        return dict(code=HttpCode.auth_error, msg='无效Token')

上面代码同样是一个静态方法,主要用于验证JWT token的有效性。首先从传入的auth_token参数中解码token,使用保存在配置文件中的SECRET_KEY来解码,options选项表示在验证token的过期时间时,不要执行过期时间验证。

随后要验证auth_token 中是否包含有效的数据,这里要分三种情况考虑。

  • 如果包含有效数据,则返回一个字典,其中 code 为 HttpCode.ok,表示请求成功,payload 为解码后的数据。
  • 如果不包含有效数据或者解码失败,则抛出 InvalidTokenError,表示 Token 验证失败,并返回相应的错误信息。
  • 如果 auth_token 中包含有效数据但是 Token 已经过期,则抛出 ExpiredSignatureError,表示 Token 已经失效,并返回相应的错误信息。

虽然代码中取消了过期时间验证,但是在后面依旧会抛出 ExpiredSignatureError,提示Token过期,所以我们需要把异常处理情况涵盖得更全。

登录认证

搞定了生成Token和对Token认证的代码后,下一步,我们就需要对用户登录进行认证。登录认证成功即可给客户端返回Token,下次向服务端请求资源的时候,必须带着服务端签发的 Token,才能实现对用户信息的认证。

实现用户登录的代码是后面这样。

def authenticate(self, mobile, password):
    """
    用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
    :param password:
    :return: json
    """
    user = UserLogin.query.filter_by(mobile=mobile).first()
    if not user:
        return error(code=HttpCode.auth_error, msg='请求的用户不存在')
    else:
        if user.check_password(password):
            login_time = int(time.time())
            try:
                user.last_login_stamp = login_time
                user.last_login = datetime.datetime.now()
                user.update()
            except Exception as e:
                current_app.logger.error(e)
                return error(code=HttpCode.db_error, msg='登录时间查询失败')
            token = self.encode_auth_token(user.user_id, login_time)  # bytes
            token = str(token, encoding="utf-8")
            user_id = user.user_id
            # 存储到redis中
            try:
                redis_store.set("jwt_token:%s" % user_id, token, constants.JWT_TOKEN_REDIS_EXPIRES)
            # 设置过期时间为常量JWT_TOKEN_REDIS_EXPIRES(86400秒,即24小时)
            except Exception as e:
                current_app.logger.error(e)
                return error(code=HttpCode.db_error, msg="token保存redis失败")
            from api.modules.video.views import user_action_log
            user_action_log.warning({
                'user_id': user_id,
                'url': '/passport/login',
                'method': 'post',
                'msg': 'login',
                'event': 'login',
            })
            return success(msg='用户登录成功', data={"token": token, "user_id": user_id})
        else:
            return error(code=HttpCode.parmas_error, msg='用户登录密码输入错误')

上面代码整体实现流程是,首先要做的就是接收用户输入的手机号码和密码,然后利用手机号码查询数据库中是,否存在该用户。如果不存在,则返回错误信息。如果存在,使用在数据库表中定义的函数check_password来,检查密码是否正确。

如果密码错误,则返回错误信息。如果密码正确则记录用户的登录时间和日期,使用当前用户的user_id和登录时间戳作为参数,调用encode_auth_token()方法生成一个token,再使用redis_store.set()方法将生成的token存储在redis中,并设置过期时间。

如果存储失败,则将该错误信息存入应用日志中,以便于后续的调试和问题排查。如果所有条件都满足,最后返回成功信息和token以及用户ID。

这里还调用了video模块中的user_action_log。user_action_log用来记录出现的异常等信息。具体代码是后面这样。

from api.utils.log_utils import json_log
json_log('user_action', 'logs/user_action.log')
user_action_log = logging.getLogger('user_action')

这里调用了log_utils中的json_log函数,使用 json_log 函数来创建一个名为 user_action_log 的日志记录器对象,并将其指向 logs/user_action.log 路径的文件,这样记录用户操作的相关信息会更方便。

用户鉴权

接下来的环节就是在请求时获取用户的登录信息,并进行鉴权。如果用户没有相应的权限,则返回相应的错误信息。具体实现代码是后面这样。

def identify(self, request):
    """
    用户鉴权
    :return: list
    """
    auth_header = request.headers.get('Authorization', None)
    if auth_header:
        auth_token_arr = auth_header.split(" ")
        # 分成列表,含有两个元素
        if not auth_token_arr or auth_token_arr[0] != 'JWT' or len(auth_token_arr) != 2:
            return dict(code=HttpCode.auth_error, msg='请求未携带认证信息,认证失败')
        else:
            auth_token = auth_token_arr[1]
            # 将JWT令牌的字符串值给auth_token
            payload_dict = self.decode_auth_token(auth_token)
            if 'payload' in payload_dict and payload_dict.get('code') == 200:
                payload = payload_dict.get('payload')
                user_id = payload.get('data').get('id')
                login_time = payload.get('data').get('login_time')
                # print('👉👉   解析出的时间戳', login_time)
                user = UserLogin.query.filter_by(user_id=user_id).first()
                if not user:  # 未在请求中找到对应的用户
                    return dict(code=HttpCode.auth_error, msg='用户不存在,查无此用户')
                else:
                    # 通过user取出redis中的token
                    try:
                        # print(user_id)
                        redis_jwt_token = redis_store.get("jwt_token:%s" % user_id)
                        # print('👈redis', redis_jwt_token)
                    except Exception as e:
                        current_app.logger.error(e)
                        return dict(code=HttpCode.db_error, msg="redis查询token失败")
                    if not redis_jwt_token or redis_jwt_token != auth_token:
                        # print('👉👉   解析出来的token', auth_token)
                        return dict(code=HttpCode.auth_error, msg="jwt-token失效")
                    # print(type(user.last_login_stamp), type(login_time))
                    # print(user.last_login_stamp, login_time)
                    if user.last_login_stamp == login_time:

                        return dict(code=HttpCode.ok, msg='用户认证成功', data={"user_id": user.user_id})
                    else:
                        return dict(code=HttpCode.auth_error, msg='用户认证失败,需要再次登录')
            else:
                return dict(code=HttpCode.auth_error, msg=payload_dict.get('msg') or '用户认证失败,携带认证参数不合法')
    else:
        return dic在代码中t(code主要=HttpCode.auth_error, msg='用户认证失败,请求未携带对应认证信息')

用户鉴权函数主要用于验证用户的身份是否合法。首先通过request.headers获取请求头中的Authorization字段,如果不存在,说明用户未携带对应认证信息,返回包含错误信息的字典。

如果存在该字段,就按照空格将其分割成一个列表,列表中包含两个元素,第一个元素为JWT,第二个元素为JWT令牌的字符串值。如果auth_token_arr为空,那么auth_token_arr第一个元素不包含 “JWT” 字符串,或者分割后的auth_token_arr长度不为2,这就证明JWT令牌格式不正确,需要返回认证失败的信息。

这一步如果通过的话,我们再将auth_token_arr列表中的第二个值,也就是JWT令牌的字符串值赋给auth_token,并将解码结果赋值payload_dict。

下一步就是判断payload_dict中是否有payload字段,且code字段的值是否为200。不符合判断条件同样要返回错误信息,说明携带认证参数不合法。如果符合条件,就从payload中把用户ID、登录时间和payload信息取出来,并根据用户ID在用户登录表中完成查询。

如果不存在该用户同样要返回错误。如果用户存在,则从 Redis内存中,获取以 user_id 为键的jwt_token,赋给redis_jwt_token。如果内存中取不出来该值,这时候就返回错误。

紧接着会再次做条件判断,如果请求中解析出的JWT令牌的字符串值,跟之前存储在内存中的不相符合,同样要返回错误。最后,验证该token对应的登录时间戳是否与数据库中最近一次登录时间戳一致。如果一致,则表示认证通过,否则表示需要重新登录。

在实操环节我们知道Token的认证流程是当用户在进行首次登录,服务器会使用密钥和加密算法,生成Token,发送给客户端,由客户端自己进行存储。等再次登录时,客户端携带Token请求资源,服务器会进行Token的认证,完成一系列验证(如Token是否过期,JWT令牌的格式是否正确等),通过异常处理的把控来保证Token认证的安全和稳定性。

总结

又到了课程的尾声,我们来回顾总结一下。

这节课,我们主要是通过项目实战来强化对认证机制的应用。在项目中应用也是一样的认证流程,我们先要生成Token,借助Flask的扩展Flask-JWT来生成Token。你需要掌握生成Token的代码,理解它的生成过程。

之后就是Token验证和认证阶段,Token的验证就是借助JWT扩展的decode函数,将客户端发送的Token进行解码。我们重点要关注登录认证成功的前提下,客户端接收Token以后,下次向服务端请求资源的时候,必须带着服务端签发的 Token,这样才能实现对用户信息的认证。

用户鉴权函数主要用于验证用户的身份是否合法。鉴定方法就是通过request.headers请求头中的Authorization字段来判断:如果该字段不存在,说明用户未携带对应认证信息;如果存在则需要我们验证内部参数来判定。

通过这节课的实操练习,相信你会对认证机制的应用得更加熟练。课程里有很多的代码,一定在课后自己多实践。下节课我们即将开启功能接口的实战,不见不散。

思考题

前面的课程里,我们讲到了current_app,session,request,你知道他们有什么区别么?

欢迎你在留言区和我交流互动,如果这节课对你有启发,也推荐你把这节课分享给更多朋友。

精选留言(3)
  • 胡歌衡阳分歌 👍(1) 💬(3)

    这个教程是相当的不完整啊,看的心烦

    2023-09-29

  • 石佛慈悲 👍(0) 💬(1)

    token存在redis里的意义是啥呢,为啥还要校验redis的token,不是解码比对都已经校验了吗?为了控制token失效?

    2023-12-05

  • peter 👍(0) 💬(1)

    请教老师两个问题: Q1:Config.SECRET_KEY是系统自带的吗? Q2:token放在http的header中的Authorization字段,Authorization字段是http固有的字段吗?记不清楚了,好像应该是自定义字段?

    2023-06-27