From 45d22bf8906b93b4e845138130a55b29c6601ffe Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Wed, 14 Apr 2021 17:01:30 +0300 Subject: [PATCH 1/3] Add validation functions for Signed attributes Those functions are created to be used during model initialization. They are the stepping stone for other validation solutions (like python descriptors, decorators, and ValidationMixin) which will reuse part of them. For more context read ADR 0007. Signed-off-by: Martin Vrachev --- tests/test_api.py | 42 +++++++++++++++++++++++++++++ tuf/api/validators.py | 61 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 tuf/api/validators.py diff --git a/tests/test_api.py b/tests/test_api.py index dac4a008ed..e1b404df10 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -38,6 +38,8 @@ CanonicalJSONSerializer ) +import tuf.api.validators as validators + from securesystemslib.interface import ( import_ed25519_publickey_from_file, import_ed25519_privatekey_from_file @@ -380,6 +382,46 @@ def test_support_for_unrecognized_fields(self): ) + def test_validate_signed_attr(self): + # spec_version validation + for val in [None, True, 111, 1.1]: + with self.assertRaises(TypeError): + validators.validate_spec_version(val) + for val in ["", "1.11", "2", "0.1.1"]: + with self.assertRaises(ValueError): + validators.validate_spec_version(val) + validators.validate_spec_version("1.0.0") + + # _type validation + for val in [None, True, 111, 1.1]: + with self.assertRaises(TypeError): + validators.validate_type(val) + for val in ["wrong", "", "ROOT", "timestamp1"]: + with self.assertRaises(ValueError): + validators.validate_type(val) + for val in ["root", "snapshot", "targets", "timestamp"]: + validators.validate_type(val) + + # version validation + for val in [None, True, "1", "", 1.0]: + with self.assertRaises(TypeError): + validators.validate_version(val) + with self.assertRaises(ValueError): + validators.validate_version(-1) + validators.validate_version(0) + validators.validate_version(1) + + # expiry validation + for val in [None, True, "1", "", 1.0]: + with self.assertRaises(TypeError): + validators.validate_expires(val) + past_time = datetime(1990, 1, 1) + with self.assertRaises(ValueError): + validators.validate_expires(past_time) + future_time = datetime(2050, 1, 1) + validators.validate_expires(future_time) + + # Run unit test. if __name__ == '__main__': utils.configure_test_logging(sys.argv) diff --git a/tuf/api/validators.py b/tuf/api/validators.py new file mode 100644 index 0000000000..72f00063db --- /dev/null +++ b/tuf/api/validators.py @@ -0,0 +1,61 @@ +""" +Provides validation functionality for tuf/api modules. +""" +from datetime import datetime + +import tuf + +METADATA_TYPES = ["root", "snapshot", "targets", "timestamp"] + + +def validate_spec_version(spec_version: str) -> None: + """Validate that the SPEC_VERSION is a string in semantic versioning + format and that its spec version is not higher than the current + official tuf spec version.""" + if not isinstance(spec_version, str): + raise TypeError(f"Expected {spec_version} to be an str") + spec_version_split = spec_version.split(".") + if len(spec_version_split) != 3: + raise ValueError( + "spec_version should be in a semantic versioning format." + ) + + spec_major_version = int(spec_version_split[0]) + code_spec_version_split = tuf.SPECIFICATION_VERSION.split(".") + code_spec_major_version = int(code_spec_version_split[0]) + + if spec_major_version != code_spec_major_version: + raise ValueError( + f"version major version must be ," + f"{code_spec_major_version} got {spec_major_version}" + ) + + +def validate_type(_type: str) -> None: + """Validate the _TYPE Signed attribute.""" + if not isinstance(_type, str): + raise TypeError("Expected _type to be an str") + if _type not in METADATA_TYPES: + raise ValueError(f"_type must be one of {METADATA_TYPES} got {_type}") + + +def validate_version(version: int) -> None: + """Validate the VERSION Signed attribute.""" + if not isinstance(version, int): + raise TypeError("Expected version to be an integer") + if isinstance(version, (float, bool)): + raise TypeError("Expected version to be an integer, not float or bool!") + if version <= 0: + raise ValueError(f"version must be > 0, got {version}") + + +def validate_expires(expires: datetime) -> None: + """Validate the EXPIRES Signed attribute.""" + if not isinstance(expires, datetime): + raise TypeError("Expected expires to be a datetime.datetime object!") + now = datetime.utcnow() + if now > expires: + raise ValueError( + f"Expected expires to reference time in the future," + f" instead got {expires}!" + ) From 863fed36d1e6f6a4e65e16ef4540f814bde4ec8b Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 27 Apr 2021 15:55:56 +0300 Subject: [PATCH 2/3] Validate Root dictionary key and value attributes Add validation functions for all Root dictionary key and values as described in the spec. Signed-off-by: Martin Vrachev --- tests/test_api.py | 60 ++++++++++++++++++++++++++++++++++++++ tuf/api/validators.py | 67 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/tests/test_api.py b/tests/test_api.py index e1b404df10..e11ad18eac 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -382,6 +382,8 @@ def test_support_for_unrecognized_fields(self): ) +class TestValidation(unittest.TestCase): + def test_validate_signed_attr(self): # spec_version validation for val in [None, True, 111, 1.1]: @@ -422,6 +424,64 @@ def test_validate_signed_attr(self): validators.validate_expires(future_time) + def test_root_specific_attr(self): + # consistent_snapshot Root attribute + for val in [None, "1", "", "False", 1]: + with self.assertRaises(TypeError): + validators.validate_consistent_snapshot(val) + validators.validate_consistent_snapshot(True) + + # keyid Key attribute + for val in [None, False, 1]: + with self.assertRaises(TypeError): + validators.validate_keyid(val) + for val in ["", "12345"]: + with self.assertRaises(ValueError): + validators.validate_keyid(val) + validators.validate_keyid("4e777de0d275f9d28588dd9a1606cc748e548f9e22b6795b7cb3f63f98035fcb") + + # keytype Key attribute + for val in [None, False, 1]: + with self.assertRaises(TypeError): + validators.validate_keytype(val) + validators.validate_keytype("rsa") + + # scheme Key attribute + for val in [None, False, 1]: + with self.assertRaises(TypeError): + validators.validate_scheme(val) + validators.validate_scheme("rsassa-pss-sha256") + + # keyval Key attribute + for val in [None, False, 1, ""]: + with self.assertRaises(TypeError): + validators.validate_keyval(val) + for val in [{}, {"a": 3}, {"public": "123456"}]: + with self.assertRaises(ValueError): + validators.validate_keyval(val) + validators.validate_keyval({ + "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" + }) + + # role Root attribute + for val in [None, False, 1]: + with self.assertRaises(TypeError): + validators.validate_role(val) + for val in ["", "123456"]: + with self.assertRaises(ValueError): + validators.validate_role(val) + validators.validate_role("timestamp") + + # threshold Root attribute + for val in [None, False, "0", "1.0", 1.0]: + with self.assertRaises(TypeError): + validators.validate_threshold(val) + for val in [-1, 0]: + with self.assertRaises(ValueError): + validators.validate_threshold(val) + validators.validate_threshold(1) + + # Run unit test. if __name__ == '__main__': utils.configure_test_logging(sys.argv) diff --git a/tuf/api/validators.py b/tuf/api/validators.py index 72f00063db..f9dd1ae2b3 100644 --- a/tuf/api/validators.py +++ b/tuf/api/validators.py @@ -2,10 +2,11 @@ Provides validation functionality for tuf/api modules. """ from datetime import datetime +from typing import Any, Mapping import tuf -METADATA_TYPES = ["root", "snapshot", "targets", "timestamp"] +METADATA_TYPES = {"root", "snapshot", "targets", "timestamp"} def validate_spec_version(spec_version: str) -> None: @@ -59,3 +60,67 @@ def validate_expires(expires: datetime) -> None: f"Expected expires to reference time in the future," f" instead got {expires}!" ) + + +def validate_consistent_snapshot(consistent_snapshot: bool) -> None: + """Validate the "CONSISTENT_SNAPSHOT" Root attribute.""" + if not isinstance(consistent_snapshot, bool): + raise TypeError("Expected consistent_snapshot to be bool!") + + +def validate_keyid(keyid: str) -> None: + """Validate the KEYID Root attribute.""" + if not isinstance(keyid, str): + raise TypeError("Expected keyid to be a string!") + if len(keyid) != 64: + raise ValueError( + f"Expected a 64 character long hexdigest string," + f" instead got: {keyid}!" + ) + + +def validate_keytype(keytype: str) -> None: + """Validate the KEYTYPE Key attribute.""" + if not isinstance(keytype, str): + raise TypeError("Expected keytype to be a string!") + + +def validate_scheme(scheme: str) -> None: + """Validate the SCHEME Key attribute.""" + if not isinstance(scheme, str): + raise TypeError("Expected scheme to be a string!") + + +def validate_keyval(keyval: Mapping[str, Any]) -> None: + """Validate the KEYVAL Key attribute.""" + if not isinstance(keyval, Mapping): + raise TypeError("Expected keyval to be a mapping!") + if not keyval.get("public"): + raise ValueError("keyval doesn't follow the specification format!") + if len(keyval["public"]) < 64: + raise ValueError( + f"The public portion of keyval should be at least 64 character long" + f"hexdigest string, instead got: {keyval}" + ) + + +def validate_role(role: str) -> None: + """Validate the ROLE Root attribute.""" + if not isinstance(role, str): + raise TypeError("Expected role to be a string!") + if role not in METADATA_TYPES: + raise ValueError( + f"Role should one of the metadata, instead got: {role}!" + ) + + +def validate_threshold(threshold: int) -> None: + """Validate the THRESHOLD Root attribute.""" + if not isinstance(threshold, int): + raise TypeError("Expected threshold to be an integer!") + if isinstance(threshold, (float, bool)): + raise TypeError( + "Expected threshold to be an integer, not float or bool!" + ) + if threshold <= 0: + raise ValueError("Expected threshold to be > 0!") From 804a1f487a3d2057b8793eefe98c748fa2adc7c3 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 27 Apr 2021 18:10:05 +0300 Subject: [PATCH 3/3] Prototype descriptors usage in Root and Signed This experiment tries to help us envision how validation will look like if we decide to use validation functions + descriptors. What I found with this experiment is that python descriptors can't handle dictionary keys validation as well as dictionary values. More precisely, we can't validate that each of the "roles" in Root is one of the 4 metadata types or that Root role keyids are unique. The reason is that for example if we are to run this: "root.roles = 3" this will invoke validation for root.roles, but if we invoke: "root.roles["dwad"] = 3" then we are getting an element of the dictionary roles and then assigning it a new value. Signed-off-by: Martin Vrachev --- tests/test_api.py | 67 +++++++++++++++ tuf/api/metadata.py | 24 ++++++ tuf/api/validators.py | 191 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 267 insertions(+), 15 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index e11ad18eac..f4e6edf914 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -23,6 +23,7 @@ import tuf.exceptions from tuf.api.metadata import ( Metadata, + Root, Snapshot, Timestamp, Targets @@ -482,6 +483,72 @@ def test_root_specific_attr(self): validators.validate_threshold(1) + def test_descriptors_example_usage(self): + data = { + '_type': 'snapshot', + 'spec_version': '1.2.3', + 'expires': datetime.now() + timedelta(hours=1), + 'version': 3, + "consistent_snapshot": False, + "keys": { + "59a4df8af818e9ed7abe0764c0b47b4240952aa0d179b5b78346c470ac30278d": { + "keytype": "ed25519", + "keyval": { + "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" + }, + "scheme": "ed25519" + }, + }, + "roles": { + "root": { + "keyids": [ + "4e777de0d275f9d28588dd9a1606cc748e548f9e22b6795b7cb3f63f98035fcb" + ], + "threshold": 1 + }, + } + } + # Validation happens on initialization and during assigment + root = Root(**data) + + # _type validation checks that the type is one of our expected roles + data['_type'] = 'wrong' + with self.assertRaises(ValueError): + root = Root(**data) + data['_type'] = 'snapshot' + + # spec_version should be a specific string in sem format + # and be the same major version as the current tuf version + for val in ['', '1.11', '2', '0.1.1']: + with self.assertRaises(ValueError): + root.spec_version = val + + # Version should be an int above 0. + with self.assertRaises(ValueError): + root.version = -1 + + # consistent_snapshot should be boolean + with self.assertRaises(TypeError): + root.consistent_snapshot = -1 + + # If we want to change root with a totally different role dict + # Without the neceserry format + new_roles = { + "roles" : + { + "DDS": { + "keyids": [ + "4e777de0d275f9d28588dd9a1606cc748e548f9e22b6795b7cb3f63f98035fcb" + ], + "threshold": 1 + }, + } + } + with self.assertRaises(ValueError): + root.roles = new_roles + + + # Run unit test. if __name__ == '__main__': utils.configure_test_logging(sys.argv) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index e28310ae56..45b2c536b2 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -30,6 +30,18 @@ MetadataSerializer, SignedSerializer, ) +from tuf.api.validators import ( + METADATA_TYPES, + Bool, + DateTime, + Dictionary, + Integer, + OneOf, + String, + _check_dict_elements_uniqueness, + _check_semantic_versioning, + _check_str_one_of_metadata_types, +) class Metadata: @@ -324,6 +336,13 @@ class Signed: """ + _type = OneOf(METADATA_TYPES) + version = Integer(minvalue=1) + # The minimum length is 5 because "X.Y.Z" has 5 characters. + spec_version = String(minsize=5, predicate=_check_semantic_versioning) + expires = DateTime() + unrecognized_fields = Dictionary(keys_type=str) + # NOTE: Signed is a stupid name, because this might not be signed yet, but # we keep it to match spec terminology (I often refer to this as "payload", # or "inner metadata") @@ -440,6 +459,11 @@ class Root(Signed): """ + consistent_snapshot = Bool() + keys = Dictionary(keys_type=str) + roles = Dictionary( + keys_type=str, predicate_keys=_check_str_one_of_metadata_types + ) # TODO: determine an appropriate value for max-args and fix places where # we violate that. This __init__ function takes 7 arguments, whereas the # default max-args value for pylint is 5 diff --git a/tuf/api/validators.py b/tuf/api/validators.py index f9dd1ae2b3..a8b2dfd9da 100644 --- a/tuf/api/validators.py +++ b/tuf/api/validators.py @@ -1,7 +1,8 @@ """ Provides validation functionality for tuf/api modules. """ -from datetime import datetime +from abc import ABC, abstractmethod +from datetime import datetime, timedelta from typing import Any, Mapping import tuf @@ -9,12 +10,7 @@ METADATA_TYPES = {"root", "snapshot", "targets", "timestamp"} -def validate_spec_version(spec_version: str) -> None: - """Validate that the SPEC_VERSION is a string in semantic versioning - format and that its spec version is not higher than the current - official tuf spec version.""" - if not isinstance(spec_version, str): - raise TypeError(f"Expected {spec_version} to be an str") +def _check_semantic_versioning(spec_version: str) -> None: spec_version_split = spec_version.split(".") if len(spec_version_split) != 3: raise ValueError( @@ -26,10 +22,34 @@ def validate_spec_version(spec_version: str) -> None: code_spec_major_version = int(code_spec_version_split[0]) if spec_major_version != code_spec_major_version: - raise ValueError( - f"version major version must be ," - f"{code_spec_major_version} got {spec_major_version}" - ) + return False + + return True + + +def _check_str_one_of_metadata_types(s): + if not s in METADATA_TYPES: + return False + return True + + +def _check_dict_elements_uniqueness(d: Mapping[str, Any]): + keys_list = d.keys() + keys_set = set(keys_list) + if len(keys_set) != len(keys_list): + return False + return True + + +def validate_spec_version(spec_version: str) -> None: + """Validate that the SPEC_VERSION is a string in semantic versioning + format and that its spec version is not higher than the current + official tuf spec version.""" + if not isinstance(spec_version, str): + raise TypeError(f"Expected {spec_version} to be an str") + + if not _check_semantic_versioning(spec_version): + raise ValueError(f"spec version must be in semating versioning!") def validate_type(_type: str) -> None: @@ -108,10 +128,8 @@ def validate_role(role: str) -> None: """Validate the ROLE Root attribute.""" if not isinstance(role, str): raise TypeError("Expected role to be a string!") - if role not in METADATA_TYPES: - raise ValueError( - f"Role should one of the metadata, instead got: {role}!" - ) + if not _check_str_one_of_metadata_types(role): + raise ValueError(f"Expected role to be one of {METADATA_TYPES}") def validate_threshold(threshold: int) -> None: @@ -124,3 +142,146 @@ def validate_threshold(threshold: int) -> None: ) if threshold <= 0: raise ValueError("Expected threshold to be > 0!") + + +class Validator(ABC): + def __set_name__(self, owner, name): + self.private_name = "_" + "NO_VALIDATION_" + name + + def __get__(self, obj, objtype=None): + return getattr(obj, self.private_name) + + def __set__(self, obj, value): + self.validate(value) + setattr(obj, self.private_name, value) + + @abstractmethod + def validate(self, value): + pass + + +class OneOf(Validator): + def __init__(self, options): + self.options = set(options) + + def validate(self, value): + if value not in self.options: + raise ValueError( + f"Expected {value!r} to be one of {self.options!r}" + ) + + +class Integer(Validator): + def __init__(self, minvalue=None, maxvalue=None): + self.minvalue = minvalue + self.maxvalue = maxvalue + + def validate(self, value): + if not isinstance(value, int): + raise TypeError(f"Expected {value!r} to be an int!") + if isinstance(value, (float, bool)): + raise TypeError( + "Expected {value!r} to be an integer, not float or bool! " + ) + if self.minvalue is not None and value < self.minvalue: + raise ValueError( + f"Expected {value!r} to be at least {self.minvalue!r}!" + ) + if self.maxvalue is not None and value > self.maxvalue: + raise ValueError( + f"Expected {value!r} to be no more than {self.maxvalue!r}!" + ) + + +class String(Validator): + def __init__(self, minsize=None, maxsize=None, predicate=None): + self.minsize = minsize + self.maxsize = maxsize + # The predicate function must return a boolean value. + self.predicate = predicate + + def validate(self, value): + if not isinstance(value, str): + raise TypeError(f"Expected {value!r} to be a str!") + if self.minsize is not None and len(value) < self.minsize: + raise ValueError( + f"Expected {value!r} to be no smaller than {self.minsize!r}!" + ) + if self.maxsize is not None and len(value) > self.maxsize: + raise ValueError( + f"Expected {value!r} to be no bigger than {self.maxsize!r}!" + ) + if self.predicate is not None and not self.predicate(value): + raise ValueError( + f"Expected {self.predicate} to be true for {value!r}!" + ) + + +class Dictionary(Validator): + def __init__( + self, + keys_type=None, + values_type=None, + predicate=None, + predicate_keys=None, + predicate_values=None, + ): + self.keys_type = keys_type + self.values_type = values_type + # The predicate functions must return a boolean value. + self.predicate = predicate + self.predicate_keys = predicate_keys + self.predicate_values = predicate_values + + def validate(self, value: Mapping): + if not isinstance(value, Mapping): + raise TypeError(f"Expected {value!r} to be a Mapping object!") + if self.keys_type: + for key in value.keys(): + if not isinstance(key, self.keys_type): + raise TypeError( + f"Expected {key!r} to be a {self.keys_type} type!" + ) + if self.values_type: + for val in value.values(): + if not isinstance(val, self.values_type): + raise TypeError( + f"Expected {val!r} to be a {self.values_type} type!" + ) + if self.predicate and not self.predicate(value): + raise ValueError( + f"Expected {self.predicate} to be true for {value!r}!" + ) + if self.predicate_keys: + for key in value.keys(): + if not self.predicate_keys(key): + raise ValueError( + f"Expected {self.predicate_keys} to be true for {key!r}!" + ) + if self.predicate_values: + for val in value.values(): + if not self.predicate_values(val): + raise ValueError( + f"Expected {self.predicate_values} to be true for {val!r}!" + ) + + +class Bool(Validator): + def validate(self, value): + if not isinstance(value, bool): + raise TypeError(f"Expected {value!r} to be a bool object!") + + +class DateTime(Validator): + def validate(self, value): + if not isinstance(value, datetime): + raise TypeError( + f"Expected {value!r} to be a datetime.datetime object!" + ) + now = datetime.utcnow() + # Add additional 10 minutes gratis between the call and this check. + value += timedelta(minutes=10) + if now > value: + raise ValueError( + f"Expected {value!r} to reference time in the future," + )