blob: 4cbc5cd5573bb163aac3c9f37c3db68e9ce0df1e [file] [log] [blame] [edit]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import chrome
from autotest_lib.client.cros import cryptohome
from autotest_lib.client.cros import enterprise_base
from autotest_lib.client.cros import httpd
'prod': [],
'cr-dev': CROSDEV_FLAGS,
'cr-auto': CROSAUTO_FLAGS,
'dm-test': TESTDMS_FLAGS,
'dm-fake': TESTDMS_FLAGS
'prod': '',
'cr-dev': '',
'cr-auto': '',
'dm-test': '',
'dm-fake': ''
DMSERVER = '--device-management-url=%s'
class EnterprisePolicyTest(enterprise_base.EnterpriseTest):
"""Base class for Enterprise Policy Tests."""
def setup(self):
def initialize(self, args=()):
# Start AutoTest DM Server if using local fake server.
if self.env == 'dm-fake':
self._web_server = None
def cleanup(self):
# Clean up AutoTest DM Server if using local fake server.
if self.env == 'dm-fake':
super(EnterprisePolicyTest, self).cleanup()
# Stop web server if it was started.
if self._web_server:
# Close Chrome instance if opened.
def start_webserver(self, port):
"""Set up an HTTP Server to serve pages from localhost.
@param port: Port used by HTTP server.
if self.mode != 'list':
self._web_server = httpd.HTTPListener(port, docroot=self.bindir)
def _initialize_test_context(self, args=()):
"""Initialize class-level test context parameters.
@raises error.TestError if an arg is given an invalid value or some
combination of args is given incompatible values.
# Extract local parameters from command line args.
args_dict = utils.args_to_dict(args)
self.mode = args_dict.get('mode', 'all') = args_dict.get('case')
self.value = args_dict.get('value')
self.env = args_dict.get('env', 'dm-fake')
self.username = args_dict.get('username')
self.password = args_dict.get('password')
self.dms_name = args_dict.get('dms_name')
# If |mode| is 'list', set |env| to generic 'prod', and blank out
# the other key parameters: case, value.
if self.mode == 'list':
self.env = 'prod' = None
self.value = None
# If |case| is given then set |mode| to 'single'.
self.mode = 'single'
# If |mode| is 'all', then |env| must be 'dm-fake', and
# the |case| and |value| args must not be given.
if self.mode == 'all':
if self.env != 'dm-fake':
raise error.TestError('env must be "dm-fake" '
'when mode=all.')
raise error.TestError('case must not be given '
'when mode=all.')
if self.value:
raise error.TestError('value must not be given '
'when mode=all.')
# If |value| is given, set |is_value_given| flag to True. If it
# was given as 'none', 'null', or '', then set |value| to 'null'.
if self.value is not None:
self.is_value_given = True
if (self.value.lower() == 'none' or
self.value.lower() == 'null' or
self.value == ''):
self.value = 'null'
self.is_value_given = False
# Verify |env| is a valid environment.
if self.env is not None and self.env not in FLAGS_DICT:
raise error.TestError('env=%s is invalid.' % self.env)
# If |env| is 'dm-fake', ensure value and credentials are not given.
if self.env == 'dm-fake':
if self.is_value_given:
raise error.TestError('value must not be given when using '
'the fake DM Server.')
if self.username or self.password:
raise error.TestError('user credentials must not be given '
'when using the fake DM Server.')
# If either credential is not given, set both to default.
if self.username is None or self.password is None:
self.username = self.USERNAME
self.password = self.PASSWORD
# Verify |case| is given if |mode|==single.
if self.mode == 'single' and not
raise error.TestError('case must be given when mode is single.')
# Verify |case| is given if a |value| is given.
if self.is_value_given and is None:
raise error.TestError('value must not be given without also '
'giving a test case.')
# Verify |dms_name| is given iff |env|==dm-test.
if self.env == 'dm-test' and not self.dms_name:
raise error.TestError('dms_name must be given when using '
if self.env != 'dm-test' and self.dms_name:
raise error.TestError('dms_name must not be given when not using '
# Log the test context parameters.'Test Context Parameters:')' Run Mode: %r', self.mode)' Test Case: %r',' Expected Value: %r', self.value)' Environment: %r', self.env)' Username: %r', self.username)' Password: %r', self.password)' Test DMS Name: %r', self.dms_name)
def _initialize_chrome_extra_flags(self):
"""Initialize flags used to create Chrome instance."""
# Construct DM Server URL flags.
env_flag_list = []
if self.env != 'prod':
if self.env == 'dm-fake':
# Use URL provided by AutoTest DM server.
dmserver_str = (DMSERVER % self.dm_server_url)
# Use URL defined in DMS URL dictionary.
dmserver_str = (DMSERVER % (DMS_URL_DICT[self.env]))
if self.env == 'dm-test':
dmserver_str = (dmserver_str % self.dms_name)
# Merge with other flags needed by non-prod enviornment.
env_flag_list = ([dmserver_str] + FLAGS_DICT[self.env])
self.extra_flags = env_flag_list = None
def setup_case(self, policy_name, policy_value, policies_json):
"""Set up and confirm the preconditions of a test case.
If the AutoTest fake DM Server is initialized, make a policy blob
from |policies_json|, and upload it to the fake server.
Launch a chrome browser, and sign in to Chrome OS. Examine the user's
cryptohome vault, to confirm it signed in successfully.
Open the Policies page, and confirm that it shows the specified
|policy_name| and has the correct |policy_value|.
@param policy_name: Name of the policy under test.
@param policy_value: Expected value to appear on chrome://policy page.
@param policies_json: JSON string to set up the fake DMS policy value.
@raises error.TestError if cryptohome vault is not mounted for user.
@raises error.TestFail if |policy_name| and |policy_value| are not
shown on the Policies page.
# Set up policy on AutoTest DM Server only if initialized.
if self.env == 'dm-fake':
tab = self.navigate_to_url('chrome://policy')
if not cryptohome.is_vault_mounted(user=self.username,
raise error.TestError('Expected to find a mounted vault for %s.'
% self.username)
value_shown = self._get_policy_value_shown(tab, policy_name)
if not self._policy_value_matches_shown(policy_value, value_shown):
raise error.TestFail('Policy value shown is not correct: %s '
'(expected: %s)' %
(value_shown, policy_value))
def _launch_chrome_browser(self):
"""Launch Chrome browser and sign in."""'Chrome Browser Arguments:')' extra_browser_args: %s', self.extra_flags)' username: %s', self.username)' password: %s', self.password) = chrome.Chrome(extra_browser_args=self.extra_flags,
def navigate_to_url(self, url, tab=None):
"""Navigate tab to the specified |url|. Create new tab if none given.
@param url: URL of web page to load.
@param tab: browser tab to load (if any).
@returns: browser tab loaded with web page.
"""'Navigating to URL: %r', url)
if not tab:
tab =
tab.Navigate(url, timeout=5)
return tab
def _policy_value_matches_shown(self, policy_value, value_shown):
"""Compare |policy_value| to |value_shown| with whitespace removed.
Compare the expected policy value with the value actually shown on the
chrome://policies page. Before comparing, convert both values to JSON
formatted strings, and remove all whitespace. Whitespace is removed
because Chrome processes some policy values to show them in a more
human readable format.
@param policy_value: Expected value to appear on chrome://policy page.
@param value_shown: Value as it appears on chrome://policy page.
@param policies_json: JSON string to set up the fake DMS policy value.
@returns: True if the strings match after removing all whitespace.
# Convert Python None or '' to JSON formatted 'null' string.
if value_shown is None or value_shown == '':
value_shown = 'null'
if policy_value is None or policy_value == '':
policy_value = 'null'
# Remove whitespace.
trimmed_value = ''.join(policy_value.split())
trimmed_shown = ''.join(value_shown.split())'Trimmed policy value shown: %r (expected: %r)',
trimmed_shown, trimmed_value)
return trimmed_value == trimmed_shown
def _make_json_blob(self, policies_json):
"""Create policy blob from policies JSON object.
@param policies_json: Policies JSON object (name-value pairs).
@returns: Policy blob to be used to setup the policy server.
policies_json = self._move_modeless_to_mandatory(policies_json)
policies_json = self._remove_null_policies(policies_json)
policy_blob = """{
"google/chromeos/user": %s,
"managed_users": ["*"],
"policy_user": "%s",
"current_key_index": 0,
"invalidation_source": 16,
"invalidation_name": "test_policy"
}""" % (json.dumps(policies_json), self.USERNAME)
return policy_blob
def _move_modeless_to_mandatory(self, policies_json):
"""Add the 'mandatory' mode if a policy's mode was omitted.
The AutoTest fake DM Server requires that every policy be contained
within either a 'mandatory' or 'recommended' dictionary, to indicate
the mode of the policy. This function moves modeless policies into
the 'mandatory' dictionary.
@param policies_json: The policy JSON data (name-value pairs).
@returns: dict of policies grouped by mode keys.
mandatory_policies = {}
recommended_policies = {}
collated_json = {}
# Extract mandatory and recommended mode dicts.
if 'mandatory' in policies_json:
mandatory_policies = policies_json['mandatory']
del policies_json['mandatory']
if 'recommended' in policies_json:
recommended_policies = policies_json['recommended']
del policies_json['recommended']
# Move any remaining modeless policies into mandatory dict.
if policies_json:
# Collate all policies into mandatory & recommended dicts.
if recommended_policies:
collated_json.update({'recommended': recommended_policies})
if mandatory_policies:
collated_json.update({'mandatory': mandatory_policies})
return collated_json
def _remove_null_policies(self, policies_json):
"""Remove policy dict data that is set to None or ''.
For the status of a policy to be shown as "Not set" on the
chrome://policy page, the policy blob must contain no dictionary entry
for that policy. This function removes policy NVPs from a copy of the
|policies_json| dictionary that the test case had set to None or ''.
@param policies_json: setup policy JSON data (name-value pairs).
@returns: setup policy JSON data with all 'Not set' policies removed.
policies_json_copy = policies_json.copy()
for policies in policies_json_copy.values():
for policy_data in policies.items():
if policy_data[1] is None or policy_data[1] == '':
return policies_json_copy
def _get_policy_value_shown(self, policy_tab, policy_name):
"""Get the value shown for the named policy on the Policies page.
Takes |policy_name| as a parameter and returns the corresponding
policy value shown on the chrome://policy page.
@param policy_tab: Tab displaying the chrome://policy page.
@param policy_name: The name of the policy.
@returns: The value shown for the policy on the Policies page.
row_values = policy_tab.EvaluateJavaScript('''
var section = document.getElementsByClassName("policy-table-section")[0];
var table = section.getElementsByTagName('table')[0];
rowValues = '';
for (var i = 1, row; row = table.rows[i]; i++) {
if (row.className !== 'expanded-value-container') {
var name_div = row.getElementsByClassName('name elide')[0];
var name = name_div.textContent;
if (name === '%s') {
var value_span = row.getElementsByClassName('value')[0];
var value = value_span.textContent;
var status_div = row.getElementsByClassName('status elide')[0];
var status = status_div.textContent;
rowValues = [name, value, status];
''' % policy_name)
value_shown = row_values[1].encode('ascii', 'ignore')
status_shown = row_values[2].encode('ascii', 'ignore')
if status_shown == 'Not set.':
return None
return value_shown
def get_elements_from_page(self, tab, cmd):
"""Get collection of page elements that match the |cmd| filter.
@param tab: tab containing the page to be scraped.
@param cmd: JavaScript command to evaluate on the page.
@returns object containing elements on page that match the cmd.
@raises: TestFail if matching elements are not found on the page.
elements = tab.EvaluateJavaScript(cmd)
except Exception as err:
raise error.TestFail('Unable to find matching elements on '
'the test page: %s\n %r' %(tab.url, err))
return elements
def json_string(self, policy_value):
"""Convert policy value to a JSON formatted string.
@param policy_value: object containing a policy value.
@returns: string in JSON format.
return json.dumps(policy_value)
def _validate_and_run_test_case(self, test_case, run_test):
"""Validate test case and call the test runner in the test class.
@param test_case: name of the test case to run.
@param run_test: method in test class that runs a test case.
if test_case not in self.TEST_CASES:
raise error.TestError('Test case is not valid: %s' % test_case)'Running test case: %s', test_case)
def run_once_impl(self, run_test):
"""Dispatch the common run modes for all child test classes.
@param run_test: method in test class that runs a test case.
if self.mode == 'all':
for test_case in sorted(self.TEST_CASES):
self._validate_and_run_test_case(test_case, run_test)
elif self.mode == 'single':
self._validate_and_run_test_case(, run_test)
elif self.mode == 'list':'List Test Cases:')
for test_case, value in sorted(self.TEST_CASES.items()):' case=%s, value="%s"', test_case, value)
raise error.TestError('Run mode is not valid: %s' % self.mode)
def run_once(self):
# The run_once() method is core to all autotest tests. We define it
# herein to support tests that do not define their own override.