diff --git a/packages/discovery-provider/integration_tests/queries/test_get_tracks.py b/packages/discovery-provider/integration_tests/queries/test_get_tracks.py index 8e8bcdfd704..b7212ac0044 100644 --- a/packages/discovery-provider/integration_tests/queries/test_get_tracks.py +++ b/packages/discovery-provider/integration_tests/queries/test_get_tracks.py @@ -153,7 +153,7 @@ def populate_tracks(db): ], "users": [ {"user_id": 1287289, "handle": "some-test-user"}, - {"user_id": 4, "handle": "some-other-user"}, + {"user_id": 4, "wallet": "0xuser4wallet", "handle": "some-other-user"}, { "user_id": 5, "handle": "test-user-5", @@ -161,6 +161,14 @@ def populate_tracks(db): "allow_ai_attribution": True, }, ], + "grants": [ + { + "user_id": 1287289, + "grantee_address": "0xuser4wallet", + "is_approved": True, + "is_revoked": False, + }, + ], } populate_mock_db(db, test_entities) @@ -221,27 +229,49 @@ def test_get_tracks_by_date_authed(app): with app.app_context(): db = get_db() - populate_tracks(db) + populate_tracks(db) - with db.scoped_session() as session: - tracks = _get_tracks( - session, - { - "user_id": 1287289, - "authed_user_id": 1287289, - "offset": 0, - "limit": 10, - "sort": "date", - }, - ) + with db.scoped_session() as session: + # test as authed user matching owner + tracks = _get_tracks( + session, + { + "user_id": 1287289, + "authed_user_id": 1287289, + "offset": 0, + "limit": 10, + "sort": "date", + }, + ) - assert len(tracks) == 8 - assert tracks[0]["track_id"] == 1 - assert tracks[1]["track_id"] == 11 - assert tracks[2]["track_id"] == 3 - assert tracks[3]["track_id"] == 5 - assert tracks[4]["track_id"] == 4 - assert tracks[5]["track_id"] == 2 + assert len(tracks) == 8 + assert tracks[0]["track_id"] == 1 + assert tracks[1]["track_id"] == 11 + assert tracks[2]["track_id"] == 3 + assert tracks[3]["track_id"] == 5 + assert tracks[4]["track_id"] == 4 + assert tracks[5]["track_id"] == 2 + + # test as authed user managing owner + tracks = _get_tracks( + session, + { + "user_id": 1287289, + "current_user_id": 1287289, + "authed_user_id": 4, + "offset": 0, + "limit": 10, + "sort": "date", + }, + ) + + assert len(tracks) == 8 + assert tracks[0]["track_id"] == 1 + assert tracks[1]["track_id"] == 11 + assert tracks[2]["track_id"] == 3 + assert tracks[3]["track_id"] == 5 + assert tracks[4]["track_id"] == 4 + assert tracks[5]["track_id"] == 2 def test_get_tracks_with_pinned_track(app): diff --git a/packages/discovery-provider/src/api/v1/access_helpers.py b/packages/discovery-provider/src/api/v1/access_helpers.py index 34f46142bcb..1fd3995a593 100644 --- a/packages/discovery-provider/src/api/v1/access_helpers.py +++ b/packages/discovery-provider/src/api/v1/access_helpers.py @@ -2,40 +2,11 @@ from flask_restx.errors import abort -from src.queries.get_managed_users import ( - GetUserManagersArgs, - get_user_managers_with_grants, -) +from src.queries.get_managed_users import is_active_manager logger = logging.getLogger(__name__) -def is_active_manager(user_id: int, manager_id: int) -> bool: - """ - Check if a manager is active for a given user. - - Args: - user_id (int): The ID of the user. - manager_id (int): The ID of the manager. - - Returns: - bool: True if the manager is active for the user, False otherwise. - """ - try: - grants = get_user_managers_with_grants( - GetUserManagersArgs(user_id=user_id, is_approved=True, is_revoked=False) - ) - for grant in grants: - manager = grant.get("manager") - if manager and manager.get("user_id") == manager_id: - return True - except Exception as e: - logger.error( - f"access_helpers.py | Unexpected exception checking managers for user: {e}" - ) - return False - - def check_authorized(user_id, authed_user_id): """ Checks that the authenticated user matches or is a manager of the requested user. diff --git a/packages/discovery-provider/src/queries/get_managed_users.py b/packages/discovery-provider/src/queries/get_managed_users.py index 5bba0f70189..922fd57a712 100644 --- a/packages/discovery-provider/src/queries/get_managed_users.py +++ b/packages/discovery-provider/src/queries/get_managed_users.py @@ -156,3 +156,29 @@ def get_managed_users_with_grants(args: GetManagedUsersArgs) -> List[Dict]: grants = query_result_to_list(grants) return make_managed_users_list(users, grants) + + +def is_active_manager(user_id: int, manager_id: int) -> bool: + """ + Check if a manager is active for a given user. + + Args: + user_id (int): The ID of the user. + manager_id (int): The ID of the manager. + + Returns: + bool: True if the manager is active for the user, False otherwise. + """ + try: + grants = get_user_managers_with_grants( + GetUserManagersArgs(user_id=user_id, is_approved=True, is_revoked=False) + ) + for grant in grants: + manager = grant.get("manager") + if manager and manager.get("user_id") == manager_id: + return True + except Exception as e: + logger.error( + f"get_managed_users.py | Unexpected exception checking managers for user: {e}" + ) + return False diff --git a/packages/discovery-provider/src/queries/get_tracks.py b/packages/discovery-provider/src/queries/get_tracks.py index 2ece85f0d2a..ae6cdb4afe5 100644 --- a/packages/discovery-provider/src/queries/get_tracks.py +++ b/packages/discovery-provider/src/queries/get_tracks.py @@ -9,6 +9,7 @@ from src.models.tracks.track_route import TrackRoute from src.models.tracks.track_with_aggregates import TrackWithAggregates from src.models.users.user import User +from src.queries.get_managed_users import is_active_manager from src.queries.query_helpers import ( SortDirection, SortMethod, @@ -89,15 +90,25 @@ def _get_tracks(session, args): elif args.get("skip_unlisted_filter", False): pass else: + current_user_id = args.get("current_user_id") + user_id = args.get("user_id") + authed_user_id = args.get("authed_user_id") + is_authed_user_owner = ( + user_id is not None + and authed_user_id is not None + and authed_user_id == user_id + ) + is_authed_user_manager = ( + user_id is not None + and current_user_id is not None + and authed_user_id is not None + and current_user_id == user_id + and is_active_manager(user_id, authed_user_id) + ) # Only return unlisted tracks if either # - above case, routes are present (direct links to hidden tracks) - # - the user is authenticated as the owner - is_authed_user = ( - "user_id" in args - and "authed_user_id" in args - and args.get("user_id") == args.get("authed_user_id") - ) - if not is_authed_user: + # - the current user is authenticated as the owner or a manager of the owner + if not (is_authed_user_owner or is_authed_user_manager): base_query = base_query.filter(TrackWithAggregates.is_unlisted == False) # Conditionally process an array of tracks