diff --git a/tests/test_api.py b/tests/test_api.py index 00733132e9..7bdaf3ae3b 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -13,6 +13,7 @@ import shutil import tempfile import unittest +import copy from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta @@ -42,6 +43,10 @@ def setUpModule(): import_ed25519_privatekey_from_file ) + from securesystemslib.keys import ( + format_keyval_to_metadata + ) + logger = logging.getLogger(__name__) @@ -215,12 +220,14 @@ def test_metadata_snapshot(self): snapshot = Metadata.from_json_file(snapshot_path) # Create a dict representing what we expect the updated data to be - fileinfo = snapshot.signed.meta + fileinfo = copy.deepcopy(snapshot.signed.meta) hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} fileinfo['role1.json']['version'] = 2 fileinfo['role1.json']['hashes'] = hashes fileinfo['role1.json']['length'] = 123 + + self.assertNotEqual(snapshot.signed.meta, fileinfo) snapshot.signed.update('role1', 2, 123, hashes) self.assertEqual(snapshot.signed.meta, fileinfo) @@ -250,14 +257,73 @@ def test_metadata_timestamp(self): self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0)) hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} - fileinfo = timestamp.signed.meta['snapshot.json'] + fileinfo = copy.deepcopy(timestamp.signed.meta['snapshot.json']) fileinfo['hashes'] = hashes fileinfo['version'] = 2 fileinfo['length'] = 520 + + self.assertNotEqual(timestamp.signed.meta['snapshot.json'], fileinfo) timestamp.signed.update(2, 520, hashes) self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo) + def test_metadata_root(self): + root_path = os.path.join( + self.repo_dir, 'metadata', 'root.json') + root = Metadata.from_json_file(root_path) + + # Add a second key to root role + root_key2 = import_ed25519_publickey_from_file( + os.path.join(self.keystore_dir, 'root_key2.pub')) + + keyid = root_key2['keyid'] + key_metadata = format_keyval_to_metadata( + root_key2['keytype'], root_key2['scheme'], root_key2['keyval']) + + # Assert that root does not contain the new key + self.assertNotIn(keyid, root.signed.roles['root']['keyids']) + self.assertNotIn(keyid, root.signed.keys) + + # Add new root key + root.signed.add_key('root', keyid, key_metadata) + + # Assert that key is added + self.assertIn(keyid, root.signed.roles['root']['keyids']) + self.assertIn(keyid, root.signed.keys) + + # Remove the key + root.signed.remove_key('root', keyid) + + # Assert that root does not contain the new key anymore + self.assertNotIn(keyid, root.signed.roles['root']['keyids']) + self.assertNotIn(keyid, root.signed.keys) + + + + def test_metadata_targets(self): + targets_path = os.path.join( + self.repo_dir, 'metadata', 'targets.json') + targets = Metadata.from_json_file(targets_path) + + # Create a fileinfo dict representing what we expect the updated data to be + filename = 'file2.txt' + hashes = { + "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", + "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0" + }, + + fileinfo = { + 'hashes': hashes, + 'length': 28 + } + + # Assert that data is not aleady equal + self.assertNotEqual(targets.signed.targets[filename], fileinfo) + # Update an already existing fileinfo + targets.signed.update(filename, fileinfo) + # Verify that data is updated + self.assertEqual(targets.signed.targets[filename], fileinfo) + # Run unit test. if __name__ == '__main__': utils.configure_test_logging(sys.argv) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 0b1ee08284..a747be6d13 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -88,8 +88,7 @@ class also that has a 'from_dict' factory method. (Currently this is elif _type == 'timestamp': inner_cls = Timestamp elif _type == 'root': - # TODO: implement Root class - raise NotImplementedError('Root not yet implemented') + inner_cls = Root else: raise ValueError(f'unrecognized metadata type "{_type}"') @@ -335,6 +334,89 @@ def bump_version(self) -> None: self.version += 1 +class Root(Signed): + """A container for the signed part of root metadata. + + Attributes: + consistent_snapshot: A boolean indicating whether the repository + supports consistent snapshots. + keys: A dictionary that contains a public key store used to verify + top level roles metadata signatures:: + { + '': { + 'keytype': '', + 'scheme': '', + 'keyid_hash_algorithms': [ + '', + '' + ... + ], + 'keyval': { + 'public': '' + } + }, + ... + }, + roles: A dictionary that contains a list of signing keyids and + a signature threshold for each top level role:: + { + '': { + 'keyids': ['', ...], + 'threshold': , + }, + ... + } + + """ + # TODO: determine an appropriate value for max-args and fix places where + # we violate that. This __init__ function takes 7 arguments, whereas the + # default max-args value for pylint is 5 + # pylint: disable=too-many-arguments + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime, consistent_snapshot: bool, + keys: JsonDict, roles: JsonDict) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add classes for keys and roles + self.consistent_snapshot = consistent_snapshot + self.keys = keys + self.roles = roles + + + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'consistent_snapshot': self.consistent_snapshot, + 'keys': self.keys, + 'roles': self.roles + }) + return json_dict + + + # Update key for a role. + def add_key(self, role: str, keyid: str, key_metadata: JsonDict) -> None: + """Adds new key for 'role' and updates the key store. """ + if keyid not in self.roles[role]['keyids']: + self.roles[role]['keyids'].append(keyid) + self.keys[keyid] = key_metadata + + + # Remove key for a role. + def remove_key(self, role: str, keyid: str) -> None: + """Removes key for 'role' and updates the key store. """ + if keyid in self.roles[role]['keyids']: + self.roles[role]['keyids'].remove(keyid) + for keyinfo in self.roles.values(): + if keyid in keyinfo['keyids']: + return + + del self.keys[keyid] + + + + class Timestamp(Signed): """A container for the signed part of timestamp metadata.