From e718f63cc5619ce33f68a82a89f41aa4d46ae7a7 Mon Sep 17 00:00:00 2001 From: Hareesh Nagaraj Date: Mon, 16 Sep 2019 18:02:30 -0700 Subject: [PATCH 1/3] Reconnect for track indexing --- discovery-provider/src/tasks/index.py | 4 +++- discovery-provider/src/tasks/tracks.py | 28 ++++++++++++++++++++-- discovery-provider/src/tasks/users.py | 30 +---------------------- discovery-provider/src/utils/helpers.py | 32 +++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 32 deletions(-) diff --git a/discovery-provider/src/tasks/index.py b/discovery-provider/src/tasks/index.py index 9b07ab1c6a4..d049b949736 100644 --- a/discovery-provider/src/tasks/index.py +++ b/discovery-provider/src/tasks/index.py @@ -3,10 +3,11 @@ from src.models import Block, User, Track, Repost, Follow, Playlist, Save from src.tasks.celery_app import celery from src.tasks.tracks import track_state_update -from src.tasks.users import user_state_update, get_ipfs_info_from_cnode_endpoint # pylint: disable=E0611,E0001 +from src.tasks.users import user_state_update # pylint: disable=E0611,E0001 from src.tasks.social_features import social_feature_state_update from src.tasks.playlists import playlist_state_update from src.tasks.user_library import user_library_state_update +from src.utils.helpers import get_ipfs_info_from_cnode_endpoint logger = logging.getLogger(__name__) @@ -423,6 +424,7 @@ def refresh_peer_connections(task_context): continue if cnode_url == '': continue + logger.warning(cnode_url) try: logger.warning('index.py | Retrieving connection info for %s', cnode_url) diff --git a/discovery-provider/src/tasks/tracks.py b/discovery-provider/src/tasks/tracks.py index bb0e9fe7fdb..a525fd9eafa 100644 --- a/discovery-provider/src/tasks/tracks.py +++ b/discovery-provider/src/tasks/tracks.py @@ -2,8 +2,8 @@ from datetime import datetime from sqlalchemy.orm.session import make_transient from src import contract_addresses -from src.utils import multihash -from src.models import Track, BlacklistedIPLD +from src.utils import multihash, helpers +from src.models import Track, User, BlacklistedIPLD from src.tasks.metadata import track_metadata_format logger = logging.getLogger(__name__) @@ -143,6 +143,10 @@ def parse_track_event( return track_record track_record.owner_id = event_args._trackOwnerId + + # Reconnect to creator nodes for this user + refresh_track_owner_ipfs_conn(track_record.owner_id, session, update_task) + track_record.is_delete = False track_metadata = update_task.ipfs_client.get_metadata( track_metadata_multihash, @@ -185,6 +189,10 @@ def parse_track_event( track_record.owner_id = event_args._trackOwnerId track_record.is_delete = False + + # Reconnect to creator nodes for this user + refresh_track_owner_ipfs_conn(track_record.owner_id, session, update_task) + track_metadata = update_task.ipfs_client.get_metadata( upd_track_metadata_multihash, track_metadata_format @@ -243,3 +251,19 @@ def populate_track_record_metadata(track_record, track_metadata): track_record.iswc = track_metadata["iswc"] track_record.track_segments = track_metadata["track_segments"] return track_record + +def refresh_track_owner_ipfs_conn(owner_id, session, update_task): + owner_record = ( + session.query(User.creator_node_endpoint) + .filter( + User.is_current == True, + User.is_ready == True, + User.user_id == owner_id) + .all() + ) + if len(owner_record) >= 1: + parsed_endpoint_list = owner_record[0][0] + helpers.update_ipfs_peers_from_user_endpoint( + update_task, + parsed_endpoint_list + ) diff --git a/discovery-provider/src/tasks/users.py b/discovery-provider/src/tasks/users.py index e8b81db62e6..90ecbe37cef 100644 --- a/discovery-provider/src/tasks/users.py +++ b/discovery-provider/src/tasks/users.py @@ -246,7 +246,7 @@ def get_metadata_overrides_from_ipfs(session, update_task, user_record): return None # Manually peer with user creator nodes - update_ipfs_peers_from_user_endpoint( + helpers.update_ipfs_peers_from_user_endpoint( update_task, user_record.creator_node_endpoint ) @@ -257,31 +257,3 @@ def get_metadata_overrides_from_ipfs(session, update_task, user_record): ) return user_metadata - - -def get_ipfs_info_from_cnode_endpoint(url): - id_url = urljoin(url, 'ipfs_peer_info') - resp = requests.get(id_url, timeout=5) - json_resp = resp.json() - if 'addresses' in json_resp and isinstance(json_resp['addresses'], list): - for multiaddr in json_resp['addresses']: - if ('127.0.0.1' not in multiaddr) and ('ip6' not in multiaddr): - return multiaddr - raise Exception('Failed to find valid multiaddr') - - -def update_ipfs_peers_from_user_endpoint(update_task, cnode_url_list): - if cnode_url_list is None: - return - redis = update_task.redis - cnode_entries = cnode_url_list.split(',') - interval = int(update_task.shared_config["discprov"]["peer_refresh_interval"]) - for cnode_url in cnode_entries: - if cnode_url == '': - continue - try: - multiaddr = get_ipfs_info_from_cnode_endpoint(cnode_url) - update_task.ipfs_client.connect_peer(multiaddr) - redis.set(cnode_url, multiaddr, interval) - except Exception as e: # pylint: disable=broad-except - logger.warning(f"Error retrieving info for {cnode_url}, {e}") diff --git a/discovery-provider/src/utils/helpers.py b/discovery-provider/src/utils/helpers.py index 80f8c471823..b92cfb92204 100644 --- a/discovery-provider/src/utils/helpers.py +++ b/discovery-provider/src/utils/helpers.py @@ -1,9 +1,13 @@ +import logging import os import json import logging import contextlib +from urllib.parse import urljoin +import requests from . import multihash +logger = logging.getLogger(__name__) @contextlib.contextmanager def cd(path): @@ -120,3 +124,31 @@ def get_discovery_provider_version(): with open(versionFilePath) as f: data = json.load(f) return data + +def get_ipfs_info_from_cnode_endpoint(url): + id_url = urljoin(url, 'ipfs_peer_info') + resp = requests.get(id_url, timeout=5) + json_resp = resp.json() + if 'addresses' in json_resp and isinstance(json_resp['addresses'], list): + for multiaddr in json_resp['addresses']: + if ('127.0.0.1' not in multiaddr) and ('ip6' not in multiaddr): + return multiaddr + raise Exception('Failed to find valid multiaddr') + +def update_ipfs_peers_from_user_endpoint(update_task, cnode_url_list): + logger.warning('from users:') + logger.warning(cnode_url_list) + if cnode_url_list is None: + return + redis = update_task.redis + cnode_entries = cnode_url_list.split(',') + interval = int(update_task.shared_config["discprov"]["peer_refresh_interval"]) + for cnode_url in cnode_entries: + if cnode_url == '': + continue + try: + multiaddr = get_ipfs_info_from_cnode_endpoint(cnode_url) + update_task.ipfs_client.connect_peer(multiaddr) + redis.set(cnode_url, multiaddr, interval) + except Exception as e: # pylint: disable=broad-except + logger.warning(f"Error connecting to {cnode_url}, {e}") From 4221456cdaa8c1a2d767613d34e4f8ad354a4636 Mon Sep 17 00:00:00 2001 From: Hareesh Nagaraj Date: Mon, 16 Sep 2019 18:06:20 -0700 Subject: [PATCH 2/3] missed --- discovery-provider/src/tasks/index.py | 1 - 1 file changed, 1 deletion(-) diff --git a/discovery-provider/src/tasks/index.py b/discovery-provider/src/tasks/index.py index d049b949736..5a055862587 100644 --- a/discovery-provider/src/tasks/index.py +++ b/discovery-provider/src/tasks/index.py @@ -424,7 +424,6 @@ def refresh_peer_connections(task_context): continue if cnode_url == '': continue - logger.warning(cnode_url) try: logger.warning('index.py | Retrieving connection info for %s', cnode_url) From 38ab6f2465435e852c6bff454037344aadf3e9a5 Mon Sep 17 00:00:00 2001 From: Hareesh Nagaraj Date: Mon, 16 Sep 2019 18:18:16 -0700 Subject: [PATCH 3/3] lints --- discovery-provider/src/tasks/users.py | 2 -- discovery-provider/src/utils/helpers.py | 6 +----- discovery-provider/src/utils/ipfs_lib.py | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/discovery-provider/src/tasks/users.py b/discovery-provider/src/tasks/users.py index 90ecbe37cef..c731944c8b1 100644 --- a/discovery-provider/src/tasks/users.py +++ b/discovery-provider/src/tasks/users.py @@ -1,8 +1,6 @@ import logging -from urllib.parse import urljoin from datetime import datetime from sqlalchemy.orm.session import make_transient -import requests from src import contract_addresses from src.utils import helpers from src.models import User, BlacklistedIPLD diff --git a/discovery-provider/src/utils/helpers.py b/discovery-provider/src/utils/helpers.py index b92cfb92204..135239dd28a 100644 --- a/discovery-provider/src/utils/helpers.py +++ b/discovery-provider/src/utils/helpers.py @@ -1,14 +1,11 @@ import logging import os import json -import logging import contextlib from urllib.parse import urljoin import requests from . import multihash -logger = logging.getLogger(__name__) - @contextlib.contextmanager def cd(path): """Context manager that changes to directory `path` and return to CWD @@ -136,8 +133,7 @@ def get_ipfs_info_from_cnode_endpoint(url): raise Exception('Failed to find valid multiaddr') def update_ipfs_peers_from_user_endpoint(update_task, cnode_url_list): - logger.warning('from users:') - logger.warning(cnode_url_list) + logger = logging.getLogger(__name__) if cnode_url_list is None: return redis = update_task.redis diff --git a/discovery-provider/src/utils/ipfs_lib.py b/discovery-provider/src/utils/ipfs_lib.py index db6f2f9b915..2fc270e7e9a 100644 --- a/discovery-provider/src/utils/ipfs_lib.py +++ b/discovery-provider/src/utils/ipfs_lib.py @@ -2,9 +2,9 @@ import json import time from urllib.parse import urlparse -import ipfshttpclient import requests from requests.exceptions import ReadTimeout +import ipfshttpclient logger = logging.getLogger(__name__)