blob: b1952a8c367603121bf562cdd594fcd4bd32156d [file] [log] [blame]
# Copyright (C) 2023 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module provides a function for validating starlark data against a schema.
See validate() for more information.
"""
_schema_schema = {
"type": "dict",
"optional_keys": {
"or": {
"type": "list",
"length": ">=2",
},
"noneable": {"type": "bool"},
"type": {
"type": "string",
"choices": [
"NoneType",
"bool",
"int",
"float",
"string",
"bytes",
"list",
"tuple",
"dict",
"struct",
],
},
"choices": {
"type": "list",
"of": {
"or": [
{"type": "string"},
{"type": "int"},
{"type": "float"},
],
},
},
"value": {
"or": [
{"type": "string"},
{"type": "int"},
{"type": "float"},
],
},
"of": {}, # to be filled in later
"unique": {"type": "bool"},
"length": {"or": [
{"type": "string"},
{"type": "int"},
]},
"required_keys": {
"type": "dict",
"values": {}, # to be filled in later
},
"optional_keys": {
"type": "dict",
"values": {}, # to be filled in later
},
"keys": {}, # to be filled in later
"values": {}, # to be filled in later
"required_fields": {
"type": "dict",
"keys": {"type": "string"},
"values": {}, # to be filled in later
},
"optional_fields": {
"type": "dict",
"keys": {"type": "string"},
"values": {}, # to be filled in later
},
},
}
_schema_schema["optional_keys"]["of"] = _schema_schema
_schema_schema["optional_keys"]["required_keys"]["values"] = _schema_schema
_schema_schema["optional_keys"]["optional_keys"]["values"] = _schema_schema
_schema_schema["optional_keys"]["keys"] = _schema_schema
_schema_schema["optional_keys"]["values"] = _schema_schema
_schema_schema["optional_keys"]["required_fields"]["values"] = _schema_schema
_schema_schema["optional_keys"]["optional_fields"]["values"] = _schema_schema
def _check_len(obj, length):
if type(length) == "int":
return len(obj) == length
if length.startswith("<="):
return len(obj) <= int(length[2:])
if length.startswith(">="):
return len(obj) >= int(length[2:])
ln = int(length[1:])
if length[0] == "=":
return len(obj) == ln
if length[0] == "<":
return len(obj) < ln
if length[0] == ">":
return len(obj) > ln
fail("Unexpected length format")
def _validate_impl(obj, schema):
stack = []
def newStackFrame(obj, schema):
stack.append({
"obj": obj,
"schema": schema,
"state": "start",
})
newStackFrame(obj, schema)
ret = ""
# Because bazel doesn't allow infinite loops/recursion, just make a loop
# with an arbitrarily large number of iterations.
for _ in range(100000):
if not stack:
break
frame = stack[-1]
obj = frame["obj"]
schema = frame["schema"]
state = frame["state"]
if state == "start":
if len(schema) == 0:
ret = ""
stack.pop()
continue
if "or" in schema:
if len(schema) != 1:
fail("an 'or' schema must not be accompanied by any other keys")
frame["i"] = 0
frame["state"] = "or_loop"
frame["failures"] = []
newStackFrame(obj, schema["or"][0])
continue
if "type" not in schema:
fail("a non-empty/non-or schema must have a 'type' key: " + str(schema))
if schema.get("noneable", False):
if obj == None:
ret = ""
stack.pop()
continue
ty = schema["type"]
if type(obj) != ty:
ret = "Expected %s, got %s" % (ty, type(obj))
stack.pop()
continue
if "length" in schema:
if ty not in ["string", "bytes", "list", "tuple"]:
fail("'len' is only valid for string, bytes, lists, or tuples, got: " + ty)
if not _check_len(obj, schema["length"]):
ret = "Expected length %s, got %d" % (schema["length"], len(obj))
stack.pop()
continue
if "choices" in schema:
if ty not in ["string", "int", "float"]:
fail("'choices' is only valid for string, int, or float, got: " + ty)
if obj not in schema["choices"]:
ret = "Expected one of %s, got %s" % (schema["choices"], obj)
stack.pop()
continue
if "value" in schema:
if ty not in ["string", "int", "float"]:
fail("'value' is only valid for string, int, or float, got: " + ty)
if obj != schema["value"]:
ret = "Expected %s, got %s" % (schema["value"], obj)
stack.pop()
continue
if schema.get("unique", False):
if ty != "list" and ty != "tuple":
fail("'unique' is only valid for lists or tuples, got: " + ty)
sorted_list = sorted(obj)
done = False
for i in range(len(sorted_list) - 1):
if type(sorted_list[i]) not in ["string", "int", "float", "bool", "NoneType", "bytes"]:
ret = "'unique' only works on lists/tuples of scalar types, got: " + type(sorted_list[i])
stack.pop()
done = True
break
if sorted_list[i] == sorted_list[i + 1]:
ret = "Expected all elements to be unique, but saw '%s' twice" % str(sorted_list[i])
stack.pop()
done = True
break
if done:
continue
if "of" in schema:
if ty != "list" and ty != "tuple":
fail("'of' is only valid for lists or tuples, got: " + ty)
if obj:
frame["i"] = 0
frame["state"] = "of_loop"
newStackFrame(obj[0], schema["of"])
continue
if ty == "dict":
if "required_fields" in schema or "optional_fields" in schema:
fail("a dict schema can't contain required_fields/optional_fields")
schema_names_keys = bool(schema.get("required_keys", {})) or bool(schema.get("optional_keys", {}))
schema_enforces_generic_keys = bool(schema.get("keys", {})) or bool(schema.get("values", {}))
if schema_names_keys and schema_enforces_generic_keys:
fail("Only required_keys/optional_keys or keys/values may be used, but not both")
if schema_names_keys:
all_keys = {}
done = False
for key, subSchema in schema.get("required_keys", {}).items():
if key not in obj:
ret = "required key '" + key + "' not found"
stack.pop()
done = True
break
all_keys[key] = subSchema
if done:
continue
for key, subSchema in schema.get("optional_keys", {}).items():
if key in all_keys:
fail("A key cannot be both required and optional: " + key)
if key in obj:
all_keys[key] = subSchema
extra_keys = [
key
for key in obj.keys()
if key not in all_keys
]
if extra_keys:
ret = "keys " + str(extra_keys) + " not allowed, valid keys: " + str(all_keys.keys())
stack.pop()
continue
if all_keys:
frame["all_keys"] = all_keys.items()
frame["i"] = 0
frame["state"] = "dict_individual_keys_loop"
k, v = frame["all_keys"][0]
newStackFrame(obj[k], v)
continue
elif schema_enforces_generic_keys:
frame["items"] = obj.items()
if frame["items"]:
frame["i"] = 0
frame["state"] = "dict_generic_keys_loop"
frame["checking_key"] = True
continue
if ty == "struct":
if "required_keys" in schema or "optional_keys" in schema or "keys" in schema or "values" in schema:
fail("a struct schema can't contain required_keys/optional_keys/keys/values")
all_fields = {}
original_fields = {f: True for f in dir(obj)}
done = False
for field, subSchema in schema.get("required_fields", {}).items():
if field not in original_fields:
ret = "required field '" + field + "' not found"
stack.pop()
done = True
break
all_fields[field] = subSchema
if done:
continue
for field, subSchema in schema.get("optional_fields", {}).items():
if field in all_fields:
fail("A field cannot be both required and optional: " + key)
if field in original_fields:
all_fields[field] = subSchema
for field in all_fields:
if field == "to_json" or field == "to_proto":
fail("don't use deprecated fields to_json or to_proto")
extra_fields = [
field
for field in original_fields.keys()
if field not in all_fields and field != "to_json" and field != "to_proto"
]
if extra_fields:
ret = "fields " + str(extra_fields) + " not allowed, valid keys: " + str(all_fields.keys())
stack.pop()
continue
if all_fields:
frame["all_fields"] = all_fields.items()
frame["i"] = 0
frame["state"] = "struct_individual_fields_loop"
k, v = frame["all_fields"][0]
newStackFrame(getattr(obj, k), v)
continue
elif state == "or_loop":
if ret != "":
frame["failures"].append(" " + ret)
frame["i"] += 1
if frame["i"] >= len(schema["or"]):
ret = "did not match any schemas in 'or' list, errors:\n" + "\n".join(frame["failures"])
stack.pop()
continue
else:
newStackFrame(obj, schema["or"][frame["i"]])
continue
elif state == "of_loop":
frame["i"] += 1
if ret != "" or frame["i"] >= len(obj):
stack.pop()
continue
newStackFrame(obj[frame["i"]], schema["of"])
continue
elif state == "dict_individual_keys_loop":
frame["i"] += 1
if ret != "" or frame["i"] >= len(frame["all_keys"]):
stack.pop()
continue
k, v = frame["all_keys"][frame["i"]]
newStackFrame(obj[k], v)
continue
elif state == "dict_generic_keys_loop":
if ret != "" or frame["i"] >= len(frame["items"]):
stack.pop()
continue
k, v = frame["items"][frame["i"]]
if frame["checking_key"]:
frame["checking_key"] = False
newStackFrame(k, schema.get("keys", {}))
continue
else:
frame["checking_key"] = True
frame["i"] += 1
newStackFrame(v, schema.get("values", {}))
continue
elif state == "struct_individual_fields_loop":
frame["i"] += 1
if ret != "" or frame["i"] >= len(frame["all_fields"]):
stack.pop()
continue
k, v = frame["all_fields"][frame["i"]]
newStackFrame(getattr(obj, k), v)
continue
# by default return success
ret = ""
stack.pop()
if stack:
fail("Schema validation took too many iterations")
return ret
def validate(obj, schema, *, validate_schema = True, fail_on_error = True):
"""Validates the given starlark object against a schema.
A schema is a dictionary that describes the format of obj. Currently,
recursive objects cannot be validated because there's no cycle detection.
An empty dictionary describes "any object".
A dictionary with an "or" key must not have any other keys, and its
value is a list of other schema objects. If any of those schema objects
match, the "or" schema is considered a success.
Any schemas that are not empty or "or" schemas must have a "type" key.
This type must match the result of type(obj).
The "noneable" key can be set to true to act as an alias for:
`{"or": [{"type": "NoneType"}, ...the rest of the schema...]}`
The "value" key contains a value that must match the object exactly.
Only applies to strings, ints, and floats.
The "choices" key is a list of values that the object could match.
If the object is equal to any one of them then validation succeeds.
The "length" key applies to strings, bytes, lists, or tuples.
Its value can either be an integer length that the object must have,
or a string in that starts with <, >, <=, >=, or =, followed by a number.
The "of" key is a schema to match against the elements of a list/tuple.
Dictionaries and structs have "required_keys"/"required_fields" and
"optional_keys"/"optional_fields". (keys for dictionaries, fields for
structs). The value of each of these fields is a dictionary mapping from
the key/field value to a schema object to validate the value of the
key/field. Any keys/fields that are not listed in the schema will cause
the validation to fail. Any keys/fields in the required_ schemas must
be present in the input object.
Dictionaries have two additional fields over structs, "keys" and "values".
These fields cannot be mixed with required_keys/optional_keys. They provide
a single schema object each to apply to all the keys/values in the dictionary.
Args:
obj: The object to be validated against the schema
schema: The schema. (See above)
validate_schema: Also check that the schema itself is valid. This
can be disabled for performance. However, some of the checks
about the schema are hardcoded and cannot be disabled.
fail_on_error: If this function should fail() when the object doesn't
conform to the schema. Note that if the schema itself is invalid,
validate() fails regardless of the value of this argument.
Returns:
If fail_on_error is True, validate() doesn't return anything.
If fail_on_error is False, validate() returns a string that describes
the reason why the object doesn't match the schema, or an empty string
if it does match.
"""
if validate_schema:
schema_validation_results = _validate_impl(schema, _schema_schema)
if schema_validation_results:
fail("Schema is invalid: " + schema_validation_results)
result = _validate_impl(obj, schema)
if not fail_on_error:
return result
if result:
fail(result)
return None