Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,22 @@ def populate_tracks(db):
],
"users": [
{"user_id": 1287289, "handle": "some-test-user"},
{"user_id": 4, "handle": "some-other-user"},
{"user_id": 4, "wallet": "0xuser4wallet", "handle": "some-other-user"},
{
"user_id": 5,
"handle": "test-user-5",
"artist_pick_track_id": 12,
"allow_ai_attribution": True,
},
],
"grants": [
{
"user_id": 1287289,
"grantee_address": "0xuser4wallet",
"is_approved": True,
"is_revoked": False,
},
],
}

populate_mock_db(db, test_entities)
Expand Down Expand Up @@ -221,27 +229,49 @@ def test_get_tracks_by_date_authed(app):
with app.app_context():
db = get_db()

populate_tracks(db)
populate_tracks(db)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why these tests mostly run outside the app context. But the check for managed users throws if we're not inside one.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python weird

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think python would say JS weird.


with db.scoped_session() as session:
tracks = _get_tracks(
session,
{
"user_id": 1287289,
"authed_user_id": 1287289,
"offset": 0,
"limit": 10,
"sort": "date",
},
)
with db.scoped_session() as session:
# test as authed user matching owner
tracks = _get_tracks(
session,
{
"user_id": 1287289,
"authed_user_id": 1287289,
"offset": 0,
"limit": 10,
"sort": "date",
},
)

assert len(tracks) == 8
assert tracks[0]["track_id"] == 1
assert tracks[1]["track_id"] == 11
assert tracks[2]["track_id"] == 3
assert tracks[3]["track_id"] == 5
assert tracks[4]["track_id"] == 4
assert tracks[5]["track_id"] == 2
assert len(tracks) == 8
assert tracks[0]["track_id"] == 1
assert tracks[1]["track_id"] == 11
assert tracks[2]["track_id"] == 3
assert tracks[3]["track_id"] == 5
assert tracks[4]["track_id"] == 4
assert tracks[5]["track_id"] == 2

# test as authed user managing owner
tracks = _get_tracks(
session,
{
"user_id": 1287289,
"current_user_id": 1287289,
"authed_user_id": 4,
"offset": 0,
"limit": 10,
"sort": "date",
},
)

assert len(tracks) == 8
assert tracks[0]["track_id"] == 1
assert tracks[1]["track_id"] == 11
assert tracks[2]["track_id"] == 3
assert tracks[3]["track_id"] == 5
assert tracks[4]["track_id"] == 4
assert tracks[5]["track_id"] == 2


def test_get_tracks_with_pinned_track(app):
Expand Down
31 changes: 1 addition & 30 deletions packages/discovery-provider/src/api/v1/access_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,11 @@

from flask_restx.errors import abort

from src.queries.get_managed_users import (
GetUserManagersArgs,
get_user_managers_with_grants,
)
from src.queries.get_managed_users import is_active_manager

logger = logging.getLogger(__name__)


def is_active_manager(user_id: int, manager_id: int) -> bool:
"""
Check if a manager is active for a given user.

Args:
user_id (int): The ID of the user.
manager_id (int): The ID of the manager.

Returns:
bool: True if the manager is active for the user, False otherwise.
"""
try:
grants = get_user_managers_with_grants(
GetUserManagersArgs(user_id=user_id, is_approved=True, is_revoked=False)
)
for grant in grants:
manager = grant.get("manager")
if manager and manager.get("user_id") == manager_id:
return True
except Exception as e:
logger.error(
f"access_helpers.py | Unexpected exception checking managers for user: {e}"
)
return False


def check_authorized(user_id, authed_user_id):
"""
Checks that the authenticated user matches or is a manager of the requested user.
Expand Down
26 changes: 26 additions & 0 deletions packages/discovery-provider/src/queries/get_managed_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,29 @@ def get_managed_users_with_grants(args: GetManagedUsersArgs) -> List[Dict]:
grants = query_result_to_list(grants)

return make_managed_users_list(users, grants)


def is_active_manager(user_id: int, manager_id: int) -> bool:
"""
Check if a manager is active for a given user.

Args:
user_id (int): The ID of the user.
manager_id (int): The ID of the manager.

Returns:
bool: True if the manager is active for the user, False otherwise.
"""
try:
grants = get_user_managers_with_grants(
GetUserManagersArgs(user_id=user_id, is_approved=True, is_revoked=False)
)
for grant in grants:
manager = grant.get("manager")
if manager and manager.get("user_id") == manager_id:
return True
except Exception as e:
logger.error(
f"get_managed_users.py | Unexpected exception checking managers for user: {e}"
)
return False
25 changes: 18 additions & 7 deletions packages/discovery-provider/src/queries/get_tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from src.models.tracks.track_route import TrackRoute
from src.models.tracks.track_with_aggregates import TrackWithAggregates
from src.models.users.user import User
from src.queries.get_managed_users import is_active_manager
from src.queries.query_helpers import (
SortDirection,
SortMethod,
Expand Down Expand Up @@ -89,15 +90,25 @@ def _get_tracks(session, args):
elif args.get("skip_unlisted_filter", False):
pass
else:
current_user_id = args.get("current_user_id")
user_id = args.get("user_id")
authed_user_id = args.get("authed_user_id")
is_authed_user_owner = (
user_id is not None
and authed_user_id is not None
and authed_user_id == user_id
)
is_authed_user_manager = (
user_id is not None
and current_user_id is not None
and authed_user_id is not None
and current_user_id == user_id
and is_active_manager(user_id, authed_user_id)
)
# Only return unlisted tracks if either
# - above case, routes are present (direct links to hidden tracks)
# - the user is authenticated as the owner
is_authed_user = (
"user_id" in args
and "authed_user_id" in args
and args.get("user_id") == args.get("authed_user_id")
)
if not is_authed_user:
# - the current user is authenticated as the owner or a manager of the owner
if not (is_authed_user_owner or is_authed_user_manager):
base_query = base_query.filter(TrackWithAggregates.is_unlisted == False)

# Conditionally process an array of tracks
Expand Down