diff --git a/scratchattach/cloud/_base.py b/scratchattach/cloud/_base.py index 1bf4a98e..19928a5a 100644 --- a/scratchattach/cloud/_base.py +++ b/scratchattach/cloud/_base.py @@ -1,60 +1,62 @@ -from abc import ABC, abstractmethod +from __future__ import annotations -import websocket import json +import ssl import time -from ..utils import exceptions -import warnings +from abc import ABC + +import websocket + from ..eventhandlers import cloud_recorder -import ssl +from ..utils import exceptions -class BaseCloud(ABC): +class BaseCloud(ABC): """ Base class for a project's cloud variables. Represents a cloud. - When inheriting from this class, the __init__ function of the inherited class ... - + When inheriting from this class, the __init__ function of the inherited class: - must first call the constructor of the super class: super().__init__() - - must then set some attributes Attributes that must be specified in the __init__ function a class inheriting from this one: + project_id: Project id of the cloud variables - :self.project_id: Project id of the cloud variables - - :self.cloud_host: URL of the websocket server ("wss://..." or "ws://...") + cloud_host: URL of the websocket server ("wss://..." or "ws://...") Attributes that can, but don't have to be specified in the __init__ function: - :self._session: Either None or a site.session.Session object. Defaults to None. + _session: Either None or a site.session.Session object. Defaults to None. - :self.ws_shortterm_ratelimit: The wait time between cloud variable sets. Defaults to 0.1 + ws_shortterm_ratelimit: The wait time between cloud variable sets. Defaults to 0.1 - :self.ws_longterm_ratelimit: The amount of cloud variable set that can be performed long-term without ever getting ratelimited + ws_longterm_ratelimit: The amount of cloud variable set that can be performed long-term without ever getting ratelimited - :self.allow_non_numeric: Whether non-numeric cloud variable values are allowed. Defaults to False + allow_non_numeric: Whether non-numeric cloud variable values are allowed. Defaults to False - :self.length_limit: Length limit for cloud variable values. Defaults to 100000 + length_limit: Length limit for cloud variable values. Defaults to 100000 - :self.username: The username to send during handshake. Defaults to "scratchattach" + username: The username to send during handshake. Defaults to "scratchattach" - :self.header: The header to send. Defaults to None + header: The header to send. Defaults to None - :self.cookie: The cookie to send. Defaults to None + cookie: The cookie to send. Defaults to None - :self.origin: The origin to send. Defaults to None + origin: The origin to send. Defaults to None - :self.print_connect_messages: Whether to print a message on every connect to the cloud server. Defaults to False. + print_connect_messages: Whether to print a message on every connect to the cloud server. Defaults to False. """ + def __init__(self, *, _session=None): # Required internal attributes that every object representing a cloud needs to have (no matter what cloud is represented): self._session = _session self.active_connection = False #whether a connection to a cloud variable server is currently established + self.websocket = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE}) - self.recorder = None # A CloudRecorder object that records cloud activity for the values to be retrieved later will be saved in this attribute as soon as .get_var is called + self.recorder = None # A CloudRecorder object that records cloud activity for the values to be retrieved later, + # which will be saved in this attribute as soon as .get_var is called self.first_var_set = 0 self.last_var_set = 0 self.var_stets_since_first = 0 @@ -63,7 +65,8 @@ def __init__(self, *, _session=None): # (These attributes can be specifically in the constructors of classes inheriting from this base class) self.ws_shortterm_ratelimit = 0.06667 self.ws_longterm_ratelimit = 0.1 - self.ws_timeout = 3 # Timeout for send operations (after the timeout, the connection will be renewed and the operation will be retried 3 times) + self.ws_timeout = 3 # Timeout for send operations (after the timeout, + # the connection will be renewed and the operation will be retried 3 times) self.allow_non_numeric = False self.length_limit = 100000 self.username = "scratchattach" @@ -100,7 +103,7 @@ def _send_packet(self, packet): self.websocket.send(json.dumps(packet) + "\n") except Exception: self.active_connection = False - raise exceptions.ConnectionError(f"Sending packet failed three times in a row: {packet}") + raise exceptions.CloudConnectionError(f"Sending packet failed three times in a row: {packet}") def _send_packet_list(self, packet_list): packet_string = "".join([json.dumps(packet) + "\n" for packet in packet_list]) @@ -126,7 +129,8 @@ def _send_packet_list(self, packet_list): self.websocket.send(packet_string) except Exception: self.active_connection = False - raise exceptions.ConnectionError(f"Sending packet list failed four times in a row: {packet_list}") + raise exceptions.CloudConnectionError( + f"Sending packet list failed four times in a row: {packet_list}") def _handshake(self): packet = {"method": "handshake", "user": self.username, "project_id": self.project_id} @@ -139,8 +143,8 @@ def connect(self): cookie=self.cookie, origin=self.origin, enable_multithread=True, - timeout = self.ws_timeout, - header = self.header + timeout=self.ws_timeout, + header=self.header ) self._handshake() self.active_connection = True @@ -166,29 +170,29 @@ def _assert_valid_value(self, value): if not (value in [True, False, float('inf'), -float('inf')]): value = str(value) if len(value) > self.length_limit: - raise(exceptions.InvalidCloudValue( + raise (exceptions.InvalidCloudValue( f"Value exceeds length limit: {str(value)}" )) if not self.allow_non_numeric: x = value.replace(".", "") x = x.replace("-", "") if not (x.isnumeric() or x == ""): - raise(exceptions.InvalidCloudValue( + raise (exceptions.InvalidCloudValue( "Value not numeric" )) def _enforce_ratelimit(self, *, n): # n is the amount of variables being set - if (time.time() - self.first_var_set) / (self.var_stets_since_first+1) > self.ws_longterm_ratelimit: # if the average delay between cloud variable sets has been bigger than the long-term rate-limit, cloud variables can be set fast (wait time smaller than long-term rate limit) again + if (time.time() - self.first_var_set) / ( + self.var_stets_since_first + 1) > self.ws_longterm_ratelimit: # if the average delay between cloud variable sets has been bigger than the long-term rate-limit, cloud variables can be set fast (wait time smaller than long-term rate limit) again self.var_stets_since_first = 0 self.first_var_set = time.time() wait_time = self.ws_shortterm_ratelimit * n - if time.time() - self.first_var_set > 25: # if cloud variables have been continously set fast (wait time smaller than long-term rate limit) for 25 seconds, they should be set slow now (wait time = long-term rate limit) to avoid getting rate-limited + if time.time() - self.first_var_set > 25: # if cloud variables have been continously set fast (wait time smaller than long-term rate limit) for 25 seconds, they should be set slow now (wait time = long-term rate limit) to avoid getting rate-limited wait_time = self.ws_longterm_ratelimit * n while self.last_var_set + wait_time >= time.time(): time.sleep(0.001) - def set_var(self, variable, value): """ @@ -231,7 +235,7 @@ def set_vars(self, var_value_dict, *, intelligent_waits=True): self.connect() if intelligent_waits: self._enforce_ratelimit(n=len(list(var_value_dict.keys()))) - + self.var_stets_since_first += len(list(var_value_dict.keys())) packet_list = [] @@ -256,7 +260,7 @@ def get_var(self, var, *, recorder_initial_values={}): self.recorder = cloud_recorder.CloudRecorder(self, initial_values=recorder_initial_values) self.recorder.start() start_time = time.time() - while not (self.recorder.cloud_values != {} or start_time < time.time() -5): + while not (self.recorder.cloud_values != {} or start_time < time.time() - 5): time.sleep(0.01) return self.recorder.get_var(var) @@ -265,7 +269,7 @@ def get_all_vars(self, *, recorder_initial_values={}): self.recorder = cloud_recorder.CloudRecorder(self, initial_values=recorder_initial_values) self.recorder.start() start_time = time.time() - while not (self.recorder.cloud_values != {} or start_time < time.time() -5): + while not (self.recorder.cloud_values != {} or start_time < time.time() - 5): time.sleep(0.01) return self.recorder.get_all_vars() @@ -273,9 +277,11 @@ def events(self): from ..eventhandlers.cloud_events import CloudEvents return CloudEvents(self) - def requests(self, *, no_packet_loss=False, used_cloud_vars=["1", "2", "3", "4", "5", "6", "7", "8", "9"], respond_order="receive"): + def requests(self, *, no_packet_loss=False, used_cloud_vars=["1", "2", "3", "4", "5", "6", "7", "8", "9"], + respond_order="receive"): from ..eventhandlers.cloud_requests import CloudRequests - return CloudRequests(self, used_cloud_vars=used_cloud_vars, no_packet_loss=no_packet_loss, respond_order=respond_order) + return CloudRequests(self, used_cloud_vars=used_cloud_vars, no_packet_loss=no_packet_loss, + respond_order=respond_order) def storage(self, *, no_packet_loss=False, used_cloud_vars=["1", "2", "3", "4", "5", "6", "7", "8", "9"]): from ..eventhandlers.cloud_storage import CloudStorage diff --git a/scratchattach/cloud/cloud.py b/scratchattach/cloud/cloud.py index c0378c8b..a622b92b 100644 --- a/scratchattach/cloud/cloud.py +++ b/scratchattach/cloud/cloud.py @@ -1,13 +1,15 @@ """v2 ready: ScratchCloud, TwCloud and CustomCloud classes""" +from __future__ import annotations + from ._base import BaseCloud from typing import Type from ..utils.requests import Requests as requests from ..utils import exceptions, commons from ..site import cloud_activity -class ScratchCloud(BaseCloud): +class ScratchCloud(BaseCloud): def __init__(self, *, project_id, _session=None): super().__init__() @@ -91,9 +93,10 @@ def events(self, *, use_logs=False): else: return super().events() -class TwCloud(BaseCloud): - def __init__(self, *, project_id, cloud_host="wss://clouddata.turbowarp.org", purpose="", contact=""): +class TwCloud(BaseCloud): + def __init__(self, *, project_id, cloud_host="wss://clouddata.turbowarp.org", purpose="", contact="", + _session=None): super().__init__() self.project_id = project_id diff --git a/scratchattach/eventhandlers/_base.py b/scratchattach/eventhandlers/_base.py index b94b6886..5b906b44 100644 --- a/scratchattach/eventhandlers/_base.py +++ b/scratchattach/eventhandlers/_base.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from abc import ABC, abstractmethod from ..utils.requests import Requests as requests from threading import Thread diff --git a/scratchattach/eventhandlers/cloud_events.py b/scratchattach/eventhandlers/cloud_events.py index 87b7586d..6f2c27fd 100644 --- a/scratchattach/eventhandlers/cloud_events.py +++ b/scratchattach/eventhandlers/cloud_events.py @@ -1,4 +1,5 @@ """CloudEvents class""" +from __future__ import annotations from ..cloud import cloud from ._base import BaseEventHandler diff --git a/scratchattach/eventhandlers/cloud_recorder.py b/scratchattach/eventhandlers/cloud_recorder.py index 6a2474a7..14eb1dcc 100644 --- a/scratchattach/eventhandlers/cloud_recorder.py +++ b/scratchattach/eventhandlers/cloud_recorder.py @@ -1,21 +1,25 @@ """CloudRecorder class (used by ScratchCloud, TwCloud and other classes inheriting from BaseCloud to deliver cloud var values)""" +from __future__ import annotations from .cloud_events import CloudEvents + class CloudRecorder(CloudEvents): + def __init__(self, cloud, *, initial_values: dict = None): + if initial_values is None: + initial_values = {} - def __init__(self, cloud, *, initial_values={}): super().__init__(cloud) self.cloud_values = initial_values self.event(self.on_set) def get_var(self, var): - if not var in self.cloud_values: + if var not in self.cloud_values: return None return self.cloud_values[var] - + def get_all_vars(self): return self.cloud_values - + def on_set(self, activity): self.cloud_values[activity.var] = activity.value diff --git a/scratchattach/eventhandlers/cloud_requests.py b/scratchattach/eventhandlers/cloud_requests.py index d3fcfa9b..ff0888e7 100644 --- a/scratchattach/eventhandlers/cloud_requests.py +++ b/scratchattach/eventhandlers/cloud_requests.py @@ -1,4 +1,5 @@ """CloudRequests class (threading.Event version)""" +from __future__ import annotations from .cloud_events import CloudEvents from ..site import project @@ -167,8 +168,8 @@ def _parse_output(self, request_name, output, request_id): def _set_FROM_HOST_var(self, value): try: self.cloud.set_var(f"FROM_HOST_{self.used_cloud_vars[self.current_var]}", value) - except exceptions.ConnectionError: - self.call_even("on_disconnect") + except exceptions.CloudConnectionError: + self.call_event("on_disconnect") except Exception as e: print("scratchattach: internal error while responding (please submit a bug report on GitHub):", e) self.current_var += 1 diff --git a/scratchattach/eventhandlers/cloud_server.py b/scratchattach/eventhandlers/cloud_server.py index 1b545c40..cf0af26c 100644 --- a/scratchattach/eventhandlers/cloud_server.py +++ b/scratchattach/eventhandlers/cloud_server.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket from threading import Thread from ..utils import exceptions diff --git a/scratchattach/eventhandlers/cloud_storage.py b/scratchattach/eventhandlers/cloud_storage.py index 199bb179..3a52bedb 100644 --- a/scratchattach/eventhandlers/cloud_storage.py +++ b/scratchattach/eventhandlers/cloud_storage.py @@ -1,4 +1,5 @@ """CloudStorage class""" +from __future__ import annotations from .cloud_requests import CloudRequests import json diff --git a/scratchattach/eventhandlers/combine.py b/scratchattach/eventhandlers/combine.py index 2ac3fc3b..3abd86d9 100644 --- a/scratchattach/eventhandlers/combine.py +++ b/scratchattach/eventhandlers/combine.py @@ -1,3 +1,5 @@ +from __future__ import annotations + class MultiEventHandler: def __init__(self, *handlers): diff --git a/scratchattach/eventhandlers/filterbot.py b/scratchattach/eventhandlers/filterbot.py index 829b5382..73498dd6 100644 --- a/scratchattach/eventhandlers/filterbot.py +++ b/scratchattach/eventhandlers/filterbot.py @@ -1,4 +1,5 @@ """FilterBot class""" +from __future__ import annotations from .message_events import MessageEvents import time diff --git a/scratchattach/eventhandlers/message_events.py b/scratchattach/eventhandlers/message_events.py index ec548b54..574f2360 100644 --- a/scratchattach/eventhandlers/message_events.py +++ b/scratchattach/eventhandlers/message_events.py @@ -1,4 +1,5 @@ """MessageEvents class""" +from __future__ import annotations from ..site import user from ._base import BaseEventHandler diff --git a/scratchattach/other/other_apis.py b/scratchattach/other/other_apis.py index c9519b87..2bc9ed03 100644 --- a/scratchattach/other/other_apis.py +++ b/scratchattach/other/other_apis.py @@ -1,11 +1,13 @@ """Other Scratch API-related functions""" +from __future__ import annotations import json +from dataclasses import dataclass, field from ..utils import commons +from ..utils.enums import Languages, Language, TTSVoices, TTSVoice from ..utils.exceptions import BadRequest, InvalidLanguage, InvalidTTSGender from ..utils.requests import Requests as requests -from ..utils.enums import Languages, Language, TTSVoices, TTSVoice # --- Front page --- @@ -136,6 +138,68 @@ def get_resource_urls(): return requests.get("https://resources.scratch.mit.edu/localized-urls.json").json() +# --- ScratchTools endpoints --- +def scratchtools_online_status(username: str) -> bool | None: + """ + Get the online status of an account. + :return: Boolean whether the account is online; if they do not use scratchtools, return None. + """ + data = requests.get(f"https://data.scratchtools.app/isonline/{username}").json() + + if data["scratchtools"]: + return data["online"] + else: + return None + + +def scratchtools_beta_user(username: str) -> bool: + """ + Get whether a user is a scratchtools beta tester (I think that's what it means) + """ + return requests.get(f"https://data.scratchtools.app/isbeta/{username}").json()["beta"] + + +def scratchtools_display_name(username: str) -> str | None: + """ + Get the display name of a user for scratchtools. Returns none if there is no display name or the username is invalid + """ + return requests.get(f"https://data.scratchtools.app/name/{username}").json().get("displayName") + + +@dataclass(init=True, repr=True) +class ScratchToolsTutorial: + title: str + description: str = field(repr=False) + id: str + + @classmethod + def from_json(cls, data: dict[str, str]) -> ScratchToolsTutorial: + return cls(**data) + + @property + def yt_link(self): + return f"https://www.youtube.com/watch?v={self.id}" + + +def scratchtools_tutorials() -> list[ScratchToolsTutorial]: + """ + Returns a list of scratchtools tutorials (just yt videos) + """ + data_list = requests.get("https://data.scratchtools.app/tutorials/").json() + return [ScratchToolsTutorial.from_json(data) for data in data_list] + + +def scratchtools_emoji_status(username: str) -> str | None: + return requests.get(f"https://data.scratchtools.app/status/{username}").json().get("status", + '🍪') # Cookie is the default status, even if the user does not use ScratchTools + + +def scratchtools_pinned_comment(project_id: int) -> dict[str, str | int]: + data = requests.get(f"https://data.scratchtools.app/pinned/{project_id}/").json() + # Maybe use this info to instantiate a partial comment object? + return data + + # --- Misc --- # I'm not sure what to label this as def scratch_team_members() -> dict: @@ -147,6 +211,13 @@ def scratch_team_members() -> dict: return json.loads(text) +def send_password_reset_email(username: str = None, email: str = None): + requests.post("https://scratch.mit.edu/accounts/password_reset/", data={ + "username": username, + "email": email, + }, headers=commons.headers, cookies={"scratchcsrftoken": 'a'}) + + def translate(language: str | Languages, text: str = "hello"): if isinstance(language, str): lang = Languages.find_by_attrs(language.lower(), ["code", "tts_locale", "name"], str.lower) diff --git a/scratchattach/other/project_json_capabilities.py b/scratchattach/other/project_json_capabilities.py index 06d65c29..5c022463 100644 --- a/scratchattach/other/project_json_capabilities.py +++ b/scratchattach/other/project_json_capabilities.py @@ -1,34 +1,38 @@ """Project JSON reading and editing capabilities. This code is still in BETA, there are still bugs and potential consistency issues to be fixed. New features will be added.""" +from __future__ import annotations + +import hashlib +import json import random -import zipfile import string +import zipfile from abc import ABC, abstractmethod + from ..utils import exceptions -from ..utils.requests import Requests as requests from ..utils.commons import empty_project_json -import json -import hashlib +from ..utils.requests import Requests as requests -def load_components(json_data:list, ComponentClass, target_list): + +# noinspection PyPep8Naming +def load_components(json_data: list, ComponentClass: type, target_list: list): for element in json_data: component = ComponentClass() component.from_json(element) target_list.append(component) -class ProjectBody: +class ProjectBody: class BaseProjectBodyComponent(ABC): - def __init__(self, **entries): # Attributes every object needs to have: self.id = None # Update attributes from entries dict: self.__dict__.update(entries) - + @abstractmethod - def from_json(self, data:dict): + def from_json(self, data: dict): pass @abstractmethod @@ -44,25 +48,26 @@ def _generate_new_id(self): """ self.id = ''.join(random.choices(string.ascii_letters + string.digits, k=20)) - class Block(BaseProjectBodyComponent): - + # Thanks to @MonkeyBean2 for some scripts def from_json(self, data: dict): - self.opcode = data["opcode"] # The name of the block - self.next_id = data.get("next", None) # The id of the block attached below this block - self.parent_id = data.get("parent", None) # The id of the block that this block is attached to - self.input_data = data.get("inputs", None) # The blocks inside of the block (if the block is a loop or an if clause for example) - self.fields = data.get("fields", None) # The values inside the block's inputs - self.shadow = data.get("shadow", False) # Whether the block is displayed with a shadow - self.topLevel = data.get("topLevel", False) # Whether the block has no parent - self.mutation = data.get("mutation", None) # For custom blocks - self.x = data.get("x", None) # x position if topLevel - self.y = data.get("y", None) # y position if topLevel - + self.opcode = data["opcode"] # The name of the block + self.next_id = data.get("next", None) # The id of the block attached below this block + self.parent_id = data.get("parent", None) # The id of the block that this block is attached to + self.input_data = data.get("inputs", None) # The blocks inside of the block (if the block is a loop or an if clause for example) + self.fields = data.get("fields", None) # The values inside the block's inputs + self.shadow = data.get("shadow", False) # Whether the block is displayed with a shadow + self.topLevel = data.get("topLevel", False) # Whether the block has no parent + self.mutation = data.get("mutation", None) # For custom blocks + self.x = data.get("x", None) # x position if topLevel + self.y = data.get("y", None) # y position if topLevel + def to_json(self): - output = {"opcode":self.opcode,"next":self.next_id,"parent":self.parent_id,"inputs":self.input_data,"fields":self.fields,"shadow":self.shadow,"topLevel":self.topLevel,"mutation":self.mutation,"x":self.x,"y":self.y} + output = {"opcode": self.opcode, "next": self.next_id, "parent": self.parent_id, "inputs": self.input_data, + "fields": self.fields, "shadow": self.shadow, "topLevel": self.topLevel, + "mutation": self.mutation, "x": self.x, "y": self.y} return {k: v for k, v in output.items() if v} def attached_block(self): @@ -81,7 +86,7 @@ def previous_chain(self): block = self while block.parent_id is not None: block = block.previous_block() - chain.insert(0,block) + chain.insert(0, block) return chain def attached_chain(self): @@ -94,7 +99,7 @@ def attached_chain(self): def complete_chain(self): return self.previous_chain() + [self] + self.attached_chain() - + def duplicate_single_block(self): new_block = ProjectBody.Block(**self.__dict__) new_block.parent_id = None @@ -102,7 +107,7 @@ def duplicate_single_block(self): new_block._generate_new_id() self.sprite.blocks.append(new_block) return new_block - + def duplicate_chain(self): blocks_to_dupe = [self] + self.attached_chain() duped = [] @@ -112,8 +117,8 @@ def duplicate_chain(self): new_block.next_id = None new_block._generate_new_id() if i != 0: - new_block.parent_id = duped[i-1].id - duped[i-1].next_id = new_block.id + new_block.parent_id = duped[i - 1].id + duped[i - 1].next_id = new_block.id duped.append(new_block) self.sprite.blocks += duped return duped @@ -126,7 +131,7 @@ def _reattach(self, new_parent_id, new_next_id_of_old_parent): self.sprite.blocks.append(old_parent_block) self.parent_id = new_parent_id - + if self.parent_id is not None: new_parent_block = self.sprite.block_by_id(self.parent_id) self.sprite.blocks.remove(new_parent_block) @@ -159,7 +164,7 @@ def delete_chain(self): self.sprite.blocks.remove(self) self.reattach_chain(None) - + to_delete = self.attached_chain() for block in to_delete: self.sprite.blocks.remove(block) @@ -171,36 +176,36 @@ def inputs_as_blocks(self): for input in self.input_data: inputs.append(self.sprite.block_by_id(self.input_data[input][1])) - class Sprite(BaseProjectBodyComponent): - def from_json(self, data:dict): + def from_json(self, data: dict): self.isStage = data["isStage"] self.name = data["name"] - self.id = self.name # Sprites are uniquely identifiable through their name + self.id = self.name # Sprites are uniquely identifiable through their name self.variables = [] - for variable_id in data["variables"]: #self.lists is a dict with the list_id as key and info as value + for variable_id in data["variables"]: # self.lists is a dict with the list_id as key and info as value pvar = ProjectBody.Variable(id=variable_id) pvar.from_json(data["variables"][variable_id]) self.variables.append(pvar) self.lists = [] - for list_id in data["lists"]: #self.lists is a dict with the list_id as key and info as value + for list_id in data["lists"]: # self.lists is a dict with the list_id as key and info as value plist = ProjectBody.List(id=list_id) plist.from_json(data["lists"][list_id]) self.lists.append(plist) self.broadcasts = data["broadcasts"] self.blocks = [] - for block_id in data["blocks"]: #self.blocks is a dict with the block_id as key and block content as value - if isinstance(data["blocks"][block_id], dict): # Sometimes there is a weird list at the end of the blocks list. This list is ignored + for block_id in data["blocks"]: # self.blocks is a dict with the block_id as key and block content as value + if isinstance(data["blocks"][block_id], + dict): # Sometimes there is a weird list at the end of the blocks list. This list is ignored block = ProjectBody.Block(id=block_id, sprite=self) block.from_json(data["blocks"][block_id]) self.blocks.append(block) self.comments = data["comments"] self.currentCostume = data["currentCostume"] self.costumes = [] - load_components(data["costumes"], ProjectBody.Asset, self.costumes) # load lists + load_components(data["costumes"], ProjectBody.Asset, self.costumes) # load lists self.sounds = [] - load_components(data["sounds"], ProjectBody.Asset, self.sounds) # load lists + load_components(data["sounds"], ProjectBody.Asset, self.sounds) # load lists self.volume = data["volume"] self.layerOrder = data["layerOrder"] if self.isStage: @@ -236,61 +241,71 @@ def to_json(self): return return_data def variable_by_id(self, variable_id): - matching = list(filter(lambda x : x.id == variable_id, self.variables)) + matching = list(filter(lambda x: x.id == variable_id, self.variables)) if matching == []: return None return matching[0] def list_by_id(self, list_id): - matching = list(filter(lambda x : x.id == list_id, self.lists)) + matching = list(filter(lambda x: x.id == list_id, self.lists)) if matching == []: return None return matching[0] def variable_by_name(self, variable_name): - matching = list(filter(lambda x : x.name == variable_name, self.variables)) + matching = list(filter(lambda x: x.name == variable_name, self.variables)) if matching == []: return None return matching[0] def list_by_name(self, list_name): - matching = list(filter(lambda x : x.name == list_name, self.lists)) + matching = list(filter(lambda x: x.name == list_name, self.lists)) if matching == []: return None return matching[0] def block_by_id(self, block_id): - matching = list(filter(lambda x : x.id == block_id, self.blocks)) + matching = list(filter(lambda x: x.id == block_id, self.blocks)) if matching == []: return None return matching[0] - + # -- Functions to modify project contents -- def create_sound(self, asset_content, *, name="new sound", dataFormat="mp3", rate=4800, sampleCount=4800): data = asset_content if isinstance(asset_content, bytes) else open(asset_content, "rb").read() new_asset_id = hashlib.md5(data).hexdigest() - new_asset = ProjectBody.Asset(assetId=new_asset_id, name=name, id=new_asset_id, dataFormat=dataFormat, rate=rate, sampleCound=sampleCount, md5ext=new_asset_id+"."+dataFormat, filename=new_asset_id+"."+dataFormat) + new_asset = ProjectBody.Asset(assetId=new_asset_id, name=name, id=new_asset_id, dataFormat=dataFormat, + rate=rate, sampleCound=sampleCount, md5ext=new_asset_id + "." + dataFormat, + filename=new_asset_id + "." + dataFormat) self.sounds.append(new_asset) if not hasattr(self, "projectBody"): - print("Warning: Since there's no project body connected to this object, the new sound asset won't be uploaded to Scratch") + print( + "Warning: Since there's no project body connected to this object, the new sound asset won't be uploaded to Scratch") elif self.projectBody._session is None: - print("Warning: Since there's no login connected to this object, the new sound asset won't be uploaded to Scratch") + print( + "Warning: Since there's no login connected to this object, the new sound asset won't be uploaded to Scratch") else: self._session.upload_asset(data, asset_id=new_asset_id, file_ext=dataFormat) return new_asset - def create_costume(self, asset_content, *, name="new costume", dataFormat="svg", rotationCenterX=0, rotationCenterY=0): + def create_costume(self, asset_content, *, name="new costume", dataFormat="svg", rotationCenterX=0, + rotationCenterY=0): data = asset_content if isinstance(asset_content, bytes) else open(asset_content, "rb").read() new_asset_id = hashlib.md5(data).hexdigest() - new_asset = ProjectBody.Asset(assetId=new_asset_id, name=name, id=new_asset_id, dataFormat=dataFormat, rotationCenterX=rotationCenterX, rotationCenterY=rotationCenterY, md5ext=new_asset_id+"."+dataFormat, filename=new_asset_id+"."+dataFormat) + new_asset = ProjectBody.Asset(assetId=new_asset_id, name=name, id=new_asset_id, dataFormat=dataFormat, + rotationCenterX=rotationCenterX, rotationCenterY=rotationCenterY, + md5ext=new_asset_id + "." + dataFormat, + filename=new_asset_id + "." + dataFormat) self.costumes.append(new_asset) if not hasattr(self, "projectBody"): - print("Warning: Since there's no project body connected to this object, the new costume asset won't be uploaded to Scratch") + print( + "Warning: Since there's no project body connected to this object, the new costume asset won't be uploaded to Scratch") elif self.projectBody._session is None: - print("Warning: Since there's no login connected to this object, the new costume asset won't be uploaded to Scratch") + print( + "Warning: Since there's no login connected to this object, the new costume asset won't be uploaded to Scratch") else: self._session.upload_asset(data, asset_id=new_asset_id, file_ext=dataFormat) return new_asset @@ -304,7 +319,7 @@ def create_list(self, name, *, value=[]): new_list = ProjectBody.List(name=name, value=value) self.lists.append(new_list) return new_list - + def add_block(self, block, *, parent_id=None): block.parent_id = None block.next_id = None @@ -317,15 +332,15 @@ def add_block_chain(self, block_chain, *, parent_id=None): for block in block_chain: self.add_block(block, parent_id=parent) parent = str(block.id) - + class Variable(BaseProjectBodyComponent): - + def __init__(self, **entries): super().__init__(**entries) if self.id is None: self._generate_new_id() - def from_json(self, data:list): + def from_json(self, data: list): self.name = data[0] self.saved_value = data[1] self.is_cloud = len(data) == 3 @@ -335,7 +350,7 @@ def to_json(self): return [self.name, self.saved_value, True] else: return [self.name, self.saved_value] - + def make_cloud_variable(self): self.is_cloud = True @@ -346,16 +361,16 @@ def __init__(self, **entries): if self.id is None: self._generate_new_id() - def from_json(self, data:list): + def from_json(self, data: list): self.name = data[0] self.saved_content = data[1] - + def to_json(self): return [self.name, self.saved_content] - + class Monitor(BaseProjectBodyComponent): - def from_json(self, data:dict): + def from_json(self, data: dict): self.__dict__.update(data) def to_json(self): @@ -375,12 +390,12 @@ def target(self): class Asset(BaseProjectBodyComponent): - def from_json(self, data:dict): + def from_json(self, data: dict): self.__dict__.update(data) self.id = self.assetId self.filename = self.md5ext self.download_url = f"https://assets.scratch.mit.edu/internalapi/asset/{self.filename}" - + def to_json(self): return_data = dict(self.__dict__) return_data.pop("filename") @@ -390,7 +405,7 @@ def to_json(self): def download(self, *, filename=None, dir=""): if not (dir.endswith("/") or dir.endswith("\\")): - dir = dir+"/" + dir = dir + "/" try: if filename is None: filename = str(self.filename) @@ -406,7 +421,7 @@ def download(self, *, filename=None, dir=""): ) ) - def __init__(self, *, sprites=[], monitors=[], extensions=[], meta=[{"agent":None}], _session=None): + def __init__(self, *, sprites=[], monitors=[], extensions=[], meta=[{"agent": None}], _session=None): # sprites are called "targets" in the initial API response self.sprites = sprites self.monitors = monitors @@ -414,7 +429,7 @@ def __init__(self, *, sprites=[], monitors=[], extensions=[], meta=[{"agent":Non self.meta = meta self._session = _session - def from_json(self, data:dict): + def from_json(self, data: dict): """ Imports the project data from a dict that contains the raw project json """ @@ -423,8 +438,8 @@ def from_json(self, data:dict): load_components(data["targets"], ProjectBody.Sprite, self.sprites) # Save origin of sprite in Sprite object: for sprite in self.sprites: - sprite.projectBody = self - # Load monitors: + sprite.projectBody = self + # Load monitors: self.monitors = [] load_components(data["monitors"], ProjectBody.Monitor, self.monitors) # Save origin of monitor in Monitor object: @@ -449,16 +464,17 @@ def to_json(self): def blocks(self): return [block for sprite in self.sprites for block in sprite.blocks] - + def block_count(self): return len(self.blocks()) - + def assets(self): - return [sound for sprite in self.sprites for sound in sprite.sounds] + [costume for sprite in self.sprites for costume in sprite.costumes] + return [sound for sprite in self.sprites for sound in sprite.sounds] + [costume for sprite in self.sprites for + costume in sprite.costumes] def asset_count(self): return len(self.assets()) - + def variable_by_id(self, variable_id): for sprite in self.sprites: r = sprite.variable_by_id(variable_id) @@ -470,16 +486,16 @@ def list_by_id(self, list_id): r = sprite.list_by_id(list_id) if r is not None: return r - + def sprite_by_name(self, sprite_name): - matching = list(filter(lambda x : x.name == sprite_name, self.sprites)) + matching = list(filter(lambda x: x.name == sprite_name, self.sprites)) if matching == []: return None return matching[0] - + def user_agent(self): return self.meta["agent"] - + def save(self, *, filename=None, dir=""): """ Saves the project body to the given directory. @@ -496,16 +512,19 @@ def save(self, *, filename=None, dir=""): with open(f"{dir}{filename}.sb3", "w") as d: json.dump(self.to_json(), d, indent=4) + def get_empty_project_pb(): pb = ProjectBody() pb.from_json(empty_project_json) return pb -def get_pb_from_dict(project_json:dict): + +def get_pb_from_dict(project_json: dict): pb = ProjectBody() pb.from_json(project_json) return pb + def _load_sb3_file(path_to_file): try: with open(path_to_file, "r") as r: @@ -520,19 +539,21 @@ def _load_sb3_file(path_to_file): else: raise ValueError("specified sb3 archive doesn't contain project.json") + def read_sb3_file(path_to_file): pb = ProjectBody() pb.from_json(_load_sb3_file(path_to_file)) return pb + def download_asset(asset_id_with_file_ext, *, filename=None, dir=""): if not (dir.endswith("/") or dir.endswith("\\")): - dir = dir+"/" + dir = dir + "/" try: if filename is None: filename = str(asset_id_with_file_ext) response = requests.get( - "https://assets.scratch.mit.edu/"+str(asset_id_with_file_ext), + "https://assets.scratch.mit.edu/" + str(asset_id_with_file_ext), timeout=10, ) open(f"{dir}{filename}", "wb").write(response.content) @@ -543,4 +564,4 @@ def download_asset(asset_id_with_file_ext, *, filename=None, dir=""): ) ) -# The method for uploading an asset by id requires authentication and can be found in the site.session.Session class \ No newline at end of file +# The method for uploading an asset by id requires authentication and can be found in the site.session.Session class diff --git a/scratchattach/site/_base.py b/scratchattach/site/_base.py index 29514653..86c542e4 100644 --- a/scratchattach/site/_base.py +++ b/scratchattach/site/_base.py @@ -1,9 +1,19 @@ +from __future__ import annotations + from abc import ABC, abstractmethod + import requests -from threading import Thread from ..utils import exceptions, commons +from types import FunctionType + class BaseSiteComponent(ABC): + @abstractmethod + def __init__(self): + self._session = None + self._cookies = None + self._headers = None + self.update_API = None def update(self): """ @@ -11,18 +21,22 @@ def update(self): """ response = self.update_function( self.update_API, - headers = self._headers, - cookies = self._cookies, timeout=10 + headers=self._headers, + cookies=self._cookies, timeout=10 ) # Check for 429 error: + # Note, this is a bit naïve if "429" in str(response): return "429" + if response.text == '{\n "response": "Too many requests"\n}': return "429" + # If no error: Parse JSON: response = response.json() if "code" in response: return False + return self._update_from_dict(response) @abstractmethod @@ -37,10 +51,14 @@ def _assert_auth(self): raise exceptions.Unauthenticated( "You need to use session.connect_xyz (NOT get_xyz) in order to perform this operation.") - def _make_linked_object(self, identificator_id, identificator, Class, NotFoundException): + def _make_linked_object(self, identificator_id, identificator, Class, NotFoundException) -> BaseSiteComponent: """ Internal function for making a linked object (authentication kept) based on an identificator (like a project id or username) Class must inherit from BaseSiteComponent """ return commons._get_object(identificator_id, identificator, Class, NotFoundException, self._session) + update_function: FunctionType = requests.get + """ + Internal function run on update. Function is a method of the 'requests' module/class + """ diff --git a/scratchattach/site/activity.py b/scratchattach/site/activity.py index 6fcc26d4..6d90e7c3 100644 --- a/scratchattach/site/activity.py +++ b/scratchattach/site/activity.py @@ -1,26 +1,20 @@ """Activity and CloudActivity class""" +from __future__ import annotations -import json -import re -import time +from bs4 import PageElement -from . import user -from . import session -from . import project -from . import studio -from . import forum, comment -from ..utils import exceptions +from . import user, project, studio from ._base import BaseSiteComponent -from ..utils.commons import headers -from bs4 import BeautifulSoup +from ..utils import exceptions -from ..utils.requests import Requests as requests class Activity(BaseSiteComponent): - - ''' + """ Represents a Scratch activity (message or other user page activity) - ''' + """ + + def __repr__(self): + return repr(self.raw) def str(self): return str(self.raw) @@ -31,44 +25,339 @@ def __init__(self, **entries): self._session = None self.raw = None + # Possible attributes + self.project_id = None + self.gallery_id = None + + self.username = None + self.followed_username = None + self.recipient_username = None + + self.comment_type = None + self.comment_obj_id = None + self.comment_obj_title = None + self.comment_id = None + + self.datetime_created = None + self.time = None + self.type = None + # Update attributes from entries dict: self.__dict__.update(entries) - def update(): + def update(self): print("Warning: Activity objects can't be updated") - return False # Objects of this type cannot be updated + return False # Objects of this type cannot be updated def _update_from_dict(self, data): self.raw = data self.__dict__.update(data) return True - def _update_from_html(self, data): + def _update_from_json(self, data: dict): + """ + Update using JSON, used in the classroom API. + """ + activity_type = data["type"] + + _time = data["datetime_created"] if "datetime_created" in data else None + + if "actor" in data: + username = data["actor"]["username"] + elif "actor_username" in data: + username = data["actor_username"] + else: + username = None + + if data.get("recipient") is not None: + recipient_username = data["recipient"]["username"] + + elif data.get("recipient_username") is not None: + recipient_username = data["recipient_username"] + + elif data.get("project_creator") is not None: + recipient_username = data["project_creator"]["username"] + else: + recipient_username = None + + default_case = False + """Whether this is 'blank'; it will default to 'user performed an action'""" + if activity_type == 0: + # follow + followed_username = data["followed_username"] + + self.raw = f"{username} followed user {followed_username}" + + self.datetime_created = _time + self.type = "followuser" + self.username = username + self.followed_username = followed_username + + elif activity_type == 1: + # follow studio + studio_id = data["gallery"] + + raw = f"{username} followed studio https://scratch.mit.edu/studios/{studio_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "followstudio" + + self.username = username + self.gallery_id = studio_id + + elif activity_type == 2: + # love project + project_id = data["project"] + + raw = f"{username} loved project https://scratch.mit.edu/projects/{project_id}" + + self.raw = raw + self.datetime_created = _time, + self.type = "loveproject" + + self.username = username + self.project_id = project_id + self.recipient_username = recipient_username + + elif activity_type == 3: + # Favorite project + project_id = data["project"] + + raw = f"{username} favorited project https://scratch.mit.edu/projects/{project_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "favoriteproject" + + self.username = username + self.project_id = project_id + self.recipient_username = recipient_username + + elif activity_type == 7: + # Add project to studio + + project_id = data["project"] + studio_id = data["gallery"] + + raw = f"{username} added the project https://scratch.mit.edu/projects/{project_id} to studio https://scratch.mit.edu/studios/{studio_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "addprojecttostudio" + + self.username = username + self.project_id = project_id + self.recipient_username = recipient_username + + elif activity_type == 8: + default_case = True + + elif activity_type == 9: + default_case = True + + elif activity_type == 10: + # Share/Reshare project + project_id = data["project"] + is_reshare = data["is_reshare"] + + raw_reshare = "reshared" if is_reshare else "shared" + + raw = f"{username} {raw_reshare} the project https://scratch.mit.edu/projects/{project_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "shareproject" + + self.username = username + self.project_id = project_id + self.recipient_username = recipient_username + + elif activity_type == 11: + # Remix + parent_id = data["parent"] + + raw = f"{username} remixed the project https://scratch.mit.edu/projects/{parent_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "remixproject" + + self.username = username + self.project_id = parent_id + self.recipient_username = recipient_username + + elif activity_type == 12: + default_case = True + + elif activity_type == 13: + # Create ('add') studio + studio_id = data["gallery"] + + raw = f"{username} created the studio https://scratch.mit.edu/studios/{studio_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "createstudio" + + self.username = username + self.gallery_id = studio_id + + elif activity_type == 15: + # Update studio + studio_id = data["gallery"] + + raw = f"{username} updated the studio https://scratch.mit.edu/studios/{studio_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "updatestudio" + + self.username = username + self.gallery_id = studio_id + + elif activity_type == 16: + default_case = True + + elif activity_type == 17: + default_case = True + + elif activity_type == 18: + default_case = True + + elif activity_type == 19: + # Remove project from studio + + project_id = data["project"] + studio_id = data["gallery"] + + raw = f"{username} removed the project https://scratch.mit.edu/projects/{project_id} from studio https://scratch.mit.edu/studios/{studio_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "removeprojectfromstudio" + + self.username = username + self.project_id = project_id + + elif activity_type == 20: + default_case = True + + elif activity_type == 21: + default_case = True + + elif activity_type == 22: + # Was promoted to manager for studio + studio_id = data["gallery"] + + raw = f"{recipient_username} was promoted to manager by {username} for studio https://scratch.mit.edu/studios/{studio_id}" + + self.raw = raw + self.datetime_created = _time + self.type = "promotetomanager" + + self.username = username + self.recipient_username = recipient_username + self.gallery_id = studio_id + + elif activity_type == 23: + default_case = True + + elif activity_type == 24: + default_case = True + + elif activity_type == 25: + # Update profile + raw = f"{username} made a profile update" + + self.raw = raw + self.datetime_created = _time + self.type = "updateprofile" + + self.username = username + + elif activity_type == 26: + default_case = True + + elif activity_type == 27: + # Comment (quite complicated) + comment_type: int = data["comment_type"] + fragment = data["comment_fragment"] + comment_id = data["comment_id"] + comment_obj_id = data["comment_obj_id"] + comment_obj_title = data["comment_obj_title"] + + if comment_type == 0: + # Project comment + raw = f"{username} commented on project https://scratch.mit.edu/projects/{comment_obj_id}/#comments-{comment_id} {fragment!r}" + + elif comment_type == 1: + # Profile comment + raw = f"{username} commented on user https://scratch.mit.edu/users/{comment_obj_title}/#comments-{comment_id} {fragment!r}" + + elif comment_type == 2: + # Studio comment + # Scratch actually provides an incorrect link, but it is fixed here: + raw = f"{username} commented on studio https://scratch.mit.edu/studios/{comment_obj_id}/comments/#comments-{comment_id} {fragment!r}" + + else: + raw = f"{username} commented {fragment!r}" # This should never happen + + self.raw = raw + self.datetime_created = _time + self.type = "addcomment" + + self.username = username + + self.comment_type = comment_type + self.comment_obj_id = comment_obj_id + self.comment_obj_title = comment_obj_title + self.comment_id = comment_id + + else: + default_case = True + + if default_case: + # This is coded in the scratch HTML, haven't found an example of it though + raw = f"{username} performed an action" + + self.raw = raw + self.datetime_created = _time + self.type = "performaction" + + self.username = username + + def _update_from_html(self, data: PageElement): self.raw = data - time=data.find('div').find('span').findNext().findNext().text.strip() + _time = data.find('div').find('span').findNext().findNext().text.strip() - if '\xa0' in time: - while '\xa0' in time: time=time.replace('\xa0', ' ') + if '\xa0' in _time: + while '\xa0' in _time: + _time = _time.replace('\xa0', ' ') - self.time = time - self.actor_username=(data.find('div').find('span').text) + self.datetime_created = _time + self.actor_username = data.find('div').find('span').text - self.target_name=(data.find('div').find('span').findNext().text) - self.target_link=(data.find('div').find('span').findNext()["href"]) - self.target_id=(data.find('div').find('span').findNext()["href"].split("/")[-2]) + self.target_name = data.find('div').find('span').findNext().text + self.target_link = data.find('div').find('span').findNext()["href"] + self.target_id = data.find('div').find('span').findNext()["href"].split("/")[-2] - self.type=data.find('div').find_all('span')[0].next_sibling.strip() + self.type = data.find('div').find_all('span')[0].next_sibling.strip() if self.type == "loved": self.type = "loveproject" - if self.type == "favorited": + + elif self.type == "favorited": self.type = "favoriteproject" - if "curator" in self.type: + + elif "curator" in self.type: self.type = "becomecurator" - if "shared" in self.type: + + elif "shared" in self.type: self.type = "shareproject" - if "is now following" in self.type: + + elif "is now following" in self.type: if "users" in self.target_link: self.type = "followuser" else: @@ -87,14 +376,14 @@ def target(self): Returns the activity's target (depending on the activity, this is either a User, Project, Studio or Comment object). May also return None if the activity type is unknown. """ - - if "project" in self.type: # target is a project + + if "project" in self.type: # target is a project if "target_id" in self.__dict__: return self._make_linked_object("id", self.target_id, project.Project, exceptions.ProjectNotFound) if "project_id" in self.__dict__: return self._make_linked_object("id", self.project_id, project.Project, exceptions.ProjectNotFound) - - if self.type == "becomecurator" or self.type == "followstudio": # target is a studio + + if self.type == "becomecurator" or self.type == "followstudio": # target is a studio if "target_id" in self.__dict__: return self._make_linked_object("id", self.target_id, studio.Studio, exceptions.StudioNotFound) if "gallery_id" in self.__dict__: @@ -102,21 +391,24 @@ def target(self): # NOTE: the "becomecurator" type is ambigous - if it is inside the studio activity tab, the target is the user who joined if "username" in self.__dict__: return self._make_linked_object("username", self.username, user.User, exceptions.UserNotFound) - - if self.type == "followuser" or "curator" in self.type: # target is a user + + if self.type == "followuser" or "curator" in self.type: # target is a user if "target_name" in self.__dict__: return self._make_linked_object("username", self.target_name, user.User, exceptions.UserNotFound) if "followed_username" in self.__dict__: return self._make_linked_object("username", self.followed_username, user.User, exceptions.UserNotFound) - if "recipient_username" in self.__dict__: # the recipient_username field always indicates the target is a user + if "recipient_username" in self.__dict__: # the recipient_username field always indicates the target is a user return self._make_linked_object("username", self.recipient_username, user.User, exceptions.UserNotFound) - - if self.type == "addcomment": # target is a comment + + if self.type == "addcomment": # target is a comment if self.comment_type == 0: - _c = project.Project(id=self.comment_obj_id, author_name=self._session.username, _session=self._session).comment_by_id(self.comment_id) + _c = project.Project(id=self.comment_obj_id, author_name=self._session.username, + _session=self._session).comment_by_id(self.comment_id) if self.comment_type == 1: _c = user.User(username=self.comment_obj_title, _session=self._session).comment_by_id(self.comment_id) if self.comment_type == 2: - _c = user.User(id=self.comment_obj_id, _session=self._session).comment_by_id(self.comment_id) + _c = user.User(id=self.comment_obj_id, _session=self._session).comment_by_id(self.comment_id) + else: + raise ValueError(f"{self.comment_type} is an invalid comment type") + return _c - \ No newline at end of file diff --git a/scratchattach/site/backpack_asset.py b/scratchattach/site/backpack_asset.py index fae2768b..f0b42692 100644 --- a/scratchattach/site/backpack_asset.py +++ b/scratchattach/site/backpack_asset.py @@ -1,8 +1,13 @@ +from __future__ import annotations + import time +import logging + from ._base import BaseSiteComponent from ..utils.requests import Requests as requests from ..utils import exceptions + class BackpackAsset(BaseSiteComponent): """ Represents an asset from the backpack. @@ -32,8 +37,8 @@ def __init__(self, **entries): self.__dict__.update(entries) def update(self): - print("Warning: BackpackAsset objects can't be updated") - return False # Objects of this type cannot be updated + logging.warning("Warning: BackpackAsset objects can't be updated") + return False # Objects of this type cannot be updated def _update_from_dict(self, data) -> bool: try: self.id = data["id"] @@ -52,21 +57,21 @@ def _update_from_dict(self, data) -> bool: except Exception: pass return True - def download(self, *, dir=""): + def download(self, *, fp: str = ''): """ Downloads the asset content to the given directory. The given filename is equal to the value saved in the .filename attribute. Args: - dir (str): The path of the directory the file will be saved in. + fp (str): The path of the directory the file will be saved in. """ - if not (dir.endswith("/") or dir.endswith("\\")): - dir = dir+"/" + if not (fp.endswith("/") or fp.endswith("\\")): + fp = fp + "/" try: response = requests.get( self.download_url, timeout=10, ) - open(f"{dir}{self.filename}", "wb").write(response.content) + open(f"{fp}{self.filename}", "wb").write(response.content) except Exception as e: raise ( exceptions.FetchError( @@ -79,6 +84,6 @@ def delete(self): return requests.delete( f"https://backpack.scratch.mit.edu/{self._session.username}/{self.id}", - headers = self._session._headers, - timeout = 10, + headers=self._session._headers, + timeout=10, ).json() diff --git a/scratchattach/site/classroom.py b/scratchattach/site/classroom.py index 0744d30a..6e7fb2b6 100644 --- a/scratchattach/site/classroom.py +++ b/scratchattach/site/classroom.py @@ -1,23 +1,37 @@ +from __future__ import annotations + import datetime -import requests -from . import user, session -from ..utils.commons import api_iterative, headers -from ..utils import exceptions, commons +import warnings +from typing import TYPE_CHECKING, Any + +import bs4 + +if TYPE_CHECKING: + from ..site.session import Session + +from ..utils.commons import requests +from . import user, activity from ._base import BaseSiteComponent +from ..utils import exceptions, commons +from ..utils.commons import headers + +from bs4 import BeautifulSoup + class Classroom(BaseSiteComponent): def __init__(self, **entries): # Info on how the .update method has to fetch the data: + # NOTE: THIS DOESN'T WORK WITH CLOSED CLASSES! self.update_function = requests.get if "id" in entries: self.update_API = f"https://api.scratch.mit.edu/classrooms/{entries['id']}" elif "classtoken" in entries: self.update_API = f"https://api.scratch.mit.edu/classtoken/{entries['classtoken']}" else: - raise KeyError + raise KeyError(f"No class id or token provided! Entries: {entries}") # Set attributes every Project object needs to have: - self._session = None + self._session: Session = None self.id = None self.classtoken = None @@ -35,33 +49,86 @@ def __init__(self, **entries): self._json_headers = dict(self._headers) self._json_headers["accept"] = "application/json" self._json_headers["Content-Type"] = "application/json" + self.is_closed = False + + def __repr__(self) -> str: + return f"classroom called {self.title!r}" + + def update(self): + try: + success = super().update() + except exceptions.ClassroomNotFound: + success = False + + if not success: + response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") + soup = BeautifulSoup(response.text, "html.parser") + + # id, title, description, status, date_start (iso format), educator/username + title = soup.find("title").contents[0][:-len(" on Scratch")] + + overviews = soup.find_all("p", {"class": "overview"}) + description, status = overviews[0].text, overviews[1].text + + educator_username = None + pfx = "Scratch.INIT_DATA.PROFILE = {\n model: {\n id: '" + sfx = "',\n userId: " + for script in soup.find_all("script"): + if pfx in script.text: + educator_username = commons.webscrape_count(script.text, pfx, sfx, str) + + ret = {"id": self.id, + "title": title, + "description": description, + "status": status, + "educator": {"username": educator_username}, + "is_closed": True + } + + return self._update_from_dict(ret) + return success def _update_from_dict(self, classrooms): - try: self.id = int(classrooms["id"]) - except Exception: pass - try: self.title = classrooms["title"] - except Exception: pass - try: self.about_class = classrooms["description"] - except Exception: pass - try: self.working_on = classrooms["status"] - except Exception: pass - try: self.datetime = datetime.datetime.fromisoformat(classrooms["date_start"]) - except Exception: pass - try: self.author = user.User(username=classrooms["educator"]["username"],_session=self._session) - except Exception: pass - try: self.author._update_from_dict(classrooms["educator"]) - except Exception: pass + try: + self.id = int(classrooms["id"]) + except Exception: + pass + try: + self.title = classrooms["title"] + except Exception: + pass + try: + self.about_class = classrooms["description"] + except Exception: + pass + try: + self.working_on = classrooms["status"] + except Exception: + pass + try: + self.datetime = datetime.datetime.fromisoformat(classrooms["date_start"]) + except Exception: + pass + try: + self.author = user.User(username=classrooms["educator"]["username"], _session=self._session) + except Exception: + pass + try: + self.author._update_from_dict(classrooms["educator"]) + except Exception: + pass + self.is_closed = classrooms.get("is_closed", False) return True - - def student_count(self): + + def student_count(self) -> int: # student count text = requests.get( f"https://scratch.mit.edu/classes/{self.id}/", - headers = self._headers + headers=self._headers ).text return commons.webscrape_count(text, "Students (", ")") - - def student_names(self, *, page=1): + + def student_names(self, *, page=1) -> list[str]: """ Returns the student on the class. @@ -71,22 +138,37 @@ def student_names(self, *, page=1): Returns: list: The usernames of the class students """ + if self.is_closed: + ret = [] + response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") + soup = BeautifulSoup(response.text, "html.parser") + + for scrollable in soup.find_all("ul", {"class": "scroll-content"}): + for item in scrollable.contents: + if not isinstance(item, bs4.NavigableString): + if "user" in item.attrs["class"]: + anchors = item.find_all("a") + if len(anchors) == 2: + ret.append(anchors[1].text.strip()) + + return ret + text = requests.get( f"https://scratch.mit.edu/classes/{self.id}/students/?page={page}", - headers = self._headers + headers=self._headers ).text textlist = [i.split('/">')[0] for i in text.split(' list[int]: """ Returns the class studio on the class. @@ -94,18 +176,194 @@ def class_studio_ids(self, *, page=1): page: The page of the students that should be returned. Returns: - list: The id of the class studios + list: The id of the class studios """ + if self.is_closed: + ret = [] + response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") + soup = BeautifulSoup(response.text, "html.parser") + + for scrollable in soup.find_all("ul", {"class": "scroll-content"}): + for item in scrollable.contents: + if not isinstance(item, bs4.NavigableString): + if "gallery" in item.attrs["class"]: + anchor = item.find("a") + if "href" in anchor.attrs: + ret.append(commons.webscrape_count(anchor.attrs["href"], "/studios/", "/")) + return ret + text = requests.get( f"https://scratch.mit.edu/classes/{self.id}/studios/?page={page}", - headers = self._headers + headers=self._headers ).text textlist = [int(i.split('/">')[0]) for i in text.split('\n None: + self._check_session() + requests.post(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, cookies=self._cookies, + files={"file": thumbnail}) + + def set_description(self, desc: str) -> None: + self._check_session() + response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, cookies=self._cookies, + json={"description": desc}) + + try: + data = response.json() + if data["description"] == desc: + # Success! + return + else: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + + except Exception as e: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + raise e + + def set_working_on(self, status: str) -> None: + self._check_session() + response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, cookies=self._cookies, + json={"status": status}) + + try: + data = response.json() + if data["status"] == status: + # Success! + return + else: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + + except Exception as e: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + raise e + + def set_title(self, title: str) -> None: + self._check_session() + response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, cookies=self._cookies, + json={"title": title}) + + try: + data = response.json() + if data["title"] == title: + # Success! + return + else: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + except Exception as e: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + raise e -def get_classroom(class_id) -> Classroom: + def add_studio(self, name: str, description: str = '') -> None: + self._check_session() + requests.post("https://scratch.mit.edu/classes/create_classroom_gallery/", + json={ + "classroom_id": str(self.id), + "classroom_token": self.classtoken, + "title": name, + "description": description}, + headers=self._headers, cookies=self._cookies) + + def reopen(self) -> None: + self._check_session() + response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, cookies=self._cookies, + json={"visibility": "visible"}) + + try: + response.json() + + except Exception as e: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + raise e + + def close(self) -> None: + self._check_session() + response = requests.post(f"https://scratch.mit.edu/site-api/classrooms/close_classroom/{self.id}/", + headers=self._headers, cookies=self._cookies) + + try: + response.json() + + except Exception as e: + warnings.warn(f"{self._session} may not be authenticated to edit {self}") + raise e + + def register_student(self, username: str, password: str = '', birth_month: int = None, birth_year: int = None, + gender: str = None, country: str = None, is_robot: bool = False) -> None: + return register_by_token(self.id, self.classtoken, username, password, birth_month, birth_year, gender, country, + is_robot) + + def generate_signup_link(self): + if self.classtoken is not None: + return f"https://scratch.mit.edu/signup/{self.classtoken}" + + self._check_session() + + response = requests.get(f"https://scratch.mit.edu/site-api/classrooms/generate_registration_link/{self.id}/", + headers=self._headers, cookies=self._cookies) + # Should really check for '404' page + data = response.json() + if "reg_link" in data: + return data["reg_link"] + else: + raise exceptions.Unauthorized(f"{self._session} is not authorised to generate a signup link of {self}") + + def public_activity(self, *, limit=20): + """ + Returns: + list: The user's activity data as parsed list of scratchattach.activity.Activity objects + """ + if limit > 20: + warnings.warn("The limit is set to more than 20. There may be an error") + soup = BeautifulSoup( + requests.get(f"https://scratch.mit.edu/site-api/classrooms/activity/public/{self.id}/?limit={limit}").text, + 'html.parser') + + activities = [] + source = soup.find_all("li") + + for data in source: + _activity = activity.Activity(_session=self._session, raw=data) + _activity._update_from_html(data) + activities.append(_activity) + + return activities + + def activity(self, student: str = "all", mode: str = "Last created", page: int = None) -> list[dict[str, Any]]: + """ + Get a list of private activity, only available to the class owner. + Returns: + list The private activity of users in the class + """ + + self._check_session() + + ascsort, descsort = commons.get_class_sort_mode(mode) + + data = requests.get(f"https://scratch.mit.edu/site-api/classrooms/activity/{self.id}/{student}/", + params={"page": page, "ascsort": ascsort, "descsort": descsort}, + headers=self._headers, cookies=self._cookies).json() + + _activity = [] + for activity_json in data: + _activity.append(activity.Activity(_session=self._session)) + _activity[-1]._update_from_json(activity_json) + + return _activity + + +def get_classroom(class_id: str) -> Classroom: """ Gets a class without logging in. @@ -120,9 +378,10 @@ def get_classroom(class_id) -> Classroom: If you want to use these, get the user with :meth:`scratchattach.session.Session.connect_classroom` instead. """ - print("Warning: For methods that require authentication, use session.connect_classroom instead of get_classroom") + warnings.warn("For methods that require authentication, use session.connect_classroom instead of get_classroom") return commons._get_object("id", class_id, Classroom, exceptions.ClassroomNotFound) + def get_classroom_from_token(class_token) -> Classroom: """ Gets a class without logging in. @@ -138,5 +397,28 @@ def get_classroom_from_token(class_token) -> Classroom: If you want to use these, get the user with :meth:`scratchattach.session.Session.connect_classroom` instead. """ - print("Warning: For methods that require authentication, use session.connect_classroom instead of get_classroom") - return commons._get_object("classtoken", class_token, Classroom, exceptions.ClassroomNotFound) \ No newline at end of file + warnings.warn("For methods that require authentication, use session.connect_classroom instead of get_classroom") + return commons._get_object("classtoken", class_token, Classroom, exceptions.ClassroomNotFound) + + +def register_by_token(class_id: int, class_token: str, username: str, password: str, birth_month: int, birth_year: int, + gender: str, country: str, is_robot: bool = False) -> None: + data = {"classroom_id": class_id, + "classroom_token": class_token, + + "username": username, + "password": password, + "birth_month": birth_month, + "birth_year": birth_year, + "gender": gender, + "country": country, + "is_robot": is_robot} + + response = requests.post("https://scratch.mit.edu/classes/register_new_student/", + data=data, headers=headers, cookies={"scratchcsrftoken": 'a'}) + ret = response.json()[0] + + if "username" in ret: + return + else: + raise exceptions.Unauthorized(f"Can't create account: {response.text}") diff --git a/scratchattach/site/cloud_activity.py b/scratchattach/site/cloud_activity.py index 296a14c2..63b64e0f 100644 --- a/scratchattach/site/cloud_activity.py +++ b/scratchattach/site/cloud_activity.py @@ -1,6 +1,10 @@ +from __future__ import annotations + import time from ._base import BaseSiteComponent + + class CloudActivity(BaseSiteComponent): """ Represents a cloud activity (a cloud variable set / creation / deletion). diff --git a/scratchattach/site/comment.py b/scratchattach/site/comment.py index 6fa456a9..3c66ece8 100644 --- a/scratchattach/site/comment.py +++ b/scratchattach/site/comment.py @@ -1,27 +1,15 @@ """Comment class""" +from __future__ import annotations -import json -import re - -from ..utils import commons - -from . import user -from . import session -from . import project -from . import studio -from . import forum -from ..utils import exceptions +from . import user, project, studio from ._base import BaseSiteComponent -from ..utils.commons import headers -from bs4 import BeautifulSoup +from ..utils import exceptions -from ..utils.requests import Requests as requests class Comment(BaseSiteComponent): - - ''' + """ Represents a Scratch comment (on a profile, studio or project) - ''' + """ def str(self): return str(self.content) @@ -31,52 +19,75 @@ def __init__(self, **entries): # Set attributes every Comment object needs to have: self.id = None self._session = None - self.source=None + self.source = None self.source_id = None self.cached_replies = None self.parent_id = None self.cached_parent_comment = None - if not "source" in entries: - "source" == "Unknown" # Update attributes from entries dict: self.__dict__.update(entries) + if "source" not in entries: + self.source = "Unknown" + def update(self): print("Warning: Comment objects can't be updated") - return False # Objects of this type cannot be updated + return False # Objects of this type cannot be updated def _update_from_dict(self, data): - try: self.id = data["id"] - except Exception: pass - try: self.parent_id = data["parent_id"] - except Exception: pass - try: self.commentee_id = data["commentee_id"] - except Exception: pass - try: self.content = data["content"] - except Exception: pass - try: self.datetime_created = data["datetime_created"] - except Exception: pass - try: self.author_name = data["author"]["username"] - except Exception: pass - try: self.author_id = data["author"]["id"] - except Exception: pass - try: self.written_by_scratchteam = data["author"]["scratchteam"] - except Exception: pass - try: self.reply_count = data["reply_count"] - except Exception: pass - try: self.source = data["source"] - except Exception: pass - try: self.source_id = data["source_id"] - except Exception: pass + try: + self.id = data["id"] + except Exception: + pass + try: + self.parent_id = data["parent_id"] + except Exception: + pass + try: + self.commentee_id = data["commentee_id"] + except Exception: + pass + try: + self.content = data["content"] + except Exception: + pass + try: + self.datetime_created = data["datetime_created"] + except Exception: + pass + try: + self.author_name = data["author"]["username"] + except Exception: + pass + try: + self.author_id = data["author"]["id"] + except Exception: + pass + try: + self.written_by_scratchteam = data["author"]["scratchteam"] + except Exception: + pass + try: + self.reply_count = data["reply_count"] + except Exception: + pass + try: + self.source = data["source"] + except Exception: + pass + try: + self.source_id = data["source_id"] + except Exception: + pass return True # Methods for getting related entities - def author(self): + def author(self) -> user.User: return self._make_linked_object("username", self.author_name, user.User, exceptions.UserNotFound) - def place(self): + def place(self) -> user.User | studio.Studio | project.Project: """ Returns the place (the project, profile or studio) where the comment was posted as Project, User, or Studio object. @@ -89,37 +100,49 @@ def place(self): if self.source == "project": return self._make_linked_object("id", self.source_id, project.Project, exceptions.UserNotFound) - def parent_comment(self): + def parent_comment(self) -> Comment | None: if self.parent_id is None: return None + if self.cached_parent_comment is not None: return self.cached_parent_comment + if self.source == "profile": - self.cached_parent_comment = user.User(username=self.source_id, _session=self._session).comment_by_id(self.parent_id) - if self.source == "project": + self.cached_parent_comment = user.User(username=self.source_id, _session=self._session).comment_by_id( + self.parent_id) + + elif self.source == "project": p = project.Project(id=self.source_id, _session=self._session) p.update() self.cached_parent_comment = p.comment_by_id(self.parent_id) - if self.source == "studio": - self.cached_parent_comment = studio.Studio(id=self.source_id, _session=self._session).comment_by_id(self.parent_id) + + elif self.source == "studio": + self.cached_parent_comment = studio.Studio(id=self.source_id, _session=self._session).comment_by_id( + self.parent_id) + return self.cached_parent_comment - - def replies(self, *, use_cache=True, limit=40, offset=0): + + def replies(self, *, use_cache: bool = True, limit=40, offset=0): """ Keyword Arguments: use_cache (bool): Returns the replies cached on the first reply fetch. This makes it SIGNIFICANTLY faster for profile comments. Warning: For profile comments, the replies are retrieved and cached on object creation. """ - if (self.cached_replies is None) or (use_cache is False): + if (self.cached_replies is None) or (not use_cache): if self.source == "profile": - self.cached_replies = user.User(username=self.source_id, _session=self._session).comment_by_id(self.id).cached_replies[offset:offset+limit] - if self.source == "project": + self.cached_replies = user.User(username=self.source_id, _session=self._session).comment_by_id( + self.id).cached_replies[offset:offset + limit] + + elif self.source == "project": p = project.Project(id=self.source_id, _session=self._session) p.update() self.cached_replies = p.comment_replies(comment_id=self.id, limit=limit, offset=offset) - if self.source == "studio": - self.cached_replies = studio.Studio(id=self.source_id, _session=self._session).comment_replies(comment_id=self.id, limit=limit, offset=offset) + + elif self.source == "studio": + self.cached_replies = studio.Studio(id=self.source_id, _session=self._session).comment_replies( + comment_id=self.id, limit=limit, offset=offset) + return self.cached_replies - + # Methods for dealing with the comment def reply(self, content, *, commentee_id=None): @@ -152,14 +175,17 @@ def reply(self, content, *, commentee_id=None): else: commentee_id = "" if self.source == "profile": - return user.User(username=self.source_id, _session=self._session).reply_comment(content, parent_id=str(parent_id), commentee_id=commentee_id) + return user.User(username=self.source_id, _session=self._session).reply_comment(content, + parent_id=str(parent_id), + commentee_id=commentee_id) if self.source == "project": p = project.Project(id=self.source_id, _session=self._session) p.update() return p.reply_comment(content, parent_id=str(parent_id), commentee_id=commentee_id) if self.source == "studio": - return studio.Studio(id=self.source_id, _session=self._session).reply_comment(content, parent_id=str(parent_id), commentee_id=commentee_id) - + return studio.Studio(id=self.source_id, _session=self._session).reply_comment(content, + parent_id=str(parent_id), + commentee_id=commentee_id) def delete(self): """ @@ -168,11 +194,13 @@ def delete(self): self._assert_auth() if self.source == "profile": user.User(username=self.source_id, _session=self._session).delete_comment(comment_id=self.id) - if self.source == "project": + + elif self.source == "project": p = project.Project(id=self.source_id, _session=self._session) p.update() p.delete_comment(comment_id=self.id) - if self.source == "studio": + + elif self.source == "studio": studio.Studio(id=self.source_id, _session=self._session).delete_comment(comment_id=self.id) def report(self): @@ -182,9 +210,11 @@ def report(self): self._assert_auth() if self.source == "profile": user.User(username=self.source_id, _session=self._session).report_comment(comment_id=self.id) - if self.source == "project": + + elif self.source == "project": p = project.Project(id=self.source_id, _session=self._session) p.update() p.report_comment(comment_id=self.id) - if self.source == "studio": - studio.Studio(id=self.source_id, _session=self._session).report_comment(comment_id=self.id) \ No newline at end of file + + elif self.source == "studio": + studio.Studio(id=self.source_id, _session=self._session).report_comment(comment_id=self.id) diff --git a/scratchattach/site/forum.py b/scratchattach/site/forum.py index 510b10a8..5605b92e 100644 --- a/scratchattach/site/forum.py +++ b/scratchattach/site/forum.py @@ -1,4 +1,5 @@ """ForumTopic and ForumPost classes""" +from __future__ import annotations from . import user from ..utils.commons import headers @@ -32,8 +33,8 @@ class ForumTopic(BaseSiteComponent): :.update(): Updates the attributes ''' - def __init__(self, **entries): + def __init__(self, **entries): # Info on how the .update method has to fetch the data: self.update_function = requests.get self.update_API = f"https://scratch.mit.edu/discuss/feeds/topic/{entries['id']}/" diff --git a/scratchattach/site/project.py b/scratchattach/site/project.py index 47e8b465..0cc33e7b 100644 --- a/scratchattach/site/project.py +++ b/scratchattach/site/project.py @@ -1,4 +1,5 @@ """Project and PartialProject classes""" +from __future__ import annotations import json import random @@ -324,7 +325,7 @@ def studios(self, *, limit=40, offset=0): f"https://api.scratch.mit.edu/users/{self.author_name}/projects/{self.id}/studios", limit=limit, offset=offset, add_params=f"&cachebust={random.randint(0,9999)}") return commons.parse_object_list(response, studio.Studio, self._session) - def comments(self, *, limit=40, offset=0): + def comments(self, *, limit=40, offset=0) -> list['comment.Comment']: """ Returns the comments posted on the project (except for replies. To get replies use :meth:`scratchattach.project.Project.comment_replies`). @@ -343,7 +344,6 @@ def comments(self, *, limit=40, offset=0): i["source_id"] = self.id return commons.parse_object_list(response, comment.Comment, self._session) - def comment_replies(self, *, comment_id, limit=40, offset=0): response = commons.api_iterative( f"https://api.scratch.mit.edu/users/{self.author_name}/projects/{self.id}/comments/{comment_id}/replies/", limit=limit, offset=offset, add_params=f"&cachebust={random.randint(0,9999)}") diff --git a/scratchattach/site/session.py b/scratchattach/site/session.py index 5b278f86..d513a6c3 100644 --- a/scratchattach/site/session.py +++ b/scratchattach/site/session.py @@ -1,63 +1,55 @@ """Session class and login function""" +from __future__ import annotations +import base64 +import datetime +import hashlib import json -import re -import warnings import pathlib -import hashlib -import time import random -import base64 -import secrets +import re +import time +import warnings +# import secrets +# import zipfile from typing import Type -import zipfile -from . import forum - -from ..utils import commons +from bs4 import BeautifulSoup +from . import activity, classroom, forum, studio, user, project, backpack_asset +# noinspection PyProtectedMember +from ._base import BaseSiteComponent from ..cloud import cloud, _base -from . import user, project, backpack_asset, classroom -from ..utils import exceptions -from . import studio -from . import classroom from ..eventhandlers import message_events, filterbot -from . import activity -from ._base import BaseSiteComponent -from ..utils.commons import headers, empty_project_json -from bs4 import BeautifulSoup from ..other import project_json_capabilities +from ..utils import commons +from ..utils import exceptions +from ..utils.commons import headers, empty_project_json, webscrape_count, get_class_sort_mode from ..utils.requests import Requests as requests CREATE_PROJECT_USES = [] +CREATE_STUDIO_USES = [] +CREATE_CLASS_USES = [] -class Session(BaseSiteComponent): - ''' +class Session(BaseSiteComponent): + """ Represents a Scratch log in / session. Stores authentication data (session id and xtoken). Attributes: + id: The session id associated with the login + username: The username associated with the login + xtoken: The xtoken associated with the login + email: The email address associated with the logged in account + new_scratcher: True if the associated account is a new Scratcher + mute_status: Information about commenting restrictions of the associated account + banned: Returns True if the associated account is banned + """ - :.id: The session id associated with the login - - :.username: The username associated with the login - - :.xtoken: The xtoken associated with the login - - :.email: The email address associated with the logged in account - - :.new_scratcher: Returns True if the associated account is a new Scratcher - - :.mute_status: Information about commenting restrictions of the associated account - - :.banned: Returns True if the associated account is banned - ''' - - def __str__(self): - return f"Login for account: {self.username}" + def __str__(self) -> str: + return f"Login for account {self.username!r}" def __init__(self, **entries): - # Info on how the .update method has to fetch the data: self.update_function = requests.post self.update_API = "https://scratch.mit.edu/session" @@ -68,23 +60,26 @@ def __init__(self, **entries): self.xtoken = None self.new_scratcher = None + # Set attributes that Session object may get + self._user: user.User = None + # Update attributes from entries dict: self.__dict__.update(entries) # Set alternative attributes: - self._username = self.username # backwards compatibility with v1 + self._username = self.username # backwards compatibility with v1 # Base headers and cookies of every session: self._headers = dict(headers) self._cookies = { - "scratchsessionsid" : self.id, - "scratchcsrftoken" : "a", - "scratchlanguage" : "en", + "scratchsessionsid": self.id, + "scratchcsrftoken": "a", + "scratchlanguage": "en", "accept": "application/json", "Content-Type": "application/json", } - def _update_from_dict(self, data): + def _update_from_dict(self, data: dict): # Note: there are a lot more things you can get from this data dict. # Maybe it would be a good idea to also store the dict itself? # self.data = data @@ -97,6 +92,8 @@ def _update_from_dict(self, data): self.email = data["user"]["email"] self.new_scratcher = data["permissions"]["new_scratcher"] + self.is_teacher = data["permissions"]["educator"] + self.mute_status = data["permissions"]["mute_status"] self.username = data["user"]["username"] @@ -104,30 +101,44 @@ def _update_from_dict(self, data): self.banned = data["user"]["banned"] if self.banned: - warnings.warn(f"Warning: The account {self._username} you logged in to is BANNED. Some features may not work properly.") + warnings.warn(f"Warning: The account {self._username} you logged in to is BANNED. " + f"Some features may not work properly.") if self.has_outstanding_email_confirmation: - warnings.warn(f"Warning: The account {self._username} you logged is not email confirmed. Some features may not work properly.") + warnings.warn(f"Warning: The account {self._username} you logged is not email confirmed. " + f"Some features may not work properly.") return True - def connect_linked_user(self) -> 'user.User': - ''' - Gets the user associated with the log in / session. + def connect_linked_user(self) -> user.User: + """ + Gets the user associated with the login / session. Warning: The returned User object is cached. To ensure its attribute are up to date, you need to run .update() on it. Returns: - scratchattach.user.User: Object representing the user associated with the log in / session. - ''' - if not hasattr(self, "_user"): + scratchattach.user.User: Object representing the user associated with the session. + """ + cached = hasattr(self, "_user") + if cached: + cached = self._user is not None + + if not cached: self._user = self.connect_user(self._username) return self._user - def get_linked_user(self): + def get_linked_user(self) -> 'user.User': # backwards compatibility with v1 - return self.connect_linked_user() # To avoid inconsistencies with "connect" and "get", this function was renamed - def set_country(self, country: str="Antarctica"): + # To avoid inconsistencies with "connect" and "get", this function was renamed + return self.connect_linked_user() + + def set_country(self, country: str = "Antarctica"): + """ + Sets the profile country of the session's associated user + + Arguments: + country (str): The country to relocate to + """ requests.post("https://scratch.mit.edu/accounts/settings/", data={"country": country}, headers=self._headers, cookies=self._cookies) @@ -143,10 +154,12 @@ def resend_email(self, password: str): data={"email_address": self.new_email_address, "password": password}, headers=self._headers, cookies=self._cookies) + @property - def new_email_address(self) -> str | None: + def new_email_address(self) -> str: """ - Gets the (unconfirmed) email address that this session has requested to transfer to, if any, otherwise the current address. + Gets the (unconfirmed) email address that this session has requested to transfer to, if any, + otherwise the current address. Returns: str: The email that this session wants to switch to @@ -160,20 +173,21 @@ def new_email_address(self) -> str | None: for label_span in soup.find_all("span", {"class": "label"}): if label_span.contents[0] == "New Email Address": return label_span.parent.contents[-1].text.strip("\n ") + elif label_span.contents[0] == "Current Email Address": email = label_span.parent.contents[-1].text.strip("\n ") return email - + def logout(self): """ - Sends a logout request to scratch. Might not do anything, might log out this account on other ips/sessions? I am not sure + Sends a logout request to scratch. (Might not do anything, might log out this account on other ips/sessions.) """ requests.post("https://scratch.mit.edu/accounts/logout/", headers=self._headers, cookies=self._cookies) - def messages(self, *, limit=40, offset=0, date_limit=None, filter_by=None): - ''' + def messages(self, *, limit: int = 40, offset: int = 0, date_limit=None, filter_by=None) -> list[activity.Activity]: + """ Returns the messages. Keyword arguments: @@ -182,98 +196,118 @@ def messages(self, *, limit=40, offset=0, date_limit=None, filter_by=None): Returns: list: List that contains all messages as Activity objects. - ''' + """ add_params = "" if date_limit is not None: add_params += f"&dateLimit={date_limit}" if filter_by is not None: add_params += f"&filter={filter_by}" + data = commons.api_iterative( f"https://api.scratch.mit.edu/users/{self._username}/messages", - limit = limit, offset = offset, headers = self._headers, cookies = self._cookies, add_params=add_params + limit=limit, offset=offset, _headers=self._headers, cookies=self._cookies, add_params=add_params ) return commons.parse_object_list(data, activity.Activity, self) - def admin_messages(self, *, limit=40, offset=0): + def admin_messages(self, *, limit=40, offset=0) -> list[dict]: """ Returns your messages sent by the Scratch team (alerts). """ return commons.api_iterative( f"https://api.scratch.mit.edu/users/{self._username}/messages/admin", - limit = limit, offset = offset, headers = self._headers, cookies = self._cookies + limit=limit, offset=offset, _headers=self._headers, cookies=self._cookies ) + def classroom_alerts(self, _classroom: classroom.Classroom | int = None, mode: str = "Last created", + page: int = None): + if isinstance(_classroom, classroom.Classroom): + _classroom = _classroom.id + + if _classroom is None: + _classroom = '' + else: + _classroom = f"{_classroom}/" + + ascsort, descsort = get_class_sort_mode(mode) + + data = requests.get(f"https://scratch.mit.edu/site-api/classrooms/alerts/{_classroom}", + params={"page": page, "ascsort": ascsort, "descsort": descsort}, + headers=self._headers, cookies=self._cookies).json() + + return data def clear_messages(self): - ''' + """ Clears all messages. - ''' + """ return requests.post( "https://scratch.mit.edu/site-api/messages/messages-clear/", - headers = self._headers, - cookies = self._cookies, - timeout = 10, + headers=self._headers, + cookies=self._cookies, + timeout=10, ).text - def message_count(self): - ''' + def message_count(self) -> int: + """ Returns the message count. Returns: int: message count - ''' + """ return json.loads(requests.get( f"https://scratch.mit.edu/messages/ajax/get-message-count/", - headers = self._headers, - cookies = self._cookies, - timeout = 10, + headers=self._headers, + cookies=self._cookies, + timeout=10, ).text)["msg_count"] # Front-page-related stuff: - def feed(self, *, limit=20, offset=0, date_limit=None): - ''' + def feed(self, *, limit=20, offset=0, date_limit=None) -> list[activity.Activity]: + """ Returns the "What's happening" section (frontpage). Returns: list: List that contains all "What's happening" entries as Activity objects - ''' + """ add_params = "" if date_limit is not None: add_params = f"&dateLimit={date_limit}" data = commons.api_iterative( f"https://api.scratch.mit.edu/users/{self._username}/following/users/activity", - limit = limit, offset = offset, headers = self._headers, cookies = self._cookies, add_params=add_params + limit=limit, offset=offset, _headers=self._headers, cookies=self._cookies, add_params=add_params ) return commons.parse_object_list(data, activity.Activity, self) def get_feed(self, *, limit=20, offset=0, date_limit=None): # for more consistent names, this method was renamed - return self.feed(limit=limit, offset=offset, date_limit=date_limit) # for backwards compatibility with v1 + return self.feed(limit=limit, offset=offset, date_limit=date_limit) # for backwards compatibility with v1 - def loved_by_followed_users(self, *, limit=40, offset=0): - ''' + def loved_by_followed_users(self, *, limit=40, offset=0) -> list[project.Project]: + """ Returns the "Projects loved by Scratchers I'm following" section (frontpage). Returns: - list: List that contains all "Projects loved by Scratchers I'm following" entries as Project objects - ''' + list: List that contains all "Projects loved by Scratchers I'm following" + entries as Project objects + """ data = commons.api_iterative( f"https://api.scratch.mit.edu/users/{self._username}/following/users/loves", - limit = limit, offset = offset, headers = self._headers, cookies = self._cookies + limit=limit, offset=offset, _headers=self._headers, cookies=self._cookies ) return commons.parse_object_list(data, project.Project, self) """ These methods are disabled because it is unclear if there is any case in which the response is not empty. - def shared_by_followed_users(self, *, limit=40, offset=0): + def shared_by_followed_users(self, *, limit=40, offset=0) -> list[project.Project]: ''' Returns the "Projects by Scratchers I'm following" section (frontpage). This section is only visible to old accounts (according to the Scratch wiki). For newer users, this method will always return an empty list. Returns: - list: List that contains all "Projects loved by Scratchers I'm following" entries as Project objects + list: List that contains all "Projects loved by Scratchers I'm following" + entries as Project objects ''' data = commons.api_iterative( f"https://api.scratch.mit.edu/users/{self._username}/following/users/projects", @@ -281,14 +315,15 @@ def shared_by_followed_users(self, *, limit=40, offset=0): ) return commons.parse_object_list(data, project.Project, self) - def in_followed_studios(self, *, limit=40, offset=0): + def in_followed_studios(self, *, limit=40, offset=0) -> list['project.Project']: ''' Returns the "Projects in studios I'm following" section (frontpage). This section is only visible to old accounts (according to the Scratch wiki). For newer users, this method will always return an empty list. Returns: - list: List that contains all "Projects loved by Scratchers I'm following" entries as Project objects + list: List that contains all "Projects loved by Scratchers I'm following" + entries as Project objects ''' data = commons.api_iterative( f"https://api.scratch.mit.edu/users/{self._username}/following/studios/projects", @@ -297,33 +332,39 @@ def in_followed_studios(self, *, limit=40, offset=0): return commons.parse_object_list(data, project.Project, self)""" # -- Project JSON editing capabilities --- - - def connect_empty_project_pb() -> 'project_json_capabilities.ProjectBody': + # These are set to staticmethods right now, but they probably should not be + @staticmethod + def connect_empty_project_pb() -> project_json_capabilities.ProjectBody: pb = project_json_capabilities.ProjectBody() pb.from_json(empty_project_json) return pb - def connect_pb_from_dict(project_json:dict) -> 'project_json_capabilities.ProjectBody': + @staticmethod + def connect_pb_from_dict(project_json: dict) -> project_json_capabilities.ProjectBody: pb = project_json_capabilities.ProjectBody() pb.from_json(project_json) return pb - def connect_pb_from_file(path_to_file) -> 'project_json_capabilities.ProjectBody': + @staticmethod + def connect_pb_from_file(path_to_file) -> project_json_capabilities.ProjectBody: pb = project_json_capabilities.ProjectBody() + # noinspection PyProtectedMember + # _load_sb3_file starts with an underscore pb.from_json(project_json_capabilities._load_sb3_file(path_to_file)) return pb - def download_asset(asset_id_with_file_ext, *, filename=None, dir=""): - if not (dir.endswith("/") or dir.endswith("\\")): - dir = dir+"/" + @staticmethod + def download_asset(asset_id_with_file_ext, *, filename: str = None, fp=""): + if not (fp.endswith("/") or fp.endswith("\\")): + fp = fp + "/" try: if filename is None: filename = str(asset_id_with_file_ext) response = requests.get( - "https://assets.scratch.mit.edu/"+str(asset_id_with_file_ext), + "https://assets.scratch.mit.edu/" + str(asset_id_with_file_ext), timeout=10, ) - open(f"{dir}{filename}", "wb").write(response.content) + open(f"{fp}{filename}", "wb").write(response.content) except Exception: raise ( exceptions.FetchError( @@ -344,68 +385,79 @@ def upload_asset(self, asset_content, *, asset_id=None, file_ext=None): requests.post( f"https://assets.scratch.mit.edu/{asset_id}.{file_ext}", headers=self._headers, - cookies = self._cookies, + cookies=self._cookies, data=data, timeout=10, ) # --- Search --- - def search_projects(self, *, query="", mode="trending", language="en", limit=40, offset=0): - ''' + def search_projects(self, *, query: str = "", mode: str = "trending", language: str = "en", limit: int = 40, + offset: int = 0) -> list[project.Project]: + """ Uses the Scratch search to search projects. Keyword arguments: query (str): The query that will be searched. mode (str): Has to be one of these values: "trending", "popular" or "recent". Defaults to "trending". - language (str): A language abbreviation, defaults to "en". (Depending on the language used on the Scratch website, Scratch displays you different results.) + language (str): A language abbreviation, defaults to "en". + (Depending on the language used on the Scratch website, Scratch displays you different results.) limit (int): Max. amount of returned projects. offset (int): Offset of the first returned project. Returns: list: List that contains the search results. - ''' + """ response = commons.api_iterative( - f"https://api.scratch.mit.edu/search/projects", limit=limit, offset=offset, add_params=f"&language={language}&mode={mode}&q={query}") + f"https://api.scratch.mit.edu/search/projects", limit=limit, offset=offset, + add_params=f"&language={language}&mode={mode}&q={query}") return commons.parse_object_list(response, project.Project, self) - def explore_projects(self, *, query="*", mode="trending", language="en", limit=40, offset=0): - ''' + def explore_projects(self, *, query: str = "*", mode: str = "trending", language: str = "en", limit: int = 40, + offset: int = 0) -> list[project.Project]: + """ Gets projects from the explore page. Keyword arguments: - query (str): Specifies the tag of the explore page. To get the projects from the "All" tag, set this argument to "*". - mode (str): Has to be one of these values: "trending", "popular" or "recent". Defaults to "trending". - language (str): A language abbreviation, defaults to "en". (Depending on the language used on the Scratch website, Scratch displays you different explore pages.) + query (str): Specifies the tag of the explore page. + To get the projects from the "All" tag, set this argument to "*". + mode (str): Has to be one of these values: "trending", "popular" or "recent". + Defaults to "trending". + language (str): A language abbreviation, defaults to "en". + (Depending on the language used on the Scratch website, Scratch displays you different explore pages.) limit (int): Max. amount of returned projects. offset (int): Offset of the first returned project. Returns: list: List that contains the explore page projects. - ''' + """ response = commons.api_iterative( - f"https://api.scratch.mit.edu/explore/projects", limit=limit, offset=offset, add_params=f"&language={language}&mode={mode}&q={query}") + f"https://api.scratch.mit.edu/explore/projects", limit=limit, offset=offset, + add_params=f"&language={language}&mode={mode}&q={query}") return commons.parse_object_list(response, project.Project, self) - def search_studios(self, *, query="", mode="trending", language="en", limit=40, offset=0): + def search_studios(self, *, query: str = "", mode: str = "trending", language: str = "en", limit: int = 40, + offset: int = 0) -> list[studio.Studio]: if not query: raise ValueError("The query can't be empty for search") response = commons.api_iterative( - f"https://api.scratch.mit.edu/search/studios", limit=limit, offset=offset, add_params=f"&language={language}&mode={mode}&q={query}") + f"https://api.scratch.mit.edu/search/studios", limit=limit, offset=offset, + add_params=f"&language={language}&mode={mode}&q={query}") return commons.parse_object_list(response, studio.Studio, self) - - def explore_studios(self, *, query="", mode="trending", language="en", limit=40, offset=0): + def explore_studios(self, *, query: str = "", mode: str = "trending", language: str = "en", limit: int = 40, + offset: int = 0) -> list[studio.Studio]: if not query: raise ValueError("The query can't be empty for explore") response = commons.api_iterative( - f"https://api.scratch.mit.edu/explore/studios", limit=limit, offset=offset, add_params=f"&language={language}&mode={mode}&q={query}") + f"https://api.scratch.mit.edu/explore/studios", limit=limit, offset=offset, + add_params=f"&language={language}&mode={mode}&q={query}") return commons.parse_object_list(response, studio.Studio, self) - # --- Create project API --- - def create_project(self, *, title=None, project_json=empty_project_json, parent_id=None): # not working + def create_project(self, *, title: str = None, project_json: dict = empty_project_json, + parent_id=None) -> project.Project: # not working """ Creates a project on the Scratch website. @@ -420,8 +472,11 @@ def create_project(self, *, title=None, project_json=empty_project_json, parent_ if CREATE_PROJECT_USES[-1] < time.time() - 300: CREATE_PROJECT_USES.pop() else: - raise exceptions.BadRequest("Rate limit for creating Scratch projects exceeded.\nThis rate limit is enforced by scratchattach, not by the Scratch API.\nFor security reasons, it cannot be turned off.\n\nDon't spam-create projects, it WILL get you banned.") - return + raise exceptions.BadRequest( + "Rate limit for creating Scratch projects exceeded.\n" + "This rate limit is enforced by scratchattach, not by the Scratch API.\n" + "For security reasons, it cannot be turned off.\n\n" + "Don't spam-create projects, it WILL get you banned.") CREATE_PROJECT_USES.insert(0, time.time()) if title is None: @@ -433,13 +488,85 @@ def create_project(self, *, title=None, project_json=empty_project_json, parent_ 'title': title, } - response = requests.post('https://projects.scratch.mit.edu/', params=params, cookies=self._cookies, headers=self._headers, json=project_json).json() + response = requests.post('https://projects.scratch.mit.edu/', params=params, cookies=self._cookies, + headers=self._headers, json=project_json).json() return self.connect_project(response["content-name"]) + def create_studio(self, *, title: str = None, description: str = None) -> studio.Studio: + """ + Create a studio on the scratch website + + Warning: + Don't spam this method - it WILL get you banned from Scratch. + To prevent accidental spam, a rate limit (5 studios per minute) is implemented for this function. + """ + global CREATE_STUDIO_USES + if len(CREATE_STUDIO_USES) < 5: + CREATE_STUDIO_USES.insert(0, time.time()) + else: + if CREATE_STUDIO_USES[-1] < time.time() - 300: + CREATE_STUDIO_USES.pop() + else: + raise exceptions.BadRequest( + "Rate limit for creating Scratch studios exceeded.\n" + "This rate limit is enforced by scratchattach, not by the Scratch API.\n" + "For security reasons, it cannot be turned off.\n\n" + "Don't spam-create studios, it WILL get you banned.") + CREATE_STUDIO_USES.insert(0, time.time()) + + if self.new_scratcher: + raise exceptions.Unauthorized(f"\nNew scratchers (like {self.username}) cannot create studios.") + + response = requests.post("https://scratch.mit.edu/studios/create/", + cookies=self._cookies, headers=self._headers) + + studio_id = webscrape_count(response.json()["redirect"], "/studios/", "/") + new_studio = self.connect_studio(studio_id) + + if title is not None: + new_studio.set_title(title) + if description is not None: + new_studio.set_description(description) + + return new_studio + + def create_class(self, title: str, desc: str = '') -> classroom.Classroom: + """ + Create a class on the scratch website + + Warning: + Don't spam this method - it WILL get you banned from Scratch. + To prevent accidental spam, a rate limit (5 classes per minute) is implemented for this function. + """ + global CREATE_CLASS_USES + if len(CREATE_CLASS_USES) < 5: + CREATE_CLASS_USES.insert(0, time.time()) + else: + if CREATE_CLASS_USES[-1] < time.time() - 300: + CREATE_CLASS_USES.pop() + else: + raise exceptions.BadRequest( + "Rate limit for creating Scratch classes exceeded.\n" + "This rate limit is enforced by scratchattach, not by the Scratch API.\n" + "For security reasons, it cannot be turned off.\n\n" + "Don't spam-create classes, it WILL get you banned.") + CREATE_CLASS_USES.insert(0, time.time()) + + if not self.is_teacher: + raise exceptions.Unauthorized(f"{self.username} is not a teacher; can't create class") + + data = requests.post("https://scratch.mit.edu/classes/create_classroom/", + json={"title": title, "description": desc}, + headers=self._headers, cookies=self._cookies).json() + + class_id = data[0]["id"] + return self.connect_classroom(class_id) + # --- My stuff page --- - def mystuff_projects(self, filter_arg="all", *, page=1, sort_by="", descending=True): - ''' + def mystuff_projects(self, filter_arg: str = "all", *, page: int = 1, sort_by: str = '', descending: bool = True) \ + -> list[project.Project]: + """ Gets the projects from the "My stuff" page. Args: @@ -452,7 +579,7 @@ def mystuff_projects(self, filter_arg="all", *, page=1, sort_by="", descending=T Returns: list: A list with the projects from the "My Stuff" page, each project is represented by a Project object. - ''' + """ if descending: ascsort = "" descsort = sort_by @@ -462,19 +589,19 @@ def mystuff_projects(self, filter_arg="all", *, page=1, sort_by="", descending=T try: targets = requests.get( f"https://scratch.mit.edu/site-api/projects/{filter_arg}/?page={page}&ascsort={ascsort}&descsort={descsort}", - headers = headers, - cookies = self._cookies, - timeout = 10, + headers=headers, + cookies=self._cookies, + timeout=10, ).json() projects = [] for target in targets: projects.append(project.Project( - id = target["pk"], _session=self, author_name=self._username, + id=target["pk"], _session=self, author_name=self._username, comments_allowed=None, instructions=None, notes=None, created=target["fields"]["datetime_created"], last_modified=target["fields"]["datetime_modified"], share_date=target["fields"]["datetime_shared"], - thumbnail_url="https:"+target["fields"]["thumbnail_url"], + thumbnail_url="https:" + target["fields"]["thumbnail_url"], favorites=target["fields"]["favorite_count"], loves=target["fields"]["love_count"], remixes=target["fields"]["remixers_count"], @@ -484,9 +611,10 @@ def mystuff_projects(self, filter_arg="all", *, page=1, sort_by="", descending=T )) return projects except Exception: - raise(exceptions.FetchError) + raise exceptions.FetchError() - def mystuff_studios(self, filter_arg="all", *, page=1, sort_by="", descending=True): + def mystuff_studios(self, filter_arg: str = "all", *, page: int = 1, sort_by: str = "", descending: bool = True) \ + -> list[studio.Studio]: if descending: ascsort = "" descsort = sort_by @@ -495,107 +623,160 @@ def mystuff_studios(self, filter_arg="all", *, page=1, sort_by="", descending=Tr descsort = "" try: targets = requests.get( - f"https://scratch.mit.edu/site-api/galleries/{filter_arg}/?page={page}&ascsort={ascsort}&descsort={descsort}", - headers = headers, - cookies = self._cookies, - timeout = 10, + f"https://scratch.mit.edu/site-api/galleries/{filter_arg}/", + params={"page": page, "ascsort": ascsort, "descsort": descsort}, + headers=headers, + cookies=self._cookies, + timeout=10 ).json() studios = [] for target in targets: studios.append(studio.Studio( - id = target["pk"], _session=self, - title = target["fields"]["title"], - description = None, - host_id = target["fields"]["owner"]["pk"], - host_name = target["fields"]["owner"]["username"], - open_to_all = None, comments_allowed=None, - image_url = "https:"+target["fields"]["thumbnail_url"], - created = target["fields"]["datetime_created"], - modified = target["fields"]["datetime_modified"], - follower_count = None, manager_count = None, - curator_count = target["fields"]["curators_count"], - project_count = target["fields"]["projecters_count"] + id=target["pk"], _session=self, + title=target["fields"]["title"], + description=None, + host_id=target["fields"]["owner"]["pk"], + host_name=target["fields"]["owner"]["username"], + open_to_all=None, comments_allowed=None, + image_url="https:" + target["fields"]["thumbnail_url"], + created=target["fields"]["datetime_created"], + modified=target["fields"]["datetime_modified"], + follower_count=None, manager_count=None, + curator_count=target["fields"]["curators_count"], + project_count=target["fields"]["projecters_count"] )) return studios except Exception: - raise(exceptions.FetchError) - - - def backpack(self,limit=20, offset=0): - ''' + raise exceptions.FetchError() + + def mystuff_classes(self, mode: str = "Last created", page: int = None) -> list[classroom.Classroom]: + if not self.is_teacher: + raise exceptions.Unauthorized(f"{self.username} is not a teacher; can't have classes") + ascsort, descsort = get_class_sort_mode(mode) + + classes_data = requests.get("https://scratch.mit.edu/site-api/classrooms/all/", + params={"page": page, "ascsort": ascsort, "descsort": descsort}, + headers=self._headers, cookies=self._cookies).json() + classes = [] + for data in classes_data: + fields = data["fields"] + educator_pf = fields["educator_profile"] + classes.append(classroom.Classroom( + id=data["pk"], + title=fields["title"], + classtoken=fields["token"], + datetime=datetime.datetime.fromisoformat(fields["datetime_created"]), + author=user.User( + username=educator_pf["user"]["username"], id=educator_pf["user"]["pk"], _session=self), + _session=self)) + return classes + + def mystuff_ended_classes(self, mode: str = "Last created", page: int = None) -> list[classroom.Classroom]: + if not self.is_teacher: + raise exceptions.Unauthorized(f"{self.username} is not a teacher; can't have (deleted) classes") + ascsort, descsort = get_class_sort_mode(mode) + + classes_data = requests.get("https://scratch.mit.edu/site-api/classrooms/closed/", + params={"page": page, "ascsort": ascsort, "descsort": descsort}, + headers=self._headers, cookies=self._cookies).json() + classes = [] + for data in classes_data: + fields = data["fields"] + educator_pf = fields["educator_profile"] + classes.append(classroom.Classroom( + id=data["pk"], + title=fields["title"], + classtoken=fields["token"], + datetime=datetime.datetime.fromisoformat(fields["datetime_created"]), + author=user.User( + username=educator_pf["user"]["username"], id=educator_pf["user"]["pk"], _session=self), + _session=self)) + return classes + + def backpack(self, limit: int = 20, offset: int = 0) -> list[backpack_asset.BackpackAsset]: + """ Lists the assets that are in the backpack of the user associated with the session. Returns: - list: List that contains the backpack items as dicts - ''' + list: List that contains the backpack items + """ data = commons.api_iterative( f"https://backpack.scratch.mit.edu/{self._username}", - limit = limit, offset = offset, headers = self._headers + limit=limit, offset=offset, _headers=self._headers ) return commons.parse_object_list(data, backpack_asset.BackpackAsset, self) - def delete_from_backpack(self, backpack_asset_id): - ''' + def delete_from_backpack(self, backpack_asset_id) -> backpack_asset.BackpackAsset: + """ Deletes an asset from the backpack. Args: backpack_asset_id: ID of the backpack asset that will be deleted - ''' + """ return backpack_asset.BackpackAsset(id=backpack_asset_id, _session=self).delete() - - def become_scratcher_invite(self): + def become_scratcher_invite(self) -> dict: """ - If you are a new Scratcher and have been invited for becoming a Scratcher, this API endpoint will provide more info on the invite. + If you are a new Scratcher and have been invited for becoming a Scratcher, this API endpoint will provide + more info on the invite. """ - return requests.get(f"https://api.scratch.mit.edu/users/{self.username}/invites", headers=self._headers, cookies=self._cookies).json() + return requests.get(f"https://api.scratch.mit.edu/users/{self.username}/invites", headers=self._headers, + cookies=self._cookies).json() # --- Connect classes inheriting from BaseCloud --- - def connect_cloud(self, project_id, *, CloudClass:Type[_base.BaseCloud]=cloud.ScratchCloud) -> Type[_base.BaseCloud]: + # noinspection PyPep8Naming + def connect_cloud(self, project_id, *, CloudClass: Type[_base.BaseCloud] = cloud.ScratchCloud) \ + -> Type[_base.BaseCloud]: """ - Connects to a cloud (by default Scratch's cloud) as logged in user. + Connects to a cloud (by default Scratch's cloud) as logged-in user. Args: project_id: - Keyword arguments: - CloudClass: The class that the returned object should be of. By default this class is scratchattach.cloud.ScratchCloud. + Keyword arguments: CloudClass: The class that the returned object should be of. By default, this class is + scratchattach.cloud.ScratchCloud. - Returns: - Type[scratchattach._base.BaseCloud]: An object representing the cloud of a project. Can be of any class inheriting from BaseCloud. + Returns: Type[scratchattach._base.BaseCloud]: An object representing the cloud of a project. Can be of any + class inheriting from BaseCloud. """ return CloudClass(project_id=project_id, _session=self) - def connect_scratch_cloud(self, project_id) -> 'cloud.ScratchCloud': + def connect_scratch_cloud(self, project_id) -> cloud.ScratchCloud: """ Returns: scratchattach.cloud.ScratchCloud: An object representing the Scratch cloud of a project. """ return cloud.ScratchCloud(project_id=project_id, _session=self) - def connect_tw_cloud(self, project_id, *, purpose="", contact="", cloud_host="wss://clouddata.turbowarp.org") -> 'cloud.TwCloud': + def connect_tw_cloud(self, project_id, *, purpose="", contact="", + cloud_host="wss://clouddata.turbowarp.org") -> cloud.TwCloud: """ Returns: scratchattach.cloud.TwCloud: An object representing the TurboWarp cloud of a project. """ - return cloud.TwCloud(project_id=project_id, purpose=purpose, contact=contact, cloud_host=cloud_host, _session=self) + return cloud.TwCloud(project_id=project_id, purpose=purpose, contact=contact, cloud_host=cloud_host, + _session=self) # --- Connect classes inheriting from BaseSiteComponent --- - def _make_linked_object(self, identificator_name, identificator, Class, NotFoundException): + # noinspection PyPep8Naming + # Class is camelcase here + def _make_linked_object(self, identificator_name, identificator, Class: BaseSiteComponent, + NotFoundException: Exception) -> BaseSiteComponent: """ The Session class doesn't save the login in a ._session attribute, but IS the login ITSELF. - Therefore the _make_linked_object method has to be adjusted + Therefore, the _make_linked_object method has to be adjusted to get it to work for in the Session class. Class must inherit from BaseSiteComponent """ + # noinspection PyProtectedMember + # _get_object is protected return commons._get_object(identificator_name, identificator, Class, NotFoundException, self) - - def connect_user(self, username) -> 'user.User': + def connect_user(self, username: str) -> user.User: """ Gets a user using this session, connects the session to the User object to allow authenticated actions @@ -607,7 +788,7 @@ def connect_user(self, username) -> 'user.User': """ return self._make_linked_object("username", username, user.User, exceptions.UserNotFound) - def find_username_from_id(self, user_id:int): + def find_username_from_id(self, user_id: int) -> str: """ Warning: Every time this functions is run, a comment on your profile is posted and deleted. Therefore you shouldn't run this too often. @@ -619,7 +800,8 @@ def find_username_from_id(self, user_id:int): try: comment = you.post_comment("scratchattach", commentee_id=int(user_id)) except exceptions.CommentPostFailure: - raise exceptions.BadRequest("After posting a comment, you need to wait 10 seconds before you can connect users by id again.") + raise exceptions.BadRequest( + "After posting a comment, you need to wait 10 seconds before you can connect users by id again.") except exceptions.BadRequest: raise exceptions.UserNotFound("Invalid user id") except Exception as e: @@ -632,8 +814,7 @@ def find_username_from_id(self, user_id:int): raise exceptions.UserNotFound() return username - - def connect_user_by_id(self, user_id:int) -> 'user.User': + def connect_user_by_id(self, user_id: int) -> user.User: """ Gets a user using this session, connects the session to the User object to allow authenticated actions @@ -643,7 +824,7 @@ def connect_user_by_id(self, user_id:int) -> 'user.User': 3) fetches other information about the user using Scratch's api.scratch.mit.edu/users/username API. Warning: - Every time this functions is run, a comment on your profile is posted and deleted. Therefore you shouldn't run this too often. + Every time this functions is run, a comment on your profile is posted and deleted. Therefore, you shouldn't run this too often. Args: user_id (int): User ID of the requested user @@ -651,9 +832,10 @@ def connect_user_by_id(self, user_id:int) -> 'user.User': Returns: scratchattach.user.User: An object that represents the requested user and allows you to perform actions on the user (like user.follow) """ - return self._make_linked_object("username", self.find_username_from_id(user_id), user.User, exceptions.UserNotFound) + return self._make_linked_object("username", self.find_username_from_id(user_id), user.User, + exceptions.UserNotFound) - def connect_project(self, project_id) -> 'project.Project': + def connect_project(self, project_id) -> project.Project: """ Gets a project using this session, connects the session to the Project object to allow authenticated actions sess @@ -665,7 +847,7 @@ def connect_project(self, project_id) -> 'project.Project': """ return self._make_linked_object("id", int(project_id), project.Project, exceptions.ProjectNotFound) - def connect_studio(self, studio_id) -> 'studio.Studio': + def connect_studio(self, studio_id) -> studio.Studio: """ Gets a studio using this session, connects the session to the Studio object to allow authenticated actions @@ -677,7 +859,7 @@ def connect_studio(self, studio_id) -> 'studio.Studio': """ return self._make_linked_object("id", int(studio_id), studio.Studio, exceptions.StudioNotFound) - def connect_classroom(self, class_id) -> 'classroom.Classroom': + def connect_classroom(self, class_id) -> classroom.Classroom: """ Gets a class using this session. @@ -689,7 +871,7 @@ def connect_classroom(self, class_id) -> 'classroom.Classroom': """ return self._make_linked_object("id", int(class_id), classroom.Classroom, exceptions.ClassroomNotFound) - def connect_classroom_from_token(self, class_token) -> 'classroom.Classroom': + def connect_classroom_from_token(self, class_token) -> classroom.Classroom: """ Gets a class using this session. @@ -699,9 +881,10 @@ def connect_classroom_from_token(self, class_token) -> 'classroom.Classroom': Returns: scratchattach.classroom.Classroom: An object representing the requested classroom """ - return self._make_linked_object("classtoken", int(class_token), classroom.Classroom, exceptions.ClassroomNotFound) + return self._make_linked_object("classtoken", int(class_token), classroom.Classroom, + exceptions.ClassroomNotFound) - def connect_topic(self, topic_id) -> 'forum.ForumTopic': + def connect_topic(self, topic_id) -> forum.ForumTopic: """ Gets a forum topic using this session, connects the session to the ForumTopic object to allow authenticated actions Data is up-to-date. Data received from Scratch's RSS feed XML API. @@ -714,7 +897,6 @@ def connect_topic(self, topic_id) -> 'forum.ForumTopic': """ return self._make_linked_object("id", int(topic_id), forum.ForumTopic, exceptions.ForumContentNotFound) - def connect_topic_list(self, category_id, *, page=1): """ @@ -732,14 +914,15 @@ def connect_topic_list(self, category_id, *, page=1): """ try: - response = requests.get(f"https://scratch.mit.edu/discuss/{category_id}/?page={page}", headers=self._headers, cookies=self._cookies) + response = requests.get(f"https://scratch.mit.edu/discuss/{category_id}/?page={page}", + headers=self._headers, cookies=self._cookies) soup = BeautifulSoup(response.content, 'html.parser') except Exception as e: raise exceptions.FetchError(str(e)) try: category_name = soup.find('h4').find("span").get_text() - except Exception as e: + except Exception: raise exceptions.BadRequest("Invalid category id") try: @@ -760,37 +943,48 @@ def connect_topic_list(self, category_id, *, page=1): last_updated = columns[3].split(" ")[0] + " " + columns[3].split(" ")[1] - return_topics.append(forum.ForumTopic(_session=self, id=int(topic_id), title=title, category_name=category_name, last_updated=last_updated, reply_count=int(columns[1]), view_count=int(columns[2]))) + return_topics.append( + forum.ForumTopic(_session=self, id=int(topic_id), title=title, category_name=category_name, + last_updated=last_updated, reply_count=int(columns[1]), + view_count=int(columns[2]))) return return_topics except Exception as e: raise exceptions.ScrapeError(str(e)) # --- Connect classes inheriting from BaseEventHandler --- - def connect_message_events(self, *, update_interval=2) -> 'message_events.MessageEvents': + def connect_message_events(self, *, update_interval=2) -> message_events.MessageEvents: # shortcut for connect_linked_user().message_events() - return message_events.MessageEvents(user.User(username=self.username, _session=self), update_interval=update_interval) + return message_events.MessageEvents(user.User(username=self.username, _session=self), + update_interval=update_interval) - def connect_filterbot(self, *, log_deletions=True) -> 'filterbot.Filterbot': + def connect_filterbot(self, *, log_deletions=True) -> filterbot.Filterbot: return filterbot.Filterbot(user.User(username=self.username, _session=self), log_deletions=log_deletions) + # ------ # -def login_by_id(session_id, *, username=None, password=None, xtoken=None) -> Session: +def login_by_id(session_id: str, *, username: str = None, password: str = None, xtoken=None) -> Session: """ Creates a session / log in to the Scratch website with the specified session id. Structured similarly to Session._connect_object method. Args: session_id (str) - password (str) Keyword arguments: - timeout (int): Optional, but recommended. Specify this when the Python environment's IP address is blocked by Scratch's API, but you still want to use cloud variables. + username (str) + password (str) + xtoken (str) Returns: - scratchattach.session.Session: An object that represents the created log in / session + scratchattach.session.Session: An object that represents the created login / session """ + # Removed this from docstring since it doesn't exist: + # timeout (int): Optional, but recommended. + # Specify this when the Python environment's IP address is blocked by Scratch's API, + # but you still want to use cloud variables. + # Generate session_string (a scratchattach-specific authentication method) if password is not None: session_data = dict(session_id=session_id, username=username, password=password) @@ -798,21 +992,29 @@ def login_by_id(session_id, *, username=None, password=None, xtoken=None) -> Ses else: session_string = None _session = Session(id=session_id, username=username, session_string=session_string, xtoken=xtoken) + try: status = _session.update() except Exception as e: status = False - print(f"Key error at key "+str(e)+" when reading scratch.mit.edu/session API response") + warnings.warn(f"Key error at key {e} when reading scratch.mit.edu/session API response") + if status is not True: if _session.xtoken is None: if _session.username is None: - print(f"Warning: Logged in by id, but couldn't fetch XToken. Make sure the provided session id is valid. Setting cloud variables can still work if you provide a `username='username'` keyword argument to the sa.login_by_id function") + warnings.warn("Warning: Logged in by id, but couldn't fetch XToken. " + "Make sure the provided session id is valid. " + "Setting cloud variables can still work if you provide a " + "`username='username'` keyword argument to the sa.login_by_id function") else: - print(f"Warning: Logged in by id, but couldn't fetch XToken. Make sure the provided session id is valid.") + warnings.warn("Warning: Logged in by id, but couldn't fetch XToken. " + "Make sure the provided session id is valid.") else: - print(f"Warning: Logged in by id, but couldn't fetch session info. This won't affect any other features.") + warnings.warn("Warning: Logged in by id, but couldn't fetch session info. " + "This won't affect any other features.") return _session + def login(username, password, *, timeout=10) -> Session: """ Creates a session / log in to the Scratch website with the specified username and password. @@ -829,17 +1031,15 @@ def login(username, password, *, timeout=10) -> Session: timeout (int): Timeout for the request to Scratch's login API (in seconds). Defaults to 10. Returns: - scratchattach.session.Session: An object that represents the created log in / session + scratchattach.session.Session: An object that represents the created login / session """ # Post request to login API: - data = json.dumps({"username": username, "password": password}) - _headers = dict(headers) + _headers = headers.copy() _headers["Cookie"] = "scratchcsrftoken=a;scratchlanguage=en;" request = requests.post( - "https://scratch.mit.edu/login/", data=data, headers=_headers, - timeout = timeout, - errorhandling = False + "https://scratch.mit.edu/login/", json={"username": username, "password": password}, headers=_headers, + timeout=timeout, errorhandling = False ) try: session_id = str(re.search('"(.*)"', request.headers["Set-Cookie"]).group()) @@ -851,12 +1051,13 @@ def login(username, password, *, timeout=10) -> Session: return login_by_id(session_id, username=username, password=password) -def login_by_session_string(session_string) -> Session: - session_string = base64.b64decode(session_string).decode() # unobfuscate +def login_by_session_string(session_string: str) -> Session: + session_string = base64.b64decode(session_string).decode() # unobfuscate session_data = json.loads(session_string) try: assert session_data.get("session_id") - return login_by_id(session_data["session_id"], username=session_data.get("username"), password=session_data.get("password")) + return login_by_id(session_data["session_id"], username=session_data.get("username"), + password=session_data.get("password")) except Exception: pass try: diff --git a/scratchattach/site/studio.py b/scratchattach/site/studio.py index 9c9e2861..80875b2d 100644 --- a/scratchattach/site/studio.py +++ b/scratchattach/site/studio.py @@ -1,4 +1,5 @@ """Studio class""" +from __future__ import annotations import json import random @@ -138,7 +139,7 @@ def comments(self, *, limit=40, offset=0): i["source_id"] = self.id return commons.parse_object_list(response, comment.Comment, self._session) - def comment_replies(self, *, comment_id, limit=40, offset=0): + def comment_replies(self, *, comment_id, limit=40, offset=0) -> list[comment.Comment]: response = commons.api_iterative( f"https://api.scratch.mit.edu/studios/{self.id}/comments/{comment_id}/replies", limit=limit, offset=offset, add_params=f"&cachebust={random.randint(0,9999)}") for x in response: diff --git a/scratchattach/site/user.py b/scratchattach/site/user.py index 15b4849d..e05b792f 100644 --- a/scratchattach/site/user.py +++ b/scratchattach/site/user.py @@ -1,4 +1,5 @@ -"""Session class and login function""" +"""User class""" +from __future__ import annotations import json import random @@ -71,10 +72,10 @@ def __init__(self, **entries): # Headers and cookies: if self._session is None: - self._headers = headers + self._headers :dict = headers self._cookies = {} else: - self._headers = self._session._headers + self._headers :dict = self._session._headers self._cookies = self._session._cookies # Headers for operations that require accept and Content-Type fields: @@ -107,7 +108,6 @@ def _assert_permission(self): raise exceptions.Unauthorized( "You need to be authenticated as the profile owner to do this.") - def does_exist(self): """ Returns: @@ -269,7 +269,7 @@ def projects(self, *, limit=40, offset=0): list: The user's shared projects """ _projects = commons.api_iterative( - f"https://api.scratch.mit.edu/users/{self.username}/projects/", limit=limit, offset=offset, headers = self._headers) + f"https://api.scratch.mit.edu/users/{self.username}/projects/", limit=limit, offset=offset, _headers= self._headers) for p in _projects: p["author"] = {"username":self.username} return commons.parse_object_list(_projects, project.Project, self._session) @@ -391,7 +391,7 @@ def favorites(self, *, limit=40, offset=0): list: The user's favorite projects """ _projects = commons.api_iterative( - f"https://api.scratch.mit.edu/users/{self.username}/favorites/", limit=limit, offset=offset, headers = self._headers) + f"https://api.scratch.mit.edu/users/{self.username}/favorites/", limit=limit, offset=offset, _headers= self._headers) return commons.parse_object_list(_projects, project.Project, self._session) def favorites_count(self): @@ -420,46 +420,44 @@ def viewed_projects(self, limit=24, offset=0): """ self._assert_permission() _projects = commons.api_iterative( - f"https://api.scratch.mit.edu/users/{self.username}/projects/recentlyviewed", limit=limit, offset=offset, headers = self._headers) + f"https://api.scratch.mit.edu/users/{self.username}/projects/recentlyviewed", limit=limit, offset=offset, _headers= self._headers) return commons.parse_object_list(_projects, project.Project, self._session) + def set_pfp(self, image: bytes): + """ + Sets the user's profile picture. You can only use this function if this object was created using :meth:`scratchattach.session.Session.connect_user` + """ + # Teachers can set pfp! - Should update this method to check for that + # self._assert_permission() + requests.post( + f"https://scratch.mit.edu/site-api/users/all/{self.username}/", + headers=self._headers, + cookies=self._cookies, + files={"file": image}) + def set_bio(self, text): """ Sets the user's "About me" section. You can only use this function if this object was created using :meth:`scratchattach.session.Session.connect_user` """ - self._assert_permission() + # Teachers can set bio! - Should update this method to check for that + # self._assert_permission() requests.put( f"https://scratch.mit.edu/site-api/users/all/{self.username}/", - headers = self._json_headers, - cookies = self._cookies, - data = json.dumps(dict( - comments_allowed = True, - id = self.username, - bio = text, - thumbnail_url = self.icon_url, - userId = self.id, - username = self.username - )) - ) + headers=self._json_headers, + cookies=self._cookies, + json={"bio": text}) def set_wiwo(self, text): """ Sets the user's "What I'm working on" section. You can only use this function if this object was created using :meth:`scratchattach.session.Session.connect_user` """ - self._assert_permission() + # Teachers can also change your wiwo + # self._assert_permission() requests.put( f"https://scratch.mit.edu/site-api/users/all/{self.username}/", - headers = self._json_headers, - cookies = self._cookies, - data = json.dumps(dict( - comments_allowed = True, - id = self.username, - status = text, - thumbnail_url = self.icon_url, - userId = self.id, - username = self.username - )) - ) + headers=self._json_headers, + cookies=self._cookies, + json={"status": text}) def set_featured(self, project_id, *, label=""): """ @@ -474,9 +472,9 @@ def set_featured(self, project_id, *, label=""): self._assert_permission() requests.put( f"https://scratch.mit.edu/site-api/users/all/{self.username}/", - headers = self._json_headers, - cookies = self._cookies, - data = json.dumps({"featured_project":int(project_id),"featured_project_label":label}) + headers=self._json_headers, + cookies=self._cookies, + json={"featured_project": int(project_id), "featured_project_label": label} ) def set_forum_signature(self, text): @@ -514,14 +512,14 @@ def post_comment(self, content, *, parent_id="", commentee_id=""): """ self._assert_auth() data = { - "commentee_id": commentee_id, - "content": str(content), - "parent_id": parent_id, + "commentee_id": commentee_id, + "content": str(content), + "parent_id": parent_id, } r = requests.post( f"https://scratch.mit.edu/site-api/comments/user/{self.username}/add/", - headers = headers, - cookies = self._cookies, + headers=headers, + cookies=self._cookies, data=json.dumps(data), ) if r.status_code != 200: @@ -534,7 +532,7 @@ def post_comment(self, content, *, parent_id="", commentee_id=""): text = r.text data = { 'id': text.split('
')[1].split('"
')[0], 'reply_count': 0, 'cached_replies': [] @@ -547,7 +545,7 @@ def post_comment(self, content, *, parent_id="", commentee_id=""): raise(exceptions.CommentPostFailure( "You are being rate-limited for running this operation too often. Implement a cooldown of about 10 seconds.")) else: - raise(exceptions.FetchError("Couldn't parse API response")) + raise(exceptions.FetchError(f"Couldn't parse API response: {r.text!r}")) def reply_comment(self, content, *, parent_id, commentee_id=""): """ @@ -713,7 +711,7 @@ def comments(self, *, page=1, limit=None): DATA.append(_comment) return DATA - def comment_by_id(self, comment_id): + def comment_by_id(self, comment_id) -> comment.Comment: """ Gets a comment on this user's profile by id. diff --git a/scratchattach/utils/commons.py b/scratchattach/utils/commons.py index a5ee3467..33ef63d1 100644 --- a/scratchattach/utils/commons.py +++ b/scratchattach/utils/commons.py @@ -1,17 +1,25 @@ """v2 ready: Common functions used by various internal modules""" +from __future__ import annotations + +from types import FunctionType +from typing import Final, Any, TYPE_CHECKING from . import exceptions -from threading import Thread from .requests import Requests as requests -headers = { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36", +if TYPE_CHECKING: + # Having to do this is quite inelegant, but this is commons.py, so this is done to avoid cyclic imports + from ..site._base import BaseSiteComponent + +headers: Final = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36", "x-csrftoken": "a", "x-requested-with": "XMLHttpRequest", "referer": "https://scratch.mit.edu", -} # headers recommended for accessing API endpoints that don't require verification +} # headers recommended for accessing API endpoints that don't require verification -empty_project_json = { +empty_project_json: Final = { 'targets': [ { 'isStage': True, @@ -52,57 +60,68 @@ 'meta': { 'semver': '3.0.0', 'vm': '2.3.0', - 'agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36', + 'agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/124.0.0.0 Safari/537.36', }, } -def api_iterative_data(fetch_func, limit, offset, max_req_limit=40, unpack=True): +def api_iterative_data(fetch_func: FunctionType, limit: int, offset: int, max_req_limit: int = 40, + unpack: bool = True): """ Iteratively gets data by calling fetch_func with a moving offset and a limit. Once fetch_func returns None, the retrieval is completed. """ if limit is None: limit = max_req_limit + end = offset + limit api_data = [] for offs in range(offset, end, max_req_limit): - d = fetch_func( - offs, max_req_limit - ) # Mimick actual scratch by only requesting the max amount - if d is None: + # Mimic actual scratch by only requesting the max amount + data = fetch_func(offs, max_req_limit) + if data is None: break + if unpack: - api_data.extend(d) + api_data.extend(data) else: - api_data.append(d) - if len(d) < max_req_limit: + api_data.append(data) + + if len(data) < max_req_limit: break + api_data = api_data[:limit] return api_data -def api_iterative( - url, *, limit, offset, max_req_limit=40, add_params="", headers=headers, cookies={} -): + +def api_iterative(url: str, *, limit: int, offset: int, max_req_limit: int = 40, add_params: str = "", + _headers: dict = None, cookies: dict = None): """ Function for getting data from one of Scratch's iterative JSON API endpoints (like /users//followers, or /users//projects) """ + if _headers is None: + _headers = headers.copy() + if cookies is None: + cookies = {} + if offset < 0: raise exceptions.BadRequest("offset parameter must be >= 0") if limit < 0: raise exceptions.BadRequest("limit parameter must be >= 0") - - def fetch(o, l): + + def fetch(off: int, lim: int): """ - Performs a singla API request + Performs a single API request """ resp = requests.get( - f"{url}?limit={l}&offset={o}{add_params}", headers=headers, cookies=cookies, timeout=10 + f"{url}?limit={lim}&offset={off}{add_params}", headers=_headers, cookies=cookies, timeout=10 ).json() + if not resp: return None if resp == {"code": "BadRequest", "message": ""}: - raise exceptions.BadRequest("the passed arguments are invalid") + raise exceptions.BadRequest("The passed arguments are invalid") return resp api_data = api_iterative_data( @@ -110,39 +129,65 @@ def fetch(o, l): ) return api_data -def _get_object(identificator_name, identificator, Class, NotFoundException, session=None): - # Interal function: Generalization of the process ran by get_user, get_studio etc. + +def _get_object(identificator_name, identificator, Class: type, NotFoundException, session=None) -> BaseSiteComponent: + # Internal function: Generalization of the process ran by get_user, get_studio etc. # Builds an object of class that is inheriting from BaseSiteComponent # # Class must inherit from BaseSiteComponent try: - _object = Class(**{identificator_name:identificator, "_session":session}) + _object = Class(**{identificator_name: identificator, "_session": session}) r = _object.update() if r == "429": - raise(exceptions.Response429("Your network is blocked or rate-limited by Scratch.\nIf you're using an online IDE like replit.com, try running the code on your computer.")) + raise exceptions.Response429( + "Your network is blocked or rate-limited by Scratch.\n" + "If you're using an online IDE like replit.com, try running the code on your computer.") if not r: # Target is unshared. The cases that this can happen in are hardcoded: from ..site import project - if Class is project.Project: # Case: Target is an unshared project. - return project.PartialProject(**{identificator_name:identificator, "shared":False, "_session":session}) + if Class is project.Project: # Case: Target is an unshared project. + return project.PartialProject(**{identificator_name: identificator, + "shared": False, "_session": session}) else: raise NotFoundException else: return _object except KeyError as e: - raise(NotFoundException("Key error at key "+str(e)+" when reading API response")) + raise NotFoundException(f"Key error at key {e} when reading API response") except Exception as e: - raise(e) + raise e + + +def webscrape_count(raw, text_before, text_after, cls: type = int) -> int | Any: + return cls(raw.split(text_before)[1].split(text_after)[0]) -def webscrape_count(raw, text_before, text_after): - return int(raw.split(text_before)[1].split(text_after)[0]) -def parse_object_list(raw, Class, session=None, primary_key="id"): +def parse_object_list(raw, Class, session=None, primary_key="id") -> list[BaseSiteComponent]: results = [] for raw_dict in raw: try: - _obj = Class(**{primary_key:raw_dict[primary_key], "_session":session}) + _obj = Class(**{primary_key: raw_dict[primary_key], "_session": session}) _obj._update_from_dict(raw_dict) results.append(_obj) except Exception as e: print("Warning raised by scratchattach: failed to parse ", raw_dict, "error", e) - return results \ No newline at end of file + return results + + +def get_class_sort_mode(mode: str) -> tuple[str, str]: + """ + Returns the sort mode for the given mode for classes only + """ + ascsort = '' + descsort = '' + + mode = mode.lower() + if mode == "last created": + pass + elif mode == "students": + descsort = "student_count" + elif mode == "a-z": + ascsort = "title" + elif mode == "z-a": + descsort = "title" + + return ascsort, descsort diff --git a/scratchattach/utils/encoder.py b/scratchattach/utils/encoder.py index dd063898..7df83e09 100644 --- a/scratchattach/utils/encoder.py +++ b/scratchattach/utils/encoder.py @@ -1,3 +1,4 @@ +from __future__ import annotations import math from . import exceptions diff --git a/scratchattach/utils/enums.py b/scratchattach/utils/enums.py index 0fbc0ffc..67d4d914 100644 --- a/scratchattach/utils/enums.py +++ b/scratchattach/utils/enums.py @@ -2,6 +2,7 @@ List of supported languages of scratch's translate and text2speech extensions. Adapted from https://translate-service.scratch.mit.edu/supported?language=en """ +from __future__ import annotations from enum import Enum from dataclasses import dataclass diff --git a/scratchattach/utils/exceptions.py b/scratchattach/utils/exceptions.py index 82e1514a..a8b7526c 100644 --- a/scratchattach/utils/exceptions.py +++ b/scratchattach/utils/exceptions.py @@ -1,4 +1,5 @@ # Authentication / Authorization: +from __future__ import annotations class Unauthenticated(Exception): """ @@ -28,7 +29,9 @@ class Unauthorized(Exception): """ def __init__(self, message=""): - self.message = "The user corresponding to the connected login / session is not allowed to perform this action." + self.message = ( + f"The user corresponding to the connected login / session is not allowed to perform this action. " + f"{message}") super().__init__(self.message) @@ -88,6 +91,7 @@ class CommentNotFound(Exception): # Invalid inputs + class InvalidLanguage(Exception): """ Raised when an invalid language/language code/language object is provided, for TTS or Translate @@ -101,6 +105,7 @@ class InvalidTTSGender(Exception): """ pass + # API errors: class LoginFailure(Exception): diff --git a/scratchattach/utils/requests.py b/scratchattach/utils/requests.py index f772db04..c015cfe6 100644 --- a/scratchattach/utils/requests.py +++ b/scratchattach/utils/requests.py @@ -1,17 +1,20 @@ +from __future__ import annotations + import requests from . import exceptions proxies = None + class Requests: """ Centralized HTTP request handler (for better error handling and proxies) """ @staticmethod - def check_response(r : requests.Response): + def check_response(r: requests.Response): if r.status_code == 403 or r.status_code == 401: - raise exceptions.Unauthorized + raise exceptions.Unauthorized(f"Request content: {r.content}") if r.status_code == 500: raise exceptions.APIError("Internal Scratch server error") if r.status_code == 429: @@ -24,16 +27,18 @@ def check_response(r : requests.Response): @staticmethod def get(url, *, data=None, json=None, headers=None, cookies=None, timeout=None, params=None): try: - r = requests.get(url, data=data, json=json, headers=headers, cookies=cookies, params=params, timeout=timeout, proxies=proxies) + r = requests.get(url, data=data, json=json, headers=headers, cookies=cookies, params=params, + timeout=timeout, proxies=proxies) except Exception as e: raise exceptions.FetchError(e) Requests.check_response(r) return r - + @staticmethod - def post(url, *, data=None, json=None, headers=None, cookies=None, timeout=None, params=None, errorhandling=True): + def post(url, *, data=None, json=None, headers=None, cookies=None, timeout=None, params=None, files=None, errorhandling=True, ): try: - r = requests.post(url, data=data, json=json, headers=headers, cookies=cookies, params=params, timeout=timeout, proxies=proxies) + r = requests.post(url, data=data, json=json, headers=headers, cookies=cookies, params=params, + timeout=timeout, proxies=proxies, files=files) except Exception as e: raise exceptions.FetchError(e) if errorhandling: @@ -43,7 +48,8 @@ def post(url, *, data=None, json=None, headers=None, cookies=None, timeout=None, @staticmethod def delete(url, *, data=None, json=None, headers=None, cookies=None, timeout=None, params=None): try: - r = requests.delete(url, data=data, json=json, headers=headers, cookies=cookies, params=params, timeout=timeout, proxies=proxies) + r = requests.delete(url, data=data, json=json, headers=headers, cookies=cookies, params=params, + timeout=timeout, proxies=proxies) except Exception as e: raise exceptions.FetchError(e) Requests.check_response(r) @@ -52,8 +58,10 @@ def delete(url, *, data=None, json=None, headers=None, cookies=None, timeout=Non @staticmethod def put(url, *, data=None, json=None, headers=None, cookies=None, timeout=None, params=None): try: - r = requests.put(url, data=data, json=json, headers=headers, cookies=cookies, params=params, timeout=timeout, proxies=proxies) + r = requests.put(url, data=data, json=json, headers=headers, cookies=cookies, params=params, + timeout=timeout, proxies=proxies) except Exception as e: raise exceptions.FetchError(e) Requests.check_response(r) - return r \ No newline at end of file + return r +