| # Copyright (C) 2023 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """ |
| This module provides a function for validating starlark data against a schema. |
| See validate() for more information. |
| """ |
| |
| _schema_schema = { |
| "type": "dict", |
| "optional_keys": { |
| "or": { |
| "type": "list", |
| "length": ">=2", |
| }, |
| "noneable": {"type": "bool"}, |
| "type": { |
| "type": "string", |
| "choices": [ |
| "NoneType", |
| "bool", |
| "int", |
| "float", |
| "string", |
| "bytes", |
| "list", |
| "tuple", |
| "dict", |
| "struct", |
| ], |
| }, |
| "choices": { |
| "type": "list", |
| "of": { |
| "or": [ |
| {"type": "string"}, |
| {"type": "int"}, |
| {"type": "float"}, |
| ], |
| }, |
| }, |
| "value": { |
| "or": [ |
| {"type": "string"}, |
| {"type": "int"}, |
| {"type": "float"}, |
| ], |
| }, |
| "of": {}, # to be filled in later |
| "unique": {"type": "bool"}, |
| "length": {"or": [ |
| {"type": "string"}, |
| {"type": "int"}, |
| ]}, |
| "required_keys": { |
| "type": "dict", |
| "values": {}, # to be filled in later |
| }, |
| "optional_keys": { |
| "type": "dict", |
| "values": {}, # to be filled in later |
| }, |
| "keys": {}, # to be filled in later |
| "values": {}, # to be filled in later |
| "required_fields": { |
| "type": "dict", |
| "keys": {"type": "string"}, |
| "values": {}, # to be filled in later |
| }, |
| "optional_fields": { |
| "type": "dict", |
| "keys": {"type": "string"}, |
| "values": {}, # to be filled in later |
| }, |
| }, |
| } |
| |
| _schema_schema["optional_keys"]["of"] = _schema_schema |
| _schema_schema["optional_keys"]["required_keys"]["values"] = _schema_schema |
| _schema_schema["optional_keys"]["optional_keys"]["values"] = _schema_schema |
| _schema_schema["optional_keys"]["keys"] = _schema_schema |
| _schema_schema["optional_keys"]["values"] = _schema_schema |
| _schema_schema["optional_keys"]["required_fields"]["values"] = _schema_schema |
| _schema_schema["optional_keys"]["optional_fields"]["values"] = _schema_schema |
| |
| def _check_len(obj, length): |
| if type(length) == "int": |
| return len(obj) == length |
| if length.startswith("<="): |
| return len(obj) <= int(length[2:]) |
| if length.startswith(">="): |
| return len(obj) >= int(length[2:]) |
| ln = int(length[1:]) |
| if length[0] == "=": |
| return len(obj) == ln |
| if length[0] == "<": |
| return len(obj) < ln |
| if length[0] == ">": |
| return len(obj) > ln |
| fail("Unexpected length format") |
| |
| def _validate_impl(obj, schema): |
| stack = [] |
| |
| def newStackFrame(obj, schema): |
| stack.append({ |
| "obj": obj, |
| "schema": schema, |
| "state": "start", |
| }) |
| |
| newStackFrame(obj, schema) |
| ret = "" |
| |
| # Because bazel doesn't allow infinite loops/recursion, just make a loop |
| # with an arbitrarily large number of iterations. |
| for _ in range(100000): |
| if not stack: |
| break |
| frame = stack[-1] |
| obj = frame["obj"] |
| schema = frame["schema"] |
| state = frame["state"] |
| |
| if state == "start": |
| if len(schema) == 0: |
| ret = "" |
| stack.pop() |
| continue |
| if "or" in schema: |
| if len(schema) != 1: |
| fail("an 'or' schema must not be accompanied by any other keys") |
| frame["i"] = 0 |
| frame["state"] = "or_loop" |
| frame["failures"] = [] |
| newStackFrame(obj, schema["or"][0]) |
| continue |
| if "type" not in schema: |
| fail("a non-empty/non-or schema must have a 'type' key: " + str(schema)) |
| if schema.get("noneable", False): |
| if obj == None: |
| ret = "" |
| stack.pop() |
| continue |
| ty = schema["type"] |
| if type(obj) != ty: |
| ret = "Expected %s, got %s" % (ty, type(obj)) |
| stack.pop() |
| continue |
| if "length" in schema: |
| if ty not in ["string", "bytes", "list", "tuple"]: |
| fail("'len' is only valid for string, bytes, lists, or tuples, got: " + ty) |
| if not _check_len(obj, schema["length"]): |
| ret = "Expected length %s, got %d" % (schema["length"], len(obj)) |
| stack.pop() |
| continue |
| if "choices" in schema: |
| if ty not in ["string", "int", "float"]: |
| fail("'choices' is only valid for string, int, or float, got: " + ty) |
| if obj not in schema["choices"]: |
| ret = "Expected one of %s, got %s" % (schema["choices"], obj) |
| stack.pop() |
| continue |
| if "value" in schema: |
| if ty not in ["string", "int", "float"]: |
| fail("'value' is only valid for string, int, or float, got: " + ty) |
| if obj != schema["value"]: |
| ret = "Expected %s, got %s" % (schema["value"], obj) |
| stack.pop() |
| continue |
| if schema.get("unique", False): |
| if ty != "list" and ty != "tuple": |
| fail("'unique' is only valid for lists or tuples, got: " + ty) |
| sorted_list = sorted(obj) |
| done = False |
| for i in range(len(sorted_list) - 1): |
| if type(sorted_list[i]) not in ["string", "int", "float", "bool", "NoneType", "bytes"]: |
| ret = "'unique' only works on lists/tuples of scalar types, got: " + type(sorted_list[i]) |
| stack.pop() |
| done = True |
| break |
| if sorted_list[i] == sorted_list[i + 1]: |
| ret = "Expected all elements to be unique, but saw '%s' twice" % str(sorted_list[i]) |
| stack.pop() |
| done = True |
| break |
| if done: |
| continue |
| if "of" in schema: |
| if ty != "list" and ty != "tuple": |
| fail("'of' is only valid for lists or tuples, got: " + ty) |
| if obj: |
| frame["i"] = 0 |
| frame["state"] = "of_loop" |
| newStackFrame(obj[0], schema["of"]) |
| continue |
| if ty == "dict": |
| if "required_fields" in schema or "optional_fields" in schema: |
| fail("a dict schema can't contain required_fields/optional_fields") |
| schema_names_keys = bool(schema.get("required_keys", {})) or bool(schema.get("optional_keys", {})) |
| schema_enforces_generic_keys = bool(schema.get("keys", {})) or bool(schema.get("values", {})) |
| if schema_names_keys and schema_enforces_generic_keys: |
| fail("Only required_keys/optional_keys or keys/values may be used, but not both") |
| if schema_names_keys: |
| all_keys = {} |
| done = False |
| for key, subSchema in schema.get("required_keys", {}).items(): |
| if key not in obj: |
| ret = "required key '" + key + "' not found" |
| stack.pop() |
| done = True |
| break |
| all_keys[key] = subSchema |
| if done: |
| continue |
| for key, subSchema in schema.get("optional_keys", {}).items(): |
| if key in all_keys: |
| fail("A key cannot be both required and optional: " + key) |
| if key in obj: |
| all_keys[key] = subSchema |
| extra_keys = [ |
| key |
| for key in obj.keys() |
| if key not in all_keys |
| ] |
| if extra_keys: |
| ret = "keys " + str(extra_keys) + " not allowed, valid keys: " + str(all_keys.keys()) |
| stack.pop() |
| continue |
| if all_keys: |
| frame["all_keys"] = all_keys.items() |
| frame["i"] = 0 |
| frame["state"] = "dict_individual_keys_loop" |
| k, v = frame["all_keys"][0] |
| newStackFrame(obj[k], v) |
| continue |
| elif schema_enforces_generic_keys: |
| frame["items"] = obj.items() |
| if frame["items"]: |
| frame["i"] = 0 |
| frame["state"] = "dict_generic_keys_loop" |
| frame["checking_key"] = True |
| continue |
| if ty == "struct": |
| if "required_keys" in schema or "optional_keys" in schema or "keys" in schema or "values" in schema: |
| fail("a struct schema can't contain required_keys/optional_keys/keys/values") |
| all_fields = {} |
| original_fields = {f: True for f in dir(obj)} |
| done = False |
| for field, subSchema in schema.get("required_fields", {}).items(): |
| if field not in original_fields: |
| ret = "required field '" + field + "' not found" |
| stack.pop() |
| done = True |
| break |
| all_fields[field] = subSchema |
| if done: |
| continue |
| for field, subSchema in schema.get("optional_fields", {}).items(): |
| if field in all_fields: |
| fail("A field cannot be both required and optional: " + key) |
| if field in original_fields: |
| all_fields[field] = subSchema |
| for field in all_fields: |
| if field == "to_json" or field == "to_proto": |
| fail("don't use deprecated fields to_json or to_proto") |
| extra_fields = [ |
| field |
| for field in original_fields.keys() |
| if field not in all_fields and field != "to_json" and field != "to_proto" |
| ] |
| if extra_fields: |
| ret = "fields " + str(extra_fields) + " not allowed, valid keys: " + str(all_fields.keys()) |
| stack.pop() |
| continue |
| if all_fields: |
| frame["all_fields"] = all_fields.items() |
| frame["i"] = 0 |
| frame["state"] = "struct_individual_fields_loop" |
| k, v = frame["all_fields"][0] |
| newStackFrame(getattr(obj, k), v) |
| continue |
| elif state == "or_loop": |
| if ret != "": |
| frame["failures"].append(" " + ret) |
| frame["i"] += 1 |
| if frame["i"] >= len(schema["or"]): |
| ret = "did not match any schemas in 'or' list, errors:\n" + "\n".join(frame["failures"]) |
| stack.pop() |
| continue |
| else: |
| newStackFrame(obj, schema["or"][frame["i"]]) |
| continue |
| elif state == "of_loop": |
| frame["i"] += 1 |
| if ret != "" or frame["i"] >= len(obj): |
| stack.pop() |
| continue |
| newStackFrame(obj[frame["i"]], schema["of"]) |
| continue |
| elif state == "dict_individual_keys_loop": |
| frame["i"] += 1 |
| if ret != "" or frame["i"] >= len(frame["all_keys"]): |
| stack.pop() |
| continue |
| k, v = frame["all_keys"][frame["i"]] |
| newStackFrame(obj[k], v) |
| continue |
| elif state == "dict_generic_keys_loop": |
| if ret != "" or frame["i"] >= len(frame["items"]): |
| stack.pop() |
| continue |
| k, v = frame["items"][frame["i"]] |
| if frame["checking_key"]: |
| frame["checking_key"] = False |
| newStackFrame(k, schema.get("keys", {})) |
| continue |
| else: |
| frame["checking_key"] = True |
| frame["i"] += 1 |
| newStackFrame(v, schema.get("values", {})) |
| continue |
| elif state == "struct_individual_fields_loop": |
| frame["i"] += 1 |
| if ret != "" or frame["i"] >= len(frame["all_fields"]): |
| stack.pop() |
| continue |
| k, v = frame["all_fields"][frame["i"]] |
| newStackFrame(getattr(obj, k), v) |
| continue |
| |
| # by default return success |
| ret = "" |
| stack.pop() |
| if stack: |
| fail("Schema validation took too many iterations") |
| return ret |
| |
| def validate(obj, schema, *, validate_schema = True, fail_on_error = True): |
| """Validates the given starlark object against a schema. |
| |
| A schema is a dictionary that describes the format of obj. Currently, |
| recursive objects cannot be validated because there's no cycle detection. |
| |
| An empty dictionary describes "any object". |
| |
| A dictionary with an "or" key must not have any other keys, and its |
| value is a list of other schema objects. If any of those schema objects |
| match, the "or" schema is considered a success. |
| |
| Any schemas that are not empty or "or" schemas must have a "type" key. |
| This type must match the result of type(obj). |
| |
| The "noneable" key can be set to true to act as an alias for: |
| `{"or": [{"type": "NoneType"}, ...the rest of the schema...]}` |
| |
| The "value" key contains a value that must match the object exactly. |
| Only applies to strings, ints, and floats. |
| |
| The "choices" key is a list of values that the object could match. |
| If the object is equal to any one of them then validation succeeds. |
| |
| The "length" key applies to strings, bytes, lists, or tuples. |
| Its value can either be an integer length that the object must have, |
| or a string in that starts with <, >, <=, >=, or =, followed by a number. |
| |
| The "of" key is a schema to match against the elements of a list/tuple. |
| |
| Dictionaries and structs have "required_keys"/"required_fields" and |
| "optional_keys"/"optional_fields". (keys for dictionaries, fields for |
| structs). The value of each of these fields is a dictionary mapping from |
| the key/field value to a schema object to validate the value of the |
| key/field. Any keys/fields that are not listed in the schema will cause |
| the validation to fail. Any keys/fields in the required_ schemas must |
| be present in the input object. |
| |
| Dictionaries have two additional fields over structs, "keys" and "values". |
| These fields cannot be mixed with required_keys/optional_keys. They provide |
| a single schema object each to apply to all the keys/values in the dictionary. |
| |
| Args: |
| obj: The object to be validated against the schema |
| schema: The schema. (See above) |
| validate_schema: Also check that the schema itself is valid. This |
| can be disabled for performance. However, some of the checks |
| about the schema are hardcoded and cannot be disabled. |
| fail_on_error: If this function should fail() when the object doesn't |
| conform to the schema. Note that if the schema itself is invalid, |
| validate() fails regardless of the value of this argument. |
| Returns: |
| If fail_on_error is True, validate() doesn't return anything. |
| If fail_on_error is False, validate() returns a string that describes |
| the reason why the object doesn't match the schema, or an empty string |
| if it does match. |
| """ |
| if validate_schema: |
| schema_validation_results = _validate_impl(schema, _schema_schema) |
| if schema_validation_results: |
| fail("Schema is invalid: " + schema_validation_results) |
| result = _validate_impl(obj, schema) |
| if not fail_on_error: |
| return result |
| if result: |
| fail(result) |
| return None |