Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate all message utils from bot and sir-lancebot. #141

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 306 additions & 0 deletions botcore/utils/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
import asyncio
import random
import re
from functools import partial
from io import BytesIO
from typing import Callable, Sequence

import discord
from discord.ext import commands

from botcore.utils import scheduling
from botcore.utils.logging import get_logger


log = get_logger(__name__)


def reaction_check(
reaction: discord.Reaction,
user: discord.abc.User,
*,
message_id: int,
allowed_emoji: Sequence[str],
allowed_users: Sequence[int],
mod_roles: Sequence[int],
allow_mods: bool = True,
TizzySaurus marked this conversation as resolved.
Show resolved Hide resolved
) -> bool:
"""
Check if a reaction's emoji and author are allowed and the message is `message_id`.

If the user is not allowed, remove the reaction. Ignore reactions made by the bot.
If `allow_mods` is True, allow users with `mod_roles` even if they're not in `allowed_users`.
"""
right_reaction = (
not user.bot
and reaction.message.id == message_id
and str(reaction.emoji) in allowed_emoji
)
if not right_reaction:
return False

is_moderator = (
allow_mods
and any(role.id in mod_roles for role in getattr(user, "roles", []))
)

if user.id in allowed_users or is_moderator:
log.trace(f"Allowed reaction {reaction} by {user} on {reaction.message.id}.")
return True
else:
log.trace(f"Removing reaction {reaction} by {user} on {reaction.message.id}: disallowed user.")
scheduling.create_task(
reaction.message.remove_reaction(reaction.emoji, user),
suppressed_exceptions=(discord.HTTPException,),
name=f"remove_reaction-{reaction}-{reaction.message.id}-{user}"
)
return False


async def wait_for_deletion(
bot: commands.Bot,
message: discord.Message,
user_ids: Sequence[int],
TizzySaurus marked this conversation as resolved.
Show resolved Hide resolved
mod_roles: Sequence[int],
deletion_emojis: Sequence[str] = ("<:trashcan:675729438528503910>",),
timeout: float = 60 * 5,
attach_emojis: bool = True,
allow_mods: bool = True
) -> None:
"""
Wait for any of `user_ids` to react with one of the `deletion_emojis` within `timeout` seconds to delete `message`.

If `timeout` expires then reactions are cleared to indicate the option to delete has expired.

An `attach_emojis` bool may be specified to determine whether to attach the given
`deletion_emojis` to the message in the given `context`.
An `allow_mods` bool may also be specified to allow anyone with a role in `mod_roles` to delete
the message.
"""
if message.guild is None:
raise ValueError("Message must be sent on a guild")

if attach_emojis:
for emoji in deletion_emojis:
try:
await message.add_reaction(emoji)
except discord.NotFound:
log.trace(f"Aborting wait_for_deletion: message {message.id} deleted prematurely.")
return

check = partial(
reaction_check,
message_id=message.id,
allowed_emoji=deletion_emojis,
allowed_users=user_ids,
mod_roles=mod_roles,
allow_mods=allow_mods,
)

try:
try:
await bot.wait_for('reaction_add', check=check, timeout=timeout)
except asyncio.TimeoutError:
await message.clear_reactions()
else:
await message.delete()
except discord.NotFound:
log.trace(f"wait_for_deletion: message {message.id} deleted prematurely.")


async def send_attachments(
message: discord.Message,
destination: discord.TextChannel | discord.Webhook,
link_large: bool = True,
use_cached: bool = False,
**kwargs
) -> list[str]:
"""
Re-upload the message's attachments to the destination and return a list of their new URLs.

Each attachment is sent as a separate message to more easily comply with the request/file size
limit. If link_large is True, attachments which are too large are instead grouped into a single
embed which links to them. Extra kwargs will be passed to send() when sending the attachment.
"""
webhook_send_kwargs = {
'username': message.author.display_name,
'avatar_url': message.author.display_avatar.url,
}
webhook_send_kwargs.update(kwargs)
webhook_send_kwargs['username'] = sub_clyde(webhook_send_kwargs['username'])

large = []
urls = []
for attachment in message.attachments:
failure_msg = (
f"Failed to re-upload attachment {attachment.filename} from message {message.id}"
)

try:
# Allow 512 bytes of leeway for the rest of the request.
# This should avoid most files that are too large,
# but some may get through hence the try-catch.
if attachment.size <= destination.guild.filesize_limit - 512:
with BytesIO() as file:
await attachment.save(file, use_cached=use_cached)
attachment_file = discord.File(file, filename=attachment.filename)

if isinstance(destination, discord.TextChannel):
msg = await destination.send(file=attachment_file, **kwargs)
urls.append(msg.attachments[0].url)
else:
await destination.send(file=attachment_file, **webhook_send_kwargs)
elif link_large:
large.append(attachment)
else:
log.info(f"{failure_msg} because it's too large.")
except discord.HTTPException as e:
if link_large and e.status == 413:
large.append(attachment)
else:
log.warning(f"{failure_msg} with status {e.status}.", exc_info=e)

if link_large and large:
desc = "\n".join(f"[{attachment.filename}]({attachment.url})" for attachment in large)
embed = discord.Embed(description=desc)
embed.set_footer(text="Attachments exceed upload size limit.")

if isinstance(destination, discord.TextChannel):
await destination.send(embed=embed, **kwargs)
else:
await destination.send(embed=embed, **webhook_send_kwargs)

return urls


async def count_unique_users_reaction(
message: discord.Message,
reaction_predicate: Callable[[discord.Reaction], bool] = lambda _: True,
user_predicate: Callable[[discord.User], bool] = lambda _: True,
count_bots: bool = True
) -> int:
"""
Count the amount of unique users who reacted to the message.

A reaction_predicate function can be passed to check if this reaction should be counted,
another user_predicate to check if the user should also be counted along with a count_bot flag.
"""
unique_users = set()

for reaction in message.reactions:
if reaction_predicate(reaction):
async for user in reaction.users():
if (count_bots or not user.bot) and user_predicate(user):
unique_users.add(user.id)

return len(unique_users)


async def pin_no_system_message(message: discord.Message) -> bool:
"""Pin the given message, wait a couple of seconds and try to delete the system message."""
await message.pin()

# Make sure that we give it enough time to deliver the message
await asyncio.sleep(2)
# Search for the system message in the last 10 messages
async for historical_message in message.channel.history(limit=10):
if historical_message.type == discord.MessageType.pins_add:
await historical_message.delete()
return True

return False


async def send_denial(ctx: commands.Context, reason: str, *, negative_replies: Sequence[str]) -> discord.Message:
"""Send an embed denying the user with the given reason."""
embed = discord.Embed()
embed.colour = discord.Colour.red()
embed.title = random.choice(negative_replies)
embed.description = reason

return await ctx.send(embed=embed)


def format_user(user: discord.abc.User) -> str:
"""Return a string for `user` which has their mention and ID."""
return f"{user.mention} (`{user.id}`)"


def sub_clyde(username: str | None) -> str | None:
"""
Replace "e"/"E" in any "clyde" in `username` with a Cyrillic "е"/"E" and return the new string.

Discord disallows "clyde" anywhere in the username for webhooks. It will return a 400.
Return None only if `username` is None.
"""
def replace_e(match: re.Match) -> str:
char = "е" if match[2] == "e" else "Е"
return match[1] + char

if username:
return re.sub(r"(clyd)(e)", replace_e, username, flags=re.I)
else:
return username # Empty string or None


async def get_discord_message(ctx: commands.Context, text: str) -> discord.Message | str:
"""
Attempts to convert a given `text` to a discord Message object and return it.

Conversion will succeed if given a discord Message ID or link.
Returns `text` if the conversion fails.
"""
try:
text = await commands.MessageConverter().convert(ctx, text)
except commands.BadArgument:
pass

return text


async def get_text_and_embed(ctx: commands.Context, text: str) -> tuple[str, discord.Embed | None]:
"""
Attempts to extract the text and embed from a possible link to a discord Message.

Does not retrieve the text and embed from the Message if it is in a channel the user does
not have read permissions in.

Returns a tuple of:
str: If `text` is a valid discord Message, the contents of the message, else `text`.
Embed | None: The embed if found in the valid Message, else `None`
"""
embed: discord.Embed | None = None

msg = await get_discord_message(ctx, text)
# Ensure the user has read permissions for the channel the message is in
if isinstance(msg, discord.Message):
permissions = msg.channel.permissions_for(ctx.author)
if permissions.read_messages:
text = msg.clean_content
# Take first embed because we can't send multiple embeds
if msg.embeds:
embed = msg.embeds[0]

return text, embed


def convert_embed(func: Callable[[str, ], str], embed: discord.Embed) -> discord.Embed:
"""
Converts the text in an embed using a given conversion function, then return the embed.

Only modifies the following fields: title, description, footer, fields
"""
embed_dict = embed.to_dict()

embed_dict["title"] = func(embed_dict.get("title", ""))
embed_dict["description"] = func(embed_dict.get("description", ""))

if "footer" in embed_dict:
embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", ""))

if "fields" in embed_dict:
for field in embed_dict["fields"]:
field["name"] = func(field.get("name", ""))
field["value"] = func(field.get("value", ""))

return discord.Embed.from_dict(embed_dict)