[OE-core] [PATCHv2 07/29] oeqa/core/threaded: Add support to run into a thread at end of execution
Aníbal Limón
anibal.limon at linux.intel.com
Wed Jul 12 19:36:53 UTC 2017
Some test cases aren't allowed to run into a multi-thread environment so
add the posibility to run those tests at end of execution.
Signed-off-by: Aníbal Limón <anibal.limon at linux.intel.com>
---
meta/lib/oeqa/core/threaded.py | 102 +++++++++++++++++++++++++++++++----------
1 file changed, 78 insertions(+), 24 deletions(-)
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
index 34217f1a8b8..a7dc0aed401 100644
--- a/meta/lib/oeqa/core/threaded.py
+++ b/meta/lib/oeqa/core/threaded.py
@@ -30,18 +30,35 @@ class OETestLoaderThreaded(OETestLoader):
suites = {}
suites['main'] = self.suiteClass()
suites['pool'] = []
+ suites['end'] = self.suiteClass()
for _ in range(self.process_num - 1):
suites['pool'].append(self.suiteClass())
+ def _add_by_module_or_dep(suite, case, depends):
+ """
+ A test case that needs to run into the same thread
+ because is on the same module or for dependency
+ reasons.
+ """
+
+ for c in suite._tests:
+ if case.__module__ == c.__module__:
+ suite.addTest(case)
+ return True
+
+ if case.id() in depends:
+ case_depends = depends[case.id()]
+ for c in suite._tests:
+ if c.id() in case_depends:
+ suite.addTest(case)
+ return True
+
+ return False
+
def _add_to_main_thread(main_suite, case, depends):
"""
Some test cases needs to be run into the main
- thread for several resons.
-
- A test case that needs to run in the main thread
- can be for specific set via test class _main_thread
- attr or because is on the same module or for a dependency
- reason.
+ thread by request.
"""
if hasattr(case.__class__, '_main_thread') and \
@@ -50,19 +67,20 @@ class OETestLoaderThreaded(OETestLoader):
main_suite.addTest(case)
return True
- for c in main_suite._tests:
- if case.__module__ == c.__module__:
- main_suite.addTest(case)
- return True
+ return _add_by_module_or_dep(main_suite, case, depends)
- if case.id() in depends:
- case_depends = depends[case.id()]
- for c in main_suite._tests:
- if c.id() in case_depends:
- main_suite.addTest(case)
- return True
+ def _add_to_end_thread(end_suite, case, depends):
+ """
+ Some test cases needs to be run into at end of
+ execution into the main by request.
+ """
+ if hasattr(case.__class__, '_end_thread') and \
+ case.__class__._end_thread or \
+ self.process_num == 1:
+ end_suite.addTest(case)
+ return True
- return False
+ return _add_by_module_or_dep(end_suite, case, depends)
def _search_for_module_idx(suites, case):
"""
@@ -112,6 +130,9 @@ class OETestLoaderThreaded(OETestLoader):
if 'depends' in self.tc._registry:
depends = self.tc._registry['depends']
+ if _add_to_end_thread(suites['end'], case, depends):
+ continue
+
if _add_to_main_thread(suites['main'], case, depends):
continue
@@ -135,7 +156,7 @@ class OETestLoaderThreaded(OETestLoader):
# if the main suite doesn't have test cases
# use the first element of the suites pool
- if not len(suites['main']._tests):
+ if not len(suites['main']._tests) and len(suites['pool']):
suites['main'] = suites['pool'].pop(0)
return suites
@@ -268,6 +289,12 @@ class _ThreadedPool:
self.tasks = queue.Queue(num_tasks)
self.workers = []
+ self.stream = stream
+ self.result = result
+
+ self.end_task = None
+ self.end_worker = None
+
for _ in range(num_workers):
worker = _Worker(self.tasks, result, stream)
self.workers.append(worker)
@@ -280,12 +307,25 @@ class _ThreadedPool:
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
+ def add_end_task(self, func, *args, **kwargs):
+ """Add a task to be executed at end"""
+
+ self.end_task = queue.Queue(1)
+ self.end_task.put((func, args, kwargs))
+ self.end_worker = _Worker(self.end_task, self.result,
+ self.stream)
+
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
for worker in self.workers:
worker.join()
+ if self.end_task:
+ self.end_worker.start()
+ self.end_task.join()
+ self.end_worker.join()
+
class OETestRunnerThreaded(OETestRunner):
streamLoggerClass = OEStreamLoggerThreaded
@@ -293,32 +333,46 @@ class OETestRunnerThreaded(OETestRunner):
super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
+ def _run_main_thread(self, suite, result):
+ if len(suite._tests):
+ run_start_time = time.time()
+ rc = super(OETestRunnerThreaded, self).run(suite)
+ run_end_time = time.time()
+ result.addResult(rc, run_start_time, run_end_time)
+ self.stream.finish()
+
def run(self, suites):
result = OETestResultThreaded(self.tc)
pool = None
+
if suites['pool']:
thread_no = len(suites['pool'])
pool = _ThreadedPool(thread_no, thread_no, stream=self.stream,
result=result)
for s in suites['pool']:
pool.add_task(super(OETestRunnerThreaded, self).run, s)
- pool.start()
- run_start_time = time.time()
- rc = super(OETestRunnerThreaded, self).run(suites['main'])
- run_end_time = time.time()
- result.addResult(rc, run_start_time, run_end_time)
- self.stream.finish()
+ if len(suites['end']._tests):
+ if not pool:
+ pool = _ThreadedPool(0, 0, stream=self.stream,
+ result=result)
+ pool.add_end_task(super(OETestRunnerThreaded, self).run,
+ suites['end'])
if pool:
+ pool.start()
+ self._run_main_thread(suites['main'], result)
+ if pool:
pool.wait_completion()
+
result._fill_tc_results()
return result
def list_tests(self, suite, display_type):
suite['pool'].insert(0, suite['main'])
+ suite['pool'].append(suite['end'])
return super(OETestRunnerThreaded, self).list_tests(
suite['pool'], display_type)
--
2.11.0
More information about the Openembedded-core
mailing list