make
这个文件中主要实现了 相关的文件生成,目录生成
make.py
,其实这个文件应该在client.py
前看的
可用资料
jinja2[模板,常用于Python Web 前后端不分离]: https://jinja.palletsprojects.com/en/3.0.x/intro/
black[代码格式化]: https://black.readthedocs.io/en/stable/
from string import Template 也可实现字符串模板替换
string包下的Template: https://docs.python.org/3/library/string.html#format-string-syntax
argparse[命令行选项]:https://docs.python.org/zh-cn/3/library/argparse.html
导包
import os
import string
import subprocess
import sys
from typing import Text, List, Tuple, Dict, Set, NoReturn
# 模板库,写过flask 的应该很熟悉,其灵感来自django
import jinja2
from loguru import logger
from sentry_sdk import capture_exception
from httprunner import exceptions, __version__
# v2 v3 版本兼容
from httprunner.compat import (
ensure_testcase_v3_api,
ensure_testcase_v3,
convert_variables,
ensure_path_sep,
)
# 导文件处理的
from httprunner.loader import (
load_folder_files,
load_test_file,
load_testcase,
load_testsuite,
load_project_meta,
convert_relative_project_root_dir,
)
# 验证器
from httprunner.response import uniform_validator
# 变量合并,多进程处理
from httprunner.utils import merge_variables, is_support_multiprocessing
源码附注释
""" cache converted pytest files, avoid duplicate making
pytest测试文件,缓存字典
"""
pytest_files_made_cache_mapping: Dict[Text, Text] = {}
""" save generated pytest files to run, except referenced testcase
运行集合,排除引入的测试用例
"""
pytest_files_run_set: Set = set()
# httprunner3, test_xx.py 模板
__TEMPLATE__ = jinja2.Template(
"""# NOTE: Generated By HttpRunner v{{ version }}
# FROM: {{ testcase_path }}
{% if imports_list and diff_levels > 0 %}
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__){% for _ in range(diff_levels) %}.parent{% endfor %}))
{% endif %}
{% if parameters %}
import pytest
from httprunner import Parameters
{% endif %}
from httprunner import HttpRunner, Config, Step, RunRequest, RunTestCase
{% for import_str in imports_list %}
{{ import_str }}
{% endfor %}
class {{ class_name }}(HttpRunner):
{% if parameters %}
@pytest.mark.parametrize("param", Parameters({{parameters}}))
def test_start(self, param):
super().test_start(param)
{% endif %}
config = {{ config_chain_style }}
teststeps = [
{% for step_chain_style in teststeps_chain_style %}
{{ step_chain_style }},
{% endfor %}
]
if __name__ == "__main__":
{{ class_name }}().test_start()
"""
)
# 不同操作系统地址符 兼容处理, 返回绝对地址
def __ensure_absolute(path: Text) -> Text:
if path.startswith("./"):
# Linux/Darwin, hrun ./test.yml
path = path[len("./") :]
elif path.startswith(".\"):
# Windows, hrun .\test.yml
path = path[len(".\") :]
path = ensure_path_sep(path)
project_meta = load_project_meta(path)
if os.path.isabs(path):
absolute_path = path
else:
absolute_path = os.path.join(project_meta.RootDir, path)
if not os.path.isfile(absolute_path):
logger.error(f"Invalid testcase file path: {absolute_path}")
sys.exit(1)
return absolute_path
def ensure_file_abs_path_valid(file_abs_path: Text) -> Text:
""" ensure file path valid for pytest, handle cases when directory name includes dot/hyphen/space
确保地址有效
Args:
file_abs_path: absolute file path
Returns:
ensured valid absolute file path
"""
project_meta = load_project_meta(file_abs_path)
raw_abs_file_name, file_suffix = os.path.splitext(file_abs_path)
file_suffix = file_suffix.lower()
raw_file_relative_name = convert_relative_project_root_dir(raw_abs_file_name)
if raw_file_relative_name == "":
return file_abs_path
path_names = []
for name in raw_file_relative_name.rstrip(os.sep).split(os.sep):
# 包含数字0~9的字符串 string.digits
if name[0] in string.digits:
# ensure file name not startswith digit
# 19 => T19, 2C => T2C
name = f"T{name}"
if name.startswith("."):
# avoid ".csv" been converted to "_csv"
pass
else:
# handle cases when directory name includes dot/hyphen/space
name = name.replace(" ", "_").replace(".", "_").replace("-", "_")
path_names.append(name)
new_file_path = os.path.join(
project_meta.RootDir, f"{os.sep.join(path_names)}{file_suffix}"
)
return new_file_path
# 按需生成测试模块 加让 __init__.py
def __ensure_testcase_module(path: Text) -> NoReturn:
""" ensure pytest files are in python module, generate __init__.py on demand
"""
init_file = os.path.join(os.path.dirname(path), "__init__.py")
if os.path.isfile(init_file):
return
with open(init_file, "w", encoding="utf-8") as f:
f.write("# NOTICE: Generated By HttpRunner. DO NOT EDIT!
")
def convert_testcase_path(testcase_abs_path: Text) -> Tuple[Text, Text]:
"""convert single YAML/JSON testcase path to python file"""
testcase_new_path = ensure_file_abs_path_valid(testcase_abs_path)
dir_path = os.path.dirname(testcase_new_path)
file_name, _ = os.path.splitext(os.path.basename(testcase_new_path))
testcase_python_abs_path = os.path.join(dir_path, f"{file_name}_test.py")
# convert title case, e.g. request_with_variables => RequestWithVariables
name_in_title_case = file_name.title().replace("_", "")
return testcase_python_abs_path, name_in_title_case
def format_pytest_with_black(*python_paths: Text) -> NoReturn:
# 格式化Python文件 Black
logger.info("format pytest cases with black ...")
try:
if is_support_multiprocessing() or len(python_paths) <= 1:
subprocess.run(["black", *python_paths])
else:
logger.warning(
f"this system does not support multiprocessing well, format files one by one ..."
)
# 运行被 arg 描述的指令。等待指令完成,然后返回一个 CompletedProcess 实例。subprocess.run
[subprocess.run(["black", path]) for path in python_paths]
except subprocess.CalledProcessError as ex:
capture_exception(ex)
logger.error(ex)
sys.exit(1)
except FileNotFoundError:
err_msg = """
missing dependency tool: black
install black manually and try again:
$ pip install black
"""
logger.error(err_msg)
sys.exit(1)
def make_config_chain_style(config: Dict) -> Text:
# 模板文件中Config 格式化
config_chain_style = f'Config("{config["name"]}")'
if config["variables"]:
variables = config["variables"]
config_chain_style += f".variables(**{variables})"
if "base_url" in config:
config_chain_style += f'.base_url("{config["base_url"]}")'
if "verify" in config:
config_chain_style += f'.verify({config["verify"]})'
if "export" in config:
config_chain_style += f'.export(*{config["export"]})'
if "weight" in config:
config_chain_style += f'.locust_weight({config["weight"]})'
return config_chain_style
def make_request_chain_style(request: Dict) -> Text:
# Reuests部分格式化
method = request["method"].lower()
url = request["url"]
request_chain_style = f'.{method}("{url}")'
if "params" in request:
params = request["params"]
request_chain_style += f".with_params(**{params})"
if "headers" in request:
headers = request["headers"]
request_chain_style += f".with_headers(**{headers})"
if "cookies" in request:
cookies = request["cookies"]
request_chain_style += f".with_cookies(**{cookies})"
if "data" in request:
data = request["data"]
if isinstance(data, Text):
data = f'"{data}"'
request_chain_style += f".with_data({data})"
if "json" in request:
req_json = request["json"]
if isinstance(req_json, Text):
req_json = f'"{req_json}"'
request_chain_style += f".with_json({req_json})"
if "timeout" in request:
timeout = request["timeout"]
request_chain_style += f".set_timeout({timeout})"
if "verify" in request:
verify = request["verify"]
request_chain_style += f".set_verify({verify})"
if "allow_redirects" in request:
allow_redirects = request["allow_redirects"]
request_chain_style += f".set_allow_redirects({allow_redirects})"
if "upload" in request:
upload = request["upload"]
request_chain_style += f".upload(**{upload})"
return request_chain_style
def make_teststep_chain_style(teststep: Dict) -> Text:
# Step 部分格式化
if teststep.get("request"): # 全新请求
step_info = f'RunRequest("{teststep["name"]}")'
elif teststep.get("testcase"): # 调用其他测试用例
step_info = f'RunTestCase("{teststep["name"]}")'
else:
raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")
if "variables" in teststep:
variables = teststep["variables"]
step_info += f".with_variables(**{variables})"
if "setup_hooks" in teststep:
setup_hooks = teststep["setup_hooks"]
for hook in setup_hooks:
if isinstance(hook, Text):
step_info += f'.setup_hook("{hook}")'
elif isinstance(hook, Dict) and len(hook) == 1:
assign_var_name, hook_content = list(hook.items())[0]
step_info += f'.setup_hook("{hook_content}", "{assign_var_name}")'
else:
raise exceptions.TestCaseFormatError(f"Invalid setup hook: {hook}")
if teststep.get("request"):
step_info += make_request_chain_style(teststep["request"])
elif teststep.get("testcase"):
testcase = teststep["testcase"]
call_ref_testcase = f".call({testcase})"
step_info += call_ref_testcase
if "teardown_hooks" in teststep:
teardown_hooks = teststep["teardown_hooks"]
for hook in teardown_hooks:
if isinstance(hook, Text):
step_info += f'.teardown_hook("{hook}")'
elif isinstance(hook, Dict) and len(hook) == 1:
assign_var_name, hook_content = list(hook.items())[0]
step_info += f'.teardown_hook("{hook_content}", "{assign_var_name}")'
else:
raise exceptions.TestCaseFormatError(f"Invalid teardown hook: {hook}")
if "extract" in teststep:
# request step
step_info += ".extract()"
for extract_name, extract_path in teststep["extract"].items():
step_info += f""".with_jmespath('{extract_path}', '{extract_name}')"""
if "export" in teststep:
# reference testcase step
export: List[Text] = teststep["export"]
step_info += f".export(*{export})"
if "validate" in teststep:
step_info += ".validate()"
for v in teststep["validate"]:
validator = uniform_validator(v)
assert_method = validator["assert"]
check = validator["check"]
if '"' in check:
# e.g. body."user-agent" => 'body."user-agent"'
check = f"'{check}'"
else:
check = f'"{check}"'
expect = validator["expect"]
if isinstance(expect, Text):
expect = f'"{expect}"'
message = validator["message"]
if message:
step_info += f".assert_{assert_method}({check}, {expect}, '{message}')"
else:
step_info += f".assert_{assert_method}({check}, {expect})"
return f"Step({step_info})"
# 生成测试用例文件
def make_testcase(testcase: Dict, dir_path: Text = None) -> Text:
"""convert valid testcase dict to pytest file path"""
# ensure compatibility with testcase format v2
testcase = ensure_testcase_v3(testcase)
# validate testcase format
load_testcase(testcase)
testcase_abs_path = __ensure_absolute(testcase["config"]["path"])
logger.info(f"start to make testcase: {testcase_abs_path}")
testcase_python_abs_path, testcase_cls_name = convert_testcase_path(
testcase_abs_path
)
if dir_path:
testcase_python_abs_path = os.path.join(
dir_path, os.path.basename(testcase_python_abs_path)
)
global pytest_files_made_cache_mapping
if testcase_python_abs_path in pytest_files_made_cache_mapping:
return testcase_python_abs_path
config = testcase["config"]
config["path"] = convert_relative_project_root_dir(testcase_python_abs_path)
config["variables"] = convert_variables(
config.get("variables", {}), testcase_abs_path
)
# prepare reference testcase
imports_list = []
teststeps = testcase["teststeps"]
for teststep in teststeps:
if not teststep.get("testcase"):
continue
# make ref testcase pytest file
ref_testcase_path = __ensure_absolute(teststep["testcase"])
test_content = load_test_file(ref_testcase_path)
if not isinstance(test_content, Dict):
raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")
# api in v2 format, convert to v3 testcase
if "request" in test_content and "name" in test_content:
test_content = ensure_testcase_v3_api(test_content)
test_content.setdefault("config", {})["path"] = ref_testcase_path
ref_testcase_python_abs_path = make_testcase(test_content)
# override testcase export
ref_testcase_export: List = test_content["config"].get("export", [])
if ref_testcase_export:
step_export: List = teststep.setdefault("export", [])
step_export.extend(ref_testcase_export)
teststep["export"] = list(set(step_export))
# prepare ref testcase class name
ref_testcase_cls_name = pytest_files_made_cache_mapping[
ref_testcase_python_abs_path
]
teststep["testcase"] = ref_testcase_cls_name
# prepare import ref testcase
ref_testcase_python_relative_path = convert_relative_project_root_dir(
ref_testcase_python_abs_path
)
ref_module_name, _ = os.path.splitext(ref_testcase_python_relative_path)
ref_module_name = ref_module_name.replace(os.sep, ".")
import_expr = f"from {ref_module_name} import TestCase{ref_testcase_cls_name} as {ref_testcase_cls_name}"
if import_expr not in imports_list:
imports_list.append(import_expr)
testcase_path = convert_relative_project_root_dir(testcase_abs_path)
# current file compared to ProjectRootDir
diff_levels = len(testcase_path.split(os.sep))
data = {
"version": __version__,
"testcase_path": testcase_path,
"diff_levels": diff_levels,
"class_name": f"TestCase{testcase_cls_name}",
"imports_list": imports_list,
"config_chain_style": make_config_chain_style(config),
"parameters": config.get("parameters"),
"teststeps_chain_style": [
make_teststep_chain_style(step) for step in teststeps
],
}
# 套入数据,替换模板中变量
content = __TEMPLATE__.render(data)
# ensure new file's directory exists
dir_path = os.path.dirname(testcase_python_abs_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# 写入文件
with open(testcase_python_abs_path, "w", encoding="utf-8") as f:
f.write(content)
pytest_files_made_cache_mapping[testcase_python_abs_path] = testcase_cls_name
__ensure_testcase_module(testcase_python_abs_path)
logger.info(f"generated testcase: {testcase_python_abs_path}")
return testcase_python_abs_path
# 生成测试套件目录
def make_testsuite(testsuite: Dict) -> NoReturn:
"""convert valid testsuite dict to pytest folder with testcases"""
# validate testsuite format
load_testsuite(testsuite)
testsuite_config = testsuite["config"]
testsuite_path = testsuite_config["path"]
testsuite_variables = convert_variables(
testsuite_config.get("variables", {}), testsuite_path
)
logger.info(f"start to make testsuite: {testsuite_path}")
# create directory with testsuite file name, put its testcases under this directory
testsuite_path = ensure_file_abs_path_valid(testsuite_path)
testsuite_dir, file_suffix = os.path.splitext(testsuite_path)
# demo_testsuite.yml => demo_testsuite_yml
testsuite_dir = f"{testsuite_dir}_{file_suffix.lstrip('.')}"
for testcase in testsuite["testcases"]:
# get referenced testcase content
testcase_file = testcase["testcase"]
testcase_path = __ensure_absolute(testcase_file)
testcase_dict = load_test_file(testcase_path)
testcase_dict.setdefault("config", {})
testcase_dict["config"]["path"] = testcase_path
# override testcase name
testcase_dict["config"]["name"] = testcase["name"]
# override base_url
base_url = testsuite_config.get("base_url") or testcase.get("base_url")
if base_url:
testcase_dict["config"]["base_url"] = base_url
# override verify
if "verify" in testsuite_config:
testcase_dict["config"]["verify"] = testsuite_config["verify"]
# override variables
# testsuite testcase variables > testsuite config variables
testcase_variables = convert_variables(
testcase.get("variables", {}), testcase_path
)
testcase_variables = merge_variables(testcase_variables, testsuite_variables)
# testsuite testcase variables > testcase config variables
testcase_dict["config"]["variables"] = convert_variables(
testcase_dict["config"].get("variables", {}), testcase_path
)
testcase_dict["config"]["variables"].update(testcase_variables)
# override weight
if "weight" in testcase:
testcase_dict["config"]["weight"] = testcase["weight"]
# make testcase
testcase_pytest_path = make_testcase(testcase_dict, testsuite_dir)
pytest_files_run_set.add(testcase_pytest_path)
# 生成的测试文件 缓存进入字典
def __make(tests_path: Text) -> NoReturn:
""" make testcase(s) with testcase/testsuite/folder absolute path
generated pytest file path will be cached in pytest_files_made_cache_mapping
Args:
tests_path: should be in absolute path
"""
logger.info(f"make path: {tests_path}")
test_files = []
if os.path.isdir(tests_path):
files_list = load_folder_files(tests_path)
test_files.extend(files_list)
elif os.path.isfile(tests_path):
test_files.append(tests_path)
else:
raise exceptions.TestcaseNotFound(f"Invalid tests path: {tests_path}")
for test_file in test_files:
if test_file.lower().endswith("_test.py"):
pytest_files_run_set.add(test_file)
continue
try:
test_content = load_test_file(test_file)
except (exceptions.FileNotFound, exceptions.FileFormatError) as ex:
logger.warning(f"Invalid test file: {test_file}
{type(ex).__name__}: {ex}")
continue
if not isinstance(test_content, Dict):
logger.warning(
f"Invalid test file: {test_file}
"
f"reason: test content not in dict format."
)
continue
# api in v2 format, convert to v3 testcase
if "request" in test_content and "name" in test_content:
test_content = ensure_testcase_v3_api(test_content)
if "config" not in test_content:
logger.warning(
f"Invalid testcase/testsuite file: {test_file}
"
f"reason: missing config part."
)
continue
elif not isinstance(test_content["config"], Dict):
logger.warning(
f"Invalid testcase/testsuite file: {test_file}
"
f"reason: config should be dict type, got {test_content['config']}"
)
continue
# ensure path absolute
test_content.setdefault("config", {})["path"] = test_file
# testcase
if "teststeps" in test_content:
try:
testcase_pytest_path = make_testcase(test_content)
pytest_files_run_set.add(testcase_pytest_path)
except exceptions.TestCaseFormatError as ex:
logger.warning(
f"Invalid testcase file: {test_file}
{type(ex).__name__}: {ex}"
)
continue
# testsuite
elif "testcases" in test_content:
try:
make_testsuite(test_content)
except exceptions.TestSuiteFormatError as ex:
logger.warning(
f"Invalid testsuite file: {test_file}
{type(ex).__name__}: {ex}"
)
continue
# invalid format
else:
logger.warning(
f"Invalid test file: {test_file}
"
f"reason: file content is neither testcase nor testsuite"
)
def main_make(tests_paths: List[Text]) -> List[Text]:
if not tests_paths:
return []
for tests_path in tests_paths:
tests_path = ensure_path_sep(tests_path)
if not os.path.isabs(tests_path): # 如果是个绝对路径 就返回True
tests_path = os.path.join(os.getcwd(), tests_path)
try:
__make(tests_path)
except exceptions.MyBaseError as ex:
logger.error(ex)
sys.exit(1)
# format pytest files
pytest_files_format_list = pytest_files_made_cache_mapping.keys()
# 可视化所有生成的测试py文件
format_pytest_with_black(*pytest_files_format_list)
return list(pytest_files_run_set)
def init_make_parser(subparsers):
""" make testcases: parse command line options and run commands.
add_subparsers() 方法通常不带参数地调用并返回一个特殊的动作对象
"""
parser = subparsers.add_parser(
"make", help="Convert YAML/JSON testcases to pytest cases.",
)
parser.add_argument(
"testcase_path", nargs="*", help="Specify YAML/JSON testcase file/folder path"
)
return parser