• 周日. 10 月 6th, 2024

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

HttpRunner3源码阅读:8. 用例文件生成并格式化make

admin

11 月 28, 2021

make

这个文件中主要实现了 相关的文件生成,目录生成make.py,其实这个文件应该在client.py前看的

可用资料

jinja2[模板,常用于Python Web 前后端不分离]: https://jinja.palletsprojects.com/en/3.0.x/intro/
black[代码格式化]: https://black.readthedocs.io/en/stable/
from string import Template 也可实现字符串模板替换
string包下的Template: https://docs.python.org/3/library/string.html#format-string-syntax
argparse[命令行选项]:https://docs.python.org/zh-cn/3/library/argparse.html

导包

import os
import string
import subprocess
import sys
from typing import Text, List, Tuple, Dict, Set, NoReturn

# 模板库,写过flask 的应该很熟悉,其灵感来自django
import jinja2
from loguru import logger
from sentry_sdk import capture_exception

from httprunner import exceptions, __version__
# v2 v3 版本兼容
from httprunner.compat import (
    ensure_testcase_v3_api,
    ensure_testcase_v3,
    convert_variables,
    ensure_path_sep,
)
# 导文件处理的
from httprunner.loader import (
    load_folder_files,
    load_test_file,
    load_testcase,
    load_testsuite,
    load_project_meta,
    convert_relative_project_root_dir,
)
# 验证器
from httprunner.response import uniform_validator
# 变量合并,多进程处理
from httprunner.utils import merge_variables, is_support_multiprocessing

源码附注释


""" cache converted pytest files, avoid duplicate making
pytest测试文件,缓存字典
"""
pytest_files_made_cache_mapping: Dict[Text, Text] = {}

""" save generated pytest files to run, except referenced testcase
    运行集合,排除引入的测试用例
"""
pytest_files_run_set: Set = set()
# httprunner3, test_xx.py 模板
__TEMPLATE__ = jinja2.Template(
    """# NOTE: Generated By HttpRunner v{{ version }}
# FROM: {{ testcase_path }}

{% if imports_list and diff_levels > 0 %}
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__){% for _ in range(diff_levels) %}.parent{% endfor %}))
{% endif %}

{% if parameters %}
import pytest
from httprunner import Parameters
{% endif %}

from httprunner import HttpRunner, Config, Step, RunRequest, RunTestCase
{% for import_str in imports_list %}
{{ import_str }}
{% endfor %}

class {{ class_name }}(HttpRunner):

    {% if parameters %}
    @pytest.mark.parametrize("param", Parameters({{parameters}}))
    def test_start(self, param):
        super().test_start(param)
    {% endif %}

    config = {{ config_chain_style }}

    teststeps = [
        {% for step_chain_style in teststeps_chain_style %}
            {{ step_chain_style }},
        {% endfor %}
    ]

if __name__ == "__main__":
    {{ class_name }}().test_start()

"""
)

# 不同操作系统地址符 兼容处理, 返回绝对地址
def __ensure_absolute(path: Text) -> Text:
    if path.startswith("./"):
        # Linux/Darwin, hrun ./test.yml
        path = path[len("./") :]
    elif path.startswith(".\"):
        # Windows, hrun .\test.yml
        path = path[len(".\") :]

    path = ensure_path_sep(path)
    project_meta = load_project_meta(path)

    if os.path.isabs(path):
        absolute_path = path
    else:
        absolute_path = os.path.join(project_meta.RootDir, path)

    if not os.path.isfile(absolute_path):
        logger.error(f"Invalid testcase file path: {absolute_path}")
        sys.exit(1)

    return absolute_path


def ensure_file_abs_path_valid(file_abs_path: Text) -> Text:
    """ ensure file path valid for pytest, handle cases when directory name includes dot/hyphen/space
            确保地址有效
    Args:
        file_abs_path: absolute file path

    Returns:
        ensured valid absolute file path

    """
    project_meta = load_project_meta(file_abs_path)
    raw_abs_file_name, file_suffix = os.path.splitext(file_abs_path)
    file_suffix = file_suffix.lower()

    raw_file_relative_name = convert_relative_project_root_dir(raw_abs_file_name)
    if raw_file_relative_name == "":
        return file_abs_path

    path_names = []
    for name in raw_file_relative_name.rstrip(os.sep).split(os.sep):
        # 包含数字0~9的字符串 string.digits
        if name[0] in string.digits:
            # ensure file name not startswith digit
            # 19 => T19, 2C => T2C
            name = f"T{name}"

        if name.startswith("."):
            # avoid ".csv" been converted to "_csv"
            pass
        else:
            # handle cases when directory name includes dot/hyphen/space
            name = name.replace(" ", "_").replace(".", "_").replace("-", "_")

        path_names.append(name)

    new_file_path = os.path.join(
        project_meta.RootDir, f"{os.sep.join(path_names)}{file_suffix}"
    )
    return new_file_path

# 按需生成测试模块 加让 __init__.py
def __ensure_testcase_module(path: Text) -> NoReturn:
    """ ensure pytest files are in python module, generate __init__.py on demand
    """
    init_file = os.path.join(os.path.dirname(path), "__init__.py")
    if os.path.isfile(init_file):
        return

    with open(init_file, "w", encoding="utf-8") as f:
        f.write("# NOTICE: Generated By HttpRunner. DO NOT EDIT!
")


def convert_testcase_path(testcase_abs_path: Text) -> Tuple[Text, Text]:
    """convert single YAML/JSON testcase path to python file"""
    testcase_new_path = ensure_file_abs_path_valid(testcase_abs_path)

    dir_path = os.path.dirname(testcase_new_path)
    file_name, _ = os.path.splitext(os.path.basename(testcase_new_path))
    testcase_python_abs_path = os.path.join(dir_path, f"{file_name}_test.py")

    # convert title case, e.g. request_with_variables => RequestWithVariables
    name_in_title_case = file_name.title().replace("_", "")

    return testcase_python_abs_path, name_in_title_case


def format_pytest_with_black(*python_paths: Text) -> NoReturn:
    # 格式化Python文件 Black
    logger.info("format pytest cases with black ...")
    try:
        if is_support_multiprocessing() or len(python_paths) <= 1:
            subprocess.run(["black", *python_paths])
        else:
            logger.warning(
                f"this system does not support multiprocessing well, format files one by one ..."
            )
            # 运行被 arg 描述的指令。等待指令完成,然后返回一个 CompletedProcess 实例。subprocess.run
            [subprocess.run(["black", path]) for path in python_paths]
    except subprocess.CalledProcessError as ex:
        capture_exception(ex)
        logger.error(ex)
        sys.exit(1)
    except FileNotFoundError:
        err_msg = """
missing dependency tool: black
install black manually and try again:
$ pip install black
"""
        logger.error(err_msg)
        sys.exit(1)


def make_config_chain_style(config: Dict) -> Text:
    # 模板文件中Config 格式化
    config_chain_style = f'Config("{config["name"]}")'

    if config["variables"]:
        variables = config["variables"]
        config_chain_style += f".variables(**{variables})"

    if "base_url" in config:
        config_chain_style += f'.base_url("{config["base_url"]}")'

    if "verify" in config:
        config_chain_style += f'.verify({config["verify"]})'

    if "export" in config:
        config_chain_style += f'.export(*{config["export"]})'

    if "weight" in config:
        config_chain_style += f'.locust_weight({config["weight"]})'

    return config_chain_style


def make_request_chain_style(request: Dict) -> Text:
    # Reuests部分格式化
    method = request["method"].lower()
    url = request["url"]
    request_chain_style = f'.{method}("{url}")'

    if "params" in request:
        params = request["params"]
        request_chain_style += f".with_params(**{params})"

    if "headers" in request:
        headers = request["headers"]
        request_chain_style += f".with_headers(**{headers})"

    if "cookies" in request:
        cookies = request["cookies"]
        request_chain_style += f".with_cookies(**{cookies})"

    if "data" in request:
        data = request["data"]
        if isinstance(data, Text):
            data = f'"{data}"'
        request_chain_style += f".with_data({data})"

    if "json" in request:
        req_json = request["json"]
        if isinstance(req_json, Text):
            req_json = f'"{req_json}"'
        request_chain_style += f".with_json({req_json})"

    if "timeout" in request:
        timeout = request["timeout"]
        request_chain_style += f".set_timeout({timeout})"

    if "verify" in request:
        verify = request["verify"]
        request_chain_style += f".set_verify({verify})"

    if "allow_redirects" in request:
        allow_redirects = request["allow_redirects"]
        request_chain_style += f".set_allow_redirects({allow_redirects})"

    if "upload" in request:
        upload = request["upload"]
        request_chain_style += f".upload(**{upload})"

    return request_chain_style


def make_teststep_chain_style(teststep: Dict) -> Text:
    # Step 部分格式化
    if teststep.get("request"):  # 全新请求
        step_info = f'RunRequest("{teststep["name"]}")'
    elif teststep.get("testcase"):  # 调用其他测试用例
        step_info = f'RunTestCase("{teststep["name"]}")'
    else:
        raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")

    if "variables" in teststep:
        variables = teststep["variables"]
        step_info += f".with_variables(**{variables})"

    if "setup_hooks" in teststep:
        setup_hooks = teststep["setup_hooks"]
        for hook in setup_hooks:
            if isinstance(hook, Text):
                step_info += f'.setup_hook("{hook}")'
            elif isinstance(hook, Dict) and len(hook) == 1:
                assign_var_name, hook_content = list(hook.items())[0]
                step_info += f'.setup_hook("{hook_content}", "{assign_var_name}")'
            else:
                raise exceptions.TestCaseFormatError(f"Invalid setup hook: {hook}")

    if teststep.get("request"):
        step_info += make_request_chain_style(teststep["request"])
    elif teststep.get("testcase"):
        testcase = teststep["testcase"]
        call_ref_testcase = f".call({testcase})"
        step_info += call_ref_testcase

    if "teardown_hooks" in teststep:
        teardown_hooks = teststep["teardown_hooks"]
        for hook in teardown_hooks:
            if isinstance(hook, Text):
                step_info += f'.teardown_hook("{hook}")'
            elif isinstance(hook, Dict) and len(hook) == 1:
                assign_var_name, hook_content = list(hook.items())[0]
                step_info += f'.teardown_hook("{hook_content}", "{assign_var_name}")'
            else:
                raise exceptions.TestCaseFormatError(f"Invalid teardown hook: {hook}")

    if "extract" in teststep:
        # request step
        step_info += ".extract()"
        for extract_name, extract_path in teststep["extract"].items():
            step_info += f""".with_jmespath('{extract_path}', '{extract_name}')"""

    if "export" in teststep:
        # reference testcase step
        export: List[Text] = teststep["export"]
        step_info += f".export(*{export})"

    if "validate" in teststep:
        step_info += ".validate()"

        for v in teststep["validate"]:
            validator = uniform_validator(v)
            assert_method = validator["assert"]
            check = validator["check"]
            if '"' in check:
                # e.g. body."user-agent" => 'body."user-agent"'
                check = f"'{check}'"
            else:
                check = f'"{check}"'
            expect = validator["expect"]
            if isinstance(expect, Text):
                expect = f'"{expect}"'

            message = validator["message"]
            if message:
                step_info += f".assert_{assert_method}({check}, {expect}, '{message}')"
            else:
                step_info += f".assert_{assert_method}({check}, {expect})"

    return f"Step({step_info})"

# 生成测试用例文件
def make_testcase(testcase: Dict, dir_path: Text = None) -> Text:
    """convert valid testcase dict to pytest file path"""
    # ensure compatibility with testcase format v2
    testcase = ensure_testcase_v3(testcase)

    # validate testcase format
    load_testcase(testcase)

    testcase_abs_path = __ensure_absolute(testcase["config"]["path"])
    logger.info(f"start to make testcase: {testcase_abs_path}")

    testcase_python_abs_path, testcase_cls_name = convert_testcase_path(
        testcase_abs_path
    )
    if dir_path:
        testcase_python_abs_path = os.path.join(
            dir_path, os.path.basename(testcase_python_abs_path)
        )

    global pytest_files_made_cache_mapping
    if testcase_python_abs_path in pytest_files_made_cache_mapping:
        return testcase_python_abs_path

    config = testcase["config"]
    config["path"] = convert_relative_project_root_dir(testcase_python_abs_path)
    config["variables"] = convert_variables(
        config.get("variables", {}), testcase_abs_path
    )

    # prepare reference testcase
    imports_list = []
    teststeps = testcase["teststeps"]
    for teststep in teststeps:
        if not teststep.get("testcase"):
            continue

        # make ref testcase pytest file
        ref_testcase_path = __ensure_absolute(teststep["testcase"])
        test_content = load_test_file(ref_testcase_path)

        if not isinstance(test_content, Dict):
            raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")

        # api in v2 format, convert to v3 testcase
        if "request" in test_content and "name" in test_content:
            test_content = ensure_testcase_v3_api(test_content)

        test_content.setdefault("config", {})["path"] = ref_testcase_path
        ref_testcase_python_abs_path = make_testcase(test_content)

        # override testcase export
        ref_testcase_export: List = test_content["config"].get("export", [])
        if ref_testcase_export:
            step_export: List = teststep.setdefault("export", [])
            step_export.extend(ref_testcase_export)
            teststep["export"] = list(set(step_export))

        # prepare ref testcase class name
        ref_testcase_cls_name = pytest_files_made_cache_mapping[
            ref_testcase_python_abs_path
        ]
        teststep["testcase"] = ref_testcase_cls_name

        # prepare import ref testcase
        ref_testcase_python_relative_path = convert_relative_project_root_dir(
            ref_testcase_python_abs_path
        )
        ref_module_name, _ = os.path.splitext(ref_testcase_python_relative_path)
        ref_module_name = ref_module_name.replace(os.sep, ".")
        import_expr = f"from {ref_module_name} import TestCase{ref_testcase_cls_name} as {ref_testcase_cls_name}"
        if import_expr not in imports_list:
            imports_list.append(import_expr)

    testcase_path = convert_relative_project_root_dir(testcase_abs_path)
    # current file compared to ProjectRootDir
    diff_levels = len(testcase_path.split(os.sep))

    data = {
        "version": __version__,
        "testcase_path": testcase_path,
        "diff_levels": diff_levels,
        "class_name": f"TestCase{testcase_cls_name}",
        "imports_list": imports_list,
        "config_chain_style": make_config_chain_style(config),
        "parameters": config.get("parameters"),
        "teststeps_chain_style": [
            make_teststep_chain_style(step) for step in teststeps
        ],
    }
    # 套入数据,替换模板中变量
    content = __TEMPLATE__.render(data)

    # ensure new file's directory exists
    dir_path = os.path.dirname(testcase_python_abs_path)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    # 写入文件
    with open(testcase_python_abs_path, "w", encoding="utf-8") as f:
        f.write(content)

    pytest_files_made_cache_mapping[testcase_python_abs_path] = testcase_cls_name
    __ensure_testcase_module(testcase_python_abs_path)

    logger.info(f"generated testcase: {testcase_python_abs_path}")

    return testcase_python_abs_path

# 生成测试套件目录
def make_testsuite(testsuite: Dict) -> NoReturn:
    """convert valid testsuite dict to pytest folder with testcases"""
    # validate testsuite format
    load_testsuite(testsuite)

    testsuite_config = testsuite["config"]
    testsuite_path = testsuite_config["path"]
    testsuite_variables = convert_variables(
        testsuite_config.get("variables", {}), testsuite_path
    )

    logger.info(f"start to make testsuite: {testsuite_path}")

    # create directory with testsuite file name, put its testcases under this directory
    testsuite_path = ensure_file_abs_path_valid(testsuite_path)
    testsuite_dir, file_suffix = os.path.splitext(testsuite_path)
    # demo_testsuite.yml => demo_testsuite_yml
    testsuite_dir = f"{testsuite_dir}_{file_suffix.lstrip('.')}"

    for testcase in testsuite["testcases"]:
        # get referenced testcase content
        testcase_file = testcase["testcase"]
        testcase_path = __ensure_absolute(testcase_file)
        testcase_dict = load_test_file(testcase_path)
        testcase_dict.setdefault("config", {})
        testcase_dict["config"]["path"] = testcase_path

        # override testcase name
        testcase_dict["config"]["name"] = testcase["name"]
        # override base_url
        base_url = testsuite_config.get("base_url") or testcase.get("base_url")
        if base_url:
            testcase_dict["config"]["base_url"] = base_url
        # override verify
        if "verify" in testsuite_config:
            testcase_dict["config"]["verify"] = testsuite_config["verify"]
        # override variables
        # testsuite testcase variables > testsuite config variables
        testcase_variables = convert_variables(
            testcase.get("variables", {}), testcase_path
        )
        testcase_variables = merge_variables(testcase_variables, testsuite_variables)
        # testsuite testcase variables > testcase config variables
        testcase_dict["config"]["variables"] = convert_variables(
            testcase_dict["config"].get("variables", {}), testcase_path
        )
        testcase_dict["config"]["variables"].update(testcase_variables)

        # override weight
        if "weight" in testcase:
            testcase_dict["config"]["weight"] = testcase["weight"]

        # make testcase
        testcase_pytest_path = make_testcase(testcase_dict, testsuite_dir)
        pytest_files_run_set.add(testcase_pytest_path)

# 生成的测试文件 缓存进入字典
def __make(tests_path: Text) -> NoReturn:
    """ make testcase(s) with testcase/testsuite/folder absolute path
        generated pytest file path will be cached in pytest_files_made_cache_mapping

    Args:
        tests_path: should be in absolute path

    """
    logger.info(f"make path: {tests_path}")
    test_files = []
    if os.path.isdir(tests_path):
        files_list = load_folder_files(tests_path)
        test_files.extend(files_list)
    elif os.path.isfile(tests_path):
        test_files.append(tests_path)
    else:
        raise exceptions.TestcaseNotFound(f"Invalid tests path: {tests_path}")

    for test_file in test_files:
        if test_file.lower().endswith("_test.py"):
            pytest_files_run_set.add(test_file)
            continue

        try:
            test_content = load_test_file(test_file)
        except (exceptions.FileNotFound, exceptions.FileFormatError) as ex:
            logger.warning(f"Invalid test file: {test_file}
{type(ex).__name__}: {ex}")
            continue

        if not isinstance(test_content, Dict):
            logger.warning(
                f"Invalid test file: {test_file}
"
                f"reason: test content not in dict format."
            )
            continue

        # api in v2 format, convert to v3 testcase
        if "request" in test_content and "name" in test_content:
            test_content = ensure_testcase_v3_api(test_content)

        if "config" not in test_content:
            logger.warning(
                f"Invalid testcase/testsuite file: {test_file}
"
                f"reason: missing config part."
            )
            continue
        elif not isinstance(test_content["config"], Dict):
            logger.warning(
                f"Invalid testcase/testsuite file: {test_file}
"
                f"reason: config should be dict type, got {test_content['config']}"
            )
            continue

        # ensure path absolute
        test_content.setdefault("config", {})["path"] = test_file

        # testcase
        if "teststeps" in test_content:
            try:
                testcase_pytest_path = make_testcase(test_content)
                pytest_files_run_set.add(testcase_pytest_path)
            except exceptions.TestCaseFormatError as ex:
                logger.warning(
                    f"Invalid testcase file: {test_file}
{type(ex).__name__}: {ex}"
                )
                continue

        # testsuite
        elif "testcases" in test_content:
            try:
                make_testsuite(test_content)
            except exceptions.TestSuiteFormatError as ex:
                logger.warning(
                    f"Invalid testsuite file: {test_file}
{type(ex).__name__}: {ex}"
                )
                continue

        # invalid format
        else:
            logger.warning(
                f"Invalid test file: {test_file}
"
                f"reason: file content is neither testcase nor testsuite"
            )


def main_make(tests_paths: List[Text]) -> List[Text]:
    if not tests_paths:
        return []

    for tests_path in tests_paths:
        tests_path = ensure_path_sep(tests_path)
        if not os.path.isabs(tests_path): # 如果是个绝对路径 就返回True
            tests_path = os.path.join(os.getcwd(), tests_path)

        try:
            __make(tests_path)
        except exceptions.MyBaseError as ex:
            logger.error(ex)
            sys.exit(1)

    # format pytest files
    pytest_files_format_list = pytest_files_made_cache_mapping.keys()
    # 可视化所有生成的测试py文件
    format_pytest_with_black(*pytest_files_format_list)

    return list(pytest_files_run_set)


def init_make_parser(subparsers):
    """ make testcases: parse command line options and run commands.
     add_subparsers() 方法通常不带参数地调用并返回一个特殊的动作对象
    """
    parser = subparsers.add_parser(
        "make", help="Convert YAML/JSON testcases to pytest cases.",
    )
    parser.add_argument(
        "testcase_path", nargs="*", help="Specify YAML/JSON testcase file/folder path"
    )

    return parser
作者:zy7y
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文链接,否则保留追究法律责任的权利。

发表回复