Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from src.challenges.challenge_event import ChallengeEvent
from src.challenges.challenge_event_bus import ChallengeEventBus
from src.models.core.core_indexed_blocks import CoreIndexedBlocks
from src.queries.get_trending_playlists import (
GetTrendingPlaylistsArgs,
_get_trending_playlists_with_session,
Expand All @@ -19,9 +20,11 @@
_get_underground_trending_with_session,
)
from src.tasks.aggregates import get_latest_blocknumber
from src.tasks.core.core_client import get_core_instance
from src.trending_strategies.trending_strategy_factory import TrendingStrategyFactory
from src.trending_strategies.trending_type_and_version import TrendingType
from src.utils import helpers
from src.utils.core import is_indexing_core_em
from src.utils.redis_constants import most_recent_indexed_block_redis_key

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -85,20 +88,43 @@ def enqueue_trending_challenges(
)
update_start = time.time()
with challenge_bus.use_scoped_dispatch_queue():
latest_blocknumber = get_latest_blocknumber_via_redis(session, redis)
# subtract final poa block because db is final_poa_block + latest_acdc_block
latest_blocknumber = get_latest_blocknumber(session)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use the db, indexes should be fast enough and it's easier than keeping redis in sync with db.

if latest_blocknumber is None:
logger.error(
"calculate_trending_challenges.py | Unable to get latest block number"
)
return

# subtract final poa block because db is final_poa_block + latest_acdc_block
latest_block_datetime = None
if is_indexing_core_em():
core = get_core_instance()
node_info = core.get_node_info()
core_chain_id = node_info.chainid

latest_indexed_block: Optional[CoreIndexedBlocks] = (
session.query(CoreIndexedBlocks)
.filter(CoreIndexedBlocks.chain_id == core_chain_id)
.order_by(CoreIndexedBlocks.height.desc())
.first()
)

latest_block_datetime = datetime.fromtimestamp(
web3.eth.get_block(latest_blocknumber - helpers.get_final_poa_block())[
"timestamp"
]
)
if latest_indexed_block:
block = core.get_block(int(latest_indexed_block.height))
if block:
latest_block_datetime = block.timestamp.ToDatetime()
else:
latest_block_datetime = datetime.fromtimestamp(
web3.eth.get_block(latest_blocknumber - helpers.get_final_poa_block())[
"timestamp"
]
)
Comment on lines +99 to +121

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we just get the latest block for timestamp purposes


if latest_block_datetime is None:
logger.error(
"calculate_trending_challenges.py | Unable to get latest block time"
)
return

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of just returning here, we should throw all the way out to the top level. stuff's wack if we get here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was following the earlier pattern where if the block was not found then it also returned None. Should we raise an error there too? An indexed block should always return I'd think


trending_track_versions = trending_strategy_factory.get_versions_for_type(
TrendingType.TRACKS
Expand Down
43 changes: 36 additions & 7 deletions packages/discovery-provider/src/tasks/index_trending.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlalchemy.orm.session import Session
from web3 import Web3

from src.models.core.core_indexed_blocks import CoreIndexedBlocks
from src.models.indexing.block import Block
from src.models.notifications.notification import Notification
from src.models.tracks.track import Track
Expand All @@ -22,10 +23,12 @@
make_underground_trending_cache_key,
)
from src.tasks.celery_app import celery
from src.tasks.core.core_client import get_core_instance
from src.tasks.index_tastemaker_notifications import index_tastemaker_notifications
from src.trending_strategies.trending_strategy_factory import TrendingStrategyFactory
from src.trending_strategies.trending_type_and_version import TrendingType
from src.utils.config import shared_config
from src.utils.core import is_indexing_core_em
from src.utils.hardcoded_data import genre_allowlist
from src.utils.helpers import get_adjusted_block
from src.utils.prometheus_metric import (
Expand Down Expand Up @@ -465,13 +468,39 @@ def get_should_update_trending(
The function returns the an int, representing the timestamp, if the jobs should re-run, else None
"""
with db.scoped_session() as session:
current_db_block = (
session.query(Block.number).filter(Block.is_current == True).first()
)
current_db_block_number = current_db_block[0]
current_block = get_adjusted_block(web3, current_db_block_number)
current_timestamp = current_block["timestamp"]
current_datetime = datetime.fromtimestamp(current_timestamp)
current_datetime = None

if is_indexing_core_em():
core = get_core_instance()
node_info = core.get_node_info()
core_chain_id = node_info.chainid

latest_indexed_block: Optional[CoreIndexedBlocks] = (
session.query(CoreIndexedBlocks)
.filter(CoreIndexedBlocks.chain_id == core_chain_id)
.order_by(CoreIndexedBlocks.height.desc())
.first()
)

if latest_indexed_block:
block = core.get_block(int(latest_indexed_block.height))
if block:
current_datetime = block.timestamp.ToDatetime()

else:
current_db_block = (
session.query(Block.number).filter(Block.is_current == True).first()
)
current_db_block_number = current_db_block[0]

current_block = get_adjusted_block(web3, current_db_block_number)
current_timestamp = current_block["timestamp"]
current_datetime = datetime.fromtimestamp(current_timestamp)

if not current_datetime:
logger.error("no timestamp")
return None, None

min_block_datetime = floor_time(current_datetime, interval_seconds)

# Handle base case of not having run last trending
Expand Down