[OE-core] [PATCH 1/6] oeqa: Add selftest parallelisation support
Robert Yang
liezhi.yang at windriver.com
Thu Jul 26 06:00:05 UTC 2018
On 07/26/2018 11:03 AM, Robert Yang wrote:
> Hi RP,
>
> On 07/17/2018 12:33 AM, Richard Purdie wrote:
>> This allows oe-selftest to take a -j option which specifies how much test
>> parallelisation to use. Currently this is "module" based with each module
>> being split and run in a separate build directory. Further splitting could
>> be done but this seems a good compromise between test setup and parallelism.
>>
>> You need python-testtools and python-subunit installed to use this but only
>> when the -j option is specified.
>
> Should we add python-testtools-native and python-subunit-native, please ?
>
> And add them to TESTIMAGEDEPENDS ?
After talked with Qi, this won't work since we use host's python3. So we
need install them on host, or use buildtools-tarball.
// Robert
>
> // Robert
>
>>
>> See notes posted to the openedmbedded-architecture list for more details
>> about the design choices here.
>>
>> Some of this functionality may make more sense in the oeqa core ultimately.
>>
>> Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
>> ---
>> meta/lib/oeqa/core/context.py | 10 +-
>> meta/lib/oeqa/core/runner.py | 24 +-
>> meta/lib/oeqa/core/utils/concurrencytest.py | 254 ++++++++++++++++++++
>> meta/lib/oeqa/selftest/context.py | 8 +-
>> 4 files changed, 288 insertions(+), 8 deletions(-)
>> create mode 100644 meta/lib/oeqa/core/utils/concurrencytest.py
>>
>> diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
>> index 10481b44b61..8cdfbf834f3 100644
>> --- a/meta/lib/oeqa/core/context.py
>> +++ b/meta/lib/oeqa/core/context.py
>> @@ -58,14 +58,20 @@ class OETestContext(object):
>> modules_required, filters)
>> self.suites = self.loader.discover()
>> - def runTests(self, skips=[]):
>> + def runTests(self, processes=None, skips=[]):
>> self.runner = self.runnerClass(self, descriptions=False,
>> verbosity=2, buffer=True)
>> # Dinamically skip those tests specified though arguments
>> self.skipTests(skips)
>> self._run_start_time = time.time()
>> - result = self.runner.run(self.suites)
>> + if processes:
>> + from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
>> +
>> + concurrent_suite = ConcurrentTestSuite(self.suites, processes)
>> + result = self.runner.run(concurrent_suite)
>> + else:
>> + result = self.runner.run(self.suites)
>> self._run_end_time = time.time()
>> return result
>> diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
>> index 219102c6b0f..6adbe3827b4 100644
>> --- a/meta/lib/oeqa/core/runner.py
>> +++ b/meta/lib/oeqa/core/runner.py
>> @@ -43,11 +43,17 @@ class OETestResult(_TestResult):
>> super(OETestResult, self).__init__(*args, **kwargs)
>> self.successes = []
>> + self.starttime = {}
>> + self.endtime = {}
>> + self.progressinfo = {}
>> self.tc = tc
>> self._tc_map_results()
>> def startTest(self, test):
>> + # May have been set by concurrencytest
>> + if test.id() not in self.starttime:
>> + self.starttime[test.id()] = time.time()
>> super(OETestResult, self).startTest(test)
>> def _tc_map_results(self):
>> @@ -57,6 +63,12 @@ class OETestResult(_TestResult):
>> self.tc._results['expectedFailures'] = self.expectedFailures
>> self.tc._results['successes'] = self.successes
>> + def stopTest(self, test):
>> + self.endtime[test.id()] = time.time()
>> + super(OETestResult, self).stopTest(test)
>> + if test.id() in self.progressinfo:
>> + print(self.progressinfo[test.id()])
>> +
>> def logSummary(self, component, context_msg=''):
>> elapsed_time = self.tc._run_end_time - self.tc._run_start_time
>> self.tc.logger.info("SUMMARY:")
>> @@ -141,12 +153,16 @@ class OETestResult(_TestResult):
>> if hasattr(d, 'oeid'):
>> oeid = d.oeid
>> + t = ""
>> + if case.id() in self.starttime and case.id() in self.endtime:
>> + t = " (" + "{0:.2f}".format(self.endtime[case.id()] -
>> self.starttime[case.id()]) + "s)"
>> +
>> if fail:
>> - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" %
>> (case.id(),
>> - oeid, desc))
>> + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" %
>> (case.id(),
>> + oeid, desc, t))
>> else:
>> - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" %
>> (case.id(),
>> - oeid, 'UNKNOWN'))
>> + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" %
>> (case.id(),
>> + oeid, 'UNKNOWN', t))
>> class OEListTestsResult(object):
>> def wasSuccessful(self):
>> diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py
>> b/meta/lib/oeqa/core/utils/concurrencytest.py
>> new file mode 100644
>> index 00000000000..850586516a4
>> --- /dev/null
>> +++ b/meta/lib/oeqa/core/utils/concurrencytest.py
>> @@ -0,0 +1,254 @@
>> +#!/usr/bin/env python3
>> +#
>> +# Modified for use in OE by Richard Purdie, 2018
>> +#
>> +# Modified by: Corey Goldberg, 2013
>> +# License: GPLv2+
>> +#
>> +# Original code from:
>> +# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
>> +# Copyright (C) 2005-2011 Canonical Ltd
>> +# License: GPLv2+
>> +
>> +import os
>> +import sys
>> +import traceback
>> +import unittest
>> +import subprocess
>> +import testtools
>> +import threading
>> +import time
>> +import io
>> +
>> +from queue import Queue
>> +from itertools import cycle
>> +from subunit import ProtocolTestCase, TestProtocolClient
>> +from subunit.test_results import AutoTimingTestResultDecorator
>> +from testtools import ThreadsafeForwardingResult, iterate_tests
>> +
>> +import bb.utils
>> +import oe.path
>> +
>> +_all__ = [
>> + 'ConcurrentTestSuite',
>> + 'fork_for_tests',
>> + 'partition_tests',
>> +]
>> +
>> +#
>> +# Patch the version from testtools to allow access to _test_start and allow
>> +# computation of timing information and threading progress
>> +#
>> +class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
>> +
>> + def __init__(self, target, semaphore, threadnum, totalinprocess,
>> totaltests):
>> + super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
>> + self.threadnum = threadnum
>> + self.totalinprocess = totalinprocess
>> + self.totaltests = totaltests
>> +
>> + def _add_result_with_semaphore(self, method, test, *args, **kwargs):
>> + self.semaphore.acquire()
>> + try:
>> + self.result.starttime[test.id()] = self._test_start.timestamp()
>> + self.result.threadprogress[self.threadnum].append(test.id())
>> + totalprogress = sum(len(x) for x in
>> self.result.threadprogress.values())
>> + self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss)
>> (%s)" % (
>> + self.threadnum,
>> + len(self.result.threadprogress[self.threadnum]),
>> + self.totalinprocess,
>> + totalprogress,
>> + self.totaltests,
>> + "{0:.2f}".format(time.time()-self._test_start.timestamp()),
>> + test.id())
>> + finally:
>> + self.semaphore.release()
>> + super(BBThreadsafeForwardingResult,
>> self)._add_result_with_semaphore(method, test, *args, **kwargs)
>> +
>> +#
>> +# A dummy structure to add to io.StringIO so that the .buffer object
>> +# is available and accepts writes. This allows unittest with buffer=True
>> +# to interact ok with subunit which wants to access sys.stdout.buffer.
>> +#
>> +class dummybuf(object):
>> + def __init__(self, parent):
>> + self.p = parent
>> + def write(self, data):
>> + self.p.write(data.decode("utf-8"))
>> +
>> +#
>> +# Taken from testtools.ConncurrencyTestSuite but modified for OE use
>> +#
>> +class ConcurrentTestSuite(unittest.TestSuite):
>> +
>> + def __init__(self, suite, processes):
>> + super(ConcurrentTestSuite, self).__init__([suite])
>> + self.processes = processes
>> +
>> + def run(self, result):
>> + tests, totaltests = fork_for_tests(self.processes, self)
>> + try:
>> + threads = {}
>> + queue = Queue()
>> + semaphore = threading.Semaphore(1)
>> + result.threadprogress = {}
>> + for i, (test, testnum) in enumerate(tests):
>> + result.threadprogress[i] = []
>> + process_result = BBThreadsafeForwardingResult(result,
>> semaphore, i, testnum, totaltests)
>> + # Force buffering of stdout/stderr so the console doesn't get
>> corrupted by test output
>> + # as per default in parent code
>> + process_result.buffer = True
>> + # We have to add a buffer object to stdout to keep subunit happy
>> + process_result._stderr_buffer = io.StringIO()
>> + process_result._stderr_buffer.buffer =
>> dummybuf(process_result._stderr_buffer)
>> + process_result._stdout_buffer = io.StringIO()
>> + process_result._stdout_buffer.buffer =
>> dummybuf(process_result._stdout_buffer)
>> + reader_thread = threading.Thread(
>> + target=self._run_test, args=(test, process_result, queue))
>> + threads[test] = reader_thread, process_result
>> + reader_thread.start()
>> + while threads:
>> + finished_test = queue.get()
>> + threads[finished_test][0].join()
>> + del threads[finished_test]
>> + except:
>> + for thread, process_result in threads.values():
>> + process_result.stop()
>> + raise
>> +
>> + def _run_test(self, test, process_result, queue):
>> + try:
>> + try:
>> + test.run(process_result)
>> + except Exception:
>> + # The run logic itself failed
>> + case = testtools.ErrorHolder(
>> + "broken-runner",
>> + error=sys.exc_info())
>> + case.run(process_result)
>> + finally:
>> + queue.put(test)
>> +
>> +def removebuilddir(d):
>> + delay = 5
>> + while delay and os.path.exists(d + "/bitbake.lock"):
>> + time.sleep(1)
>> + delay = delay - 1
>> + bb.utils.prunedir(d)
>> +
>> +def fork_for_tests(concurrency_num, suite):
>> + result = []
>> + test_blocks = partition_tests(suite, concurrency_num)
>> + # Clear the tests from the original suite so it doesn't keep them alive
>> + suite._tests[:] = []
>> + totaltests = sum(len(x) for x in test_blocks)
>> + for process_tests in test_blocks:
>> + numtests = len(process_tests)
>> + process_suite = unittest.TestSuite(process_tests)
>> + # Also clear each split list so new suite has only reference
>> + process_tests[:] = []
>> + c2pread, c2pwrite = os.pipe()
>> + # Clear buffers before fork to avoid duplicate output
>> + sys.stdout.flush()
>> + sys.stderr.flush()
>> + pid = os.fork()
>> + if pid == 0:
>> + ourpid = os.getpid()
>> + try:
>> + newbuilddir = None
>> + stream = os.fdopen(c2pwrite, 'wb', 1)
>> + os.close(c2pread)
>> +
>> + # Create a new separate BUILDDIR for each group of tests
>> + if 'BUILDDIR' in os.environ:
>> + builddir = os.environ['BUILDDIR']
>> + newbuilddir = builddir + "-st-" + str(ourpid)
>> + selftestdir = os.path.abspath(builddir +
>> "/../meta-selftest")
>> + newselftestdir = newbuilddir + "/meta-selftest"
>> +
>> + bb.utils.mkdirhier(newbuilddir)
>> + oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
>> + oe.path.copytree(builddir + "/cache", newbuilddir +
>> "/cache")
>> + oe.path.copytree(selftestdir, newselftestdir)
>> +
>> + for e in os.environ:
>> + if builddir in os.environ[e]:
>> + os.environ[e] = os.environ[e].replace(builddir,
>> newbuilddir)
>> +
>> + subprocess.check_output("git init; git add *; git commit
>> -a -m 'initial'", cwd=newselftestdir, shell=True)
>> +
>> + # Tried to used bitbake-layers add/remove but it requires
>> recipe parsing and hence is too slow
>> + subprocess.check_output("sed %s/conf/bblayers.conf -i -e
>> 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir,
>> shell=True)
>> +
>> + os.chdir(newbuilddir)
>> +
>> + for t in process_suite:
>> + if not hasattr(t, "tc"):
>> + continue
>> + cp = t.tc.config_paths
>> + for p in cp:
>> + if selftestdir in cp[p] and newselftestdir not in
>> cp[p]:
>> + cp[p] = cp[p].replace(selftestdir,
>> newselftestdir)
>> + if builddir in cp[p] and newbuilddir not in cp[p]:
>> + cp[p] = cp[p].replace(builddir, newbuilddir)
>> +
>> + # Leave stderr and stdout open so we can see test noise
>> + # Close stdin so that the child goes away if it decides to
>> + # read from stdin (otherwise its a roulette to see what
>> + # child actually gets keystrokes for pdb etc).
>> + newsi = os.open(os.devnull, os.O_RDWR)
>> + os.dup2(newsi, sys.stdin.fileno())
>> +
>> + subunit_client = TestProtocolClient(stream)
>> + # Force buffering of stdout/stderr so the console doesn't get
>> corrupted by test output
>> + # as per default in parent code
>> + subunit_client.buffer = True
>> + subunit_result = AutoTimingTestResultDecorator(subunit_client)
>> + process_suite.run(subunit_result)
>> + if ourpid != os.getpid():
>> + os._exit(0)
>> + if newbuilddir:
>> + removebuilddir(newbuilddir)
>> + except:
>> + # Don't do anything with process children
>> + if ourpid != os.getpid():
>> + os._exit(1)
>> + # Try and report traceback on stream, but exit with error
>> + # even if stream couldn't be created or something else
>> + # goes wrong. The traceback is formatted to a string and
>> + # written in one go to avoid interleaving lines from
>> + # multiple failing children.
>> + try:
>> + stream.write(traceback.format_exc().encode('utf-8'))
>> + except:
>> + sys.stderr.write(traceback.format_exc())
>> + finally:
>> + if newbuilddir:
>> + removebuilddir(newbuilddir)
>> + os._exit(1)
>> + os._exit(0)
>> + else:
>> + os.close(c2pwrite)
>> + stream = os.fdopen(c2pread, 'rb', 1)
>> + test = ProtocolTestCase(stream)
>> + result.append((test, numtests))
>> + return result, totaltests
>> +
>> +def partition_tests(suite, count):
>> + # Keep tests from the same class together but allow tests from modules
>> + # to go to different processes to aid parallelisation.
>> + modules = {}
>> + for test in iterate_tests(suite):
>> + m = test.__module__ + "." + test.__class__.__name__
>> + if m not in modules:
>> + modules[m] = []
>> + modules[m].append(test)
>> +
>> + # Simply divide the test blocks between the available processes
>> + partitions = [list() for _ in range(count)]
>> + for partition, m in zip(cycle(partitions), modules):
>> + partition.extend(modules[m])
>> +
>> + # No point in empty threads so drop them
>> + return [p for p in partitions if p]
>> +
>> diff --git a/meta/lib/oeqa/selftest/context.py
>> b/meta/lib/oeqa/selftest/context.py
>> index 9e90d3c2565..c937b8171c9 100644
>> --- a/meta/lib/oeqa/selftest/context.py
>> +++ b/meta/lib/oeqa/selftest/context.py
>> @@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
>> self.custommachine = None
>> self.config_paths = config_paths
>> - def runTests(self, machine=None, skips=[]):
>> + def runTests(self, processes=None, machine=None, skips=[]):
>> if machine:
>> self.custommachine = machine
>> if machine == 'random':
>> self.custommachine = choice(self.machines)
>> self.logger.info('Run tests with custom MACHINE set to: %s' % \
>> self.custommachine)
>> - return super(OESelftestTestContext, self).runTests(skips)
>> + return super(OESelftestTestContext, self).runTests(processes, skips)
>> def listTests(self, display_type, machine=None):
>> return super(OESelftestTestContext, self).listTests(display_type)
>> @@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
>> action="store_true", default=False,
>> help='List all available tests.')
>> + parser.add_argument('-j', '--num-processes', dest='processes',
>> action='store',
>> + type=int, help="number of processes to execute in parallel
>> with")
>> +
>> parser.add_argument('--machine', required=False, choices=['random',
>> 'all'],
>> help='Run tests on different machines
>> (random/all).')
>> @@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
>> self.tc_kwargs['init']['config_paths']['bblayers_backup'])
>> self.tc_kwargs['run']['skips'] = args.skips
>> + self.tc_kwargs['run']['processes'] = args.processes
>> def _pre_run(self):
>> def _check_required_env_variables(vars):
>>
More information about the Openembedded-core
mailing list