From 0b0e5d21badb36d583101a5c83759054e77dc8b1 Mon Sep 17 00:00:00 2001 From: Izan Date: Sat, 24 Sep 2022 20:33:18 +0100 Subject: [PATCH 1/3] Migrate all message utils from bot and sir-lancebot. --- botcore/utils/messages.py | 330 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 330 insertions(+) create mode 100644 botcore/utils/messages.py diff --git a/botcore/utils/messages.py b/botcore/utils/messages.py new file mode 100644 index 000000000..e9a7cc489 --- /dev/null +++ b/botcore/utils/messages.py @@ -0,0 +1,330 @@ +import asyncio +import random +import re +from functools import partial +from io import BytesIO +from typing import Callable, Sequence + +import discord +from discord.ext import commands + +from botcore.utils import scheduling +from botcore.utils.logging import get_logger + + +MODERATION_ROLES = ( + 476190234653229056, # admin + 831924068852826163, # moderators + 476190357927886848, # moderation team + 476190391595433985 # owners +) + +NEGATIVE_REPLIES = [ + "Noooooo!!", + "Nope.", + "I'm sorry Dave, I'm afraid I can't do that.", + "I don't think so.", + "Not gonna happen.", + "Out of the question.", + "Huh? No.", + "Nah.", + "Naw.", + "Not likely.", + "No way, José.", + "Not in a million years.", + "Fat chance.", + "Certainly not.", + "NEGATORY.", + "Nuh-uh.", + "Not in my house!", +] + +log = get_logger(__name__) + + +def reaction_check( + reaction: discord.Reaction, + user: discord.abc.User, + *, + message_id: int, + allowed_emoji: Sequence[str], + allowed_users: Sequence[int], + allow_mods: bool = True, +) -> bool: + """ + Check if a reaction's emoji and author are allowed and the message is `message_id`. + + If the user is not allowed, remove the reaction. Ignore reactions made by the bot. + If `allow_mods` is True, allow users with moderator roles even if they're not in `allowed_users`. + """ + right_reaction = ( + not user.bot + and reaction.message.id == message_id + and str(reaction.emoji) in allowed_emoji + ) + if not right_reaction: + return False + + is_moderator = ( + allow_mods + and any(role.id in MODERATION_ROLES for role in getattr(user, "roles", [])) + ) + + if user.id in allowed_users or is_moderator: + log.trace(f"Allowed reaction {reaction} by {user} on {reaction.message.id}.") + return True + else: + log.trace(f"Removing reaction {reaction} by {user} on {reaction.message.id}: disallowed user.") + scheduling.create_task( + reaction.message.remove_reaction(reaction.emoji, user), + suppressed_exceptions=(discord.HTTPException,), + name=f"remove_reaction-{reaction}-{reaction.message.id}-{user}" + ) + return False + + +async def wait_for_deletion( + bot: commands.Bot, + message: discord.Message, + user_ids: Sequence[int], + deletion_emojis: Sequence[str] = ("<:trashcan:675729438528503910>",), + timeout: float = 60 * 5, + attach_emojis: bool = True, + allow_mods: bool = True +) -> None: + """ + Wait for any of `user_ids` to react with one of the `deletion_emojis` within `timeout` seconds to delete `message`. + + If `timeout` expires then reactions are cleared to indicate the option to delete has expired. + + An `attach_emojis` bool may be specified to determine whether to attach the given + `deletion_emojis` to the message in the given `context`. + An `allow_mods` bool may also be specified to allow anyone with a role in `MODERATION_ROLES` to delete + the message. + """ + if message.guild is None: + raise ValueError("Message must be sent on a guild") + + if attach_emojis: + for emoji in deletion_emojis: + try: + await message.add_reaction(emoji) + except discord.NotFound: + log.trace(f"Aborting wait_for_deletion: message {message.id} deleted prematurely.") + return + + check = partial( + reaction_check, + message_id=message.id, + allowed_emoji=deletion_emojis, + allowed_users=user_ids, + allow_mods=allow_mods, + ) + + try: + try: + await bot.wait_for('reaction_add', check=check, timeout=timeout) + except asyncio.TimeoutError: + await message.clear_reactions() + else: + await message.delete() + except discord.NotFound: + log.trace(f"wait_for_deletion: message {message.id} deleted prematurely.") + + +async def send_attachments( + message: discord.Message, + destination: discord.TextChannel | discord.Webhook, + link_large: bool = True, + use_cached: bool = False, + **kwargs +) -> list[str]: + """ + Re-upload the message's attachments to the destination and return a list of their new URLs. + + Each attachment is sent as a separate message to more easily comply with the request/file size + limit. If link_large is True, attachments which are too large are instead grouped into a single + embed which links to them. Extra kwargs will be passed to send() when sending the attachment. + """ + webhook_send_kwargs = { + 'username': message.author.display_name, + 'avatar_url': message.author.display_avatar.url, + } + webhook_send_kwargs.update(kwargs) + webhook_send_kwargs['username'] = sub_clyde(webhook_send_kwargs['username']) + + large = [] + urls = [] + for attachment in message.attachments: + failure_msg = ( + f"Failed to re-upload attachment {attachment.filename} from message {message.id}" + ) + + try: + # Allow 512 bytes of leeway for the rest of the request. + # This should avoid most files that are too large, + # but some may get through hence the try-catch. + if attachment.size <= destination.guild.filesize_limit - 512: + with BytesIO() as file: + await attachment.save(file, use_cached=use_cached) + attachment_file = discord.File(file, filename=attachment.filename) + + if isinstance(destination, discord.TextChannel): + msg = await destination.send(file=attachment_file, **kwargs) + urls.append(msg.attachments[0].url) + else: + await destination.send(file=attachment_file, **webhook_send_kwargs) + elif link_large: + large.append(attachment) + else: + log.info(f"{failure_msg} because it's too large.") + except discord.HTTPException as e: + if link_large and e.status == 413: + large.append(attachment) + else: + log.warning(f"{failure_msg} with status {e.status}.", exc_info=e) + + if link_large and large: + desc = "\n".join(f"[{attachment.filename}]({attachment.url})" for attachment in large) + embed = discord.Embed(description=desc) + embed.set_footer(text="Attachments exceed upload size limit.") + + if isinstance(destination, discord.TextChannel): + await destination.send(embed=embed, **kwargs) + else: + await destination.send(embed=embed, **webhook_send_kwargs) + + return urls + + +async def count_unique_users_reaction( + message: discord.Message, + reaction_predicate: Callable[[discord.Reaction], bool] = lambda _: True, + user_predicate: Callable[[discord.User], bool] = lambda _: True, + count_bots: bool = True +) -> int: + """ + Count the amount of unique users who reacted to the message. + + A reaction_predicate function can be passed to check if this reaction should be counted, + another user_predicate to check if the user should also be counted along with a count_bot flag. + """ + unique_users = set() + + for reaction in message.reactions: + if reaction_predicate(reaction): + async for user in reaction.users(): + if (count_bots or not user.bot) and user_predicate(user): + unique_users.add(user.id) + + return len(unique_users) + + +async def pin_no_system_message(message: discord.Message) -> bool: + """Pin the given message, wait a couple of seconds and try to delete the system message.""" + await message.pin() + + # Make sure that we give it enough time to deliver the message + await asyncio.sleep(2) + # Search for the system message in the last 10 messages + async for historical_message in message.channel.history(limit=10): + if historical_message.type == discord.MessageType.pins_add: + await historical_message.delete() + return True + + return False + + +async def send_denial(ctx: commands.Context, reason: str) -> discord.Message: + """Send an embed denying the user with the given reason.""" + embed = discord.Embed() + embed.colour = discord.Colour.red() + embed.title = random.choice(NEGATIVE_REPLIES) + embed.description = reason + + return await ctx.send(embed=embed) + + +def format_user(user: discord.abc.User) -> str: + """Return a string for `user` which has their mention and ID.""" + return f"{user.mention} (`{user.id}`)" + + +def sub_clyde(username: str | None) -> str | None: + """ + Replace "e"/"E" in any "clyde" in `username` with a Cyrillic "е"/"E" and return the new string. + + Discord disallows "clyde" anywhere in the username for webhooks. It will return a 400. + Return None only if `username` is None. + """ + def replace_e(match: re.Match) -> str: + char = "е" if match[2] == "e" else "Е" + return match[1] + char + + if username: + return re.sub(r"(clyd)(e)", replace_e, username, flags=re.I) + else: + return username # Empty string or None + + +async def get_discord_message(ctx: commands.Context, text: str) -> discord.Message | str: + """ + Attempts to convert a given `text` to a discord Message object and return it. + + Conversion will succeed if given a discord Message ID or link. + Returns `text` if the conversion fails. + """ + try: + text = await commands.MessageConverter().convert(ctx, text) + except commands.BadArgument: + pass + + return text + + +async def get_text_and_embed(ctx: commands.Context, text: str) -> tuple[str, discord.Embed | None]: + """ + Attempts to extract the text and embed from a possible link to a discord Message. + + Does not retrieve the text and embed from the Message if it is in a channel the user does + not have read permissions in. + + Returns a tuple of: + str: If `text` is a valid discord Message, the contents of the message, else `text`. + Embed | None: The embed if found in the valid Message, else `None` + """ + embed: discord.Embed | None = None + + msg = await get_discord_message(ctx, text) + # Ensure the user has read permissions for the channel the message is in + if isinstance(msg, discord.Message): + permissions = msg.channel.permissions_for(ctx.author) + if permissions.read_messages: + text = msg.clean_content + # Take first embed because we can't send multiple embeds + if msg.embeds: + embed = msg.embeds[0] + + return text, embed + + +def convert_embed(func: Callable[[str, ], str], embed: discord.Embed) -> discord.Embed: + """ + Converts the text in an embed using a given conversion function, then return the embed. + + Only modifies the following fields: title, description, footer, fields + """ + embed_dict = embed.to_dict() + + embed_dict["title"] = func(embed_dict.get("title", "")) + embed_dict["description"] = func(embed_dict.get("description", "")) + + if "footer" in embed_dict: + embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) + + if "fields" in embed_dict: + for field in embed_dict["fields"]: + field["name"] = func(field.get("name", "")) + field["value"] = func(field.get("value", "")) + + return discord.Embed.from_dict(embed_dict) From 7c4060e3428d146bd948328d7c5cd4efb304ba26 Mon Sep 17 00:00:00 2001 From: TizzySaurus Date: Sun, 23 Oct 2022 11:18:17 +0100 Subject: [PATCH 2/3] Replace utils.messages constants with args in respective functions. --- botcore/utils/messages.py | 40 ++++++++------------------------------- 1 file changed, 8 insertions(+), 32 deletions(-) diff --git a/botcore/utils/messages.py b/botcore/utils/messages.py index e9a7cc489..185575b1e 100644 --- a/botcore/utils/messages.py +++ b/botcore/utils/messages.py @@ -12,33 +12,6 @@ from botcore.utils.logging import get_logger -MODERATION_ROLES = ( - 476190234653229056, # admin - 831924068852826163, # moderators - 476190357927886848, # moderation team - 476190391595433985 # owners -) - -NEGATIVE_REPLIES = [ - "Noooooo!!", - "Nope.", - "I'm sorry Dave, I'm afraid I can't do that.", - "I don't think so.", - "Not gonna happen.", - "Out of the question.", - "Huh? No.", - "Nah.", - "Naw.", - "Not likely.", - "No way, José.", - "Not in a million years.", - "Fat chance.", - "Certainly not.", - "NEGATORY.", - "Nuh-uh.", - "Not in my house!", -] - log = get_logger(__name__) @@ -49,13 +22,14 @@ def reaction_check( message_id: int, allowed_emoji: Sequence[str], allowed_users: Sequence[int], + mod_roles: Sequence[int], allow_mods: bool = True, ) -> bool: """ Check if a reaction's emoji and author are allowed and the message is `message_id`. If the user is not allowed, remove the reaction. Ignore reactions made by the bot. - If `allow_mods` is True, allow users with moderator roles even if they're not in `allowed_users`. + If `allow_mods` is True, allow users with `mod_roles` even if they're not in `allowed_users`. """ right_reaction = ( not user.bot @@ -67,7 +41,7 @@ def reaction_check( is_moderator = ( allow_mods - and any(role.id in MODERATION_ROLES for role in getattr(user, "roles", [])) + and any(role.id in mod_roles for role in getattr(user, "roles", [])) ) if user.id in allowed_users or is_moderator: @@ -87,6 +61,7 @@ async def wait_for_deletion( bot: commands.Bot, message: discord.Message, user_ids: Sequence[int], + mod_roles: Sequence[int], deletion_emojis: Sequence[str] = ("<:trashcan:675729438528503910>",), timeout: float = 60 * 5, attach_emojis: bool = True, @@ -99,7 +74,7 @@ async def wait_for_deletion( An `attach_emojis` bool may be specified to determine whether to attach the given `deletion_emojis` to the message in the given `context`. - An `allow_mods` bool may also be specified to allow anyone with a role in `MODERATION_ROLES` to delete + An `allow_mods` bool may also be specified to allow anyone with a role in `mod_roles` to delete the message. """ if message.guild is None: @@ -118,6 +93,7 @@ async def wait_for_deletion( message_id=message.id, allowed_emoji=deletion_emojis, allowed_users=user_ids, + mod_roles=mod_roles, allow_mods=allow_mods, ) @@ -235,11 +211,11 @@ async def pin_no_system_message(message: discord.Message) -> bool: return False -async def send_denial(ctx: commands.Context, reason: str) -> discord.Message: +async def send_denial(ctx: commands.Context, reason: str, *, negative_replies: Sequence[str]) -> discord.Message: """Send an embed denying the user with the given reason.""" embed = discord.Embed() embed.colour = discord.Colour.red() - embed.title = random.choice(NEGATIVE_REPLIES) + embed.title = random.choice(negative_replies) embed.description = reason return await ctx.send(embed=embed) From ed8b4f787a251227a1da3572459c5722c3f99133 Mon Sep 17 00:00:00 2001 From: TizzySaurus Date: Sun, 23 Oct 2022 11:57:39 +0100 Subject: [PATCH 3/3] Edit `reaction_check` and `wait_for_deletion` to have an `allowed_users` and `allowed_roles` kwarg. --- botcore/utils/messages.py | 42 +++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/botcore/utils/messages.py b/botcore/utils/messages.py index 185575b1e..fdf692534 100644 --- a/botcore/utils/messages.py +++ b/botcore/utils/messages.py @@ -20,31 +20,27 @@ def reaction_check( user: discord.abc.User, *, message_id: int, - allowed_emoji: Sequence[str], + allowed_emojis: Sequence[str], allowed_users: Sequence[int], - mod_roles: Sequence[int], - allow_mods: bool = True, + allowed_roles: Sequence[int], ) -> bool: """ - Check if a reaction's emoji and author are allowed and the message is `message_id`. + Checks if a reaction's emoji and author are allowed. - If the user is not allowed, remove the reaction. Ignore reactions made by the bot. - If `allow_mods` is True, allow users with `mod_roles` even if they're not in `allowed_users`. + A reaction's emoji is allowed when it's not by a bot, is on `message_id`, and in `allowed_emojis`. + A user is allowed when their id is in `allowed_users`, or they have a role that's in `allowed_roles`. + + If the user is not allowed, removes the reaction. """ right_reaction = ( not user.bot and reaction.message.id == message_id - and str(reaction.emoji) in allowed_emoji + and str(reaction.emoji) in allowed_emojis ) if not right_reaction: return False - is_moderator = ( - allow_mods - and any(role.id in mod_roles for role in getattr(user, "roles", [])) - ) - - if user.id in allowed_users or is_moderator: + if user.id in allowed_users or any(role.id in allowed_roles for role in getattr(user, "roles", [])): log.trace(f"Allowed reaction {reaction} by {user} on {reaction.message.id}.") return True else: @@ -60,22 +56,21 @@ def reaction_check( async def wait_for_deletion( bot: commands.Bot, message: discord.Message, - user_ids: Sequence[int], - mod_roles: Sequence[int], + *, + allowed_users: Sequence[int], + allowed_roles: Sequence[int], deletion_emojis: Sequence[str] = ("<:trashcan:675729438528503910>",), timeout: float = 60 * 5, attach_emojis: bool = True, - allow_mods: bool = True ) -> None: """ - Wait for any of `user_ids` to react with one of the `deletion_emojis` within `timeout` seconds to delete `message`. + Waits for an allowed user to react with one of the `deletion_emojis` within `timeout` seconds to delete `message`. + + A user is defined as allowed if their id is in `allowed_users`, or they have a role that's in `allowed_roles`. If `timeout` expires then reactions are cleared to indicate the option to delete has expired. - An `attach_emojis` bool may be specified to determine whether to attach the given - `deletion_emojis` to the message in the given `context`. - An `allow_mods` bool may also be specified to allow anyone with a role in `mod_roles` to delete - the message. + An `attach_emojis` bool may be provided to determine whether to attach the given `deletion_emojis` to the `message`. """ if message.guild is None: raise ValueError("Message must be sent on a guild") @@ -92,9 +87,8 @@ async def wait_for_deletion( reaction_check, message_id=message.id, allowed_emoji=deletion_emojis, - allowed_users=user_ids, - mod_roles=mod_roles, - allow_mods=allow_mods, + allowed_users=allowed_users, + allowed_roles=allowed_roles, ) try: