Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import shutil
import tempfile
import unittest
import copy

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -42,6 +43,10 @@ def setUpModule():
import_ed25519_privatekey_from_file
)

from securesystemslib.keys import (
format_keyval_to_metadata
)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -215,12 +220,14 @@ def test_metadata_snapshot(self):
snapshot = Metadata.from_json_file(snapshot_path)

# Create a dict representing what we expect the updated data to be
fileinfo = snapshot.signed.meta
fileinfo = copy.deepcopy(snapshot.signed.meta)
hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
fileinfo['role1.json']['version'] = 2
fileinfo['role1.json']['hashes'] = hashes
fileinfo['role1.json']['length'] = 123


self.assertNotEqual(snapshot.signed.meta, fileinfo)
snapshot.signed.update('role1', 2, 123, hashes)
self.assertEqual(snapshot.signed.meta, fileinfo)

Expand Down Expand Up @@ -250,14 +257,73 @@ def test_metadata_timestamp(self):
self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0))

hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'}
fileinfo = timestamp.signed.meta['snapshot.json']
fileinfo = copy.deepcopy(timestamp.signed.meta['snapshot.json'])
fileinfo['hashes'] = hashes
fileinfo['version'] = 2
fileinfo['length'] = 520

self.assertNotEqual(timestamp.signed.meta['snapshot.json'], fileinfo)
timestamp.signed.update(2, 520, hashes)
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)


def test_metadata_root(self):
root_path = os.path.join(
self.repo_dir, 'metadata', 'root.json')
root = Metadata.from_json_file(root_path)

# Add a second key to root role
root_key2 = import_ed25519_publickey_from_file(
os.path.join(self.keystore_dir, 'root_key2.pub'))

keyid = root_key2['keyid']
key_metadata = format_keyval_to_metadata(
root_key2['keytype'], root_key2['scheme'], root_key2['keyval'])

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles['root']['keyids'])
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key('root', keyid, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles['root']['keyids'])
self.assertIn(keyid, root.signed.keys)

# Remove the key
root.signed.remove_key('root', keyid)

# Assert that root does not contain the new key anymore
self.assertNotIn(keyid, root.signed.roles['root']['keyids'])
self.assertNotIn(keyid, root.signed.keys)



def test_metadata_targets(self):
targets_path = os.path.join(
self.repo_dir, 'metadata', 'targets.json')
targets = Metadata.from_json_file(targets_path)

# Create a fileinfo dict representing what we expect the updated data to be
filename = 'file2.txt'
hashes = {
"sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b",
"sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0"
},

fileinfo = {
'hashes': hashes,
'length': 28
}

# Assert that data is not aleady equal
self.assertNotEqual(targets.signed.targets[filename], fileinfo)
# Update an already existing fileinfo
targets.signed.update(filename, fileinfo)
# Verify that data is updated
self.assertEqual(targets.signed.targets[filename], fileinfo)

# Run unit test.
if __name__ == '__main__':
utils.configure_test_logging(sys.argv)
Expand Down
86 changes: 84 additions & 2 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ class also that has a 'from_dict' factory method. (Currently this is
elif _type == 'timestamp':
inner_cls = Timestamp
elif _type == 'root':
# TODO: implement Root class
raise NotImplementedError('Root not yet implemented')
inner_cls = Root
else:
raise ValueError(f'unrecognized metadata type "{_type}"')

Expand Down Expand Up @@ -335,6 +334,89 @@ def bump_version(self) -> None:
self.version += 1


class Root(Signed):
"""A container for the signed part of root metadata.

Attributes:
consistent_snapshot: A boolean indicating whether the repository
supports consistent snapshots.
keys: A dictionary that contains a public key store used to verify
top level roles metadata signatures::
{
'<KEYID>': {
'keytype': '<KEY TYPE>',
'scheme': '<KEY SCHEME>',
'keyid_hash_algorithms': [
'<HASH ALGO 1>',
'<HASH ALGO 2>'
...
],
'keyval': {
'public': '<PUBLIC KEY HEX REPRESENTATION>'
}
},
...
},
roles: A dictionary that contains a list of signing keyids and
a signature threshold for each top level role::
{
'<ROLE>': {
'keyids': ['<SIGNING KEY KEYID>', ...],
'threshold': <SIGNATURE THRESHOLD>,
},
...
}

"""
# TODO: determine an appropriate value for max-args and fix places where
# we violate that. This __init__ function takes 7 arguments, whereas the
# default max-args value for pylint is 5
# pylint: disable=too-many-arguments
def __init__(
self, _type: str, version: int, spec_version: str,
expires: datetime, consistent_snapshot: bool,
keys: JsonDict, roles: JsonDict) -> None:
super().__init__(_type, version, spec_version, expires)
# TODO: Add classes for keys and roles
self.consistent_snapshot = consistent_snapshot
self.keys = keys
self.roles = roles


# Serialization.
def to_dict(self) -> JsonDict:
"""Returns the JSON-serializable dictionary representation of self. """
json_dict = super().to_dict()
json_dict.update({
'consistent_snapshot': self.consistent_snapshot,
'keys': self.keys,
'roles': self.roles
})
return json_dict


# Update key for a role.
def add_key(self, role: str, keyid: str, key_metadata: JsonDict) -> None:
"""Adds new key for 'role' and updates the key store. """
if keyid not in self.roles[role]['keyids']:
self.roles[role]['keyids'].append(keyid)
self.keys[keyid] = key_metadata


# Remove key for a role.
def remove_key(self, role: str, keyid: str) -> None:
"""Removes key for 'role' and updates the key store. """
if keyid in self.roles[role]['keyids']:
self.roles[role]['keyids'].remove(keyid)
for keyinfo in self.roles.values():
if keyid in keyinfo['keyids']:
return

del self.keys[keyid]




class Timestamp(Signed):
"""A container for the signed part of timestamp metadata.

Expand Down