diff --git a/packages/discovery-provider/src/tasks/calculate_trending_challenges.py b/packages/discovery-provider/src/tasks/calculate_trending_challenges.py index d618105a393..6841172035b 100644 --- a/packages/discovery-provider/src/tasks/calculate_trending_challenges.py +++ b/packages/discovery-provider/src/tasks/calculate_trending_challenges.py @@ -9,6 +9,7 @@ from src.challenges.challenge_event import ChallengeEvent from src.challenges.challenge_event_bus import ChallengeEventBus +from src.models.core.core_indexed_blocks import CoreIndexedBlocks from src.queries.get_trending_playlists import ( GetTrendingPlaylistsArgs, _get_trending_playlists_with_session, @@ -19,9 +20,11 @@ _get_underground_trending_with_session, ) from src.tasks.aggregates import get_latest_blocknumber +from src.tasks.core.core_client import get_core_instance from src.trending_strategies.trending_strategy_factory import TrendingStrategyFactory from src.trending_strategies.trending_type_and_version import TrendingType from src.utils import helpers +from src.utils.core import is_indexing_core_em from src.utils.redis_constants import most_recent_indexed_block_redis_key logger = logging.getLogger(__name__) @@ -85,20 +88,43 @@ def enqueue_trending_challenges( ) update_start = time.time() with challenge_bus.use_scoped_dispatch_queue(): - latest_blocknumber = get_latest_blocknumber_via_redis(session, redis) + # subtract final poa block because db is final_poa_block + latest_acdc_block + latest_blocknumber = get_latest_blocknumber(session) if latest_blocknumber is None: logger.error( "calculate_trending_challenges.py | Unable to get latest block number" ) return - # subtract final poa block because db is final_poa_block + latest_acdc_block + latest_block_datetime = None + if is_indexing_core_em(): + core = get_core_instance() + node_info = core.get_node_info() + core_chain_id = node_info.chainid + + latest_indexed_block: Optional[CoreIndexedBlocks] = ( + session.query(CoreIndexedBlocks) + .filter(CoreIndexedBlocks.chain_id == core_chain_id) + .order_by(CoreIndexedBlocks.height.desc()) + .first() + ) - latest_block_datetime = datetime.fromtimestamp( - web3.eth.get_block(latest_blocknumber - helpers.get_final_poa_block())[ - "timestamp" - ] - ) + if latest_indexed_block: + block = core.get_block(int(latest_indexed_block.height)) + if block: + latest_block_datetime = block.timestamp.ToDatetime() + else: + latest_block_datetime = datetime.fromtimestamp( + web3.eth.get_block(latest_blocknumber - helpers.get_final_poa_block())[ + "timestamp" + ] + ) + + if latest_block_datetime is None: + logger.error( + "calculate_trending_challenges.py | Unable to get latest block time" + ) + return trending_track_versions = trending_strategy_factory.get_versions_for_type( TrendingType.TRACKS diff --git a/packages/discovery-provider/src/tasks/index_trending.py b/packages/discovery-provider/src/tasks/index_trending.py index 5ff1292553a..87281773723 100644 --- a/packages/discovery-provider/src/tasks/index_trending.py +++ b/packages/discovery-provider/src/tasks/index_trending.py @@ -8,6 +8,7 @@ from sqlalchemy.orm.session import Session from web3 import Web3 +from src.models.core.core_indexed_blocks import CoreIndexedBlocks from src.models.indexing.block import Block from src.models.notifications.notification import Notification from src.models.tracks.track import Track @@ -22,10 +23,12 @@ make_underground_trending_cache_key, ) from src.tasks.celery_app import celery +from src.tasks.core.core_client import get_core_instance from src.tasks.index_tastemaker_notifications import index_tastemaker_notifications from src.trending_strategies.trending_strategy_factory import TrendingStrategyFactory from src.trending_strategies.trending_type_and_version import TrendingType from src.utils.config import shared_config +from src.utils.core import is_indexing_core_em from src.utils.hardcoded_data import genre_allowlist from src.utils.helpers import get_adjusted_block from src.utils.prometheus_metric import ( @@ -465,13 +468,39 @@ def get_should_update_trending( The function returns the an int, representing the timestamp, if the jobs should re-run, else None """ with db.scoped_session() as session: - current_db_block = ( - session.query(Block.number).filter(Block.is_current == True).first() - ) - current_db_block_number = current_db_block[0] - current_block = get_adjusted_block(web3, current_db_block_number) - current_timestamp = current_block["timestamp"] - current_datetime = datetime.fromtimestamp(current_timestamp) + current_datetime = None + + if is_indexing_core_em(): + core = get_core_instance() + node_info = core.get_node_info() + core_chain_id = node_info.chainid + + latest_indexed_block: Optional[CoreIndexedBlocks] = ( + session.query(CoreIndexedBlocks) + .filter(CoreIndexedBlocks.chain_id == core_chain_id) + .order_by(CoreIndexedBlocks.height.desc()) + .first() + ) + + if latest_indexed_block: + block = core.get_block(int(latest_indexed_block.height)) + if block: + current_datetime = block.timestamp.ToDatetime() + + else: + current_db_block = ( + session.query(Block.number).filter(Block.is_current == True).first() + ) + current_db_block_number = current_db_block[0] + + current_block = get_adjusted_block(web3, current_db_block_number) + current_timestamp = current_block["timestamp"] + current_datetime = datetime.fromtimestamp(current_timestamp) + + if not current_datetime: + logger.error("no timestamp") + return None, None + min_block_datetime = floor_time(current_datetime, interval_seconds) # Handle base case of not having run last trending