[OE-core] [PATCH 07/30] oeqa/core/threaded: Add support to run into a thread at end of execution
Leonardo Sandoval
leonardo.sandoval.gonzalez at linux.intel.com
Wed Jul 12 14:22:26 UTC 2017
On Tue, 2017-07-11 at 15:23 -0500, Aníbal Limón wrote:
> Some test cases aren't allowed to run into a multi-thread environment so
> add the posibility to run those tests at end of execution.
>
Hi Anibal,
which is the reason these need to run at the end?
> Signed-off-by: Aníbal Limón <anibal.limon at linux.intel.com>
> ---
> meta/lib/oeqa/core/threaded.py | 102 +++++++++++++++++++++++++++++++----------
> 1 file changed, 78 insertions(+), 24 deletions(-)
>
> diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
> index 34217f1a8b8..a7dc0aed401 100644
> --- a/meta/lib/oeqa/core/threaded.py
> +++ b/meta/lib/oeqa/core/threaded.py
> @@ -30,18 +30,35 @@ class OETestLoaderThreaded(OETestLoader):
> suites = {}
> suites['main'] = self.suiteClass()
> suites['pool'] = []
> + suites['end'] = self.suiteClass()
> for _ in range(self.process_num - 1):
> suites['pool'].append(self.suiteClass())
>
> + def _add_by_module_or_dep(suite, case, depends):
> + """
> + A test case that needs to run into the same thread
> + because is on the same module or for dependency
> + reasons.
> + """
> +
> + for c in suite._tests:
> + if case.__module__ == c.__module__:
> + suite.addTest(case)
> + return True
> +
> + if case.id() in depends:
> + case_depends = depends[case.id()]
> + for c in suite._tests:
> + if c.id() in case_depends:
> + suite.addTest(case)
> + return True
> +
> + return False
> +
> def _add_to_main_thread(main_suite, case, depends):
> """
> Some test cases needs to be run into the main
> - thread for several resons.
> -
> - A test case that needs to run in the main thread
> - can be for specific set via test class _main_thread
> - attr or because is on the same module or for a dependency
> - reason.
> + thread by request.
> """
>
> if hasattr(case.__class__, '_main_thread') and \
> @@ -50,19 +67,20 @@ class OETestLoaderThreaded(OETestLoader):
> main_suite.addTest(case)
> return True
>
> - for c in main_suite._tests:
> - if case.__module__ == c.__module__:
> - main_suite.addTest(case)
> - return True
> + return _add_by_module_or_dep(main_suite, case, depends)
>
> - if case.id() in depends:
> - case_depends = depends[case.id()]
> - for c in main_suite._tests:
> - if c.id() in case_depends:
> - main_suite.addTest(case)
> - return True
> + def _add_to_end_thread(end_suite, case, depends):
> + """
> + Some test cases needs to be run into at end of
> + execution into the main by request.
> + """
> + if hasattr(case.__class__, '_end_thread') and \
> + case.__class__._end_thread or \
> + self.process_num == 1:
> + end_suite.addTest(case)
> + return True
>
> - return False
> + return _add_by_module_or_dep(end_suite, case, depends)
>
> def _search_for_module_idx(suites, case):
> """
> @@ -112,6 +130,9 @@ class OETestLoaderThreaded(OETestLoader):
> if 'depends' in self.tc._registry:
> depends = self.tc._registry['depends']
>
> + if _add_to_end_thread(suites['end'], case, depends):
> + continue
> +
> if _add_to_main_thread(suites['main'], case, depends):
> continue
>
> @@ -135,7 +156,7 @@ class OETestLoaderThreaded(OETestLoader):
>
> # if the main suite doesn't have test cases
> # use the first element of the suites pool
> - if not len(suites['main']._tests):
> + if not len(suites['main']._tests) and len(suites['pool']):
> suites['main'] = suites['pool'].pop(0)
>
> return suites
> @@ -268,6 +289,12 @@ class _ThreadedPool:
> self.tasks = queue.Queue(num_tasks)
> self.workers = []
>
> + self.stream = stream
> + self.result = result
> +
> + self.end_task = None
> + self.end_worker = None
> +
> for _ in range(num_workers):
> worker = _Worker(self.tasks, result, stream)
> self.workers.append(worker)
> @@ -280,12 +307,25 @@ class _ThreadedPool:
> """Add a task to the queue"""
> self.tasks.put((func, args, kargs))
>
> + def add_end_task(self, func, *args, **kwargs):
> + """Add a task to be executed at end"""
> +
> + self.end_task = queue.Queue(1)
> + self.end_task.put((func, args, kwargs))
> + self.end_worker = _Worker(self.end_task, self.result,
> + self.stream)
> +
> def wait_completion(self):
> """Wait for completion of all the tasks in the queue"""
> self.tasks.join()
> for worker in self.workers:
> worker.join()
>
> + if self.end_task:
> + self.end_worker.start()
> + self.end_task.join()
> + self.end_worker.join()
> +
> class OETestRunnerThreaded(OETestRunner):
> streamLoggerClass = OEStreamLoggerThreaded
>
> @@ -293,32 +333,46 @@ class OETestRunnerThreaded(OETestRunner):
> super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
> self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
>
> + def _run_main_thread(self, suite, result):
> + if len(suite._tests):
> + run_start_time = time.time()
> + rc = super(OETestRunnerThreaded, self).run(suite)
> + run_end_time = time.time()
> + result.addResult(rc, run_start_time, run_end_time)
> + self.stream.finish()
> +
> def run(self, suites):
> result = OETestResultThreaded(self.tc)
>
> pool = None
> +
> if suites['pool']:
> thread_no = len(suites['pool'])
> pool = _ThreadedPool(thread_no, thread_no, stream=self.stream,
> result=result)
> for s in suites['pool']:
> pool.add_task(super(OETestRunnerThreaded, self).run, s)
> - pool.start()
>
> - run_start_time = time.time()
> - rc = super(OETestRunnerThreaded, self).run(suites['main'])
> - run_end_time = time.time()
> - result.addResult(rc, run_start_time, run_end_time)
> - self.stream.finish()
> + if len(suites['end']._tests):
> + if not pool:
> + pool = _ThreadedPool(0, 0, stream=self.stream,
> + result=result)
> + pool.add_end_task(super(OETestRunnerThreaded, self).run,
> + suites['end'])
>
> if pool:
> + pool.start()
> + self._run_main_thread(suites['main'], result)
> + if pool:
> pool.wait_completion()
> +
> result._fill_tc_results()
>
> return result
>
> def list_tests(self, suite, display_type):
> suite['pool'].insert(0, suite['main'])
> + suite['pool'].append(suite['end'])
>
> return super(OETestRunnerThreaded, self).list_tests(
> suite['pool'], display_type)
More information about the Openembedded-core
mailing list