[OE-core] [PATCHv2 07/29] oeqa/core/threaded: Add support to run into a thread at end of execution

Aníbal Limón anibal.limon at linux.intel.com
Wed Jul 12 19:36:53 UTC 2017


Some test cases aren't allowed to run into a multi-thread environment so
add the posibility to run those tests at end of execution.

Signed-off-by: Aníbal Limón <anibal.limon at linux.intel.com>
---
 meta/lib/oeqa/core/threaded.py | 102 +++++++++++++++++++++++++++++++----------
 1 file changed, 78 insertions(+), 24 deletions(-)

diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
index 34217f1a8b8..a7dc0aed401 100644
--- a/meta/lib/oeqa/core/threaded.py
+++ b/meta/lib/oeqa/core/threaded.py
@@ -30,18 +30,35 @@ class OETestLoaderThreaded(OETestLoader):
         suites = {}
         suites['main'] = self.suiteClass()
         suites['pool'] = []
+        suites['end'] = self.suiteClass()
         for _ in range(self.process_num - 1):
             suites['pool'].append(self.suiteClass())
 
+        def _add_by_module_or_dep(suite, case, depends):
+            """
+                A test case that needs to run into the same thread
+                because is on the same module or for dependency
+                reasons.
+            """
+
+            for c in suite._tests:
+                if case.__module__ == c.__module__:
+                    suite.addTest(case)
+                    return True
+
+            if case.id() in depends:
+                case_depends = depends[case.id()]
+                for c in suite._tests:
+                    if c.id() in case_depends:
+                        suite.addTest(case)
+                        return True
+
+            return False
+
         def _add_to_main_thread(main_suite, case, depends):
             """
                 Some test cases needs to be run into the main
-                thread for several resons.
-
-                A test case that needs to run in the main thread
-                can be for specific set via test class _main_thread
-                attr or because is on the same module or for a dependency
-                reason.
+                thread by request.
             """
 
             if hasattr(case.__class__, '_main_thread') and \
@@ -50,19 +67,20 @@ class OETestLoaderThreaded(OETestLoader):
                 main_suite.addTest(case)
                 return True
 
-            for c in main_suite._tests:
-                if case.__module__ == c.__module__:
-                    main_suite.addTest(case)
-                    return True
+            return _add_by_module_or_dep(main_suite, case, depends)
 
-            if case.id() in depends:
-                case_depends = depends[case.id()]
-                for c in main_suite._tests:
-                    if c.id() in case_depends:
-                        main_suite.addTest(case)
-                        return True
+        def _add_to_end_thread(end_suite, case, depends):
+            """
+                Some test cases needs to be run into at end of
+                execution into the main by request.
+            """
+            if hasattr(case.__class__, '_end_thread') and \
+                    case.__class__._end_thread or \
+                    self.process_num == 1:
+                end_suite.addTest(case)
+                return True
 
-            return False
+            return _add_by_module_or_dep(end_suite, case, depends)
 
         def _search_for_module_idx(suites, case):
             """
@@ -112,6 +130,9 @@ class OETestLoaderThreaded(OETestLoader):
                     if 'depends' in self.tc._registry:
                         depends = self.tc._registry['depends']
 
+                    if _add_to_end_thread(suites['end'], case, depends):
+                        continue
+
                     if _add_to_main_thread(suites['main'], case, depends):
                         continue
 
@@ -135,7 +156,7 @@ class OETestLoaderThreaded(OETestLoader):
 
         # if the main suite doesn't have test cases
         # use the first element of the suites pool
-        if not len(suites['main']._tests):
+        if not len(suites['main']._tests) and len(suites['pool']):
             suites['main'] = suites['pool'].pop(0)
 
         return suites
@@ -268,6 +289,12 @@ class _ThreadedPool:
         self.tasks = queue.Queue(num_tasks)
         self.workers = []
 
+        self.stream = stream
+        self.result = result
+
+        self.end_task = None
+        self.end_worker = None
+
         for _ in range(num_workers):
             worker = _Worker(self.tasks, result, stream)
             self.workers.append(worker)
@@ -280,12 +307,25 @@ class _ThreadedPool:
         """Add a task to the queue"""
         self.tasks.put((func, args, kargs))
 
+    def add_end_task(self, func, *args, **kwargs):
+        """Add a task to be executed at end"""
+
+        self.end_task = queue.Queue(1)
+        self.end_task.put((func, args, kwargs))
+        self.end_worker = _Worker(self.end_task, self.result,
+                self.stream)
+
     def wait_completion(self):
         """Wait for completion of all the tasks in the queue"""
         self.tasks.join()
         for worker in self.workers:
             worker.join()
 
+        if self.end_task:
+            self.end_worker.start()
+            self.end_task.join()
+            self.end_worker.join()
+
 class OETestRunnerThreaded(OETestRunner):
     streamLoggerClass = OEStreamLoggerThreaded
 
@@ -293,32 +333,46 @@ class OETestRunnerThreaded(OETestRunner):
         super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
         self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
 
+    def _run_main_thread(self, suite, result):
+        if len(suite._tests):
+            run_start_time = time.time()
+            rc = super(OETestRunnerThreaded, self).run(suite)
+            run_end_time = time.time()
+            result.addResult(rc, run_start_time, run_end_time)
+            self.stream.finish()
+
     def run(self, suites):
         result = OETestResultThreaded(self.tc)
 
         pool = None
+
         if suites['pool']:
             thread_no = len(suites['pool'])
             pool = _ThreadedPool(thread_no, thread_no, stream=self.stream,
                     result=result)
             for s in suites['pool']:
                 pool.add_task(super(OETestRunnerThreaded, self).run, s)
-            pool.start()
 
-        run_start_time = time.time()
-        rc = super(OETestRunnerThreaded, self).run(suites['main'])
-        run_end_time = time.time()
-        result.addResult(rc, run_start_time, run_end_time)
-        self.stream.finish()
+        if len(suites['end']._tests):
+            if not pool:
+                pool = _ThreadedPool(0, 0, stream=self.stream,
+                            result=result)
+            pool.add_end_task(super(OETestRunnerThreaded, self).run,
+                    suites['end'])
 
         if pool:
+            pool.start()
+        self._run_main_thread(suites['main'], result)
+        if pool:
             pool.wait_completion()
+
         result._fill_tc_results()
 
         return result
 
     def list_tests(self, suite, display_type):
         suite['pool'].insert(0, suite['main'])
+        suite['pool'].append(suite['end'])
 
         return super(OETestRunnerThreaded, self).list_tests(
                 suite['pool'], display_type)
-- 
2.11.0




More information about the Openembedded-core mailing list