blob: 5ed3c794e6374756a647d2a44e7aee3cc7aad65c [file] [log] [blame]
# Copyright 2022 The ANGLE Project Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import datetime
import fnmatch
import json
import importlib
import io
import logging
import os
import signal
import subprocess
import sys
import threading
import time
import android_helper
import angle_path_util
angle_path_util.AddDepsDirToPath('testing/scripts')
import common
if sys.platform.startswith('linux'):
# vpython3 can handle this on Windows but not python3
import xvfb
ANGLE_TRACE_TEST_SUITE = 'angle_trace_tests'
def Initialize(suite_name):
android_helper.Initialize(suite_name)
# Requires .Initialize() to be called first
def IsAndroid():
return android_helper.IsAndroid()
class LogFormatter(logging.Formatter):
def __init__(self):
logging.Formatter.__init__(self, fmt='%(levelname).1s%(asctime)s %(message)s')
def formatTime(self, record, datefmt=None):
# Drop date as these scripts are short lived
return datetime.datetime.fromtimestamp(record.created).strftime('%H:%M:%S.%fZ')
def SetupLogging(level):
# Reload to reset if it was already setup by a library
importlib.reload(logging)
logger = logging.getLogger()
logger.setLevel(level)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(LogFormatter())
logger.addHandler(handler)
def IsWindows():
return sys.platform == 'cygwin' or sys.platform.startswith('win')
def ExecutablePathInCurrentDir(binary):
if IsWindows():
return '.\\%s.exe' % binary
else:
return './%s' % binary
def HasGtestShardsAndIndex(env):
if 'GTEST_TOTAL_SHARDS' in env and int(env['GTEST_TOTAL_SHARDS']) != 1:
if 'GTEST_SHARD_INDEX' not in env:
logging.error('Sharding params must be specified together.')
sys.exit(1)
return True
return False
def PopGtestShardsAndIndex(env):
return int(env.pop('GTEST_TOTAL_SHARDS')), int(env.pop('GTEST_SHARD_INDEX'))
# Adapted from testing/test_env.py: also notifies current process and restores original handlers.
@contextlib.contextmanager
def forward_signals(procs):
assert all(isinstance(p, subprocess.Popen) for p in procs)
interrupted_event = threading.Event()
def _sig_handler(sig, _):
interrupted_event.set()
for p in procs:
if p.poll() is not None:
continue
# SIGBREAK is defined only for win32.
# pylint: disable=no-member
if sys.platform == 'win32' and sig == signal.SIGBREAK:
p.send_signal(signal.CTRL_BREAK_EVENT)
else:
print("Forwarding signal(%d) to process %d" % (sig, p.pid))
p.send_signal(sig)
# pylint: enable=no-member
if sys.platform == 'win32':
signals = [signal.SIGBREAK] # pylint: disable=no-member
else:
signals = [signal.SIGINT, signal.SIGTERM]
original_handlers = {}
for sig in signals:
original_handlers[sig] = signal.signal(sig, _sig_handler)
yield
for sig, handler in original_handlers.items():
signal.signal(sig, handler)
if interrupted_event.is_set():
raise KeyboardInterrupt()
# From testing/test_env.py, see run_command_with_output below
def _popen(*args, **kwargs):
assert 'creationflags' not in kwargs
if sys.platform == 'win32':
# Necessary for signal handling. See crbug.com/733612#c6.
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
return subprocess.Popen(*args, **kwargs)
# Forked from testing/test_env.py to add ability to suppress logging with log=False
def run_command_with_output(argv, stdoutfile, env=None, cwd=None, log=True):
assert stdoutfile
with io.open(stdoutfile, 'wb') as writer, \
io.open(stdoutfile, 'rb') as reader:
process = _popen(argv, env=env, cwd=cwd, stdout=writer, stderr=subprocess.STDOUT)
with forward_signals([process]):
while process.poll() is None:
if log:
sys.stdout.write(reader.read().decode('utf-8'))
# This sleep is needed for signal propagation. See the
# wait_with_signals() docstring.
time.sleep(0.1)
if log:
sys.stdout.write(reader.read().decode('utf-8'))
return process.returncode
def RunTestSuite(test_suite,
cmd_args,
env,
show_test_stdout=True,
use_xvfb=False):
if android_helper.IsAndroid():
result, output, json_results = android_helper.RunTests(
test_suite, cmd_args, log_output=show_test_stdout)
return result, output, json_results
cmd = ExecutablePathInCurrentDir(test_suite) if os.path.exists(
os.path.basename(test_suite)) else test_suite
runner_cmd = [cmd] + cmd_args
logging.debug(' '.join(runner_cmd))
with contextlib.ExitStack() as stack:
stdout_path = stack.enter_context(common.temporary_file())
flag_matches = [a for a in cmd_args if a.startswith('--isolated-script-test-output=')]
if flag_matches:
results_path = flag_matches[0].split('=')[1]
else:
results_path = stack.enter_context(common.temporary_file())
runner_cmd += ['--isolated-script-test-output=%s' % results_path]
if use_xvfb:
# Default changed to '--use-xorg', which fails per http://crbug.com/40257169#comment34
# '--use-xvfb' forces the old working behavior
runner_cmd += ['--use-xvfb']
xvfb_whd = '3120x3120x24' # Max screen dimensions from traces, as per:
# % egrep 'Width|Height' src/tests/restricted_traces/*/*.json | awk '{print $3 $2}' | sort -n
exit_code = xvfb.run_executable(
runner_cmd, env, stdoutfile=stdout_path, xvfb_whd=xvfb_whd)
else:
exit_code = run_command_with_output(
runner_cmd, env=env, stdoutfile=stdout_path, log=show_test_stdout)
with open(stdout_path) as f:
output = f.read()
with open(results_path) as f:
data = f.read()
json_results = json.loads(data) if data else None # --list-tests => empty file
return exit_code, output, json_results
def GetTestsFromOutput(output):
out_lines = output.split('\n')
try:
start = out_lines.index('Tests list:')
end = out_lines.index('End tests list.')
except ValueError as e:
logging.exception(e)
return None
return out_lines[start + 1:end]
def FilterTests(tests, test_filter):
matches = set()
for single_filter in test_filter.split(':'):
matches.update(fnmatch.filter(tests, single_filter))
return sorted(matches)