[OE-core] [PATCH] scripts/test-result-log: Store test result & log and view summary report
Yeoh Ee Peng
ee.peng.yeoh at intel.com
Thu Jul 12 01:47:42 UTC 2018
These scripts were designed to store test result & log in GIT repository
after running OEQA automated tests such as runtime or oe-selftest. It
was designed to be triggered by both CI (Autobuilder) or manually by
human through calling the test-result-log commandline.
After storing test result & log in GIT, user can view text-based summary
report, track the history of test, as well as uing GIT DIFF commmand
to perform regresssion comparison.
Signed-off-by: Yeoh Ee Peng <ee.peng.yeoh at intel.com>
---
scripts/lib/testresultlog/__init__.py | 0
.../template/test_report_full_text.txt | 33 ++
scripts/lib/testresultlog/testlogparser.py | 97 ++++++
scripts/lib/testresultlog/testresultgitstore.py | 384 +++++++++++++++++++++
scripts/lib/testresultlog/testresultupdator.py | 139 ++++++++
scripts/lib/testresultlog/testresultviewer.py | 146 ++++++++
scripts/test-result-log | 53 +++
7 files changed, 852 insertions(+)
create mode 100644 scripts/lib/testresultlog/__init__.py
create mode 100644 scripts/lib/testresultlog/template/test_report_full_text.txt
create mode 100644 scripts/lib/testresultlog/testlogparser.py
create mode 100644 scripts/lib/testresultlog/testresultgitstore.py
create mode 100644 scripts/lib/testresultlog/testresultupdator.py
create mode 100644 scripts/lib/testresultlog/testresultviewer.py
create mode 100755 scripts/test-result-log
diff --git a/scripts/lib/testresultlog/__init__.py b/scripts/lib/testresultlog/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/lib/testresultlog/template/test_report_full_text.txt b/scripts/lib/testresultlog/template/test_report_full_text.txt
new file mode 100644
index 0000000..78c49ea
--- /dev/null
+++ b/scripts/lib/testresultlog/template/test_report_full_text.txt
@@ -0,0 +1,33 @@
+==============================================================================================================
+Test Report (Count of passed, failed, skipped group by test_component, test_environment)
+==============================================================================================================
+--------------------------------------------------------------------------------------------------------------
+{{ 'test_component'.ljust(max_len_component) }} | {{ 'test_environment'.ljust(max_len_environment) }} | {{ 'passed'.ljust(10) }} | {{ 'failed'.ljust(10) }} | {{ 'skipped'.ljust(10) }}
+--------------------------------------------------------------------------------------------------------------
+{% for report in test_reports |sort(attribute='test_component_environment') %}
+{{ report.test_component.ljust(max_len_component) }} | {{ report.test_environment.ljust(max_len_environment) }} | {{ report.passed.ljust(10) }} | {{ report.failed.ljust(10) }} | {{ report.skipped.ljust(10) }}
+{% endfor %}
+--------------------------------------------------------------------------------------------------------------
+
+==============================================================================================================
+Test Report (Percent of passed, failed, skipped group by test_component, test_environment)
+==============================================================================================================
+--------------------------------------------------------------------------------------------------------------
+{{ 'test_component'.ljust(max_len_component) }} | {{ 'test_environment'.ljust(max_len_environment) }} | {{ 'passed_%'.ljust(10) }} | {{ 'failed_%'.ljust(10) }} | {{ 'skipped_%'.ljust(10) }}
+--------------------------------------------------------------------------------------------------------------
+{% for report in test_reports |sort(attribute='test_component_environment') %}
+{{ report.test_component.ljust(max_len_component) }} | {{ report.test_environment.ljust(max_len_environment) }} | {{ report.passed_percent.ljust(10) }} | {{ report.failed_percent.ljust(10) }} | {{ report.skipped_percent.ljust(10) }}
+{% endfor %}
+--------------------------------------------------------------------------------------------------------------
+
+==============================================================================================================
+Test Report (Failed test cases group by test_component, test_environment)
+==============================================================================================================
+--------------------------------------------------------------------------------------------------------------
+{% for report in test_reports |sort(attribute='test_component_environment') %}
+test_component | test_environment : {{ report.test_component }} | {{ report.test_environment }}
+{% for testcase in report.failed_testcases %}
+ {{ testcase }}
+{% endfor %}
+{% endfor %}
+--------------------------------------------------------------------------------------------------------------
\ No newline at end of file
diff --git a/scripts/lib/testresultlog/testlogparser.py b/scripts/lib/testresultlog/testlogparser.py
new file mode 100644
index 0000000..6cb9ff5
--- /dev/null
+++ b/scripts/lib/testresultlog/testlogparser.py
@@ -0,0 +1,97 @@
+import re
+
+class TestLogParser(object):
+
+ def get_test_status(self, log_file):
+ regex = ".*RESULTS - (?P<case_name>.*) - Testcase (?P<case_id>\d+): (?P<status>PASSED|FAILED|SKIPPED|ERROR)$"
+ regex_comp = re.compile(regex)
+ results = {}
+ with open(log_file, "r") as f:
+ for line in f:
+ line = line.strip()
+ m = regex_comp.search(line)
+ if m:
+ results[m.group('case_name')] = m.group('status')
+ return results
+
+ def get_failed_tests(self, log_file):
+ regex = ".*RESULTS - (?P<case_name>.*) - Testcase (?P<case_id>\d+): (?P<status>FAILED)$"
+ regex_comp = re.compile(regex)
+ results = {}
+ with open(log_file, "r") as f:
+ for line in f:
+ line = line.strip()
+ m = regex_comp.search(line)
+ if m:
+ results[m.group('case_name')] = m.group('status')
+ return results
+
+ def get_runtime_test_image_environment(self, log_file):
+ regex = "core-image.*().*Ran.*tests in .*s"
+ regex_comp = re.compile(regex)
+ image_env = ''
+ with open(log_file, "r") as f:
+ for line in f:
+ line = line.strip()
+ m = regex_comp.search(line)
+ if m:
+ image_env = line[:line.find("(")-1]
+ image_env = image_env.strip()
+ break
+ return image_env
+
+ def get_runtime_test_qemu_environment(self, log_file):
+ regex = "DEBUG: launchcmd=runqemu*"
+ regex_comp = re.compile(regex)
+ qemu_env = ''
+ with open(log_file, "r") as f:
+ for line in f:
+ line = line.strip()
+ m = regex_comp.search(line)
+ if m:
+ qemu_list = ['qemuarm', 'qemuarm64', 'qemumips', 'qemumips64', 'qemuppc', 'qemux86', 'qemux86-64']
+ for qemu in qemu_list:
+ if qemu in line:
+ qemu_env = qemu
+ break
+ return qemu_env
+
+ def _search_log_to_capture(self, logs, line, state, regex_comp_start, regex_comp_end_fail_or, regex_comp_end_error_or, regex_comp_end):
+ if state == 'Searching':
+ m = regex_comp_start.search(line)
+ if m:
+ logs.append(line)
+ return 'Found'
+ else:
+ return 'Searching'
+ elif state == 'Found':
+ m_fail = regex_comp_end_fail_or.search(line)
+ m_error = regex_comp_end_error_or.search(line)
+ m_end = regex_comp_end.search(line)
+ if m_fail or m_error or m_end:
+ return 'End'
+ else:
+ logs.append(line)
+ return 'Found'
+
+ def get_test_log(self, log_file, test_status, testcase_name, testsuite_name):
+ if test_status == 'FAILED':
+ test_status = 'FAIL'
+ regex_search_start = ".*%s: %s \(%s\).*" % (test_status, testcase_name, testsuite_name)
+ regex_search_end_fail_or = ".*FAIL: test.*"
+ regex_search_end_error_or = ".*ERROR: test.*"
+ regex_search_end = ".*Ran.*tests in .*s"
+ regex_comp_start = re.compile(regex_search_start)
+ regex_comp_end_fail_or = re.compile(regex_search_end_fail_or)
+ regex_comp_end_error_or = re.compile(regex_search_end_error_or)
+ regex_comp_end = re.compile(regex_search_end)
+ state = 'Searching'
+ logs = []
+ with open(log_file, "r") as f:
+ for line in f:
+ line = line.strip()
+ if state == 'End':
+ return logs
+ else:
+ state = self._search_log_to_capture(logs, line, state, regex_comp_start, regex_comp_end_fail_or, regex_comp_end_error_or, regex_comp_end)
+
diff --git a/scripts/lib/testresultlog/testresultgitstore.py b/scripts/lib/testresultlog/testresultgitstore.py
new file mode 100644
index 0000000..35b6c99
--- /dev/null
+++ b/scripts/lib/testresultlog/testresultgitstore.py
@@ -0,0 +1,384 @@
+import tempfile
+import os
+import pathlib
+import json
+import subprocess
+import shutil
+import scriptpath
+scriptpath.add_bitbake_lib_path()
+scriptpath.add_oe_lib_path()
+from oeqa.utils.git import GitRepo, GitError
+
+class TestResultGitStore(object):
+
+ def __init__(self):
+ self.script_path = os.path.dirname(os.path.realpath(__file__))
+ self.base_path = self.script_path + '/../../..'
+
+ def _create_temporary_workspace_dir(self):
+ return tempfile.mkdtemp(prefix='testresultlog.')
+
+ def _remove_temporary_workspace_dir(self, workspace_dir):
+ return subprocess.run(["rm", "-rf", workspace_dir])
+
+ def _get_project_environment_directory_path(self, project_dir, test_environment_list):
+ project_env_dir = project_dir
+ for env in test_environment_list:
+ project_env_dir = os.path.join(project_env_dir, env)
+ return project_env_dir
+
+ def _get_testmodule_list(self, testmodule_testsuite_dict):
+ return sorted(list(testmodule_testsuite_dict.keys()))
+
+ def _get_testcase_list(self, testsuite_list, testsuite_testcase_dict):
+ testcase_list = []
+ for testsuite in sorted(testsuite_list):
+ if testsuite in testsuite_testcase_dict:
+ for testcase in testsuite_testcase_dict[testsuite]:
+ testcase_list.append(testcase)
+ return testcase_list
+
+ def _get_testcase_status(self, testcase, testcase_status_dict):
+ if testcase in testcase_status_dict:
+ return testcase_status_dict[testcase]
+ return ""
+
+ def _create_testcase_dict(self, testcase_list, testcase_status_dict):
+ testcase_dict = {}
+ for testcase in sorted(testcase_list):
+ #testcase_key = '%s.%s' % (testsuite_name, testcase)
+ testcase_status = self._get_testcase_status(testcase, testcase_status_dict)
+ testcase_dict[testcase] = {"testresult": testcase_status,"bugs": ""}
+ #print('DEBUG: testcase_dict: %s' % testcase_dict)
+ return testcase_dict
+
+ def _create_testsuite_testcase_teststatus_json_object(self, testsuite_list, testsuite_testcase_dict, testcase_status_dict):
+ #print('DEBUG: creating testsuite testcase for testsuite list: %s' % testsuite_list)
+ json_object = {'testsuite':{}}
+ testsuite_dict = json_object['testsuite']
+ for testsuite in sorted(testsuite_list):
+ testsuite_dict[testsuite] = {'testcase': {}}
+ #print('DEBUG: testsuite: %s' % testsuite)
+ #print('DEBUG: testsuite_testcase_dict[testsuite]: %s' % testsuite_testcase_dict[testsuite])
+ testsuite_dict[testsuite]['testcase'] = self._create_testcase_dict(testsuite_testcase_dict[testsuite], testcase_status_dict)
+ return json_object
+
+ def _create_testsuite_json_formatted_string(self, testsuite_list, testsuite_testcase_dict, testcase_status_dict):
+ testsuite_testcase_list = self._create_testsuite_testcase_teststatus_json_object(testsuite_list, testsuite_testcase_dict, testcase_status_dict)
+ return json.dumps(testsuite_testcase_list, sort_keys=True, indent=4)
+
+ def _write_testsuite_testcase_json_formatted_string_to_file(self, file_path, file_content):
+ with open(file_path, 'w') as the_file:
+ the_file.write(file_content)
+
+ def _write_log_file(self, file_path, logs):
+ with open(file_path, 'w') as the_file:
+ for line in logs:
+ the_file.write(line + '\n')
+
+ def _write_test_log_files(self, file_dir, testcase_list, testcase_logs_dict):
+ for testcase in testcase_list:
+ #print('testcase : %s' % testcase)
+ if testcase in testcase_logs_dict:
+ #print('testcase: %s' % testcase)
+ #print('testcase logs: %s' % testcase_logs_dict[testcase])
+ file_path = os.path.join(file_dir, '%s.log' % testcase)
+ self._write_log_file(file_path, testcase_logs_dict[testcase])
+
+ def _copy_files_from_source_to_destination_dir(self, source_dir, destination_dir):
+ if os.path.exists(source_dir) and os.path.exists(destination_dir):
+ for item in os.listdir(source_dir):
+ s = os.path.join(source_dir, item)
+ d = os.path.join(destination_dir, item)
+ shutil.copy2(s, d)
+
+ def _create_automated_test_result_from_empty_git(self, git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict):
+ workspace_dir = self._create_temporary_workspace_dir()
+ project_dir = os.path.join(workspace_dir, project)
+ project_env_dir = self._get_project_environment_directory_path(project_dir, environment_list)
+ pathlib.Path(project_env_dir).mkdir(parents=True, exist_ok=True)
+ for testmodule in self._get_testmodule_list(testmodule_testsuite_dict):
+ testsuite_list = testmodule_testsuite_dict[testmodule]
+ module_json_structure = self._create_testsuite_json_formatted_string(testsuite_list, testsuite_testcase_dict, testcase_status_dict)
+ file_name = '%s.json' % testmodule
+ file_path = os.path.join(project_env_dir, file_name)
+ self._write_testsuite_testcase_json_formatted_string_to_file(file_path, module_json_structure)
+ testcase_list = self._get_testcase_list(testsuite_list, testsuite_testcase_dict)
+ self._write_test_log_files(project_env_dir, testcase_list, testcase_logs_dict)
+ self._push_testsuite_testcase_json_file_to_git_repo(workspace_dir, git_dir, git_branch)
+ self._remove_temporary_workspace_dir(workspace_dir)
+
+ def _create_automated_test_result_from_existing_git(self, git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict):
+ project_dir = os.path.join(git_dir, project)
+ project_env_dir = self._get_project_environment_directory_path(project_dir, environment_list)
+ pathlib.Path(project_env_dir).mkdir(parents=True, exist_ok=True)
+ for testmodule in self._get_testmodule_list(testmodule_testsuite_dict):
+ testsuite_list = testmodule_testsuite_dict[testmodule]
+ module_json_formatted_string = self._create_testsuite_json_formatted_string(testsuite_list, testsuite_testcase_dict, testcase_status_dict)
+ file_name = '%s.json' % testmodule
+ file_path = os.path.join(project_env_dir, file_name)
+ self._write_testsuite_testcase_json_formatted_string_to_file(file_path, module_json_formatted_string)
+ testcase_list = self._get_testcase_list(testsuite_list, testsuite_testcase_dict)
+ self._write_test_log_files(project_env_dir, testcase_list, testcase_logs_dict)
+ self._push_testsuite_testcase_json_file_to_git_repo(git_dir, git_dir, git_branch)
+
+ def _create_manual_test_result_from_empty_git(self, git_dir, git_branch, project, environment_list, manual_test_report_dir):
+ workspace_dir = self._create_temporary_workspace_dir()
+ project_dir = os.path.join(workspace_dir, project)
+ project_env_dir = self._get_project_environment_directory_path(project_dir, environment_list)
+ pathlib.Path(project_env_dir).mkdir(parents=True, exist_ok=True)
+ # cp files from manual_test_report_dir to project_env_dir
+ # Not yet implement
+ self._copy_files_from_source_to_destination_dir(manual_test_report_dir, project_env_dir)
+ self._push_testsuite_testcase_json_file_to_git_repo(workspace_dir, git_dir, git_branch)
+ self._remove_temporary_workspace_dir(workspace_dir)
+
+ def _create_manual_test_result_from_exiting_git(self, git_dir, git_branch, project, environment_list, manual_test_report_dir):
+ project_dir = os.path.join(git_dir, project)
+ project_env_dir = self._get_project_environment_directory_path(project_dir, environment_list)
+ if not self._check_if_git_dir_contain_project_and_environment_directory(git_dir, project, environment_list):
+ pathlib.Path(project_env_dir).mkdir(parents=True, exist_ok=True)
+ # cp files from manual_test_report_dir to existing project_env_dir
+ # Not yet implement
+ self._copy_files_from_source_to_destination_dir(manual_test_report_dir, project_env_dir)
+ self._push_testsuite_testcase_json_file_to_git_repo(git_dir, git_dir, git_branch)
+
+ def _load_test_module_file_with_json_into_dictionary(self, file):
+ if os.path.exists(file):
+ with open(file, "r") as f:
+ return json.load(f)
+ else:
+ print('Cannot find file (%s)' % file)
+ return None
+
+ def _get_testcase_log_need_removal_list(self, testcase, cur_testcase_status, next_testcase_status, testcase_log_remove_list):
+ if cur_testcase_status == 'FAILED' or cur_testcase_status == 'ERROR':
+ if next_testcase_status == 'PASSED' or next_testcase_status == 'SKIPPED':
+ testcase_log_remove_list.append(testcase)
+
+ def _update_target_testresult_dictionary_with_status(self, target_testresult_dict, testsuite_list, testsuite_testcase_dict, testcase_status_dict, testcase_log_remove_list):
+ for testsuite in testsuite_list:
+ testcase_list = testsuite_testcase_dict[testsuite]
+ for testcase in testcase_list:
+ if testcase in testcase_status_dict:
+ cur_testcase_status = target_testresult_dict['testsuite'][testsuite]['testcase'][testcase]['testresult']
+ next_testcase_status = testcase_status_dict[testcase]
+ self._get_testcase_log_need_removal_list(testcase, cur_testcase_status, next_testcase_status, testcase_log_remove_list)
+ target_testresult_dict['testsuite'][testsuite]['testcase'][testcase]['testresult'] = next_testcase_status
+
+ def _remove_test_log_files(self, file_dir, testcase_log_remove_list):
+ for testcase_log_remove in testcase_log_remove_list:
+ file_remove_path = os.path.join(file_dir, '%s.log' % testcase_log_remove)
+ if os.path.exists(file_remove_path):
+ os.remove(file_remove_path)
+
+ def _check_if_git_dir_contain_project_and_environment_directory(self, git_dir, project, environment_list):
+ project_dir = os.path.join(git_dir, project)
+ project_env_dir = self._get_project_environment_directory_path(project_dir, environment_list)
+ completed_process = subprocess.run(["ls", project_env_dir])
+ if completed_process.returncode == 0:
+ return True
+ else:
+ return False
+
+ def _git_check_if_git_dir_and_git_branch_exist(self, git_dir, git_branch):
+ completed_process = subprocess.run(["ls", '%s/.git' % git_dir])
+ if not completed_process.returncode == 0:
+ return False
+ repo = self._git_init(git_dir)
+ try:
+ repo.run_cmd('checkout %s' % git_branch)
+ return True
+ except GitError:
+ return False
+
+ def _git_init(self, git_dir):
+ try:
+ repo = GitRepo(git_dir, is_topdir=True)
+ except GitError:
+ print("Non-empty directory that is not a Git repository "
+ "at {}\nPlease specify an existing Git repository, "
+ "an empty directory or a non-existing directory "
+ "path.".format(git_dir))
+ return repo
+
+ def _git_checkout_git_repo(self, repo, git_branch):
+ repo.run_cmd('checkout %s' % git_branch)
+
+ def _git_checkout_git_repo(self, repo, git_branch):
+ repo.run_cmd('checkout %s' % git_branch)
+
+ def _git_check_if_local_repo_contain_remote_origin(self, repo):
+ try:
+ repo.run_cmd('remote get-url origin')
+ return True
+ except GitError:
+ return False
+
+ def _git_check_if_local_repo_remote_origin_url_match(self, repo, git_remote):
+ try:
+ origin_url = repo.run_cmd('remote get-url origin')
+ if origin_url == git_remote:
+ return True
+ else:
+ return False
+ except GitError:
+ return False
+
+ def _git_fetch_remote_origin(self, repo):
+ print('Fetching remote origin to local repo')
+ try:
+ repo.run_cmd('fetch origin')
+ return True
+ except GitError:
+ return False
+
+ def _git_check_if_remote_origin_has_branch(self, repo, git_branch):
+ try:
+ output = repo.run_cmd('show-branch remotes/origin/%s' % git_branch)
+ print(output)
+ return True
+ except GitError:
+ return False
+
+ def _git_add_local_repo_remote_origin(self, repo, git_remote):
+ print('Adding remote origin to local repo')
+ try:
+ repo.run_cmd('remote add origin %s' % git_remote)
+ except GitError:
+ print("The remote add origin failed inside the Git repository")
+
+ def _git_remove_local_repo_remote_origin(self, repo):
+ print('Removing outdated remote origin from local repo')
+ try:
+ repo.run_cmd('remote remove origin')
+ except GitError:
+ print("The remote remove origin failed inside the Git repository")
+
+ def _git_fetch_remote_origin_branch(self, repo, git_branch):
+ print('Fetch remote origin %s' % git_branch)
+ try:
+ repo.run_cmd('fetch origin %s' % git_branch)
+ except GitError:
+ print("The fetch origin % failed inside the Git repository" % git_branch)
+
+ def _git_rebase_remote_origin(self, repo, git_branch):
+ print('Rebasing origin/%s' % git_branch)
+ try:
+ repo.run_cmd('rebase origin/%s' % git_branch)
+ except GitError:
+ print("The rebase origin/% failed inside the Git repository" % git_branch)
+
+ def _git_push_local_branch_to_remote_origin(self, repo, git_branch):
+ print('Pushing origin %s' % git_branch)
+ try:
+ repo.run_cmd('push origin %s' % git_branch)
+ except GitError:
+ print("The push origin % failed inside the Git repository" % git_branch)
+
+ def _push_testsuite_testcase_json_file_to_git_repo(self, file_dir, git_repo, git_branch):
+ return subprocess.run(["oe-git-archive", file_dir, "-g", git_repo, "-b", git_branch])
+
+ def create_automated_test_result(self, git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict):
+ print('Creating test result for environment list: %s' % environment_list)
+ if self._git_check_if_git_dir_and_git_branch_exist(git_dir, git_branch):
+ repo = self._git_init(git_dir)
+ self._git_checkout_git_repo(repo, git_branch)
+ print('Found git_dir and git_branch: %s %s' % (git_dir, git_branch))
+ print('Entering git_dir: %s' % git_dir)
+ if self._check_if_git_dir_contain_project_and_environment_directory(git_dir, project, environment_list):
+ print('Found project and environment inside git_dir: %s' % git_dir)
+ print('Since project and environment already exist, could not proceed to create.')
+ else:
+ print('Could not find project and environment inside git_dir: %s' % git_dir)
+ print('Creating project and environment inside git_dir: %s' % git_dir)
+ self._create_automated_test_result_from_existing_git(git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, {}, {})
+ else:
+ print('Could not find git_dir or git_branch: %s %s' % (git_dir, git_branch))
+ print('Creating git_dir, git_branch, project, and environment: %s' % git_dir)
+ self._create_automated_test_result_from_empty_git(git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, {}, {})
+
+ def update_automated_test_result(self, git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict):
+ print('Updating test result for environment list: %s' % environment_list)
+ repo = self._git_init(git_dir)
+ self._git_checkout_git_repo(repo, git_branch)
+ project_dir = os.path.join(git_dir, project)
+ project_env_dir = self._get_project_environment_directory_path(project_dir, environment_list)
+ testcase_log_remove_list = []
+ for testmodule in self._get_testmodule_list(testmodule_testsuite_dict):
+ testmodule_file = os.path.join(project_env_dir, '%s.json' % testmodule)
+ target_testresult_dict = self._load_test_module_file_with_json_into_dictionary(testmodule_file)
+ testsuite_list = testmodule_testsuite_dict[testmodule]
+ self._update_target_testresult_dictionary_with_status(target_testresult_dict, testsuite_list, testsuite_testcase_dict, testcase_status_dict, testcase_log_remove_list)
+ self._write_testsuite_testcase_json_formatted_string_to_file(testmodule_file, json.dumps(target_testresult_dict, sort_keys=True, indent=4))
+ testcase_list = self._get_testcase_list(testsuite_list, testsuite_testcase_dict)
+ self._write_test_log_files(project_env_dir, testcase_list, testcase_logs_dict)
+ self._remove_test_log_files(project_env_dir, testcase_log_remove_list)
+ self._push_testsuite_testcase_json_file_to_git_repo(git_dir, git_dir, git_branch)
+
+ def smart_update_automated_test_result(self, git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict):
+ print('Creating/Updating test result for environment list: %s' % environment_list)
+ if self._git_check_if_git_dir_and_git_branch_exist(git_dir, git_branch):
+ repo = self._git_init(git_dir)
+ self._git_checkout_git_repo(repo, git_branch)
+ print('Found git_dir and git_branch: %s %s' % (git_dir, git_branch))
+ print('Entering git_dir: %s' % git_dir)
+ if self._check_if_git_dir_contain_project_and_environment_directory(git_dir, project, environment_list):
+ print('Found project and environment inside git_dir: %s' % git_dir)
+ print('Updating test result')
+ self.update_automated_test_result(git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict)
+ else:
+ print('Could not find project and environment inside git_dir: %s' % git_dir)
+ print('Creating project and environment inside git_dir: %s' % git_dir)
+ self._create_automated_test_result_from_existing_git(git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict)
+ else:
+ print('Could not find git_dir or git_branch: %s %s' % (git_dir, git_branch))
+ print('Creating git_dir, git_branch, project, and environment: %s' % git_dir)
+ self._create_automated_test_result_from_empty_git(git_dir, git_branch, project, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, testcase_logs_dict)
+
+ def create_manual_test_result(self, git_dir, git_branch, project, environment_list, manual_test_report_dir):
+ print('Creating test result for environment list: %s' % environment_list)
+ if self._git_check_if_git_dir_and_git_branch_exist(git_dir, git_branch):
+ repo = self._git_init(git_dir)
+ self._git_checkout_git_repo(repo, git_branch)
+ print('Found git_dir and git_branch: %s %s' % (git_dir, git_branch))
+ print('Entering git_dir: %s' % git_dir)
+ if self._check_if_git_dir_contain_project_and_environment_directory(git_dir, project, environment_list):
+ print('Found project and environment inside git_dir: %s' % git_dir)
+ print('Since project and environment already exist, could not proceed to create.')
+ else:
+ print('Could not find project and environment inside git_dir: %s' % git_dir)
+ print('Creating project and environment inside git_dir: %s' % git_dir)
+ self._create_manual_test_result_from_exiting_git(git_dir, git_branch, project, environment_list, manual_test_report_dir)
+ else:
+ print('Could not find git_dir or git_branch: %s %s' % (git_dir, git_branch))
+ print('Creating git_dir, git_branch, project, and environment: %s' % git_dir)
+ self._create_manual_test_result_from_empty_git(git_dir, git_branch, project, environment_list, manual_test_report_dir)
+
+ def git_remote_fetch_rebase_push(self, git_dir, git_branch, git_remote):
+ print('Pushing test result to remote git ...')
+ repo = self._git_init(git_dir)
+ print('Fetching, Rebasing, Pushing to remote')
+ if self._git_check_if_local_repo_contain_remote_origin(repo):
+ if not self._git_check_if_local_repo_remote_origin_url_match(repo, git_remote):
+ self._git_remove_local_repo_remote_origin(repo)
+ self._git_add_local_repo_remote_origin(repo, git_remote)
+ else:
+ self._git_add_local_repo_remote_origin(repo, git_remote)
+ if self._git_fetch_remote_origin(repo):
+ if self._git_check_if_remote_origin_has_branch(repo, git_branch):
+ self._git_fetch_remote_origin_branch(repo, git_branch)
+ self._git_rebase_remote_origin(repo, git_branch)
+ self._git_push_local_branch_to_remote_origin(repo, git_branch)
+ else:
+ print('Git fetch origin failed. Stop proceeding to git push.')
+
+ def checkout_git_branch(self, git_dir, git_branch):
+ print('Checkout git branch ...')
+ if self._git_check_if_git_dir_and_git_branch_exist(git_dir, git_branch):
+ repo = self._git_init(git_dir)
+ self._git_checkout_git_repo(repo, git_branch)
+ return True
+ else:
+ print('Could not find git_dir or git_branch: %s %s' % (git_dir, git_branch))
+ return False
diff --git a/scripts/lib/testresultlog/testresultupdator.py b/scripts/lib/testresultlog/testresultupdator.py
new file mode 100644
index 0000000..17e8df3
--- /dev/null
+++ b/scripts/lib/testresultlog/testresultupdator.py
@@ -0,0 +1,139 @@
+import unittest
+from testresultlog.testresultgitstore import TestResultGitStore
+from testresultlog.testlogparser import TestLogParser
+
+class TestResultUpdator(object):
+
+ def _get_testsuite_from_testcase(self, testcase):
+ testsuite = testcase[0:testcase.rfind(".")]
+ return testsuite
+
+ def _get_testmodule_from_testsuite(self, testsuite):
+ testmodule = testsuite[0:testsuite.find(".")]
+ return testmodule
+
+ def _remove_testsuite_from_testcase(self, testcase, testsuite):
+ testsuite = testsuite + '.'
+ testcase_remove_testsuite = testcase.replace(testsuite, '')
+ return testcase_remove_testsuite
+
+ def _discover_unittest_testsuite_testcase(self, test_dir):
+ loader = unittest.TestLoader()
+ testsuite_testcase = loader.discover(start_dir=test_dir, pattern='*.py')
+ return testsuite_testcase
+
+ def _generate_flat_list_of_unittest_testcase(self, testsuite):
+ for test in testsuite:
+ if unittest.suite._isnotsuite(test):
+ yield test
+ else:
+ for subtest in self._generate_flat_list_of_unittest_testcase(test):
+ yield subtest
+
+ def _get_testsuite_from_unittest_testcase(self, unittest_testcase):
+ testsuite = unittest_testcase[unittest_testcase.find("(")+1:unittest_testcase.find(")")]
+ return testsuite
+
+ def _get_testcase_from_unittest_testcase(self, unittest_testcase):
+ testcase = unittest_testcase[0:unittest_testcase.find("(")-1]
+ testsuite = self._get_testsuite_from_unittest_testcase(unittest_testcase)
+ testcase = '%s.%s' % (testsuite, testcase)
+ return testcase
+
+ def _get_testmodule_from_testsuite(self, testsuite):
+ testmodule = testsuite[0:testsuite.find(".")]
+ return testmodule
+
+ def _add_new_environment_to_environment_list(self, environment_list, new_environment):
+ if len(new_environment) > 0 and new_environment not in environment_list:
+ if len(environment_list) == 0:
+ environment_list = new_environment
+ else:
+ environment_list = '%s,%s' % (environment_list, new_environment)
+ return environment_list
+
+ def get_environment_list_for_test_log(self, log_file, log_file_source, environment_list, testlogparser):
+ print('Getting test environment information from test log at %s' % log_file)
+ if log_file_source == 'runtime':
+ runtime_image_env = testlogparser.get_runtime_test_image_environment(log_file)
+ print('runtime image environment: %s' % runtime_image_env)
+ runtime_qemu_env = testlogparser.get_runtime_test_qemu_environment(log_file)
+ print('runtime qemu environment: %s' % runtime_qemu_env)
+ environment_list = self._add_new_environment_to_environment_list(environment_list, runtime_image_env)
+ environment_list = self._add_new_environment_to_environment_list(environment_list, runtime_qemu_env)
+ return environment_list.split(",")
+
+ def get_testsuite_testcase_dictionary(self, work_dir):
+ print('Getting testsuite testcase information from oeqa directory at %s' % work_dir)
+ unittest_testsuite_testcase = self._discover_unittest_testsuite_testcase(work_dir)
+ unittest_testcase_list = self._generate_flat_list_of_unittest_testcase(unittest_testsuite_testcase)
+ testsuite_testcase_dict = {}
+ for unittest_testcase in unittest_testcase_list:
+ testsuite = self._get_testsuite_from_unittest_testcase(str(unittest_testcase))
+ testcase = self._get_testcase_from_unittest_testcase(str(unittest_testcase))
+ if testsuite in testsuite_testcase_dict:
+ testsuite_testcase_dict[testsuite].append(testcase)
+ else:
+ testsuite_testcase_dict[testsuite] = [testcase]
+ return testsuite_testcase_dict
+
+ def get_testmodule_testsuite_dictionary(self, testsuite_testcase_dict):
+ print('Getting testmodule testsuite information')
+ testsuite_list = testsuite_testcase_dict.keys()
+ testmodule_testsuite_dict = {}
+ for testsuite in testsuite_list:
+ testmodule = self._get_testmodule_from_testsuite(testsuite)
+ if testmodule in testmodule_testsuite_dict:
+ testmodule_testsuite_dict[testmodule].append(testsuite)
+ else:
+ testmodule_testsuite_dict[testmodule] = [testsuite]
+ return testmodule_testsuite_dict
+
+ def get_testcase_failed_or_error_logs_dictionary(self, log_file, testcase_status_dict):
+ print('Getting testcase failed or error log from %s' % log_file)
+ testlogparser = TestLogParser()
+ testcase_list = testcase_status_dict.keys()
+ testcase_failed_or_error_logs_dict = {}
+ for testcase in testcase_list:
+ test_status = testcase_status_dict[testcase]
+ if test_status == 'FAILED' or test_status == 'ERROR':
+ testsuite = self._get_testsuite_from_testcase(testcase)
+ testfunction = self._remove_testsuite_from_testcase(testcase, testsuite)
+ logs = testlogparser.get_test_log(log_file, test_status, testfunction, testsuite)
+ testcase_failed_or_error_logs_dict[testcase] = logs
+ return testcase_failed_or_error_logs_dict
+
+def main(args):
+ testlogparser = TestLogParser()
+ testcase_status_dict = testlogparser.get_test_status(args.log_file)
+
+ testresultupdator = TestResultUpdator()
+ environment_list = testresultupdator.get_environment_list_for_test_log(args.log_file, args.source, args.environment_list, testlogparser)
+ testsuite_testcase_dict = testresultupdator.get_testsuite_testcase_dictionary(args.oeqa_dir)
+ testmodule_testsuite_dict = testresultupdator.get_testmodule_testsuite_dictionary(testsuite_testcase_dict)
+ test_logs_dict = testresultupdator.get_testcase_failed_or_error_logs_dictionary(args.log_file, testcase_status_dict)
+
+ testresultstore = TestResultGitStore()
+ testresultstore.smart_update_automated_test_result(args.git_repo, args.git_branch, args.component, environment_list, testmodule_testsuite_dict, testsuite_testcase_dict, testcase_status_dict, test_logs_dict)
+ if (len(args.git_remote) > 0):
+ testresultstore.git_remote_fetch_rebase_push(args.git_repo, args.git_branch, args.git_remote)
+
+def register_commands(subparsers):
+ """Register subcommands from this plugin"""
+ parser_build = subparsers.add_parser('update', help='Store test status & log into git repository',
+ description='Store test status & log into git repository')
+ parser_build.set_defaults(func=main)
+ parser_build.add_argument('-l', '--log_file', required=True, help='Full path to the test log file to be used for test result update')
+ parser_build.add_argument('-g', '--git_repo', required=False, default='default', help='(Optional) Git repository to be updated ,default will be <top_dir>/test-result-log-git')
+ parser_build.add_argument('-b', '--git_branch', required=True, help='Git branch to be updated with test result')
+ parser_build.add_argument('-r', '--git_remote', required=False, default='', help='(Optional) Git remote repository to be updated')
+ SOURCE = ('runtime', 'selftest', 'sdk', 'sdkext')
+ parser_build.add_argument('-s', '--source', required=True, choices=SOURCE,
+ help='Testcase source to be selected from the list (runtime, selftest, sdk or sdkext). '
+ '"runtime" will search testcase available in meta/lib/oeqa/runtime/cases. '
+ '"selftest" will search testcase available in meta/lib/oeqa/selftest/cases. '
+ '"sdk" will search testcase available in meta/lib/oeqa/sdk/cases. '
+ '"sdkext" will search testcase available in meta/lib/oeqa/sdkext/cases. ')
+ parser_build.add_argument('-d', '--poky_dir', required=False, default='default', help='(Optional) Poky directory to be used for oeqa testcase(s) discovery, default will use current poky directory')
+ parser_build.add_argument('-c', '--component', required=True, help='Component selected (as the top folder) to store the related test environments')
+ parser_build.add_argument('-e', '--environment_list', required=False, default='', help='List of environment to be used to perform update')
\ No newline at end of file
diff --git a/scripts/lib/testresultlog/testresultviewer.py b/scripts/lib/testresultlog/testresultviewer.py
new file mode 100644
index 0000000..d83df0e
--- /dev/null
+++ b/scripts/lib/testresultlog/testresultviewer.py
@@ -0,0 +1,146 @@
+import glob
+import os
+import json
+from jinja2 import Environment, FileSystemLoader
+from testresultlog.testresultgitstore import TestResultGitStore
+
+class TestResultViewer(object):
+
+ def _check_if_existing_dir_list_contain_parent_for_new_dir(self, dir_list, new_dir):
+ for existing_dir in dir_list:
+ if existing_dir in new_dir:
+ return True
+ return False
+
+ def _replace_existing_parent_dir_with_new_dir(self, dir_list, new_dir):
+ return [new_dir if dir in new_dir else dir for dir in dir_list]
+
+ def _get_test_report_directory_list(self, git_dir):
+ exclude = ['.git']
+ report_dir_list = []
+ for root, dirs, files in os.walk(git_dir, topdown=True):
+ for dir in dirs:
+ [dirs.remove(d) for d in list(dirs) if d in exclude]
+
+ for dir in dirs:
+ dirname = os.path.join(root, dir)
+ if self._check_if_existing_dir_list_contain_parent_for_new_dir(report_dir_list, dirname):
+ report_dir_list = self._replace_existing_parent_dir_with_new_dir(report_dir_list, dirname)
+ else:
+ report_dir_list.append(dirname)
+ return report_dir_list
+
+ def _get_list_of_test_result_files(self, report_dir):
+ path_pattern = os.path.join(report_dir, '*.json')
+ return glob.glob(path_pattern)
+
+ def _load_test_module_file_with_json_into_dictionary(self, file):
+ with open(file, "r") as f:
+ return json.load(f)
+
+ def _get_test_result_and_failed_error_testcase(self, test_result_dict):
+ count_passed = 0
+ count_failed = 0
+ count_skipped = 0
+ test_suites_dict = test_result_dict['testsuite']
+ test_suites_list = test_suites_dict.keys()
+ for suite in test_suites_list:
+ test_cases_dict = test_suites_dict[suite]['testcase']
+ test_cases_list = test_cases_dict.keys()
+ failed_error_test_case_list = []
+ for test_case in test_cases_list:
+ test_status = test_cases_dict[test_case]['testresult']
+ if test_status == 'FAILED' or test_status == 'ERROR':
+ failed_error_test_case_list.append(test_case)
+ count_failed += 1
+ elif test_status == 'PASSED':
+ count_passed += 1
+ elif test_status == 'SKIPPED':
+ count_skipped += 1
+ return count_passed, count_failed, count_skipped, failed_error_test_case_list
+
+ def _compute_test_result_percent_indicator(self, test_result):
+ total_tested = test_result['passed'] + test_result['failed'] + test_result['skipped']
+ test_result['passed_percent'] = 0
+ test_result['failed_percent'] = 0
+ test_result['skipped_percent'] = 0
+ if total_tested > 0:
+ test_result['passed_percent'] = format(test_result['passed']/total_tested * 100, '.2f')
+ test_result['failed_percent'] = format(test_result['failed']/total_tested * 100, '.2f')
+ test_result['skipped_percent'] = format(test_result['skipped']/total_tested * 100, '.2f')
+
+ def _convert_test_result_value_to_string(self, test_result):
+ test_result['passed_percent'] = str(test_result['passed_percent'])
+ test_result['failed_percent'] = str(test_result['failed_percent'])
+ test_result['skipped_percent'] = str(test_result['skipped_percent'])
+ test_result['passed'] = str(test_result['passed'])
+ test_result['failed'] = str(test_result['failed'])
+ test_result['skipped'] = str(test_result['skipped'])
+
+ def _get_max_string_len_from_test_result_list(self, test_result_list, key, default_max_len):
+ max_len = default_max_len
+ for test_result in test_result_list:
+ value_len = len(test_result[key])
+ if value_len > max_len:
+ max_len = value_len
+ return max_len
+
+ def _compile_test_result_for_test_report_directory(self, report_dir):
+ test_result_files = self._get_list_of_test_result_files(report_dir)
+ test_result = {'passed':0, 'failed':0, 'skipped':0, 'failed_testcases':[]}
+ #total_failed_error_test_case_list = []
+ for file in test_result_files:
+ test_result_dict = self._load_test_module_file_with_json_into_dictionary(file)
+ count_passed, count_failed, count_skipped, failed_error_test_case_list = self._get_test_result_and_failed_error_testcase(test_result_dict)
+ test_result['passed'] += count_passed
+ test_result['failed'] += count_failed
+ test_result['skipped'] += count_skipped
+ test_result['failed_testcases'] += failed_error_test_case_list
+ #total_failed_error_test_case_list = total_failed_error_test_case_list + failed_error_test_case_list
+ self._compute_test_result_percent_indicator(test_result)
+ self._convert_test_result_value_to_string(test_result)
+ return test_result
+
+ def _get_test_component_environment_from_test_report_dir(self, git_repo, report_dir):
+ test_component_environment = report_dir.replace(git_repo + '/', '')
+ test_component = test_component_environment[:test_component_environment.find("/")]
+ test_environment = test_component_environment.replace(test_component + '/', '')
+ return test_component, test_environment, test_component_environment
+
+ def _rendering_text_based_test_report(self, test_result_list, max_len_component, max_len_environment):
+ script_path = os.path.dirname(os.path.realpath(__file__))
+ file_loader = FileSystemLoader(script_path + '/template')
+ env = Environment(loader=file_loader, trim_blocks=True)
+ template = env.get_template('test_report_full_text.txt')
+ output = template.render(test_reports=test_result_list, max_len_component=max_len_component, max_len_environment=max_len_environment)
+ print('Printing text-based test report:')
+ print(output)
+
+ def create_text_based_test_report(self, git_repo):
+ report_dir_list = self._get_test_report_directory_list(git_repo)
+ test_result_list = []
+ for report_dir in report_dir_list:
+ print('Compiling test result for %s:' % report_dir)
+ test_result = self._compile_test_result_for_test_report_directory(report_dir)
+ test_component, test_environment, test_component_environment = self._get_test_component_environment_from_test_report_dir(git_repo, report_dir)
+ test_result['test_component'] = test_component
+ test_result['test_environment'] = test_environment
+ test_result['test_component_environment'] = test_component_environment
+ test_result_list.append(test_result)
+ max_len_component = self._get_max_string_len_from_test_result_list(test_result_list, 'test_component', len('test_component'))
+ max_len_environment = self._get_max_string_len_from_test_result_list(test_result_list, 'test_environment', len('test_environment'))
+ self._rendering_text_based_test_report(test_result_list, max_len_component, max_len_environment)
+
+def main(args):
+ testresultstore = TestResultGitStore()
+ if testresultstore.checkout_git_branch(args.git_repo, args.git_branch):
+ testresultviewer = TestResultViewer()
+ testresultviewer.create_text_based_test_report(args.git_repo)
+
+def register_commands(subparsers):
+ """Register subcommands from this plugin"""
+ parser_build = subparsers.add_parser('view', help='View text-based summary test report',
+ description='View text-based summary test report')
+ parser_build.set_defaults(func=main)
+ parser_build.add_argument('-g', '--git_repo', required=False, default='default', help='(Optional) Git repository to be used for generating the summary test result ,default will be <top_dir>/test-result-log.git')
+ parser_build.add_argument('-b', '--git_branch', required=True, help='Git branch to be updated with test result')
diff --git a/scripts/test-result-log b/scripts/test-result-log
new file mode 100755
index 0000000..8c0c0eb
--- /dev/null
+++ b/scripts/test-result-log
@@ -0,0 +1,53 @@
+#!/usr/bin/python3
+#
+# Helper script for creating test result & log data then storage it to git
+# repository. Also provide text-based summary report for test result stored.
+#
+import os
+import sys
+import argparse
+script_path = os.path.dirname(os.path.realpath(__file__))
+lib_path = script_path + '/lib'
+sys.path = sys.path + [lib_path]
+import testresultlog.testresultupdator
+import testresultlog.testresultviewer
+
+def _get_default_git_dir(git_dir):
+ base_path = script_path + '/..'
+ if git_dir == 'default':
+ git_dir = os.path.join(base_path, 'test-result-log.git')
+ return git_dir
+
+def _get_oeqa_source_dir(poky_dir, source):
+ if source == 'runtime':
+ oeqa_dir = os.path.join(poky_dir, 'meta/lib/oeqa/runtime/cases')
+ elif source == 'selftest':
+ oeqa_dir = os.path.join(poky_dir, 'meta/lib/oeqa/selftest/cases')
+ elif source == 'sdk':
+ oeqa_dir = os.path.join(poky_dir, 'meta/lib/oeqa/sdk/cases')
+ else:
+ oeqa_dir = os.path.join(poky_dir, 'meta/lib/oeqa/sdkext/cases')
+ return oeqa_dir
+
+def _get_poky_dir(poky_dir):
+ base_path = script_path + '/..'
+ if poky_dir == 'default':
+ poky_dir = base_path
+ return poky_dir
+
+def main():
+ parser = argparse.ArgumentParser(description="test-result-log script to store test status and failure log.")
+ subparser = parser.add_subparsers()
+ testresultlog.testresultupdator.register_commands(subparser)
+ testresultlog.testresultviewer.register_commands(subparser)
+ args = parser.parse_args()
+ args.script_path = script_path
+ args.git_repo = _get_default_git_dir(args.git_repo)
+ poky_dir = getattr(args,"poky_dir",None)
+ if poky_dir:
+ args.poky_dir = _get_poky_dir(args.poky_dir)
+ args.oeqa_dir = _get_oeqa_source_dir(args.poky_dir, args.source)
+ args.func(args)
+
+if __name__ == "__main__":
+ sys.exit(main())
--
2.7.4
More information about the Openembedded-core
mailing list