| #!/usr/bin/python2.7 |
| # Copyright 2019 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This implements a simple tool to create policy blobs signed with a given |
| key. It can create both device and user policies. The output will consist of |
| two files a policy file and the owner.key file which contains the policy |
| signature. |
| |
| The input file is JSON. The root dictionary contains a list under the |
| key "managed_users". Keys in the root dictionary identify request scopes. |
| The user-request scope is described by a dictionary that holds two |
| sub-dictionaries: "mandatory" and "recommended". Both these hold the policy |
| definitions as key/value stores, their format is identical to what the Linux |
| implementation reads from /etc. |
| The device-scope holds the policy-definition directly as key/value stores |
| in the protobuf-format. |
| |
| Example: |
| |
| { |
| "google/chromeos/device" : { |
| "guest_mode_enabled" : false |
| }, |
| "google/chromeos/user" : { |
| "mandatory" : { |
| "HomepageLocation" : "http://www.chromium.org", |
| "IncognitoEnabled" : false |
| }, |
| "recommended" : { |
| "JavascriptEnabled": false |
| } |
| } |
| } |
| |
| """ |
| |
| import optparse |
| import os |
| import re |
| import sys |
| import time |
| import tlslite |
| import tlslite.api |
| import tlslite.utils |
| |
| # The name and availability of the json module varies in python versions. |
| try: |
| import simplejson as json |
| except ImportError: |
| try: |
| import json |
| except ImportError: |
| json = None |
| |
| import asn1der |
| import device_management_backend_pb2 as dm |
| import cloud_policy_pb2 as cp |
| import chrome_device_policy_pb2 as dp |
| |
| # ASN.1 object identifier for PKCS#1/RSA. |
| PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' |
| |
| def SetProtobufMessageField(group_message, field, field_value): |
| '''Sets a field in a protobuf message. |
| |
| Args: |
| group_message: The protobuf message. |
| field: The field of the message to set, it shuold be a member of |
| group_message.DESCRIPTOR.fields. |
| field_value: The value to set. |
| ''' |
| if field.label == field.LABEL_REPEATED: |
| assert type(field_value) == list |
| entries = group_message.__getattribute__(field.name) |
| for list_item in field_value: |
| entries.append(list_item) |
| return |
| elif field.type == field.TYPE_BOOL: |
| assert type(field_value) == bool |
| elif field.type == field.TYPE_STRING: |
| assert type(field_value) == str or type(field_value) == unicode |
| elif field.type == field.TYPE_INT64: |
| assert type(field_value) == int |
| elif (field.type == field.TYPE_MESSAGE and |
| field.message_type.name == 'StringList'): |
| assert type(field_value) == list |
| entries = group_message.__getattribute__(field.name).entries |
| for list_item in field_value: |
| entries.append(list_item) |
| return |
| else: |
| raise Exception('Unknown field type %s' % field.type) |
| group_message.__setattr__(field.name, field_value) |
| |
| def GatherDevicePolicySettings(settings, policies): |
| '''Copies all the policies from a dictionary into a protobuf of type |
| CloudDeviceSettingsProto. |
| |
| Args: |
| settings: The destination ChromeDeviceSettingsProto protobuf. |
| policies: The source dictionary containing policies in JSON format. |
| ''' |
| for group in settings.DESCRIPTOR.fields: |
| # Create protobuf message for group. |
| group_message = eval('dp.' + group.message_type.name + '()') |
| # Indicates if at least one field was set in |group_message|. |
| got_fields = False |
| # Iterate over fields of the message and feed them from the |
| # policy config file. |
| for field in group_message.DESCRIPTOR.fields: |
| field_value = None |
| if field.name in policies: |
| got_fields = True |
| field_value = policies[field.name] |
| SetProtobufMessageField(group_message, field, field_value) |
| if got_fields: |
| settings.__getattribute__(group.name).CopyFrom(group_message) |
| |
| def GatherUserPolicySettings(settings, policies): |
| '''Copies all the policies from a dictionary into a protobuf of type |
| CloudPolicySettings. |
| |
| Args: |
| settings: The destination: a CloudPolicySettings protobuf. |
| policies: The source: a dictionary containing policies under keys |
| 'recommended' and 'mandatory'. |
| ''' |
| for group in settings.DESCRIPTOR.fields: |
| # Create protobuf message for group. |
| group_message = eval('cp.' + group.message_type.name + '()') |
| # We assume that this policy group will be recommended, and only switch |
| # it to mandatory if at least one of its members is mandatory. |
| group_message.policy_options.mode = cp.PolicyOptions.RECOMMENDED |
| # Indicates if at least one field was set in |group_message|. |
| got_fields = False |
| # Iterate over fields of the message and feed them from the |
| # policy config file. |
| for field in group_message.DESCRIPTOR.fields: |
| field_value = None |
| if field.name in policies['mandatory']: |
| group_message.policy_options.mode = cp.PolicyOptions.MANDATORY |
| field_value = policies['mandatory'][field.name] |
| elif field.name in policies['recommended']: |
| field_value = policies['recommended'][field.name] |
| if field_value != None: |
| got_fields = True |
| SetProtobufMessageField(group_message, field, field_value) |
| if got_fields: |
| settings.__getattribute__(group.name).CopyFrom(group_message) |
| |
| def ProcessCloudPolicy(policy_type, |
| policy_def, policy_key, |
| username, |
| output_path): |
| """Creates a policy blob. |
| |
| Encodes the policy into protobuf representation, signs it and saves it. |
| |
| Args: |
| policy_type: can be 'google/chromeos/user' or 'google/chromeos/device'. |
| policy_def: The JSON file containing the policy definition. |
| policy_key: A private key to be used to sign the blob. |
| username: Username to be integrated in the policy blob. |
| output_path: A directory where to put the output files. |
| """ |
| policy = json.loads(open(policy_def).read()) |
| policy_value = '' |
| if (policy_type in policy): |
| if policy_type == 'google/chromeos/user': |
| settings = cp.CloudPolicySettings() |
| GatherUserPolicySettings(settings, policy[policy_type]) |
| policy_value = settings.SerializeToString() |
| elif policy_type == 'google/chromeos/device': |
| settings = dp.ChromeDeviceSettingsProto() |
| GatherDevicePolicySettings(settings, policy[policy_type]) |
| policy_value = settings.SerializeToString() |
| |
| key = tlslite.api.parsePEMKey(open(policy_key).read(), private=True) |
| |
| algorithm = asn1der.Sequence( |
| [ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), |
| asn1der.Data(asn1der.NULL, '') ]) |
| rsa_pubkey = asn1der.Sequence([ asn1der.Integer(key.n), |
| asn1der.Integer(key.e) ]) |
| pubkey = asn1der.Sequence([ algorithm, asn1der.Bitstring(rsa_pubkey) ]) |
| key_version = 1 |
| |
| # Fill the policy data protobuf. |
| policy_data = dm.PolicyData() |
| policy_data.policy_type = policy_type |
| policy_data.timestamp = int(time.time() * 1000) |
| policy_data.request_token = "DEV_TOKEN" |
| policy_data.policy_value = policy_value |
| policy_data.machine_name = "MEAN_MACHINE" |
| policy_data.public_key_version = 1 |
| policy_data.username = username |
| policy_data.device_id = "1337_1D" |
| signed_data = policy_data.SerializeToString() |
| |
| response = dm.DeviceManagementResponse() |
| fetch_response = response.policy_response.responses.add() |
| fetch_response.policy_data = signed_data |
| fetch_response.policy_data_signature = bytes( |
| key.hashAndSign(signed_data)) |
| fetch_response.new_public_key = pubkey |
| |
| open("%s/policy" % output_path,"wb"). |
| write(fetch_response.SerializeToString()); |
| open("%s/owner.key" % output_path,"wb").write(pubkey); |
| |
| def main(options): |
| ProcessCloudPolicy(options.policy_type, |
| options.policy_def, options.policy_key, |
| options.policy_user, |
| options.output_path); |
| |
| if __name__ == '__main__': |
| option_parser = optparse.OptionParser() |
| option_parser.add_option('-k', '--policy-key', default="mykey", |
| dest='policy_key', |
| help='Specify a path to a PEM-encoded private key ' |
| 'to use for policy signing.') |
| option_parser.add_option('-p', '--policy-def', default="device_management", |
| dest='policy_def', |
| help='Specify a path to a PEM-encoded private key ' |
| 'to use for policy signing.') |
| option_parser.add_option('-u', '--policy-user', default='[email protected]', |
| dest='policy_user', |
| help='Specify the user name the server should ' |
| 'report back to the client as the user owning the ' |
| 'token used for making the policy request.') |
| option_parser.add_option('-o', '--output-path', default='.', |
| dest='output_path', |
| help='Specifies the directory to output policy ' |
| 'files to.') |
| option_parser.add_option('-t', '--type', default='google/chromeos/device', |
| dest='policy_type', |
| help='Specifies the type of policy to create.') |
| options, args = option_parser.parse_args() |
| |
| sys.exit(main(options)) |