| # coding: utf-8 |
| from __future__ import unicode_literals, division, absolute_import, print_function |
| |
| import cgi |
| import codecs |
| import coverage |
| import imp |
| import json |
| import os |
| import unittest |
| import re |
| import sys |
| import tempfile |
| import time |
| import platform as _plat |
| import subprocess |
| from fnmatch import fnmatch |
| |
| from . import package_name, package_root, other_packages |
| |
| if sys.version_info < (3,): |
| str_cls = unicode # noqa |
| from urllib2 import URLError |
| from urllib import urlencode |
| from io import open |
| else: |
| str_cls = str |
| from urllib.error import URLError |
| from urllib.parse import urlencode |
| |
| if sys.version_info < (3, 7): |
| Pattern = re._pattern_type |
| else: |
| Pattern = re.Pattern |
| |
| |
| def run(ci=False): |
| """ |
| Runs the tests while measuring coverage |
| |
| :param ci: |
| If coverage is being run in a CI environment - this triggers trying to |
| run the tests for the rest of modularcrypto and uploading coverage data |
| |
| :return: |
| A bool - if the tests ran successfully |
| """ |
| |
| xml_report_path = os.path.join(package_root, 'coverage.xml') |
| if os.path.exists(xml_report_path): |
| os.unlink(xml_report_path) |
| |
| cov = coverage.Coverage(include='%s/*.py' % package_name) |
| cov.start() |
| |
| from .tests import run as run_tests |
| result = run_tests(ci=ci) |
| print() |
| |
| if ci: |
| suite = unittest.TestSuite() |
| loader = unittest.TestLoader() |
| for other_package in other_packages: |
| for test_class in _load_package_tests(other_package): |
| suite.addTest(loader.loadTestsFromTestCase(test_class)) |
| |
| if suite.countTestCases() > 0: |
| print('Running tests from other modularcrypto packages') |
| sys.stdout.flush() |
| runner_result = unittest.TextTestRunner(stream=sys.stdout, verbosity=1).run(suite) |
| result = runner_result.wasSuccessful() and result |
| print() |
| sys.stdout.flush() |
| |
| cov.stop() |
| cov.save() |
| |
| cov.report(show_missing=False) |
| print() |
| sys.stdout.flush() |
| if ci: |
| cov.xml_report() |
| |
| if ci and result and os.path.exists(xml_report_path): |
| _codecov_submit() |
| print() |
| |
| return result |
| |
| |
| def _load_package_tests(name): |
| """ |
| Load the test classes from another modularcrypto package |
| |
| :param name: |
| A unicode string of the other package name |
| |
| :return: |
| A list of unittest.TestCase classes of the tests for the package |
| """ |
| |
| package_dir = os.path.join('..', name) |
| if not os.path.exists(package_dir): |
| return [] |
| |
| tests_module_info = imp.find_module('tests', [package_dir]) |
| tests_module = imp.load_module('%s.tests' % name, *tests_module_info) |
| return tests_module.test_classes() |
| |
| |
| def _env_info(): |
| """ |
| :return: |
| A two-element tuple of unicode strings. The first is the name of the |
| environment, the second the root of the repo. The environment name |
| will be one of: "ci-travis", "ci-circle", "ci-appveyor", |
| "ci-github-actions", "local" |
| """ |
| |
| if os.getenv('CI') == 'true' and os.getenv('TRAVIS') == 'true': |
| return ('ci-travis', os.getenv('TRAVIS_BUILD_DIR')) |
| |
| if os.getenv('CI') == 'True' and os.getenv('APPVEYOR') == 'True': |
| return ('ci-appveyor', os.getenv('APPVEYOR_BUILD_FOLDER')) |
| |
| if os.getenv('CI') == 'true' and os.getenv('CIRCLECI') == 'true': |
| return ('ci-circle', os.getcwdu() if sys.version_info < (3,) else os.getcwd()) |
| |
| if os.getenv('GITHUB_ACTIONS') == 'true': |
| return ('ci-github-actions', os.getenv('GITHUB_WORKSPACE')) |
| |
| return ('local', package_root) |
| |
| |
| def _codecov_submit(): |
| env_name, root = _env_info() |
| |
| try: |
| with open(os.path.join(root, 'codecov.json'), 'rb') as f: |
| json_data = json.loads(f.read().decode('utf-8')) |
| except (OSError, ValueError, UnicodeDecodeError, KeyError): |
| print('error reading codecov.json') |
| return |
| |
| if json_data.get('disabled'): |
| return |
| |
| if env_name == 'ci-travis': |
| # http://docs.travis-ci.com/user/environment-variables/#Default-Environment-Variables |
| build_url = 'https://travis-ci.org/%s/jobs/%s' % (os.getenv('TRAVIS_REPO_SLUG'), os.getenv('TRAVIS_JOB_ID')) |
| query = { |
| 'service': 'travis', |
| 'branch': os.getenv('TRAVIS_BRANCH'), |
| 'build': os.getenv('TRAVIS_JOB_NUMBER'), |
| 'pr': os.getenv('TRAVIS_PULL_REQUEST'), |
| 'job': os.getenv('TRAVIS_JOB_ID'), |
| 'tag': os.getenv('TRAVIS_TAG'), |
| 'slug': os.getenv('TRAVIS_REPO_SLUG'), |
| 'commit': os.getenv('TRAVIS_COMMIT'), |
| 'build_url': build_url, |
| } |
| |
| elif env_name == 'ci-appveyor': |
| # http://www.appveyor.com/docs/environment-variables |
| build_url = 'https://ci.appveyor.com/project/%s/build/%s' % ( |
| os.getenv('APPVEYOR_REPO_NAME'), |
| os.getenv('APPVEYOR_BUILD_VERSION') |
| ) |
| query = { |
| 'service': "appveyor", |
| 'branch': os.getenv('APPVEYOR_REPO_BRANCH'), |
| 'build': os.getenv('APPVEYOR_JOB_ID'), |
| 'pr': os.getenv('APPVEYOR_PULL_REQUEST_NUMBER'), |
| 'job': '/'.join(( |
| os.getenv('APPVEYOR_ACCOUNT_NAME'), |
| os.getenv('APPVEYOR_PROJECT_SLUG'), |
| os.getenv('APPVEYOR_BUILD_VERSION') |
| )), |
| 'tag': os.getenv('APPVEYOR_REPO_TAG_NAME'), |
| 'slug': os.getenv('APPVEYOR_REPO_NAME'), |
| 'commit': os.getenv('APPVEYOR_REPO_COMMIT'), |
| 'build_url': build_url, |
| } |
| |
| elif env_name == 'ci-circle': |
| # https://circleci.com/docs/environment-variables |
| query = { |
| 'service': 'circleci', |
| 'branch': os.getenv('CIRCLE_BRANCH'), |
| 'build': os.getenv('CIRCLE_BUILD_NUM'), |
| 'pr': os.getenv('CIRCLE_PR_NUMBER'), |
| 'job': os.getenv('CIRCLE_BUILD_NUM') + "." + os.getenv('CIRCLE_NODE_INDEX'), |
| 'tag': os.getenv('CIRCLE_TAG'), |
| 'slug': os.getenv('CIRCLE_PROJECT_USERNAME') + "/" + os.getenv('CIRCLE_PROJECT_REPONAME'), |
| 'commit': os.getenv('CIRCLE_SHA1'), |
| 'build_url': os.getenv('CIRCLE_BUILD_URL'), |
| } |
| |
| elif env_name == 'ci-github-actions': |
| branch = '' |
| tag = '' |
| ref = os.getenv('GITHUB_REF', '') |
| if ref.startswith('refs/tags/'): |
| tag = ref[10:] |
| elif ref.startswith('refs/heads/'): |
| branch = ref[11:] |
| |
| impl = _plat.python_implementation() |
| major, minor = _plat.python_version_tuple()[0:2] |
| build_name = '%s %s %s.%s' % (_platform_name(), impl, major, minor) |
| |
| query = { |
| 'service': 'custom', |
| 'token': json_data['token'], |
| 'branch': branch, |
| 'tag': tag, |
| 'slug': os.getenv('GITHUB_REPOSITORY'), |
| 'commit': os.getenv('GITHUB_SHA'), |
| 'build_url': 'https://github.com/wbond/oscrypto/commit/%s/checks' % os.getenv('GITHUB_SHA'), |
| 'name': 'GitHub Actions %s on %s' % (build_name, os.getenv('RUNNER_OS')) |
| } |
| |
| else: |
| if not os.path.exists(os.path.join(root, '.git')): |
| print('git repository not found, not submitting coverage data') |
| return |
| git_status = _git_command(['status', '--porcelain'], root) |
| if git_status != '': |
| print('git repository has uncommitted changes, not submitting coverage data') |
| return |
| |
| branch = _git_command(['rev-parse', '--abbrev-ref', 'HEAD'], root) |
| commit = _git_command(['rev-parse', '--verify', 'HEAD'], root) |
| tag = _git_command(['name-rev', '--tags', '--name-only', commit], root) |
| impl = _plat.python_implementation() |
| major, minor = _plat.python_version_tuple()[0:2] |
| build_name = '%s %s %s.%s' % (_platform_name(), impl, major, minor) |
| query = { |
| 'branch': branch, |
| 'commit': commit, |
| 'slug': json_data['slug'], |
| 'token': json_data['token'], |
| 'build': build_name, |
| } |
| if tag != 'undefined': |
| query['tag'] = tag |
| |
| payload = 'PLATFORM=%s\n' % _platform_name() |
| payload += 'PYTHON_VERSION=%s %s\n' % (_plat.python_version(), _plat.python_implementation()) |
| if 'oscrypto' in sys.modules: |
| payload += 'OSCRYPTO_BACKEND=%s\n' % sys.modules['oscrypto'].backend() |
| payload += '<<<<<< ENV\n' |
| |
| for path in _list_files(root): |
| payload += path + '\n' |
| payload += '<<<<<< network\n' |
| |
| payload += '# path=coverage.xml\n' |
| with open(os.path.join(root, 'coverage.xml'), 'r', encoding='utf-8') as f: |
| payload += f.read() + '\n' |
| payload += '<<<<<< EOF\n' |
| |
| url = 'https://codecov.io/upload/v4' |
| headers = { |
| 'Accept': 'text/plain' |
| } |
| filtered_query = {} |
| for key in query: |
| value = query[key] |
| if value == '' or value is None: |
| continue |
| filtered_query[key] = value |
| |
| print('Submitting coverage info to codecov.io') |
| info = _do_request( |
| 'POST', |
| url, |
| headers, |
| query_params=filtered_query |
| ) |
| |
| encoding = info[1] or 'utf-8' |
| text = info[2].decode(encoding).strip() |
| parts = text.split() |
| upload_url = parts[1] |
| |
| headers = { |
| 'Content-Type': 'text/plain', |
| 'x-amz-acl': 'public-read', |
| 'x-amz-storage-class': 'REDUCED_REDUNDANCY' |
| } |
| |
| print('Uploading coverage data to codecov.io S3 bucket') |
| _do_request( |
| 'PUT', |
| upload_url, |
| headers, |
| data=payload.encode('utf-8') |
| ) |
| |
| |
| def _git_command(params, cwd): |
| """ |
| Executes a git command, returning the output |
| |
| :param params: |
| A list of the parameters to pass to git |
| |
| :param cwd: |
| The working directory to execute git in |
| |
| :return: |
| A 2-element tuple of (stdout, stderr) |
| """ |
| |
| proc = subprocess.Popen( |
| ['git'] + params, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| cwd=cwd |
| ) |
| stdout, stderr = proc.communicate() |
| code = proc.wait() |
| if code != 0: |
| e = OSError('git exit code was non-zero') |
| e.stdout = stdout |
| raise e |
| return stdout.decode('utf-8').strip() |
| |
| |
| def _parse_env_var_file(data): |
| """ |
| Parses a basic VAR="value data" file contents into a dict |
| |
| :param data: |
| A unicode string of the file data |
| |
| :return: |
| A dict of parsed name/value data |
| """ |
| |
| output = {} |
| for line in data.splitlines(): |
| line = line.strip() |
| if not line or '=' not in line: |
| continue |
| parts = line.split('=') |
| if len(parts) != 2: |
| continue |
| name = parts[0] |
| value = parts[1] |
| if len(value) > 1: |
| if value[0] == '"' and value[-1] == '"': |
| value = value[1:-1] |
| output[name] = value |
| return output |
| |
| |
| def _platform_name(): |
| """ |
| Returns information about the current operating system and version |
| |
| :return: |
| A unicode string containing the OS name and version |
| """ |
| |
| if sys.platform == 'darwin': |
| version = _plat.mac_ver()[0] |
| _plat_ver_info = tuple(map(int, version.split('.'))) |
| if _plat_ver_info < (10, 12): |
| name = 'OS X' |
| else: |
| name = 'macOS' |
| return '%s %s' % (name, version) |
| |
| elif sys.platform == 'win32': |
| _win_ver = sys.getwindowsversion() |
| _plat_ver_info = (_win_ver[0], _win_ver[1]) |
| return 'Windows %s' % _plat.win32_ver()[0] |
| |
| elif sys.platform in ['linux', 'linux2']: |
| if os.path.exists('/etc/os-release'): |
| with open('/etc/os-release', 'r', encoding='utf-8') as f: |
| pairs = _parse_env_var_file(f.read()) |
| if 'NAME' in pairs and 'VERSION_ID' in pairs: |
| return '%s %s' % (pairs['NAME'], pairs['VERSION_ID']) |
| version = pairs['VERSION_ID'] |
| elif 'PRETTY_NAME' in pairs: |
| return pairs['PRETTY_NAME'] |
| elif 'NAME' in pairs: |
| return pairs['NAME'] |
| else: |
| raise ValueError('No suitable version info found in /etc/os-release') |
| elif os.path.exists('/etc/lsb-release'): |
| with open('/etc/lsb-release', 'r', encoding='utf-8') as f: |
| pairs = _parse_env_var_file(f.read()) |
| if 'DISTRIB_DESCRIPTION' in pairs: |
| return pairs['DISTRIB_DESCRIPTION'] |
| else: |
| raise ValueError('No suitable version info found in /etc/lsb-release') |
| else: |
| return 'Linux' |
| |
| else: |
| return '%s %s' % (_plat.system(), _plat.release()) |
| |
| |
| def _list_files(root): |
| """ |
| Lists all of the files in a directory, taking into account any .gitignore |
| file that is present |
| |
| :param root: |
| A unicode filesystem path |
| |
| :return: |
| A list of unicode strings, containing paths of all files not ignored |
| by .gitignore with root, using relative paths |
| """ |
| |
| dir_patterns, file_patterns = _gitignore(root) |
| paths = [] |
| prefix = os.path.abspath(root) + os.sep |
| for base, dirs, files in os.walk(root): |
| for d in dirs: |
| for dir_pattern in dir_patterns: |
| if fnmatch(d, dir_pattern): |
| dirs.remove(d) |
| break |
| for f in files: |
| skip = False |
| for file_pattern in file_patterns: |
| if fnmatch(f, file_pattern): |
| skip = True |
| break |
| if skip: |
| continue |
| full_path = os.path.join(base, f) |
| if full_path[:len(prefix)] == prefix: |
| full_path = full_path[len(prefix):] |
| paths.append(full_path) |
| return sorted(paths) |
| |
| |
| def _gitignore(root): |
| """ |
| Parses a .gitignore file and returns patterns to match dirs and files. |
| Only basic gitignore patterns are supported. Pattern negation, ** wildcards |
| and anchored patterns are not currently implemented. |
| |
| :param root: |
| A unicode string of the path to the git repository |
| |
| :return: |
| A 2-element tuple: |
| - 0: a list of unicode strings to match against dirs |
| - 1: a list of unicode strings to match against dirs and files |
| """ |
| |
| gitignore_path = os.path.join(root, '.gitignore') |
| |
| dir_patterns = ['.git'] |
| file_patterns = [] |
| |
| if not os.path.exists(gitignore_path): |
| return (dir_patterns, file_patterns) |
| |
| with open(gitignore_path, 'r', encoding='utf-8') as f: |
| for line in f.readlines(): |
| line = line.strip() |
| if not line: |
| continue |
| if line.startswith('#'): |
| continue |
| if '**' in line: |
| raise NotImplementedError('gitignore ** wildcards are not implemented') |
| if line.startswith('!'): |
| raise NotImplementedError('gitignore pattern negation is not implemented') |
| if line.startswith('/'): |
| raise NotImplementedError('gitignore anchored patterns are not implemented') |
| if line.startswith('\\#'): |
| line = '#' + line[2:] |
| if line.startswith('\\!'): |
| line = '!' + line[2:] |
| if line.endswith('/'): |
| dir_patterns.append(line[:-1]) |
| else: |
| file_patterns.append(line) |
| |
| return (dir_patterns, file_patterns) |
| |
| |
| def _do_request(method, url, headers, data=None, query_params=None, timeout=20): |
| """ |
| Performs an HTTP request |
| |
| :param method: |
| A unicode string of 'POST' or 'PUT' |
| |
| :param url; |
| A unicode string of the URL to request |
| |
| :param headers: |
| A dict of unicode strings, where keys are header names and values are |
| the header values. |
| |
| :param data: |
| A dict of unicode strings (to be encoded as |
| application/x-www-form-urlencoded), or a byte string of data. |
| |
| :param query_params: |
| A dict of unicode keys and values to pass as query params |
| |
| :param timeout: |
| An integer number of seconds to use as the timeout |
| |
| :return: |
| A 3-element tuple: |
| - 0: A unicode string of the response content-type |
| - 1: A unicode string of the response encoding, or None |
| - 2: A byte string of the response body |
| """ |
| |
| if query_params: |
| url += '?' + urlencode(query_params).replace('+', '%20') |
| |
| if isinstance(data, dict): |
| data_bytes = {} |
| for key in data: |
| data_bytes[key.encode('utf-8')] = data[key].encode('utf-8') |
| data = urlencode(data_bytes) |
| headers['Content-Type'] = 'application/x-www-form-urlencoded' |
| if isinstance(data, str_cls): |
| raise TypeError('data must be a byte string') |
| |
| try: |
| tempfd, tempf_path = tempfile.mkstemp('-coverage') |
| os.write(tempfd, data or b'') |
| os.close(tempfd) |
| |
| if sys.platform == 'win32': |
| powershell_exe = os.path.join('system32\\WindowsPowerShell\\v1.0\\powershell.exe') |
| code = "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12;" |
| code += "$wc = New-Object Net.WebClient;" |
| for key in headers: |
| code += "$wc.Headers.add('%s','%s');" % (key, headers[key]) |
| code += "$out = $wc.UploadFile('%s', '%s', '%s');" % (url, method, tempf_path) |
| code += "[System.Text.Encoding]::GetEncoding('ISO-8859-1').GetString($wc.ResponseHeaders.ToByteArray())" |
| |
| # To properly obtain bytes, we use BitConverter to get hex dash |
| # encoding (e.g. AE-09-3F) and they decode in python |
| code += " + [System.BitConverter]::ToString($out);" |
| stdout, stderr = _execute( |
| [powershell_exe, '-Command', code], |
| os.getcwd(), |
| re.compile(r'Unable to connect to|TLS|Internal Server Error'), |
| 6 |
| ) |
| if stdout[-2:] == b'\r\n' and b'\r\n\r\n' in stdout: |
| # An extra trailing crlf is added at the end by powershell |
| stdout = stdout[0:-2] |
| parts = stdout.split(b'\r\n\r\n', 1) |
| if len(parts) == 2: |
| stdout = parts[0] + b'\r\n\r\n' + codecs.decode(parts[1].replace(b'-', b''), 'hex_codec') |
| |
| else: |
| args = [ |
| 'curl', |
| '--request', |
| method, |
| '--location', |
| '--silent', |
| '--show-error', |
| '--include', |
| # Prevent curl from asking for an HTTP "100 Continue" response |
| '--header', 'Expect:' |
| ] |
| for key in headers: |
| args.append('--header') |
| args.append("%s: %s" % (key, headers[key])) |
| args.append('--data-binary') |
| args.append('@%s' % tempf_path) |
| args.append(url) |
| stdout, stderr = _execute( |
| args, |
| os.getcwd(), |
| re.compile(r'Failed to connect to|TLS|SSLRead|outstanding|cleanly'), |
| 6 |
| ) |
| finally: |
| if tempf_path and os.path.exists(tempf_path): |
| os.remove(tempf_path) |
| |
| if len(stderr) > 0: |
| raise URLError("Error %sing %s:\n%s" % (method, url, stderr)) |
| |
| parts = stdout.split(b'\r\n\r\n', 1) |
| if len(parts) != 2: |
| raise URLError("Error %sing %s, response data malformed:\n%s" % (method, url, stdout)) |
| header_block, body = parts |
| |
| content_type_header = None |
| content_len_header = None |
| for hline in header_block.decode('iso-8859-1').splitlines(): |
| hline_parts = hline.split(':', 1) |
| if len(hline_parts) != 2: |
| continue |
| name, val = hline_parts |
| name = name.strip().lower() |
| val = val.strip() |
| if name == 'content-type': |
| content_type_header = val |
| if name == 'content-length': |
| content_len_header = val |
| |
| if content_type_header is None and content_len_header != '0': |
| raise URLError("Error %sing %s, no content-type header:\n%s" % (method, url, stdout)) |
| |
| if content_type_header is None: |
| content_type = 'text/plain' |
| encoding = 'utf-8' |
| else: |
| content_type, params = cgi.parse_header(content_type_header) |
| encoding = params.get('charset') |
| |
| return (content_type, encoding, body) |
| |
| |
| def _execute(params, cwd, retry=None, retries=0): |
| """ |
| Executes a subprocess |
| |
| :param params: |
| A list of the executable and arguments to pass to it |
| |
| :param cwd: |
| The working directory to execute the command in |
| |
| :param retry: |
| If this string is present in stderr, or regex pattern matches stderr, retry the operation |
| |
| :param retries: |
| An integer number of times to retry |
| |
| :return: |
| A 2-element tuple of (stdout, stderr) |
| """ |
| |
| proc = subprocess.Popen( |
| params, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| cwd=cwd |
| ) |
| stdout, stderr = proc.communicate() |
| code = proc.wait() |
| if code != 0: |
| if retry and retries > 0: |
| stderr_str = stderr.decode('utf-8') |
| if isinstance(retry, Pattern): |
| if retry.search(stderr_str) is not None: |
| time.sleep(5) |
| return _execute(params, cwd, retry, retries - 1) |
| elif retry in stderr_str: |
| time.sleep(5) |
| return _execute(params, cwd, retry, retries - 1) |
| e = OSError('subprocess exit code for "%s" was %d: %s' % (' '.join(params), code, stderr)) |
| e.stdout = stdout |
| e.stderr = stderr |
| raise e |
| return (stdout, stderr) |
| |
| |
| if __name__ == '__main__': |
| _codecov_submit() |