diff --git a/tests/test_api.py b/tests/test_api.py index ff52952dad..8a5dc32fab 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -39,7 +39,7 @@ Targets, Timestamp, ) -from tuf.api.serialization import DeserializationError +from tuf.api.serialization import DeserializationError, SerializationError from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer logger = logging.getLogger(__name__) @@ -157,6 +157,13 @@ def test_read_write_read_compare(self) -> None: os.remove(path_2) + def test_serialize_with_validate(self) -> None: + # Assert that by changing one required attribute validation will fail. + root = Metadata.from_file(f"{self.repo_dir}/metadata/root.json") + root.signed.version = 0 + with self.assertRaises(SerializationError): + root.to_bytes(JSONSerializer(validate=True)) + def test_to_from_bytes(self) -> None: for metadata in TOP_LEVEL_ROLE_NAMES: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py new file mode 100644 index 0000000000..d15c8741ad --- /dev/null +++ b/tests/test_metadata_eq_.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python + +# Copyright New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test __eq__ implementations of classes inside tuf/api/metadata.py.""" + + +import copy +import os +import sys +import unittest +from typing import Any, ClassVar, Dict + +from securesystemslib.signer import Signature + +from tests import utils +from tuf.api.metadata import ( + TOP_LEVEL_ROLE_NAMES, + DelegatedRole, + Delegations, + Key, + Metadata, + MetaFile, + Role, + Root, + Snapshot, + TargetFile, + Targets, + Timestamp, +) + + +class TestMetadataComparisions(unittest.TestCase): + """Test __eq__ for all classes inside tuf/api/metadata.py.""" + + metadata: ClassVar[Dict[str, bytes]] + + @classmethod + def setUpClass(cls) -> None: + cls.repo_dir = os.path.join( + utils.TESTS_DIR, "repository_data", "repository", "metadata" + ) + cls.metadata = {} + for md in TOP_LEVEL_ROLE_NAMES: + with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f: + cls.metadata[md] = f.read() + + def copy_and_simple_assert(self, obj: Any) -> Any: + # Assert that obj is not equal to an object from another type + self.assertNotEqual(obj, "") + result_obj = copy.deepcopy(obj) + # Assert that __eq__ works for equal objects. + self.assertEqual(obj, result_obj) + return result_obj + + def test_metadata_eq_(self) -> None: + md = Metadata.from_bytes(self.metadata["snapshot"]) + md_2: Metadata = self.copy_and_simple_assert(md) + + for attr, value in [("signed", None), ("signatures", None)]: + setattr(md_2, attr, value) + self.assertNotEqual(md, md_2, f"Failed case: {attr}") + + def test_md_eq_signatures_reversed_order(self) -> None: + # Test comparing objects with same signatures but different order. + + # Remove all signatures and create new ones. + md = Metadata.from_bytes(self.metadata["snapshot"]) + md.signatures = {"a": Signature("a", "a"), "b": Signature("b", "b")} + md_2 = copy.deepcopy(md) + # Reverse signatures order in md_2. + # In python3.7 we need to cast to a list and then reverse. + md_2.signatures = dict(reversed(list(md_2.signatures.items()))) + # Assert that both objects are not the same because of signatures order. + self.assertNotEqual(md, md_2) + + # but if we fix the signatures order they will be equal + md_2.signatures = {"a": Signature("a", "a"), "b": Signature("b", "b")} + self.assertEqual(md, md_2) + + def test_md_eq_special_signatures_tests(self) -> None: + # Test that metadata objects with different signatures are not equal. + md = Metadata.from_bytes(self.metadata["snapshot"]) + md_2 = copy.deepcopy(md) + md_2.signatures = {} + self.assertNotEqual(md, md_2) + + # Test that metadata objects with empty signatures are equal + md.signatures = {} + self.assertEqual(md, md_2) + + # Metadata objects with different signatures types are not equal. + md_2.signatures = "" # type: ignore + self.assertNotEqual(md, md_2) + + def test_signed_eq_(self) -> None: + md = Metadata.from_bytes(self.metadata["snapshot"]) + md_2: Metadata = self.copy_and_simple_assert(md) + + # We don't need to make "signed" = None as that was done when testing + # metadata attribute modifications. + for attr, value in [("version", -1), ("spec_version", "0.0.0")]: + setattr(md_2.signed, attr, value) + self.assertNotEqual(md.signed, md_2.signed, f"Failed case: {attr}") + + def test_key_eq_(self) -> None: + key_dict = { + "keytype": "rsa", + "scheme": "rsassa-pss-sha256", + "keyval": {"public": "foo"}, + } + key = Key.from_dict("12sa12", key_dict) + key_2: Key = self.copy_and_simple_assert(key) + for attr, value in [ + ("keyid", "a"), + ("keytype", "foo"), + ("scheme", "b"), + ("keytype", "b"), + ]: + setattr(key_2, attr, value) + self.assertNotEqual(key, key_2, f"Failed case: {attr}") + + def test_role_eq_(self) -> None: + role_dict = { + "keyids": ["keyid1", "keyid2"], + "threshold": 3, + } + role = Role.from_dict(role_dict) + role_2: Role = self.copy_and_simple_assert(role) + + for attr, value in [("keyids", []), ("threshold", 10)]: + setattr(role_2, attr, value) + self.assertNotEqual(role, role_2, f"Failed case: {attr}") + + def test_root_eq_(self) -> None: + md = Metadata.from_bytes(self.metadata["root"]) + signed_copy: Root = self.copy_and_simple_assert(md.signed) + + # Common attributes between Signed and Root doesn't need testing. + # Ignore mypy request for type annotations on attr and value + for attr, value in [ # type: ignore + ("consistent_snapshot", None), + ("keys", {}), + ("roles", {}), + ]: + + setattr(signed_copy, attr, value) + self.assertNotEqual(md.signed, signed_copy, f"Failed case: {attr}") + + def test_metafile_eq_(self) -> None: + metafile_dict = { + "version": 1, + "length": 12, + "hashes": {"sha256": "abc"}, + } + metafile = MetaFile.from_dict(metafile_dict) + metafile_2: MetaFile = self.copy_and_simple_assert(metafile) + + # Ignore mypy request for type annotations on attr and value + for attr, value in [ # type: ignore + ("version", None), + ("length", None), + ("hashes", {}), + ]: + setattr(metafile_2, attr, value) + self.assertNotEqual(metafile, metafile_2, f"Failed case: {attr}") + + def test_timestamp_eq_(self) -> None: + md = Metadata.from_bytes(self.metadata["timestamp"]) + signed_copy: Timestamp = self.copy_and_simple_assert(md.signed) + + # Common attributes between Signed and Timestamp doesn't need testing. + setattr(signed_copy, "snapshot_meta", None) + self.assertNotEqual(md.signed, signed_copy) + + def test_snapshot_eq_(self) -> None: + md = Metadata.from_bytes(self.metadata["snapshot"]) + signed_copy: Snapshot = self.copy_and_simple_assert(md.signed) + + # Common attributes between Signed and Snapshot doesn't need testing. + setattr(signed_copy, "meta", None) + self.assertNotEqual(md.signed, signed_copy) + + def test_delegated_role_eq_(self) -> None: + delegated_role_dict = { + "keyids": ["keyid"], + "name": "a", + "terminating": False, + "threshold": 1, + "paths": ["fn1", "fn2"], + } + delegated_role = DelegatedRole.from_dict(delegated_role_dict) + delegated_role_2: DelegatedRole = self.copy_and_simple_assert( + delegated_role + ) + + # Common attributes between DelegatedRole and Role doesn't need testing. + for attr, value in [ + ("name", ""), + ("terminating", None), + ("paths", [""]), + ("path_hash_prefixes", [""]), + ]: + setattr(delegated_role_2, attr, value) + msg = f"Failed case: {attr}" + self.assertNotEqual(delegated_role, delegated_role_2, msg) + + def test_delegations_eq_(self) -> None: + delegations_dict = { + "keys": { + "keyid2": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyval": {"public": "bar"}, + } + }, + "roles": [ + { + "keyids": ["keyid2"], + "name": "b", + "terminating": True, + "paths": ["fn2"], + "threshold": 4, + } + ], + } + delegations = Delegations.from_dict(delegations_dict) + delegations_2: Delegations = self.copy_and_simple_assert(delegations) + # Ignore mypy request for type annotations on attr and value + for attr, value in [("keys", {}), ("roles", {})]: # type: ignore + setattr(delegations_2, attr, value) + msg = f"Failed case: {attr}" + self.assertNotEqual(delegations, delegations_2, msg) + + def test_targetfile_eq_(self) -> None: + targetfile_dict = { + "length": 12, + "hashes": {"sha256": "abc"}, + } + targetfile = TargetFile.from_dict(targetfile_dict, "file1.txt") + targetfile_2: TargetFile = self.copy_and_simple_assert(targetfile) + + # Common attr between TargetFile and MetaFile doesn't need testing. + setattr(targetfile_2, "path", "") + self.assertNotEqual(targetfile, targetfile_2) + + def test_delegations_eq_roles_reversed_order(self) -> None: + # Test comparing objects with same delegated roles but different order. + role_one_dict = { + "keyids": ["keyid1"], + "name": "a", + "terminating": False, + "paths": ["fn1"], + "threshold": 1, + } + role_two_dict = { + "keyids": ["keyid2"], + "name": "b", + "terminating": True, + "paths": ["fn2"], + "threshold": 4, + } + + delegations_dict = { + "keys": { + "keyid2": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyval": {"public": "bar"}, + } + }, + "roles": [role_one_dict, role_two_dict], + } + delegations = Delegations.from_dict(copy.deepcopy(delegations_dict)) + + # Create a second delegations obj with reversed roles order + delegations_2 = copy.deepcopy(delegations) + # In python3.7 we need to cast to a list and then reverse. + delegations_2.roles = dict(reversed(list(delegations.roles.items()))) + + # Both objects are not the equal because of delegated roles order. + self.assertNotEqual(delegations, delegations_2) + + # but if we fix the delegated roles order they will be equal + delegations_2.roles = delegations.roles + + self.assertEqual(delegations, delegations_2) + + def test_targets_eq_(self) -> None: + md = Metadata.from_bytes(self.metadata["targets"]) + signed_copy: Targets = self.copy_and_simple_assert(md.signed) + + # Common attributes between Targets and Signed doesn't need testing. + # Ignore mypy request for type annotations on attr and value + for attr, value in [("targets", {}), ("delegations", [])]: # type: ignore + setattr(signed_copy, attr, value) + self.assertNotEqual(md.signed, signed_copy, f"Failed case: {attr}") + + +# Run unit test. +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 2d9a81b251..f961ee0712 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -22,6 +22,7 @@ Targets, Timestamp, ) +from tuf.api.serialization.json import JSONSerializer from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet logger = logging.getLogger(__name__) @@ -49,7 +50,7 @@ def modify_metadata( metadata = Metadata.from_bytes(cls.metadata[rolename]) modification_func(metadata.signed) metadata.sign(cls.keystore[rolename]) - return metadata.to_bytes() + return metadata.to_bytes(JSONSerializer(validate=True)) @classmethod def setUpClass(cls) -> None: diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index f836147122..992a526363 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -127,6 +127,18 @@ def __init__( self.signatures = signatures self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Metadata): + return False + + return ( + self.signatures == other.signatures + # Order of the signatures matters (see issue #1788). + and list(self.signatures.items()) == list(other.signatures.items()) + and self.signed == other.signed + and self.unrecognized_fields == other.unrecognized_fields + ) + @classmethod def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": """Creates ``Metadata`` object from its dict representation. @@ -490,6 +502,18 @@ def __init__( self.version = version self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Signed): + return False + + return ( + self.type == other.type + and self.version == other.version + and self.spec_version == other.spec_version + and self.expires == other.expires + and self.unrecognized_fields == other.unrecognized_fields + ) + @abc.abstractmethod def to_dict(self) -> Dict[str, Any]: """Serialization helper that returns dict representation of self""" @@ -598,6 +622,18 @@ def __init__( self.keyval = keyval self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Key): + return False + + return ( + self.keyid == other.keyid + and self.keytype == other.keytype + and self.scheme == other.scheme + and self.keyval == other.keyval + and self.unrecognized_fields == other.unrecognized_fields + ) + @classmethod def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": """Creates ``Key`` object from its dict representation. @@ -742,6 +778,16 @@ def __init__( self.threshold = threshold self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Role): + return False + + return ( + self.keyids == other.keyids + and self.threshold == other.threshold + and self.unrecognized_fields == other.unrecognized_fields + ) + @classmethod def from_dict(cls, role_dict: Dict[str, Any]) -> "Role": """Creates ``Role`` object from its dict representation. @@ -804,6 +850,17 @@ def __init__( self.roles = roles + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Root): + return False + + return ( + super().__eq__(other) + and self.keys == other.keys + and self.roles == other.roles + and self.consistent_snapshot == other.consistent_snapshot + ) + @classmethod def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": """Creates ``Root`` object from its dict representation. @@ -986,6 +1043,17 @@ def __init__( self.hashes = hashes self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + def __eq__(self, other: Any) -> bool: + if not isinstance(other, MetaFile): + return False + + return ( + self.version == other.version + and self.length == other.length + and self.hashes == other.hashes + and self.unrecognized_fields == other.unrecognized_fields + ) + @classmethod def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile": """Creates ``MetaFile`` object from its dict representation. @@ -1066,6 +1134,14 @@ def __init__( super().__init__(version, spec_version, expires, unrecognized_fields) self.snapshot_meta = snapshot_meta + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Timestamp): + return False + + return ( + super().__eq__(other) and self.snapshot_meta == other.snapshot_meta + ) + @classmethod def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp": """Creates ``Timestamp`` object from its dict representation. @@ -1119,6 +1195,12 @@ def __init__( super().__init__(version, spec_version, expires, unrecognized_fields) self.meta = meta + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Snapshot): + return False + + return super().__eq__(other) and self.meta == other.meta + @classmethod def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot": """Creates ``Snapshot`` object from its dict representation. @@ -1203,6 +1285,18 @@ def __init__( self.paths = paths self.path_hash_prefixes = path_hash_prefixes + def __eq__(self, other: Any) -> bool: + if not isinstance(other, DelegatedRole): + return False + + return ( + super().__eq__(other) + and self.name == other.name + and self.terminating == other.terminating + and self.paths == other.paths + and self.path_hash_prefixes == other.path_hash_prefixes + ) + @classmethod def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole": """Creates ``DelegatedRole`` object from its dict representation. @@ -1331,6 +1425,18 @@ def __init__( self.roles = roles self.unrecognized_fields = unrecognized_fields or {} + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Delegations): + return False + + return ( + self.keys == other.keys + # Order of the delegated roles matters (see issue #1788). + and list(self.roles.items()) == list(other.roles.items()) + and self.roles == other.roles + and self.unrecognized_fields == other.unrecognized_fields + ) + @classmethod def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": """Creates ``Delegations`` object from its dict representation. @@ -1403,6 +1509,17 @@ def custom(self) -> Any: target. python-tuf does not use or validate this data.""" return self.unrecognized_fields.get("custom") + def __eq__(self, other: Any) -> bool: + if not isinstance(other, TargetFile): + return False + + return ( + self.length == other.length + and self.hashes == other.hashes + and self.path == other.path + and self.unrecognized_fields == other.unrecognized_fields + ) + @classmethod def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile": """Creates ``TargetFile`` object from its dict representation. @@ -1550,6 +1667,16 @@ def __init__( self.targets = targets self.delegations = delegations + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Targets): + return False + + return ( + super().__eq__(other) + and self.targets == other.targets + and self.delegations == other.delegations + ) + @classmethod def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": """Creates ``Targets`` object from its dict representation. diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py index b043310fd1..65d183bad5 100644 --- a/tuf/api/serialization/json.py +++ b/tuf/api/serialization/json.py @@ -9,6 +9,7 @@ """ import json +from typing import Optional from securesystemslib.formats import encode_canonical @@ -44,17 +45,22 @@ def deserialize(self, raw_data: bytes) -> Metadata: class JSONSerializer(MetadataSerializer): """Provides Metadata to JSON serialize method. - Attributes: + Args: compact: A boolean indicating if the JSON bytes generated in - 'serialize' should be compact by excluding whitespace. + 'serialize' should be compact by excluding whitespace. + validate: Check that the metadata object can be deserialized again + without change of contents and thus find common mistakes. + This validation might slow down serialization significantly. """ - def __init__(self, compact: bool = False): + def __init__(self, compact: bool = False, validate: Optional[bool] = False): self.compact = compact + self.validate = validate def serialize(self, metadata_obj: Metadata) -> bytes: """Serialize Metadata object into utf-8 encoded JSON bytes.""" + try: indent = None if self.compact else 1 separators = (",", ":") if self.compact else (",", ": ") @@ -65,6 +71,16 @@ def serialize(self, metadata_obj: Metadata) -> bytes: sort_keys=True, ).encode("utf-8") + if self.validate: + try: + new_md_obj = JSONDeserializer().deserialize(json_bytes) + if metadata_obj != new_md_obj: + raise ValueError( + "Metadata changes if you serialize and deserialize." + ) + except Exception as e: + raise ValueError("Metadata cannot be validated!") from e + except Exception as e: raise SerializationError from e