# 示例

检查器可以监听Lyrebird消息总线中的任意频道,对其中需要校验的目标数据进行检测。

目前,Lyrebird中提供了如下的示例脚本:

Filename Type Description
img_size.py (opens new window) 检查器 检查网络请求中图片大小是否超出限制
duplicate_requests.py (opens new window) 检查器 检查在某段时间内是否有重复的网络请求
add_request_param.py (opens new window) 修改器 修改请求参数
add_response_header.py (opens new window) 修改器 修改请求返回数据

# 大图检测

在网络请求中,图片是一种高消耗资源的数据,移动设备无需加载过大的图片,因此需要对这类请求的数据进行校验。

检查的思路为,监听flow频道,从server.response部分中的头部信息读取图片大小,当图片大小超过阈值500KB时发出报警。

# 忽略无关数据

仅关注server.response部分的数据,且Content-Type为image的数据。

from lyrebird import event

THRESHOLD_IMG_SIZE = 500

@event('flow')
def img_size(msg):

    # 1.ignore unexepcted object
    if ignore_check(msg):
        return

def ignore_check(msg):
    if msg['name'] != 'server.response':
        return True
    if 'response' not in msg['flow']:
        return True
    if 'image' not in msg['flow']['response']['headers']['Content-Type']:
        return True
    return False

# 获取目标数据

从获得的数据集中,获取检测所需的目标数据:

  • size: 图片大小
from decimal import Decimal
...

@event('flow')
def img_size(msg):

    # 1.ignore unexepcted object
    if ignore_check(msg):
        return

    # 2.prepare useful info
    img_size = int(msg['flow']['size'])
    img_size = Decimal(img_size / 1024).quantize(Decimal('0.0'))

# 得出校验结果

对目标数据进行校验,当图片大小超过500KB时,发出大图报警。

...

@event('flow')
def img_size(msg):

    # 1.ignore unexepcted object
    if ignore_check(msg):
        return

    # 2.prepare useful info
    img_size = int(msg['flow']['size'])
    img_size = Decimal(img_size / 1024).quantize(Decimal('0.0'))

    # 3.check data
    if img_size > THRESHOLD_IMG_SIZE:
        img_url = msg['flow']['request']['url']
        event.issue(f'Image size {img_size}KB is beyond expectations: {img_url}\n')

# 重复请求检测

在一组网络请求中,可能会出现重复请求同一个接口的情况。

检查的思路为,监听flow频道,从client.request部分中读取URL,当在200毫秒内发现重复的请求时,则发出重复请求报警。

# 忽略无关数据

仅关注client.request部分的数据,且过滤掉不关注的域名。

from lyrebird import event
from urllib.parse import urlparse

IGNORE_HOSTNAME = [
    'report.meituan.com',
    'frep.meituan.net'
]

@event('flow')
def duplicate_request(msg):

    # 1.ignore unexepcted object
    if ignore_check(msg):
        return

def ignore_check(msg):
    if msg.get('name') != 'client.request':
        return True
    if urlparse(msg['flow']['request']['url']).hostname in IGNORE_HOSTNAME:
        return True
    return False

# 获取目标数据

获取校验所需的数据:

  • url:请求的标示,判断是否有重复请求的重要依据。需要过滤掉请求中不关注的参数,并使参数有序排列
  • method: 请求方法
  • request_key:由处理后的url和method生成的串,作为请求的唯一标示
  • time:请求发出的时间
import hashlib
from urllib.parse import urlparse, urlencode, parse_qsl

IGNORE_PARAMETER = ['token',
                    'version_name',
                    'uuid'
                    ]

@event('flow')
def duplicate_request(msg):
    ...

    # 2.prepare useful info
    origin_url = msg['flow']['request']['url']
    sorted_url = sort_params(origin_url[origin_url.rfind('//')+2:])
    request_key_list = [
        sorted_url,
        msg['flow']['request']['method']
    ]
    request_key = get_md5_code(request_key_list)
    request_time = msg['time']

def sort_params(url):
    # serialize parameters and remove ignored parameters
    if len(urlparse(url).query):
        origin_parameters = parse_qsl(urlparse(url).query)
        origin_parameters = [param for param in origin_parameters if param[0] not in IGNORE_PARAMETER]
        origin_parameters.sort(key=lambda x:x[0])
        return url.split('?')[0] + '?' + urlencode(origin_parameters)
    else:
        return url

def get_md5_code(keys:list):
    md5_module = hashlib.sha224()
    for key in keys:
        md5_module.update(bytes(key, encoding = "utf8"))
    return md5_module.hexdigest()

# 得到校验结果

维护一个记录所有历史请求的存储器,若当前请求存在于历史请求中,且两次请求的时间间隔小于200ms,则界定为一次重复请求。

import time
from collections import OrderedDict

THRESHOLD_TIME = 0.2

HISTORY_URL = OrderedDict()

@event('flow')
def duplicate_request(msg):
    ...

    # 3.check data
    if request_key in HISTORY_URL:
        if request_time - HISTORY_URL[request_key]['time'] >= THRESHOLD_TIME:
            return

        url = HISTORY_URL[request_key]['url']
        event.issue(f'Duplicate request: {url}')

# 更新验证集合

维护历史请求数据,将新的请求数据存入历史请求,并删除其中的过期数据。

...

@event('flow')
def duplicate_request(msg):
    ...

    # 4.update storage data
    HISTORY_URL.update({
        request_key: {
            'time': request_time,
            'url': sorted_url
        }
    })

    overdue_urls = []
    for key, value in HISTORY_URL.items():
        if time.time() - value['time'] >= THRESHOLD_TIME:
            overdue_urls.append(key)
        else:
            break
    for overdue_url in overdue_urls:
        del HISTORY_URL[overdue_url]