diff --git a/packages/discovery-provider/ddl/migrations/0093_add_comment_moderation.sql b/packages/discovery-provider/ddl/migrations/0093_add_comment_moderation.sql new file mode 100644 index 00000000000..6dc91954c93 --- /dev/null +++ b/packages/discovery-provider/ddl/migrations/0093_add_comment_moderation.sql @@ -0,0 +1,28 @@ +begin; + +CREATE TABLE muted_users ( + muted_user_id integer NOT NULL, + user_id integer NOT NULL, + created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + is_delete boolean DEFAULT false, + txhash text NOT NULL, + blockhash text NOT NULL, + blocknumber integer REFERENCES blocks(number) ON DELETE CASCADE, + PRIMARY KEY (muted_user_id, user_id) + +); + + +CREATE TABLE reported_comments ( + reported_comment_id integer NOT NULL, + user_id integer NOT NULL, + created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + txhash text NOT NULL, + blockhash text NOT NULL, + blocknumber integer REFERENCES blocks(number) ON DELETE CASCADE, + PRIMARY KEY (reported_comment_id, user_id) +); + +commit; \ No newline at end of file diff --git a/packages/discovery-provider/integration_tests/tasks/entity_manager/test_comment_moderation.py b/packages/discovery-provider/integration_tests/tasks/entity_manager/test_comment_moderation.py new file mode 100644 index 00000000000..28727d62d26 --- /dev/null +++ b/packages/discovery-provider/integration_tests/tasks/entity_manager/test_comment_moderation.py @@ -0,0 +1,234 @@ +import logging # pylint: disable=C0302 +from typing import List + +from web3 import Web3 +from web3.datastructures import AttributeDict + +from integration_tests.challenges.index_helpers import UpdateTask +from integration_tests.utils import populate_mock_db +from src.challenges.challenge_event_bus import ChallengeEventBus, setup_challenge_bus +from src.models.moderation.muted_user import MutedUser +from src.models.moderation.reported_comment import ReportedComment +from src.tasks.entity_manager.entity_manager import entity_manager_update +from src.utils.db_session import get_db + +logger = logging.getLogger(__name__) + + +def test_artist_mute_user(app, mocker): + "Tests artist can mute a user" + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + update_task = UpdateTask(web3, challenge_event_bus) + + tx_receipts = { + "MuteUser": [ + { + "args": AttributeDict( + { + "_entityId": 2, + "_entityType": "User", + "_userId": 1, + "_action": "Mute", + "_metadata": "", + "_signer": "user1wallet", + } + ) + }, + ], + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.to_bytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt["transactionHash"].decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + + entities = { + "users": [ + {"user_id": 1, "handle": "artist", "wallet": "user1wallet"}, + {"user_id": 2, "handle": "punisher", "wallet": "user2wallet"}, + ], + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + # index transactions + entity_manager_update( + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=hex(0), + ) + + # validate db records + all_muted_users: List[MutedUser] = session.query(MutedUser).all() + assert len(all_muted_users) == 1 + assert all_muted_users[0].muted_user_id == 2 + assert all_muted_users[0].user_id == 1 + assert all_muted_users[0].is_delete == False + + +def test_artist_unmute_user(app, mocker): + "Tests artist can unmute a user" + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + update_task = UpdateTask(web3, challenge_event_bus) + + tx_receipts = { + "MuteUser": [ + { + "args": AttributeDict( + { + "_entityId": 2, + "_entityType": "User", + "_userId": 1, + "_action": "Mute", + "_metadata": "", + "_signer": "user1wallet", + } + ) + }, + ], + "UnmuteUser": [ + { + "args": AttributeDict( + { + "_entityId": 2, + "_entityType": "User", + "_userId": 1, + "_action": "Unmute", + "_metadata": "", + "_signer": "user1wallet", + } + ) + }, + ], + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.to_bytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt["transactionHash"].decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + + entities = { + "users": [ + {"user_id": 1, "handle": "artist", "wallet": "user1wallet"}, + {"user_id": 2, "handle": "punisher", "wallet": "user2wallet"}, + ], + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + # index transactions + entity_manager_update( + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=hex(0), + ) + + # validate db records + all_muted_users: List[MutedUser] = session.query(MutedUser).all() + assert len(all_muted_users) == 1 + assert all_muted_users[0].muted_user_id == 2 + assert all_muted_users[0].user_id == 1 + assert all_muted_users[0].is_delete == True + + +def test_report_comment(app, mocker): + "Tests users can report a comment" + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + update_task = UpdateTask(web3, challenge_event_bus) + + tx_receipts = { + "ReportComment": [ + { + "args": AttributeDict( + { + "_entityId": 1, + "_entityType": "Comment", + "_userId": 1, + "_action": "Report", + "_metadata": "", + "_signer": "user1wallet", + } + ) + }, + ], + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.to_bytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt["transactionHash"].decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + + entities = { + "users": [ + {"user_id": 1, "handle": "artist", "wallet": "user1wallet"}, + ], + "comments": [{"comment_id": 1}], + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + # index transactions + entity_manager_update( + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=hex(0), + ) + + # validate db records + all_reported_comments: List[ReportedComment] = session.query( + ReportedComment + ).all() + assert len(all_reported_comments) == 1 + assert all_reported_comments[0].reported_comment_id == 1 + assert all_reported_comments[0].user_id == 1 diff --git a/packages/discovery-provider/integration_tests/utils.py b/packages/discovery-provider/integration_tests/utils.py index fa7a82eb63c..f5466191a6b 100644 --- a/packages/discovery-provider/integration_tests/utils.py +++ b/packages/discovery-provider/integration_tests/utils.py @@ -791,8 +791,8 @@ def populate_mock_db(db, entities, block_offset=None): text=comment_meta.get("text", ""), created_at=comment_meta.get("created_at", datetime.now()), updated_at=comment_meta.get("updated_at", datetime.now()), - txhash=track_meta.get("txhash", str(i + block_offset)), - blockhash=track_meta.get("blockhash", str(i + block_offset)), + txhash=comment_meta.get("txhash", str(i + block_offset)), + blockhash=comment_meta.get("blockhash", str(i + block_offset)), ) session.add(comment_record) for i, comment_threads_meta in enumerate(comment_threads): diff --git a/packages/discovery-provider/src/models/moderation/muted_user.py b/packages/discovery-provider/src/models/moderation/muted_user.py new file mode 100644 index 00000000000..6c7e28a876e --- /dev/null +++ b/packages/discovery-provider/src/models/moderation/muted_user.py @@ -0,0 +1,30 @@ +from sqlalchemy import ( + Boolean, + Column, + DateTime, + ForeignKey, + Integer, + PrimaryKeyConstraint, + Text, +) + +from src.models.base import Base +from src.models.model_utils import RepresentableMixin + + +class MutedUser(Base, RepresentableMixin): + __tablename__ = "muted_users" + + muted_user_id = Column(Integer, nullable=False) + user_id = Column(Integer, nullable=False) + created_at = Column(DateTime, nullable=False) + updated_at = Column(DateTime, nullable=False) + is_delete = Column(Boolean, default=False) + txhash = Column(Text, nullable=False) + blockhash = Column(Text, nullable=False) + blocknumber = Column(Integer, ForeignKey("blocks.number"), nullable=False) + + PrimaryKeyConstraint(muted_user_id, user_id) + + def get_attributes_dict(self): + return {col.name: getattr(self, col.name) for col in self.__table__.columns} diff --git a/packages/discovery-provider/src/models/moderation/reported_comment.py b/packages/discovery-provider/src/models/moderation/reported_comment.py new file mode 100644 index 00000000000..fa1ccfa3ea4 --- /dev/null +++ b/packages/discovery-provider/src/models/moderation/reported_comment.py @@ -0,0 +1,21 @@ +from sqlalchemy import Column, DateTime, ForeignKey, Integer, PrimaryKeyConstraint, Text + +from src.models.base import Base +from src.models.model_utils import RepresentableMixin + + +class ReportedComment(Base, RepresentableMixin): + __tablename__ = "reported_comments" + + reported_comment_id = Column(Integer, nullable=False) + user_id = Column(Integer, nullable=False) + created_at = Column(DateTime, nullable=False) + updated_at = Column(DateTime, nullable=False) + txhash = Column(Text, nullable=False) + blockhash = Column(Text, nullable=False) + blocknumber = Column(Integer, ForeignKey("blocks.number"), nullable=False) + + PrimaryKeyConstraint(reported_comment_id, user_id) + + def get_attributes_dict(self): + return {col.name: getattr(self, col.name) for col in self.__table__.columns} diff --git a/packages/discovery-provider/src/tasks/entity_manager/entities/comment.py b/packages/discovery-provider/src/tasks/entity_manager/entities/comment.py index 0070ae4416e..da016330684 100644 --- a/packages/discovery-provider/src/tasks/entity_manager/entities/comment.py +++ b/packages/discovery-provider/src/tasks/entity_manager/entities/comment.py @@ -98,7 +98,6 @@ def validate_comment_reaction_tx(params: ManageEntityParameters): validate_signer(params) comment_id = params.entity_id user_id = params.user_id - logger.info(f"asdf params.existing_records {params.existing_records}") if ( params.action == Action.REACT and (user_id, comment_id) diff --git a/packages/discovery-provider/src/tasks/entity_manager/entities/muted_user.py b/packages/discovery-provider/src/tasks/entity_manager/entities/muted_user.py new file mode 100644 index 00000000000..414325af9e2 --- /dev/null +++ b/packages/discovery-provider/src/tasks/entity_manager/entities/muted_user.py @@ -0,0 +1,77 @@ +from src.exceptions import IndexingValidationError +from src.models.moderation.muted_user import MutedUser +from src.tasks.entity_manager.utils import ( + Action, + EntityType, + ManageEntityParameters, + copy_record, + validate_signer, +) +from src.utils.structured_logger import StructuredLogger + +logger = StructuredLogger(__name__) + + +def validate_mute_user_tx(params: ManageEntityParameters): + muted_user_id = params.entity_id + validate_signer(params) + if ( + params.action == Action.MUTE + and (params.user_id, muted_user_id) + in params.existing_records[EntityType.MUTED_USER.value] + ): + raise IndexingValidationError( + f"User {params.user_id} already muted user {muted_user_id}" + ) + if (params.entity_id) not in params.existing_records[EntityType.USER.value]: + raise IndexingValidationError( + f"User {params.entity_id} does not exist and cannot be muted" + ) + + +def mute_user(params: ManageEntityParameters): + validate_mute_user_tx(params) + + muted_user_id = params.entity_id + user_id = params.user_id + + muted_user_record = MutedUser( + muted_user_id=muted_user_id, + user_id=user_id, + txhash=params.txhash, + blockhash=params.event_blockhash, + blocknumber=params.block_number, + created_at=params.block_datetime, + updated_at=params.block_datetime, + is_delete=False, + ) + + params.add_record( + (user_id, muted_user_id), + muted_user_record, + EntityType.MUTED_USER, + ) + + +def unmute_user(params: ManageEntityParameters): + validate_signer(params) + muted_user_id = params.entity_id + user_id = params.user_id + + existing_muted_user_reaction = params.existing_records[EntityType.MUTED_USER.value][ + (user_id, muted_user_id) + ] + deleted_muted_user_reaction = copy_record( + existing_muted_user_reaction, + params.block_number, + params.event_blockhash, + params.txhash, + params.block_datetime, + ) + deleted_muted_user_reaction.is_delete = True + + params.add_record( + (user_id, muted_user_id), + deleted_muted_user_reaction, + EntityType.MUTED_USER, + ) diff --git a/packages/discovery-provider/src/tasks/entity_manager/entities/reported_comment.py b/packages/discovery-provider/src/tasks/entity_manager/entities/reported_comment.py new file mode 100644 index 00000000000..e88f50f800e --- /dev/null +++ b/packages/discovery-provider/src/tasks/entity_manager/entities/reported_comment.py @@ -0,0 +1,47 @@ +from src.exceptions import IndexingValidationError +from src.models.moderation.reported_comment import ReportedComment +from src.tasks.entity_manager.utils import ( + Action, + EntityType, + ManageEntityParameters, + validate_signer, +) +from src.utils.structured_logger import StructuredLogger + +logger = StructuredLogger(__name__) + + +def validate_report_comment_tx(params: ManageEntityParameters): + reported_comment_id = params.entity_id + validate_signer(params) + if ( + params.action == Action.CREATE + and (params.user_id, reported_comment_id) + in params.existing_records[EntityType.REPORTED_COMMENT.value] + ): + raise IndexingValidationError( + f"Comment {reported_comment_id} already reported by user {params.user_id}" + ) + + +def report_comment(params: ManageEntityParameters): + validate_report_comment_tx(params) + + reported_comment_id = params.entity_id + user_id = params.user_id + + muted_user_record = ReportedComment( + reported_comment_id=reported_comment_id, + user_id=user_id, + txhash=params.txhash, + blockhash=params.event_blockhash, + blocknumber=params.block_number, + created_at=params.block_datetime, + updated_at=params.block_datetime, + ) + + params.add_record( + (user_id, reported_comment_id), + muted_user_record, + EntityType.MUTED_USER, + ) diff --git a/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py b/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py index d09b58b6fd2..735100a2682 100644 --- a/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py +++ b/packages/discovery-provider/src/tasks/entity_manager/entity_manager.py @@ -17,6 +17,8 @@ from src.models.grants.developer_app import DeveloperApp from src.models.grants.grant import Grant from src.models.indexing.revert_block import RevertBlock +from src.models.moderation.muted_user import MutedUser +from src.models.moderation.reported_comment import ReportedComment from src.models.notifications.notification import NotificationSeen, PlaylistSeen from src.models.playlists.playlist import Playlist from src.models.playlists.playlist_route import PlaylistRoute @@ -59,6 +61,7 @@ reject_grant, revoke_grant, ) +from src.tasks.entity_manager.entities.muted_user import mute_user, unmute_user from src.tasks.entity_manager.entities.notification import ( create_notification, view_notification, @@ -69,6 +72,7 @@ delete_playlist, update_playlist, ) +from src.tasks.entity_manager.entities.reported_comment import report_comment from src.tasks.entity_manager.entities.social_features import ( action_to_record_types, create_social_action_types, @@ -219,7 +223,6 @@ def entity_manager_update( txhash, logger, ) - # update logger context with this tx event reset_entity_manager_event_tx_context(logger, event["args"]) @@ -377,6 +380,21 @@ def entity_manager_update( and params.entity_type == EntityType.COMMENT ): unreact_comment(params) + elif ( + params.action == Action.MUTE + and params.entity_type == EntityType.USER + ): + mute_user(params) + elif ( + params.action == Action.UNMUTE + and params.entity_type == EntityType.USER + ): + unmute_user(params) + elif ( + params.action == Action.REPORT + and params.entity_type == EntityType.COMMENT + ): + report_comment(params) logger.debug("process transaction") # log event context except IndexingValidationError as e: @@ -534,6 +552,10 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): if entity_type == EntityType.USER: entities_to_fetch[EntityType.USER_EVENT].add(user_id) entities_to_fetch[EntityType.ASSOCIATED_WALLET].add(user_id) + if action == Action.MUTE or action == Action.UNMUTE: + entities_to_fetch[EntityType.MUTED_USER].add((user_id, entity_id)) + entities_to_fetch[EntityType.USER].add(entity_id) + if entity_type == EntityType.TRACK: entities_to_fetch[EntityType.TRACK_ROUTE].add(entity_id) if entity_type == EntityType.PLAYLIST: @@ -544,6 +566,11 @@ def collect_entities_to_fetch(update_task, entity_manager_txs): entities_to_fetch[EntityType.COMMENT_REACTION].add( (user_id, entity_id) ) + elif action == Action.REPORT: + entities_to_fetch[EntityType.REPORTED_COMMENT].add( + (user_id, entity_id) + ) + if ( entity_type == EntityType.NOTIFICATION and action == Action.VIEW_PLAYLIST @@ -1132,6 +1159,75 @@ def fetch_existing_entities(session: Session, entities_to_fetch: EntitiesToFetch (comment_json["user_id"], comment_json["comment_id"]): comment_json for _, comment_json in comment_reactions } + if entities_to_fetch[EntityType.MUTED_USER.value]: + muted_users_to_fetch: Set[Tuple] = entities_to_fetch[ + EntityType.MUTED_USER.value + ] + or_queries = [] + for muted_user in muted_users_to_fetch: + user_id, muted_user_id = muted_user + or_queries.append( + or_( + MutedUser.user_id == user_id, + MutedUser.muted_user_id == muted_user_id, + ) + ) + + muted_users: List[Tuple[MutedUser, dict]] = ( + session.query( + MutedUser, + literal_column(f"row_to_json({MutedUser.__tablename__})"), + ) + .filter(or_(*or_queries)) + .all() + ) + existing_entities[EntityType.MUTED_USER] = { + (muted_user.user_id, muted_user.muted_user_id): muted_user + for muted_user, _ in muted_users + } + existing_entities_in_json[EntityType.MUTED_USER] = { + ( + muted_user_json["user_id"], + muted_user_json["muted_user_id"], + ): muted_user_json + for _, muted_user_json in muted_users + } + if entities_to_fetch[EntityType.REPORTED_COMMENT.value]: + reported_comments_to_fetch: Set[Tuple] = entities_to_fetch[ + EntityType.REPORTED_COMMENT.value + ] + or_queries = [] + for reported_comment in reported_comments_to_fetch: + user_id, reported_comment_id = reported_comment + or_queries.append( + or_( + ReportedComment.user_id == user_id, + ReportedComment.reported_comment_id == reported_comment_id, + ) + ) + + reported_comments: List[Tuple[ReportedComment, dict]] = ( + session.query( + ReportedComment, + literal_column(f"row_to_json({ReportedComment.__tablename__})"), + ) + .filter(or_(*or_queries)) + .all() + ) + existing_entities[EntityType.REPORTED_COMMENT] = { + ( + reported_comment.user_id, + reported_comment.reported_comment_id, + ): reported_comment + for reported_comment, _ in reported_comments + } + existing_entities_in_json[EntityType.REPORTED_COMMENT] = { + ( + reported_comment_json["user_id"], + reported_comment_json["reported_comment_id"], + ): reported_comment_json + for _, reported_comment_json in reported_comments + } return existing_entities, existing_entities_in_json diff --git a/packages/discovery-provider/src/tasks/entity_manager/utils.py b/packages/discovery-provider/src/tasks/entity_manager/utils.py index b5b2f1f0634..302bd0c1dbf 100644 --- a/packages/discovery-provider/src/tasks/entity_manager/utils.py +++ b/packages/discovery-provider/src/tasks/entity_manager/utils.py @@ -18,6 +18,8 @@ from src.models.grants.developer_app import DeveloperApp from src.models.grants.grant import Grant from src.models.indexing.cid_data import CIDData +from src.models.moderation.muted_user import MutedUser +from src.models.moderation.reported_comment import ReportedComment from src.models.notifications.notification import ( Notification, NotificationSeen, @@ -75,6 +77,9 @@ class Action(str, Enum): DOWNLOAD = "Download" REACT = "React" UNREACT = "Unreact" + MUTE = "Mute" + UNMUTE = "Unmute" + REPORT = "Report" def __str__(self) -> str: return str.__str__(self) @@ -104,6 +109,8 @@ class EntityType(str, Enum): TIP = "Tip" COMMENT = "Comment" COMMENT_REACTION = "CommentReaction" + MUTED_USER = "MutedUser" + REPORTED_COMMENT = "ReportedComment" def __str__(self) -> str: return str.__str__(self) @@ -160,6 +167,8 @@ class ExistingRecordDict(TypedDict): PlaylistRoute: Dict[int, PlaylistRoute] Comment: Dict[int, Comment] CommentReaction: Dict[Tuple, CommentReaction] + MutedUser: Dict[Tuple, MutedUser] + ReportedComment: Dict[Tuple, ReportedComment] class EntitiesToFetchDict(TypedDict): @@ -181,6 +190,8 @@ class EntitiesToFetchDict(TypedDict): UserWallet: Set[str] Comment: Set[int] CommentReaction: Set[Tuple] + MutedUser: Set[Tuple] + ReportedComment: Set[Tuple] MANAGE_ENTITY_EVENT_TYPE = "ManageEntity" @@ -413,6 +424,7 @@ def copy_record( DashboardWalletUser, Comment, CommentReaction, + MutedUser, ], block_number: int, event_blockhash: str,