[OE-core] [PATCH 07/30] oeqa/core/threaded: Add support to run into a thread at end of execution
Aníbal Limón
anibal.limon at linux.intel.com
Wed Jul 12 14:54:26 UTC 2017
On 07/12/2017 09:22 AM, Leonardo Sandoval wrote:
> On Tue, 2017-07-11 at 15:23 -0500, Aníbal Limón wrote:
>> Some test cases aren't allowed to run into a multi-thread environment so
>> add the posibility to run those tests at end of execution.
>>
>
> Hi Anibal,
>
> which is the reason these need to run at the end?
>
Some devtool tests made modifications to the meta directory causing
deterministic meta-data bitbake errors in other threads, see the patch
devtool_end in this series.
Anibal
>
>
>> Signed-off-by: Aníbal Limón <anibal.limon at linux.intel.com>
>> ---
>> meta/lib/oeqa/core/threaded.py | 102 +++++++++++++++++++++++++++++++----------
>> 1 file changed, 78 insertions(+), 24 deletions(-)
>>
>> diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
>> index 34217f1a8b8..a7dc0aed401 100644
>> --- a/meta/lib/oeqa/core/threaded.py
>> +++ b/meta/lib/oeqa/core/threaded.py
>> @@ -30,18 +30,35 @@ class OETestLoaderThreaded(OETestLoader):
>> suites = {}
>> suites['main'] = self.suiteClass()
>> suites['pool'] = []
>> + suites['end'] = self.suiteClass()
>> for _ in range(self.process_num - 1):
>> suites['pool'].append(self.suiteClass())
>>
>> + def _add_by_module_or_dep(suite, case, depends):
>> + """
>> + A test case that needs to run into the same thread
>> + because is on the same module or for dependency
>> + reasons.
>> + """
>> +
>> + for c in suite._tests:
>> + if case.__module__ == c.__module__:
>> + suite.addTest(case)
>> + return True
>> +
>> + if case.id() in depends:
>> + case_depends = depends[case.id()]
>> + for c in suite._tests:
>> + if c.id() in case_depends:
>> + suite.addTest(case)
>> + return True
>> +
>> + return False
>> +
>> def _add_to_main_thread(main_suite, case, depends):
>> """
>> Some test cases needs to be run into the main
>> - thread for several resons.
>> -
>> - A test case that needs to run in the main thread
>> - can be for specific set via test class _main_thread
>> - attr or because is on the same module or for a dependency
>> - reason.
>> + thread by request.
>> """
>>
>> if hasattr(case.__class__, '_main_thread') and \
>> @@ -50,19 +67,20 @@ class OETestLoaderThreaded(OETestLoader):
>> main_suite.addTest(case)
>> return True
>>
>> - for c in main_suite._tests:
>> - if case.__module__ == c.__module__:
>> - main_suite.addTest(case)
>> - return True
>> + return _add_by_module_or_dep(main_suite, case, depends)
>>
>> - if case.id() in depends:
>> - case_depends = depends[case.id()]
>> - for c in main_suite._tests:
>> - if c.id() in case_depends:
>> - main_suite.addTest(case)
>> - return True
>> + def _add_to_end_thread(end_suite, case, depends):
>> + """
>> + Some test cases needs to be run into at end of
>> + execution into the main by request.
>> + """
>> + if hasattr(case.__class__, '_end_thread') and \
>> + case.__class__._end_thread or \
>> + self.process_num == 1:
>> + end_suite.addTest(case)
>> + return True
>>
>> - return False
>> + return _add_by_module_or_dep(end_suite, case, depends)
>>
>> def _search_for_module_idx(suites, case):
>> """
>> @@ -112,6 +130,9 @@ class OETestLoaderThreaded(OETestLoader):
>> if 'depends' in self.tc._registry:
>> depends = self.tc._registry['depends']
>>
>> + if _add_to_end_thread(suites['end'], case, depends):
>> + continue
>> +
>> if _add_to_main_thread(suites['main'], case, depends):
>> continue
>>
>> @@ -135,7 +156,7 @@ class OETestLoaderThreaded(OETestLoader):
>>
>> # if the main suite doesn't have test cases
>> # use the first element of the suites pool
>> - if not len(suites['main']._tests):
>> + if not len(suites['main']._tests) and len(suites['pool']):
>> suites['main'] = suites['pool'].pop(0)
>>
>> return suites
>> @@ -268,6 +289,12 @@ class _ThreadedPool:
>> self.tasks = queue.Queue(num_tasks)
>> self.workers = []
>>
>> + self.stream = stream
>> + self.result = result
>> +
>> + self.end_task = None
>> + self.end_worker = None
>> +
>> for _ in range(num_workers):
>> worker = _Worker(self.tasks, result, stream)
>> self.workers.append(worker)
>> @@ -280,12 +307,25 @@ class _ThreadedPool:
>> """Add a task to the queue"""
>> self.tasks.put((func, args, kargs))
>>
>> + def add_end_task(self, func, *args, **kwargs):
>> + """Add a task to be executed at end"""
>> +
>> + self.end_task = queue.Queue(1)
>> + self.end_task.put((func, args, kwargs))
>> + self.end_worker = _Worker(self.end_task, self.result,
>> + self.stream)
>> +
>> def wait_completion(self):
>> """Wait for completion of all the tasks in the queue"""
>> self.tasks.join()
>> for worker in self.workers:
>> worker.join()
>>
>> + if self.end_task:
>> + self.end_worker.start()
>> + self.end_task.join()
>> + self.end_worker.join()
>> +
>> class OETestRunnerThreaded(OETestRunner):
>> streamLoggerClass = OEStreamLoggerThreaded
>>
>> @@ -293,32 +333,46 @@ class OETestRunnerThreaded(OETestRunner):
>> super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
>> self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
>>
>> + def _run_main_thread(self, suite, result):
>> + if len(suite._tests):
>> + run_start_time = time.time()
>> + rc = super(OETestRunnerThreaded, self).run(suite)
>> + run_end_time = time.time()
>> + result.addResult(rc, run_start_time, run_end_time)
>> + self.stream.finish()
>> +
>> def run(self, suites):
>> result = OETestResultThreaded(self.tc)
>>
>> pool = None
>> +
>> if suites['pool']:
>> thread_no = len(suites['pool'])
>> pool = _ThreadedPool(thread_no, thread_no, stream=self.stream,
>> result=result)
>> for s in suites['pool']:
>> pool.add_task(super(OETestRunnerThreaded, self).run, s)
>> - pool.start()
>>
>> - run_start_time = time.time()
>> - rc = super(OETestRunnerThreaded, self).run(suites['main'])
>> - run_end_time = time.time()
>> - result.addResult(rc, run_start_time, run_end_time)
>> - self.stream.finish()
>> + if len(suites['end']._tests):
>> + if not pool:
>> + pool = _ThreadedPool(0, 0, stream=self.stream,
>> + result=result)
>> + pool.add_end_task(super(OETestRunnerThreaded, self).run,
>> + suites['end'])
>>
>> if pool:
>> + pool.start()
>> + self._run_main_thread(suites['main'], result)
>> + if pool:
>> pool.wait_completion()
>> +
>> result._fill_tc_results()
>>
>> return result
>>
>> def list_tests(self, suite, display_type):
>> suite['pool'].insert(0, suite['main'])
>> + suite['pool'].append(suite['end'])
>>
>> return super(OETestRunnerThreaded, self).list_tests(
>> suite['pool'], display_type)
>
>
More information about the Openembedded-core
mailing list