diff --git a/.gitignore b/.gitignore index a59d39d..c0c980e 100644 --- a/.gitignore +++ b/.gitignore @@ -105,6 +105,7 @@ celerybeat.pid *.sage.py # Environments +.env.* *.env *.venv env/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..11d7e15 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# Use the official Python image as the base image +FROM python:3.10 + +# Install necessary dependencies for Chrome and ChromeDriver +RUN apt-get update && apt-get install -y wget gnupg +RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - +RUN echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list +RUN apt-get update && apt-get install -y google-chrome-stable + +# Create a new non-root user with a home directory +RUN useradd -m -s /bin/bash spellsbot + +# Set the working directory to /app +WORKDIR /app + +# Copy the requirements file into the container at /app +COPY requirements.txt . + +# Install project dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code +COPY . /app + +# Change the ownership of the /app directory to the newly created user +RUN chown -R spellsbot:spellsbot /app +# Set the user for subsequent commands +USER spellsbot + +ENV CHROME_HEADLESS=1 +ENV PYTHONPATH=/app + +# Run the run.py script +CMD ["python", "spells_bot/run.py"] diff --git a/README.md b/README.md index 91b1746..48378eb 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,30 @@ Pathfinder Spellbook for Telegram. Available at [@SpellsBot](https://t.me/SpellsBot). + +### Run +Build +``` +docker build --rm -t spellsbot . +``` + +Run the container providing `--env-file` and `-v` mounts for database file and table data directory +``` +docker run -d --name spellsbot-dev \ + --env-file .env.dev \ + -v /botdata/db_files:/db_data \ + -v /botdata/data:/table_data \ + spellsbot + +``` + +### Telegram Commands +`/start` - Приветствие + +`/menu` - Поиск по классам и кругам + +`/spellbook` - Моя книга заклинаний + +`/help` - Как пользоваться книгой + +`/settings` - Настройки поиска diff --git a/bot.py b/bot.py deleted file mode 100644 index 1d5d89d..0000000 --- a/bot.py +++ /dev/null @@ -1,164 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -from argparse import ArgumentParser -from pathlib import Path - -from telegram import Update -from telegram.ext import ( - CallbackContext, - Updater, - InlineQueryHandler, - CommandHandler, - CallbackQueryHandler, -) - -from spells_bot.config import BotSettings -from spells_bot.responder import Responder -from spells_bot.utils.log import create_logger - - -logger = create_logger("SpellsBot") - - -def start(update: Update, context: CallbackContext): - """Send a message when the command /start is issued.""" - try: - chat_id = update.effective_user.id - spell_id = context.args[0] - - context.bot.send_media_group(**responder.send_spell_tables(chat_id, spell_id)) - except IndexError: - update.message.reply_text(**responder.greet()) - - -def help(update: Update, context: CallbackContext): - """Send a message when the command /help is issued.""" - update.message.reply_text(**responder.help()) - - -def inline_query(update: Update, context: CallbackContext): - """Handle the inline query.""" - chat = update.inline_query.from_user["username"] - chat_id = update.inline_query.from_user.id - query = update.inline_query.query.lower().strip() - - if not query: - return - - logger.info(f'User @{chat} [{chat_id}] searched "{query}"') - - update.inline_query.answer(**responder.inline_search(query, chat_id)) - - -def menu(update: Update, context: CallbackContext): - """Send main menu message""" - chat_id = update.effective_user.id - update.message.reply_text(**responder.menu(chat_id)) - - -def tables_callback(update: Update, context: CallbackContext): - user_id = update.effective_user.id - cmd, spell_id = Responder.decode_callback(update.callback_query.data) - - try: - context.bot.send_media_group(**responder.send_spell_tables(user_id, spell_id)) - update.callback_query.answer("") - except Exception as e: - logger.info( - f"Redirecting to bot chat on 'show tables' click because {type(e)}: {e}" - ) - - update.callback_query.answer( - "Перейдите в чат с ботом, чтобы посмотреть таблицы", show_alert=True - ) - update.callback_query.edit_message_reply_markup( - **responder.redirect_button(spell_id) - ) - - -def home_callback(update: Update, context: CallbackContext): - cmd = Responder.decode_callback(update.callback_query.data) - chat_id = update.callback_query.from_user.id - - update.effective_message.edit_text(**responder.menu(chat_id)) - update.callback_query.answer("") - - -def class_callback(update: Update, context: CallbackContext): - cmd, class_id = Responder.decode_callback(update.callback_query.data) - update.effective_message.edit_text(**responder.menu_class(class_id)) - update.callback_query.answer("") - - -def class_info_callback(update: Update, context: CallbackContext): - cmd, class_id = Responder.decode_callback(update.callback_query.data) - update.effective_message.reply_text(**responder.class_info_delimiter(class_id)) - update.effective_message.reply_media_group( - **responder.send_class_info_tables(class_id) - ) - update.effective_message.delete() - update.effective_message.reply_text( - **responder.menu_class(class_id, tables_button=False) - ) - update.callback_query.answer("") - - -def level_callback(update: Update, context: CallbackContext): - cmd, class_id, level, page = Responder.decode_callback(update.callback_query.data) - chat_id = update.callback_query.from_user.id - - update.effective_message.edit_text( - **responder.menu_level(class_id, level, chat_id, page) - ) - update.callback_query.answer("") - - -def search_settings_callback(update: Update, context: CallbackContext): - cmd, book = Responder.decode_callback(update.callback_query.data) - chat_id = update.callback_query.from_user.id - - update.callback_query.edit_message_reply_markup( - **responder.update_search_settings(chat_id, book) - ) - update.callback_query.answer(f"Updated settings for {chat_id}") - - -def pass_callback(update: Update, context: CallbackContext): - update.callback_query.answer("") - - -def error(update: Update, context: CallbackContext): - """Log Errors caused by Updates.""" - logger.warning(f"Update {update} caused error {context.error}") - - -if __name__ == "__main__": - parser = ArgumentParser() - parser.add_argument("-e", "--env", type=Path, required=True) - options = parser.parse_args() - - settings = BotSettings() - - updater = Updater(settings.telegram.bot_token) - responder = Responder(settings, updater.bot.link) - - h = updater.dispatcher.add_handler - eh = updater.dispatcher.add_error_handler - - h(CommandHandler("start", start)) - h(CommandHandler("help", help)) - h(CommandHandler("menu", menu)) - h(InlineQueryHandler(inline_query)) - h(CallbackQueryHandler(search_settings_callback, pattern=r"SETTINGS")) - h(CallbackQueryHandler(tables_callback, pattern=r"TABLE:.*")) - h(CallbackQueryHandler(home_callback, pattern=r"^HOME")) - h(CallbackQueryHandler(class_callback, pattern=r"CLASS:.*")) - h(CallbackQueryHandler(class_info_callback, pattern=r"CLASSINFO:.*")) - h(CallbackQueryHandler(level_callback, pattern=r"LEVEL:.*")) - h(CallbackQueryHandler(pass_callback, pattern=r"^PASS$")) - eh(error) - - updater.start_polling() - logger.info("BOT DEPLOYED. Ctrl+C to terminate") - - updater.idle() diff --git a/requirements.txt b/requirements.txt index c1bab87..5c3b7c7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,10 @@ -pydantic[dotenv]==1.9.0 -python-telegram-bot==13.4.1 -SQLAlchemy==1.4.31 -requests-html==0.10.0 +git+https://github.com/aiogram/aiogram@v3.0.0rc1 +pydantic==2.1.1 +pydantic-settings==2.0.1 +SQLAlchemy==2.0.18 +aiohttp==3.8.5 +beautifulsoup4==4.12.2 +cachetools==5.3.1 +types-cachetools==5.3.0.5 +html2image==2.0.3 +certifi==2023.7.22 diff --git a/spells_bot/__init__.py b/spells_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spells_bot/bot/__init__.py b/spells_bot/bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spells_bot/bot/callback_schema.py b/spells_bot/bot/callback_schema.py new file mode 100644 index 0000000..e509629 --- /dev/null +++ b/spells_bot/bot/callback_schema.py @@ -0,0 +1,53 @@ +from aiogram.filters.callback_data import CallbackData + + +class MenuClassCallback(CallbackData, prefix="MENU_CLS"): + """""" + + +class BaseClassesCallback(CallbackData, prefix="CLS"): + id: int + + +class ClassesTableCallback(BaseClassesCallback, prefix="CLS_TABLE"): + """""" + + +class ClassesSpellsCallback(BaseClassesCallback, prefix="CLS_SPELL"): + """""" + spell_level: int + page: int | None = 0 + + +class SpellTablesCallback(CallbackData, prefix="SPELL_TABLE"): + spell_id: int + + +class SpellbookReadCallback(CallbackData, prefix="BOOK_R"): + index: int + spell_id: int | None = None + extended: bool = False + + +class SpellbookCreateCallback(CallbackData, prefix="BOOK_C"): + spell_id: int | None = None + + +class SpellbookPromptDeleteCallback(SpellbookReadCallback, prefix="BOOK_TRY_D"): + """""" + + +class SpellbookConfirmDeleteCallback(SpellbookReadCallback, prefix="BOOK_D"): + spell_id: int + + +class ChatSettingsAddFilterCallback(CallbackData, prefix="SETTINGS_ADD"): + rulebook_id: int + + +class ChatSettingsRemoveFilterCallback(CallbackData, prefix="SETTINGS_REMOVE"): + rulebook_id: int + + +class EmptyCallback(CallbackData, prefix="IGNORE"): + """""" diff --git a/spells_bot/bot/messages/__init__.py b/spells_bot/bot/messages/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spells_bot/bot/messages/buttons.py b/spells_bot/bot/messages/buttons.py new file mode 100644 index 0000000..36b14d3 --- /dev/null +++ b/spells_bot/bot/messages/buttons.py @@ -0,0 +1,162 @@ +from typing import Sequence + +from aiogram.filters.callback_data import CallbackData +from aiogram.types import InlineKeyboardButton + +from spells_bot.bot.callback_schema import ( + SpellbookReadCallback, + EmptyCallback, + SpellbookPromptDeleteCallback, + SpellbookConfirmDeleteCallback, + SpellbookCreateCallback, + SpellTablesCallback, + ClassesTableCallback, + ClassesSpellsCallback, + MenuClassCallback, +) +from spells_bot.config import settings + +EMPTY_BUTTON_TEXT = "⠀" + + +def try_inline_search_button(text: str = None, query: str = None): + return InlineKeyboardButton(text=text or "Поиск по названию", switch_inline_query_current_chat=query or "") + + +def empty_callback_button(text: str): + return InlineKeyboardButton(text=text, callback_data=EmptyCallback().pack()) + + +def spell_show_tables_button(spell_id: int): + return InlineKeyboardButton( + text="Показать таблицы", + callback_data=SpellTablesCallback(spell_id=spell_id).pack(), + ) + + +def class_show_tables_button(class_id: int): + return InlineKeyboardButton( + text="Показать таблицы", + callback_data=ClassesTableCallback(id=class_id).pack(), + ) + + +def spell_website_redirect_button(alias: str): + return InlineKeyboardButton(text="🌐 На сайте", url=f"{settings.api.spell_info_url_prefix}/{alias}") + + +def spell_add_to_spellbook_button(spell_id: int): + return InlineKeyboardButton( + text="📖 Добавить в книгу заклинаний", + callback_data=SpellbookCreateCallback(spell_id=spell_id).pack(), + ) + + +def spellbook_add_spell_button(): + return InlineKeyboardButton(text="Добавить новое", switch_inline_query_current_chat="") + + +def spellbook_toggle_extended_button(extended: bool, index: int): + if extended: + button = InlineKeyboardButton( + text="Краткое описание", callback_data=SpellbookReadCallback(index=index, extended=False).pack() + ) + else: + button = InlineKeyboardButton( + text="Полное описание", callback_data=SpellbookReadCallback(index=index, extended=True).pack() + ) + + return button + + +def arrow_selector_button(text: str, callback_data: CallbackData, is_active: bool = True): + if is_active: + button = InlineKeyboardButton(text=text, callback_data=callback_data.pack()) + else: + button = InlineKeyboardButton(text=EMPTY_BUTTON_TEXT, callback_data=EmptyCallback().pack()) + + return button + + +def index_view_selector_button(text: str): + return empty_callback_button(text) + + +def spellbook_left_arrow_button(index: int, extended: bool, is_active: bool = True): + return arrow_selector_button("<", SpellbookReadCallback(index=index - 1, extended=extended), is_active) + + +def spellbook_right_arrow_button(index: int, extended: bool, is_active: bool = True): + return arrow_selector_button(">", SpellbookReadCallback(index=index + 1, extended=extended), is_active) + + +def spellbook_selector_buttons(extended: bool, index: int, index_max: int): + row = [ + spellbook_left_arrow_button(index, extended, is_active=index > 0), + index_view_selector_button(f"{index + 1}/{index_max}"), + spellbook_right_arrow_button(index, extended, is_active=index < index_max - 1), + ] + return row + + +def class_left_arrow_button(class_id: int, spell_level: int, page: int, is_active: bool = True): + return arrow_selector_button( + "<", ClassesSpellsCallback(id=class_id, spell_level=spell_level, page=page - 1), is_active + ) + + +def class_right_arrow_button(class_id: int, spell_level: int, page: int, is_active: bool = True): + return arrow_selector_button( + ">", ClassesSpellsCallback(id=class_id, spell_level=spell_level, page=page + 1), is_active + ) + + +def class_spell_level_page_buttons(class_id: int, spell_level: int, page: int, page_max: int): + row = [ + class_left_arrow_button(class_id, spell_level, page, is_active=page > 0), + index_view_selector_button(f"Страница {page + 1}/{page_max + 1}"), + class_right_arrow_button(class_id, spell_level, page, is_active=page < page_max), + ] + return row + + +def class_spell_level_buttons(class_id: int, spell_levels: Sequence[int], active_spell_level: int = None): + spell_level_button_rows = [[]] + spell_level_row_idx = 0 + max_columns_per_row = 5 + + for level in spell_levels: + + if len(spell_level_button_rows[spell_level_row_idx]) >= max_columns_per_row: + spell_level_row_idx += 1 + spell_level_button_rows.append([]) + + if active_spell_level is not None and level == active_spell_level: + text = "🔘" + callback_data = EmptyCallback().pack() + else: + text = str(level) + callback_data = ClassesSpellsCallback(id=class_id, spell_level=level).pack() + + button = InlineKeyboardButton(text=text, callback_data=callback_data) + spell_level_button_rows[spell_level_row_idx].append(button) + + return spell_level_button_rows + + +def class_main_menu_button(): + return InlineKeyboardButton(text="Назад в меню", callback_data=MenuClassCallback().pack()) + + +def spellbook_delete_button(extended: bool, index: int, spell_id: int): + return InlineKeyboardButton( + text="🗑️ Удалить", + callback_data=SpellbookPromptDeleteCallback(index=index, spell_id=spell_id, extended=extended).pack(), + ) + + +def spellbook_delete_confirm_button(extended: bool, index: int, spell_id: int): + return InlineKeyboardButton( + text="⚠ Подтвердить удаление", + callback_data=SpellbookConfirmDeleteCallback(index=index, spell_id=spell_id, extended=extended).pack(), + ) diff --git a/spells_bot/bot/messages/keyboards.py b/spells_bot/bot/messages/keyboards.py new file mode 100644 index 0000000..9c174ca --- /dev/null +++ b/spells_bot/bot/messages/keyboards.py @@ -0,0 +1,148 @@ +from typing import Sequence + +from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup + +from spells_bot.bot.callback_schema import ( + BaseClassesCallback, + ChatSettingsRemoveFilterCallback, + ChatSettingsAddFilterCallback, +) +from spells_bot.bot.messages import buttons +from spells_bot.pathfinder_api.schemas import BotClassInfo, BotSpellInfo, BotBook + + +def try_inline_search(text: str = None, query: str = None): + keyboard = [[buttons.try_inline_search_button(text, query)]] + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def start_main(): + return try_inline_search() + + +def help_main(): + return try_inline_search() + + +def menu_main(classes: [BotClassInfo]): + keyboard = [[]] + row_idx = 0 + max_columns_per_row = 3 + + for c in classes: + + if len(keyboard[row_idx]) >= max_columns_per_row: + row_idx += 1 + keyboard.append([]) + + button = InlineKeyboardButton(text=c.name, callback_data=BaseClassesCallback(id=c.id).pack()) + keyboard[row_idx].append(button) + + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def menu_class(class_: BotClassInfo, show_tables_button: bool = True): + keyboard = [] + + if show_tables_button: + keyboard.append([buttons.class_show_tables_button(class_.id)]) + + spell_level_button_rows = buttons.class_spell_level_buttons(class_.id, class_.spellLevels) + keyboard += spell_level_button_rows + + keyboard.append([buttons.class_main_menu_button()]) + + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def menu_class_spell_level( + class_: BotClassInfo, + active_spell_level: int, + show_tables_button: bool = True, + active_page: int = 0, + page_max: int = 0, +): + keyboard = [] + + if show_tables_button: + keyboard.append([buttons.class_show_tables_button(class_.id)]) + + if page_max > 0: + spell_level_page_buttons = buttons.class_spell_level_page_buttons( + class_.id, active_spell_level, active_page, page_max + ) + keyboard.append(spell_level_page_buttons) + + spell_level_button_rows = buttons.class_spell_level_buttons(class_.id, class_.spellLevels, active_spell_level) + keyboard += spell_level_button_rows + + keyboard.append([buttons.class_main_menu_button()]) + + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def spellbook_empty(): + keyboard = [[buttons.spellbook_add_spell_button()]] + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def spellbook_main(spell: BotSpellInfo, index: int, index_max: int, extended: bool, tables: bool = False): + keyboard = [] + if tables: + keyboard.append([buttons.spell_show_tables_button(spell.id)]) + + keyboard += [ + [buttons.spellbook_toggle_extended_button(extended, index)], + buttons.spellbook_selector_buttons(extended, index, index_max), + [buttons.spell_website_redirect_button(spell.alias)], + [buttons.spellbook_delete_button(extended, index, spell.id)], + ] + + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def spellbook_main_prompt_delete_spell(spell: BotSpellInfo, index: int, index_max: int, extended: bool): + keyboard = [ + [buttons.spellbook_toggle_extended_button(extended, index)], + buttons.spellbook_selector_buttons(extended, index, index_max), + [buttons.spell_website_redirect_button(spell.alias)], + [buttons.spellbook_delete_confirm_button(extended, index, spell.id)], + ] + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def spellbook_main_after_delete_spell(index: int, index_max: int, extended: bool): + keyboard = [buttons.spellbook_selector_buttons(extended, index, index_max), [buttons.spellbook_add_spell_button()]] + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def extended_description_message(spell: BotSpellInfo, tables: bool = False): + keyboard = [[buttons.spell_show_tables_button(spell.id)] if tables else []] + + keyboard += [ + [buttons.spell_website_redirect_button(spell.alias)], + [buttons.spell_add_to_spellbook_button(spell.id)], + ] + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def extended_description_message_saved_spell(spell: BotSpellInfo): + keyboard = [[buttons.spell_website_redirect_button(spell.alias)]] + return InlineKeyboardMarkup(inline_keyboard=keyboard) + + +def chat_settings(all_rulebooks: Sequence[BotBook], user_rulebooks: Sequence[int]): + keyboard = [] + + for book in all_rulebooks: + if book.id in user_rulebooks: + icon = "☑" + callback_data = ChatSettingsRemoveFilterCallback(rulebook_id=book.id).pack() + else: + icon = "☐" + callback_data = ChatSettingsAddFilterCallback(rulebook_id=book.id).pack() + + button = InlineKeyboardButton(text=f"{icon} {book.name}", callback_data=callback_data) + keyboard.append([button]) + + return InlineKeyboardMarkup(inline_keyboard=keyboard) diff --git a/spells_bot/bot/messages/texts.py b/spells_bot/bot/messages/texts.py new file mode 100644 index 0000000..87c5ff6 --- /dev/null +++ b/spells_bot/bot/messages/texts.py @@ -0,0 +1,274 @@ +import warnings +from typing import Sequence + +from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning + +from spells_bot.config import settings +from spells_bot.pathfinder_api.schemas import BotClassInfo, BotSpellInfo + +warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) + + +def _validate_message_len(msg: str): + """Validate message length against telegram API limits + + Args: + msg: text to be validated. Any markup should be included in the string + + Returns: + True if text can fit in one telegram message, False otherwise + """ + return len(msg.encode("utf-8")) < 4096 + + +def _menu_message_header(text: str): + return f"{text}\n\n" + + +def start_main(): + return ( + f"{_menu_message_header('📖 Книга Заклинаний Pathfinder 1e 🔮')}" + "/spellbook - открыть мою книгу\n" + "/help - как пользоваться книгой\n" + "/menu - поиск по классам\n" + "/settings - настройки поиска\n" + ) + + +def help_main(bot_name: str): + return ( + f"{_menu_message_header('📖 Как пользоваться книгой ℹ')}" + "В этой книге можно искать и записывать заклинания.\n\n" + f"Начните писать\n\n@{bot_name} название заклинания\n\n" + "в любом чате, выберите нужный результат и полное описание заклинания отправится в текущий чат.\n" + "Его можно будет записать в свою книгу избранных заклинаний, которая открывается по команде /spellbook\n\n" + "Для поиска доступных заклинаний для определенного класса, перейдите в /menu.\n\n" + "Настройте фильтр по книгам правил в /settings или во время поиска, выбрав результат внизу списка." + ) + + +def menu_main(): + return f"{_menu_message_header('📖 Поиск по классам 🔮')}" + + +def menu_class(class_: BotClassInfo): + header = _menu_message_header(f"📖 {class_.name} 🔮") + return f"{header}{class_.description}" + + +def menu_class_spell_level(class_: BotClassInfo, spells: Sequence[BotSpellInfo], spell_level: int): + spell_description_pages = [] + header = _menu_message_header(f"{class_.name} {spell_level} круг") + current_page = header + + for s in spells: + spell_line = f"{s.name}: {s.shortDescription}" + possible_current_page = "\n".join([current_page, spell_line]) + + if _validate_message_len(possible_current_page): + current_page = possible_current_page + else: + spell_description_pages.append(current_page) + current_page = "\n".join([header, spell_line]) + + spell_description_pages.append(current_page) + + return spell_description_pages + + +def spellbook_empty(): + return 'Книга заклинаний пуста\n\nВыберите нужное заклинание через поиск и нажмите "Добавить в книгу заклинаний"' + + +def spellbook_main(spell: BotSpellInfo, extended: bool): + tables = None + if extended: + text, tables = extended_description_message(spell) + else: + text = short_description_message_with_optional_values(spell) + return text, tables + + +def spellbook_main_after_delete_spell(): + return "Удалено" + + +def settings_main(bot_name: str): + return ( + f"{_menu_message_header('📖 Настройки поиска ⚙')}" + "Выберите книги правил, в которых хотите искать заклинания. " + f"Настройки распространяются на /menu и поиск через @{bot_name}. " + ) + + +def _insert_into_html_template(html: str): + return f""" + + + + + Title + + + {html} + + + """ + + +def _remove_fraction_values_from_table(soup: BeautifulSoup): + soup.find("a", {"class": "changeFraction"}).extract() + + for fraction_hidden in soup.find_all("span", {"class": "fraction hidden"}): + fraction_hidden.extract() + + +def _remove_html_tags(html: str, keep_links: bool): + soup = BeautifulSoup(html, "html.parser") + + table_divs = soup.find_all("div", {"class": "tableBlock"}) + tables = [] + for div in table_divs: + t = div.extract() + tables.append(str(t)) + + subheader_spans = soup.find_all("span", {"class": "textSubheader"}) + for span in subheader_spans: + new_tag = soup.new_tag("i") + new_tag.string = span.text + span.replace_with(new_tag) + + dfn_tags = soup.find_all("dfn") + for dfn_tag in dfn_tags: + new_tag = soup.new_tag("i") + new_tag.string = dfn_tag.text + dfn_tag.replace_with(new_tag) + + ul_tags = soup.find_all("ul") + for ul in ul_tags: + for li in ul.find_all_next("li"): + li.replace_with(f"- {li.text}") + ul.replace_with(ul.text) + + a_tags = soup.find_all("a") + for a_tag in a_tags: + if keep_links: + a_tag["href"] = f"{settings.api.base_web_url}{a_tag['href']}" + else: + a_tag.replace_with(a_tag.text) + + clean_html = str(soup) + + return clean_html, tables + + +def _make_header(spell: BotSpellInfo): + helpers = [] + if spell.helpers: + for h in spell.helpers: + if h.alias: + helper = f'{h.name}' + else: + helper = h.name + helpers.append(helper) + + if helpers: + helpers_line = "\nПеревод: " + ", ".join(helpers) + "\n" + else: + helpers_line = "" + + return f"{spell.name.upper()} ({spell.engName.upper()})\n{spell.school.name}\n{helpers_line}" + + +def _make_optional_values(spell: BotSpellInfo): + optional_rows = [] + + if spell.spellResistance: + if spell.spellResistance == 0: + spell_resistance = "Нет" + elif spell.spellResistance == 1: + spell_resistance = "Да" + else: + spell_resistance = spell.spellResistance + else: + spell_resistance = None + + value_map = { + f"Источник": spell.book.name, + f"Круг": ", ".join(c.name + " " + str(c.level) for c in spell.classes), + f"Время сотворения": spell.castingTime, + f"Компоненты": spell.components, + f"Дистанция": spell.area, + f"Эффект": spell.effect, + f"Цель": spell.target, + f"Длительность": spell.duration, + f"Испытание": spell.savingThrow, + f"Устойчивость к магии": spell_resistance, + } + for k, v in value_map.items(): + if v is not None: + optional_rows.append(f"{k}: {v}") + return "\n".join(optional_rows) + + +def _make_extended_description(spell: BotSpellInfo): + return _remove_html_tags(spell.description, keep_links=True) + + +def _make_short_description(spell: BotSpellInfo, keep_links: bool): + clean_html, _ = _remove_html_tags(spell.shortDescription, keep_links=keep_links) + return clean_html + + +def class_table_feature(class_: BotClassInfo): + soup = BeautifulSoup(class_.tableFeatures, "html.parser") + + _remove_fraction_values_from_table(soup) + + clean_html = str(soup) + return clean_html + + +def class_table_spell_count(class_: BotClassInfo): + soup = BeautifulSoup(class_.tableSpellCount, "html.parser") + + clean_html = str(soup) + return clean_html + + +def short_description_message(spell: BotSpellInfo, keep_links: bool): + text = _make_short_description(spell, keep_links=keep_links) + + return text + + +def short_description_message_with_optional_values(spell: BotSpellInfo): + text = f"{_make_header(spell)}\n{_make_short_description(spell, keep_links=True)}\n\n{_make_optional_values(spell)}" + + return text + + +def extended_description_message(spell: BotSpellInfo): + extended_description, tables = _make_extended_description(spell) + text = "\n\n".join([_make_header(spell), _make_optional_values(spell), extended_description]) + + if not _validate_message_len(text): + text = short_description_message_with_optional_values(spell) + text = ( + f"{text}\n\n" + "Полное описание заклинания слишком длинное, чтобы уместить его в сообщение - перейдите по ссылке на сайт." + ) + + return text, tables + + +def empty_inline_results_message(query: str): + return ( + f'По запросу "{query[:100]}" ничего не найдено.\n\n' + "Проверьте фильтр по книгам в меню /settings " + "и удостоверьтесь, что вы вводите название заклинания либо на русском, либо на английском языке." + ) + + +def toast_drawing_tables(): + return "Рисую таблицы, это может занять несколько секунд" diff --git a/spells_bot/bot/messages/views.py b/spells_bot/bot/messages/views.py new file mode 100644 index 0000000..0febba4 --- /dev/null +++ b/spells_bot/bot/messages/views.py @@ -0,0 +1,153 @@ +from typing import Sequence +from uuid import uuid4 + +from aiogram.types import ( + InlineQueryResultArticle, + InputTextMessageContent, +) + +from spells_bot.config import settings +from spells_bot.bot.messages import texts +from spells_bot.bot.messages import keyboards +from spells_bot.pathfinder_api.schemas import BotSpellInfo, BotClassInfo, BotBook, SchoolForList + + +def any_text_message(message_text: str): + text = "Поиск работает в инлайн-режиме. Для справки перейдите в /help." + keyboard = keyboards.try_inline_search(f'Искать "{message_text}"', message_text) + return text, keyboard + + +def start_main(): + text = texts.start_main() + keyboard = keyboards.start_main() + return text, keyboard + + +def help_main(bot_name: str): + text = texts.help_main(bot_name) + keyboard = keyboards.help_main() + return text, keyboard + + +def menu_main(classes: Sequence[BotClassInfo]): + text = texts.menu_main() + keyboard = keyboards.menu_main(classes) + return text, keyboard + + +def menu_class(class_: BotClassInfo, show_tables_button: bool = True): + text = texts.menu_class(class_) + keyboard = keyboards.menu_class(class_, show_tables_button) + return text, keyboard + + +def menu_class_spell_level( + class_: BotClassInfo, + spells: Sequence[BotSpellInfo], + active_spell_level: int, + page: int = 0, + show_tables_button: bool = True, +): + text_pages = texts.menu_class_spell_level(class_, spells, active_spell_level) + + keyboard = keyboards.menu_class_spell_level( + class_, active_spell_level, show_tables_button, page, len(text_pages) - 1 + ) + return text_pages[page], keyboard + + +def extended_description_message(spell: BotSpellInfo): + text, tables = texts.extended_description_message(spell) + keyboard = keyboards.extended_description_message(spell, True if tables else False) + return text, tables, keyboard + + +def spellbook_empty(): + text = texts.spellbook_empty() + keyboard = keyboards.spellbook_empty() + return text, keyboard + + +def spellbook_main(index: int, index_max: int, spell: BotSpellInfo, extended: bool = False): + text, tables = texts.spellbook_main(spell, extended) + keyboard = keyboards.spellbook_main(spell, index, index_max, extended, tables=True if tables else False) + return text, keyboard + + +def spellbook_main_after_delete(index: int, index_max: int, extended: bool = False): + text = texts.spellbook_main_after_delete_spell() + keyboard = keyboards.spellbook_main_after_delete_spell(index, index_max, extended) + return text, keyboard + + +def settings_main(all_rulebooks: Sequence[BotBook], user_rulebooks: Sequence[int], bot_name: str): + text = texts.settings_main(bot_name) + keyboard = keyboards.chat_settings(all_rulebooks, user_rulebooks) + return text, keyboard + + +def _spell_url(spell: BotSpellInfo): + prefix = settings.source.spell_info_url_prefix.rstrip("/") + return f"{prefix}/{spell.alias}" + + +def _school_icon_url(school: SchoolForList): + prefix = settings.storage.image_storage_url_root.rstrip("/") + return f"{prefix}/schoolicons/{school.alias}.jpg" + + +def _settings_icon_url(): + return f"{settings.storage.settings_icon_url}" + + +def _warning_icon_url(): + return f"{settings.storage.warning_icon_url}" + + +def inline_results( + query: str, + spells: Sequence[BotSpellInfo], + all_rulebooks: Sequence[BotBook], + user_rulebooks: Sequence[int], + chat_id: int, + max_results: int = 20, +) -> [InlineQueryResultArticle]: + articles = [] + + if query and not spells: + a = InlineQueryResultArticle( + id=str(uuid4()), + title=f"Ничего не найдено", + description=f'по запросу "{query}"', + input_message_content=InputTextMessageContent( + message_text=texts.empty_inline_results_message(query), disable_web_page_preview=True + ), + reply_markup=keyboards.try_inline_search(), + thumbnail_url=_warning_icon_url(), + ) + articles.append(a) + + for s in spells[:max_results]: + message_text, tables = texts.extended_description_message(s) + a = InlineQueryResultArticle( + id=str(uuid4()), + title=s.name, + description=texts.short_description_message(s, keep_links=False), + input_message_content=InputTextMessageContent(message_text=message_text, disable_web_page_preview=True), + reply_markup=keyboards.extended_description_message(s, tables=True if tables else False), + thumbnail_url=_school_icon_url(s.school), + ) + articles.append(a) + + settings_article = InlineQueryResultArticle( + id=str(uuid4()), + title="Настройки поиска", + description=f"Фильтр по книгам ({len(user_rulebooks)}/{len(all_rulebooks)})", + input_message_content=InputTextMessageContent(message_text=f"Настройте фильтр для чата {chat_id}"), + reply_markup=keyboards.chat_settings(all_rulebooks, user_rulebooks), + thumbnail_url=_settings_icon_url(), + ) + articles.append(settings_article) + + return articles diff --git a/spells_bot/config.py b/spells_bot/config.py index a28b6d4..7cc88d3 100644 --- a/spells_bot/config.py +++ b/spells_bot/config.py @@ -1,33 +1,35 @@ from pathlib import Path -from pydantic import BaseSettings, BaseModel +from pydantic import BaseModel +from pydantic_settings import BaseSettings class DatabaseSettings(BaseModel): sqlalchemy_url: str + drop: bool = False class StorageSettings(BaseModel): data_root_dir: Path image_storage_url_root: str settings_icon_url: str + warning_icon_url: str class TelegramSettings(BaseModel): bot_token: str -class DataSourceSettings(BaseModel): +class ApiSettings(BaseModel): + base_web_url: str + base_api_url: str spell_list_url: str class_list_url: str prestige_class_list_url: str spell_info_url_prefix: str -class HctiSettings(BaseModel): - url: str - user_id: str - api_key: str +class HtmlToImageSettings(BaseModel): css_file: Path @@ -39,11 +41,14 @@ class BotSettings(BaseSettings): # telegram telegram: TelegramSettings # spell data source - source: DataSourceSettings + api: ApiSettings # html to image - hcti: HctiSettings + hti: HtmlToImageSettings class Config: env_file = ".env" env_file_encoding = "utf-8" env_nested_delimiter = "__" + + +settings = BotSettings() diff --git a/spells_bot/database/__init__.py b/spells_bot/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spells_bot/database/models.py b/spells_bot/database/models.py new file mode 100644 index 0000000..7e7dc69 --- /dev/null +++ b/spells_bot/database/models.py @@ -0,0 +1,145 @@ +from typing import List, Sequence + +from sqlalchemy import ( + create_engine, + Column, + Integer, + ForeignKey, + update, + select, + delete, + UniqueConstraint, +) +from sqlalchemy.dialects.sqlite import JSON +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, relationship, Session + +from spells_bot.config import settings +from spells_bot.utils.logger import create_logger + +logger = create_logger("database") + +Base = declarative_base() + + +class User(Base): + __tablename__ = "user" + + id = Column(Integer, primary_key=True, index=True) + chat_id = Column(Integer, unique=True) + + chat_settings = relationship("ChatSettings", back_populates="user", uselist=False) + saved_spells = relationship("SavedSpell", back_populates="user", uselist=True) + + +class ChatSettings(Base): + __tablename__ = "chat_settings" + + id = Column(Integer, primary_key=True, index=True) + user_id = Column(Integer, ForeignKey("user.id"), unique=True) + user = relationship("User", back_populates="chat_settings") + book_filter = Column(JSON) + + +class SavedSpell(Base): + __tablename__ = "saved_spell" + + __table_args__ = (UniqueConstraint("user_id", "spell_id", name="uq_user_spell"),) + + id = Column(Integer, primary_key=True, index=True) + user_id = Column(Integer, ForeignKey("user.id")) + user = relationship("User", back_populates="saved_spells") + spell_id = Column(Integer) + + +def init_db(sqlalchemy_database_url: str, drop: bool = False) -> sessionmaker: + """Create sqlalchemy sessionmaker + + :param sqlalchemy_database_url: currently tested only against sqlite + :param drop: drop existing and recreate database if True + :return: + """ + engine = create_engine(sqlalchemy_database_url) + session_callable = sessionmaker(bind=engine) + + if drop: + Base.metadata.drop_all(bind=engine) + + Base.metadata.create_all(bind=engine) + + return session_callable + + +get_db = init_db(settings.db.sqlalchemy_url, drop=settings.db.drop) + + +def get_or_create_user(db: Session, chat_id: int): + user = db.scalar(select(User).filter_by(chat_id=chat_id)) + + if not user: + user = User(chat_id=chat_id) + db.add(user) + + book_filter = [1] + chat_settings = ChatSettings(user=user, book_filter=book_filter) + db.add(chat_settings) + + db.commit() + db.refresh(user) + + return user + + +def get_chat_settings(db: Session, chat_id: int): + return db.scalar(select(ChatSettings).filter_by(user=get_or_create_user(db, chat_id))) + + +def update_chat_settings(db: Session, chat_settings_id: int, book_filter: Sequence[int]): + db.execute(update(ChatSettings).filter_by(id=chat_settings_id).values(book_filter=book_filter)) + db.commit() + + +def chat_settings_add_rulebook(db: Session, chat_id: int, rulebook_id: int): + chat_settings = get_chat_settings(db, chat_id) + chat_settings.book_filter.append(rulebook_id) + + update_chat_settings(db, chat_settings.id, chat_settings.book_filter) + db.refresh(chat_settings) + + return chat_settings + + +def chat_settings_remove_rulebook(db: Session, chat_id: int, rulebook_id: int): + chat_settings = get_chat_settings(db, chat_id) + chat_settings.book_filter.remove(rulebook_id) + + update_chat_settings(db, chat_settings.id, chat_settings.book_filter) + db.refresh(chat_settings) + + return chat_settings + + +def create_saved_spell(db: Session, chat_id: int, spell_id: int): + saved_spell = SavedSpell(user=get_or_create_user(db, chat_id), spell_id=spell_id) + db.add(saved_spell) + db.commit() + + return saved_spell + + +def delete_saved_spell(db: Session, chat_id: int, spell_id: int): + db.execute(delete(SavedSpell).filter_by(user=get_or_create_user(db, chat_id), spell_id=spell_id)) + db.commit() + + +def get_saved_spells(db: Session, chat_id: int) -> List[SavedSpell]: + return list(db.scalars(select(SavedSpell).filter_by(user=get_or_create_user(db, chat_id)))) + + +def get_saved_spell_by_index(db: Session, chat_id: int, index: int) -> [SavedSpell, int]: + saved_spells = get_saved_spells(db, chat_id) + + saved_spell = saved_spells[index] + index_max = len(saved_spells) + + return saved_spell, index_max diff --git a/spells_bot/image_generator/__init__.py b/spells_bot/image_generator/__init__.py new file mode 100644 index 0000000..6204387 --- /dev/null +++ b/spells_bot/image_generator/__init__.py @@ -0,0 +1 @@ +from spells_bot.image_generator.html_to_image import HtmlToImage diff --git a/spells_bot/image_generator/html_to_image.py b/spells_bot/image_generator/html_to_image.py new file mode 100644 index 0000000..fe2d155 --- /dev/null +++ b/spells_bot/image_generator/html_to_image.py @@ -0,0 +1,62 @@ +from os import getcwd +from pathlib import Path +from typing import Union, List, Sequence, Tuple + +from html2image import Html2Image + + +class HtmlToImage: + def __init__(self, data_root_dir: Union[Path, str], css_file: Union[Path, str]): + self.data_root_dir = Path(data_root_dir) + self.css_file = Path(css_file) + with self.css_file.open("r") as css_f: + self.css_str = css_f.read() + + self.hti = Html2Image( + custom_flags=[ + "--no-sandbox", + "--headless", + "--disable-gpu", + "--disable-software-rasterizer", + "--disable-dev-shm-usage", + "--remote-allow-origins=*", + "--hide-scrollbars", + ], + temp_path=f"{getcwd()}/.tmp", + output_path=data_root_dir, + ) + + def _class_table_dir(self, class_alias: str): + class_dir = self.data_root_dir / "class_tables" / class_alias + class_dir.mkdir(parents=True, exist_ok=True) + return class_dir + + def _spell_table_dir(self, spell_alias: str): + class_dir = self.data_root_dir / "spell_tables" / spell_alias + class_dir.mkdir(parents=True, exist_ok=True) + return class_dir + + def _load_or_create_screenshots( + self, html_strs: Sequence[str], size: Tuple[int, int] = None, force_create: bool = False + ): + if not size: + size = (1920, 1080) + + result_paths = [] + + for idx, html_str in enumerate(html_strs): + filename = f"{idx}.jpg" + file_path = Path(self.hti.output_path) / filename + if force_create or not file_path.exists(): + self.hti.screenshot(html_str=html_str, css_str=self.css_str, save_as=filename, size=size) + result_paths.append(file_path) + + return result_paths + + def class_tables(self, html_strs: List[str], class_alias: str): + self.hti.output_path = self._class_table_dir(class_alias) + return self._load_or_create_screenshots(html_strs) + + def spell_tables(self, html_strs: List[str], spell_alias: str): + self.hti.output_path = self._spell_table_dir(spell_alias) + return self._load_or_create_screenshots(html_strs) diff --git a/spells_bot/pathfinder_api/__init__.py b/spells_bot/pathfinder_api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spells_bot/pathfinder_api/api.py b/spells_bot/pathfinder_api/api.py new file mode 100644 index 0000000..1b1a0c6 --- /dev/null +++ b/spells_bot/pathfinder_api/api.py @@ -0,0 +1,119 @@ +import ssl +from operator import attrgetter +from typing import Sequence + +import aiohttp + +from spells_bot.config import settings +from spells_bot.pathfinder_api import schemas + + +class ApiError(Exception): + def __init__(self, message: str, error: schemas.ErrorResponse): + super().__init__(message) + + self.error = error + + +CACHE_MAX_SIZE = 100 +TTL_SECONDS = 60 * 10 + + +class HttpClient: + session: aiohttp.ClientSession | None = None + + def start(self): + self.session = aiohttp.ClientSession() + + async def stop(self): + await self.session.close() + self.session = None + + +async def _make_request(session: aiohttp.ClientSession, endpoint: str, params: dict): + params_refined = {} + + for k, v in params.items(): + if v is None: + continue + if isinstance(v, bool): + v = str(v).lower() + if isinstance(v, list): + v = ",".join(str(item) for item in v) + + params_refined[k] = v + + url = f"{settings.api.base_api_url}/{endpoint}" + async with session.get(url, params=params_refined, ssl=ssl.SSLContext()) as response: + if response.status != 200: + error_response = await response.json() + raise ApiError(f"Pathfinder API error: {error_response}", error_response) + + data = await response.json() + + return data + + +async def get_class(session: aiohttp.ClientSession, class_id: int, extended: bool = False, magical_only: bool = None): + classes = await _make_request( + session, "classes", {"id": class_id, "extended": extended, "magicClass": magical_only} + ) + class_ = classes[0] + + return schemas.BotClassInfo(**class_) + + +async def get_classes(session: aiohttp.ClientSession, extended: bool = False, magical_only: bool = None): + classes = [] + + for class_ in await _make_request(session, "classes", {"extended": extended, "magicClass": magical_only}): + classes.append(schemas.BotClassInfo(**class_)) + + return classes + + +async def get_rulebooks(session: aiohttp.ClientSession, with_spells: bool = False): + rulebooks = [] + + for class_ in await _make_request(session, "rulebooks", {"withSpells": with_spells}): + rulebooks.append(schemas.BotBook(**class_)) + + return sorted(rulebooks, key=attrgetter("id")) + + +async def get_spells( + session: aiohttp.ClientSession, + ru_name: str = None, + en_name: str = None, + class_id: int = None, + alias: str = None, + level: int = None, + rulebook_ids: Sequence[int] = None, + extended: bool = False, +): + spells = [] + + for spell in await _make_request( + session, + "spells", + { + "name": ru_name, + "engName": en_name, + "classId": class_id, + "alias": alias, + "level": level, + "rulebookIds": rulebook_ids, + "extended": extended, + }, + ): + spells.append(schemas.BotSpellInfo(**spell)) + + spells.sort(key=attrgetter("name")) + return spells + + +# @ttl_cache(maxsize=CACHE_MAX_SIZE, ttl=TTL_SECONDS) +async def get_spell(session: aiohttp.ClientSession, spell_id: int, extended: bool = False): + spells = await _make_request(session, "spells", {"id": spell_id, "extended": extended}) + spell = spells[0] + return schemas.BotSpellInfo(**spell) diff --git a/spells_bot/pathfinder_api/schemas.py b/spells_bot/pathfinder_api/schemas.py new file mode 100644 index 0000000..82e787a --- /dev/null +++ b/spells_bot/pathfinder_api/schemas.py @@ -0,0 +1,92 @@ +from typing import List + +from pydantic import BaseModel, field_validator + + +class ErrorResponse(BaseModel): + message: str + title: str + + +class BotClassInfo(BaseModel): + alias: str + description: str + id: int + name: str + spellLevels: List[int] | None = None + tableFeatures: str | None = None + tableSpellCount: str | None = None + + @field_validator("*") + def convert_empty_string_to_none(cls, v): + if v == "": + return None + return v + + +class BotBook(BaseModel): + id: int + name: str + + +class BookForBotSpellList(BaseModel): + abbreviation: str + alias: str + id: int + name: str + order: int + + +class ClassForBotSpellList(BaseModel): + alias: str + id: int + level: int + name: str + + +class Helper(BaseModel): + alias: str | None = None + isMain: bool + name: str + + +class NameAlias(BaseModel): + alias: str + name: str + + +class SchoolForList(BaseModel): + alias: str + name: str + type: NameAlias + + +class BotSpellInfo(BaseModel): + id: int + alias: str + name: str + engName: str + shortDescription: str + classes: List["ClassForBotSpellList"] + shortDescriptionComponents: str | None = None + area: str | None = None + book: BookForBotSpellList | None = None + castingTime: str | None = None + components: str | None = None + description: str | None = None + duration: str | None = None + effect: str | None = None + helpers: List[Helper] | None = None + races: List[NameAlias] | None = None + range: str | None = None + savingThrow: str | None = None + spellResistance: int | None = None + school: SchoolForList | None = None + subSchool: str | None = None + target: str | None = None + + @field_validator("*") + def convert_empty_string_to_none(cls, v): + if v == "": + return None + return v diff --git a/spells_bot/responder.py b/spells_bot/responder.py deleted file mode 100644 index 8ef84fd..0000000 --- a/spells_bot/responder.py +++ /dev/null @@ -1,423 +0,0 @@ -import json -from typing import Dict, Union -from uuid import uuid4 - -from telegram import ( - InlineKeyboardMarkup, - InlineKeyboardButton, - ParseMode, - InlineQueryResultArticle, - InputTextMessageContent, - InputMediaPhoto, -) -from telegram.constants import MAX_MESSAGE_LENGTH, MAX_INLINE_QUERY_RESULTS - -from spells_bot.config import BotSettings -from spells_bot.search import SpellSearch - - -school_translation = { - "преграждения": "abjuration", - "воплощения": "conjuration", - "прорицания": "divination", - "очарования": "enchantment", - "разрушения": "evocation", - "иллюзии": "illusion", - "некромантии": "necromancy", - "превращения": "transmutation", - "универсализма": "universalist", -} - - -def _school_ru2en(full_ru_name: str) -> str: - ru_name = full_ru_name.split(" ")[1].lower() - return school_translation[ru_name] - - -def _book_alias_to_readable_name(book_alias: str) -> str: - words = [] - current_word = "" - - for char in book_alias: - if char.isupper(): - words.append(current_word) - current_word = "" - current_word += char - - if current_word: - words.append(current_word) - - return " ".join(w.capitalize() for w in words) - - -class Responder: - """ - - Public methods should return a dict of kwargs for a specific ptb response method, e.g.: - ``` - def hello(self, name: str): - text = f"*Hello, {name}!*" - return dict(text=text, parse_mode=ParseMode.MARKDOWN) - ``` - - The returned value should be unpacked and sent to the ptb method, e.g.: - ``` - def start(update: Update, context: CallbackContext): - update.message.reply_text(**responder.hello()) - ``` - """ - - def __init__( - self, - settings: BotSettings, - bot_url_root: str, - ) -> None: - self._settings = settings - - self.search = SpellSearch( - settings.db, settings.storage, settings.source, settings.hcti - ) - self.bot_url_root = bot_url_root.rstrip("/") - - def _spell_url(self, spell_id: str): - prefix = self._settings.source.spell_info_url_prefix.rstrip("/") - return f"{prefix}/{spell_id}" - - def _school_icon_url(self, school: str): - prefix = self._settings.storage.image_storage_url_root.rstrip("/") - return f"{prefix}/schoolicons/{school}.jpg" - - def _settings_icon_url(self): - return f"{self._settings.storage.settings_icon_url}" - - def _bot_url(self, payload: str = None): - url = f"{self.bot_url_root}" - if payload: - f"{url}?start={payload}" - - return url - - @staticmethod - def decode_callback(data: str, n_args: int = -1): - cmd, *payload = data.split(":", maxsplit=n_args) - return cmd, *payload - - @staticmethod - def encode_callback(cmd: Union[int, str], *payload: Union[int, str]): - return ":".join(str(i) for i in [cmd, *payload]) - - def _book_filter_markup(self, book_filter: Dict[str, bool]): - - book_filter_buttons = [] - for book, value in book_filter.items(): - icon = "☑" if value else "☐" - book_readable_name = _book_alias_to_readable_name(book) - btn = InlineKeyboardButton( - f"{book_readable_name} {icon}", - callback_data=self.encode_callback("SETTINGS", book), - ) - book_filter_buttons.append(btn) - - return InlineKeyboardMarkup.from_column(book_filter_buttons) - - def greet(self): - text = ( - "Начните вводить:\n\n@SpellsBot название заклинания\n\n" - "в любом чате и выберите нужный результат. Этот результат отправится в текущий чат.\n\n" - "Для поиска по классам и кругам используйте /menu" - ) - kb_markup = InlineKeyboardMarkup.from_button( - InlineKeyboardButton( - "Попробуйте поиск в этом чате", switch_inline_query_current_chat="" - ) - ) - - return dict(text=text, reply_markup=kb_markup, parse_mode=ParseMode.HTML) - - def help(self): - text = ( - "Начните вводить *@SpellsBot название заклинания* в любом чате " - "и выберите нужный результат. Этот результат отправится в текущий чат." - ) - - return dict(text=text, parse_mode=ParseMode.MARKDOWN) - - def inline_search(self, query: str, chat_id: int): - articles = [] - for spell in self.search.short_info( - query, chat_id, top_n=MAX_INLINE_QUERY_RESULTS - 1 - ): - spell_ext = self.search.extended_info(spell.alias) - - title = spell.name - if spell.short_description_components: - title += f" ({spell.short_description_components})" - - class_restrictions = ", ".join(f"{c.name} {c.level}" for c in spell.classes) - school = ", ".join(s.name for s in spell.schools) - description = f"{class_restrictions}\n{spell.short_description}" - - if spell_ext: - text_parts = [ - f"{spell_ext.full_name.upper()}", - f"{school}\n", - "\n".join(f"{k}: {v}" for k, v in spell_ext.variables.items()), - f"\n{spell_ext.text}", - ] - text = "\n".join(text_parts) - else: - text = ( - f"{title.upper()}\n\n" - "Невозможно загрузить данные с сайта, смотрите описание заклинания по ссылке" - ) - - if len(text) >= MAX_MESSAGE_LENGTH: - text_ending = " ... продолжение по ссылке" - cutoff = MAX_MESSAGE_LENGTH - len(text_ending) - text = text[:cutoff] + text_ending - - buttons = [] - buttons.append( - InlineKeyboardButton( - "🌐 На сайте", - url=self._spell_url(spell.alias), - ) - ) - try: - if spell_ext: - n_tables = len(spell_ext.tables) - if n_tables > 0: - buttons.append( - InlineKeyboardButton( - f"📜 Показать таблицы ({n_tables})", - callback_data=self.encode_callback("TABLE", spell.alias), - ) - ) - except json.decoder.JSONDecodeError: - pass - - if spell_ext: - en_school_name = _school_ru2en(spell_ext.school) - thumb_url = self._school_icon_url(en_school_name) - else: - thumb_url = None - - a = InlineQueryResultArticle( - id=str(uuid4()), - title=title, - description=description, - input_message_content=InputTextMessageContent( - text, parse_mode=ParseMode.HTML - ), - reply_markup=InlineKeyboardMarkup.from_column(buttons), - thumb_url=thumb_url, - ) - articles.append(a) - - chat_settings = self.search.get_chat_settings(chat_id) - current_book_filter = [ - _book_alias_to_readable_name(k) - for k, v in chat_settings.book_filter.items() - if v - ] - current_book_filter = ", ".join(current_book_filter) - settings_article_description = f"Фильтр по книгам: {current_book_filter}" - - settings_article = InlineQueryResultArticle( - id=str(uuid4()), - title="Настройки поиска", - description=settings_article_description, - input_message_content=InputTextMessageContent( - f"Настройте фильтр для чата {chat_id}" - ), - reply_markup=self._book_filter_markup(chat_settings.book_filter), - thumb_url=self._settings_icon_url(), - ) - articles.append(settings_article) - return dict(results=articles) - - # def send_search_settings(self, chat_id: int): - # return dict( - # chat_id=chat_id, - # text=f"Настройте фильтр для чата {chat_id}", - # reply_markup=self._book_filter_markup(chat_id), - # ) - - def update_search_settings(self, chat_id: int, book: str): - chat_settings = self.search.update_chat_settings(chat_id, book) - return dict(reply_markup=self._book_filter_markup(chat_settings.book_filter)) - - def send_spell_tables(self, chat_id: str, spell_alias: str): - media_group = [] - spell_short, spell_ext = self.search.full_info(spell_alias) - for t in spell_ext.tables: - class_restrictions = ", ".join( - f"{c.name} {c.level}" for c in spell_short.classes - ) - caption = ( - f"{spell_ext.full_name.upper()} " - f"таблица {int(t.path.stem) + 1}\n" - f"{class_restrictions}" - ) - m = InputMediaPhoto( - t.path.open("rb"), - caption=caption, - parse_mode=ParseMode.HTML, - ) - media_group.append(m) - - return dict(chat_id=chat_id, media=media_group) - - def redirect_button(self, spell_id: str): - buttons = [] - buttons.append( - InlineKeyboardButton( - "🌐 На сайте", - url=self._spell_url(spell_id), - ) - ) - buttons.append( - InlineKeyboardButton( - "Перейти в чат с ботом", - url=self._bot_url(spell_id), - ) - ) - - return dict(reply_markup=InlineKeyboardMarkup.from_column(buttons)) - - def menu(self, chat_id: int): - text = "📖 МЕНЮ 🔮\n\nВыберите класс" - buttons = [ - InlineKeyboardButton( - c.name, callback_data=self.encode_callback("CLASS", c.id) - ) - for c in self.search.iter_classes(chat_id) - ] - button_rows = [buttons[i : i + 3] for i in range(0, len(buttons) + 1, 3)] - - return dict( - text=text, - reply_markup=InlineKeyboardMarkup(button_rows), - parse_mode=ParseMode.HTML, - ) - - def menu_class(self, class_id: str, tables_button: bool = True): - c = self.search.class_info(int(class_id)) - - text = f"{c.name}\n\n{c.short_description}" - buttons = [] - for lvl in self.search.iter_levels(int(class_id)): - b = InlineKeyboardButton( - lvl, callback_data=self.encode_callback("LEVEL", c.id, lvl, 0) - ) - buttons.append(b) - - class_info_button = InlineKeyboardButton( - " О классе", callback_data=self.encode_callback("CLASSINFO", class_id) - ) - home_button = InlineKeyboardButton( - "🔮 Назад в меню", callback_data=self.encode_callback("HOME") - ) - - button_rows = [buttons[i : i + 5] for i in range(0, len(buttons) + 1, 5)] - has_tables = len(list(self.search.iter_class_info_tables(int(class_id)))) > 0 - if tables_button and has_tables: - button_rows.insert(0, [class_info_button]) - - button_rows.append([home_button]) - - return dict( - text=text, - reply_markup=InlineKeyboardMarkup(button_rows), - parse_mode=ParseMode.HTML, - ) - - def class_info_delimiter(self, class_id: str): - c = self.search.class_info(int(class_id)) - - return dict( - text=f"📜 {c.name.upper()} 📜", - parse_mode=ParseMode.HTML, - ) - - def send_class_info_tables(self, class_id: str): - media_group = [] - for p, class_info in self.search.iter_class_info_tables(int(class_id)): - caption = ( - f"{class_info.name.upper()} таблица {int(p.stem) + 1}" - ) - m = InputMediaPhoto( - p.open("rb"), - caption=caption, - parse_mode=ParseMode.HTML, - ) - media_group.append(m) - - return dict(media=media_group) - - def menu_level(self, class_id: str, level: str, chat_id: str, page: str = 0): - class_id, level, page, chat_id = ( - int(class_id), - int(level), - int(page), - int(chat_id), - ) - - class_info = self.search.class_info(class_id) - - text_parts = [f"{class_info.name} {level} круг"] - n_pages, spells = self.search.paginate_short_info_by_level( - class_id, level, chat_id, page - ) - for s in spells: - text_parts.append(f"{s.name}: {s.short_description}") - text = "\n".join(text_parts) - - buttons = [] - - pagination_buttons = [] - if n_pages >= 1: - if page >= 1: - b = InlineKeyboardButton( - f"<< Страница {page}", - callback_data=self.encode_callback( - "LEVEL", class_id, level, page - 1 - ), - ) - pagination_buttons.append(b) - if page < n_pages - 1: - b = InlineKeyboardButton( - f"Страница {page + 2} >>", - callback_data=self.encode_callback( - "LEVEL", class_id, level, page + 1 - ), - ) - pagination_buttons.append(b) - - for lvl in self.search.iter_levels(class_id): - b_text = lvl - b_callback_data = self.encode_callback("LEVEL", class_id, lvl, 0) - if lvl == level: - b_text = "🔘" - b_callback_data = "PASS" - b = InlineKeyboardButton(b_text, callback_data=b_callback_data) - buttons.append(b) - - button_rows = [buttons[i : i + 5] for i in range(0, len(buttons) + 1, 5)] - - if pagination_buttons: - button_rows.insert(0, pagination_buttons) - - button_rows.append( - [ - InlineKeyboardButton( - "🔮 Назад в меню", callback_data=self.encode_callback("HOME") - ) - ] - ) - - return dict( - text=text, - parse_mode=ParseMode.HTML, - reply_markup=InlineKeyboardMarkup(button_rows), - ) diff --git a/spells_bot/run.py b/spells_bot/run.py new file mode 100644 index 0000000..ad0b2b3 --- /dev/null +++ b/spells_bot/run.py @@ -0,0 +1,408 @@ +import asyncio +import logging + +from aiogram import Bot, Dispatcher, Router, types, F +from aiogram.filters import Command +from aiogram.methods import EditMessageReplyMarkup, SendMediaGroup +from aiogram.types import ( + Message, + CallbackQuery, + InputMediaPhoto, + FSInputFile, +) +from sqlalchemy.exc import IntegrityError + +from spells_bot.utils import text_parser +from spells_bot.bot.callback_schema import ( + BaseClassesCallback, + SpellbookReadCallback, + SpellbookCreateCallback, + SpellbookPromptDeleteCallback, + SpellbookConfirmDeleteCallback, + EmptyCallback, + ChatSettingsAddFilterCallback, + ChatSettingsRemoveFilterCallback, + MenuClassCallback, + ClassesTableCallback, + ClassesSpellsCallback, + SpellTablesCallback, +) +from spells_bot.config import settings +from spells_bot.bot.messages import views, texts +from spells_bot.bot.messages import keyboards +from spells_bot.database.models import ( + get_db, + get_or_create_user, + get_saved_spells, + create_saved_spell, + delete_saved_spell, + get_saved_spell_by_index, + get_chat_settings, + chat_settings_add_rulebook, + chat_settings_remove_rulebook, +) +from spells_bot.image_generator import HtmlToImage +from spells_bot.pathfinder_api import api + +router = Router() +logging.basicConfig( + format="%(asctime)s %(levelname)s:%(name)s:%(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +api_client = api.HttpClient() + + +@router.startup() +async def on_startup(): + """""" + logging.info("API session started") + api_client.start() + + +@router.shutdown() +async def on_shutdown(): + """""" + logging.info("API session stopped") + await api_client.stop() + + +@router.message(Command(commands=["start"])) +async def command_start_handler(message: Message) -> None: + """ + This handler receive messages with `/start` command + """ + # Most event objects have aliases for API methods that can be called in events' context + # For example if you want to answer to incoming message you can use `message.answer(...)` alias + # and the target chat will be passed to :ref:`aiogram.methods.send_message.SendMessage` + # method automatically or call API method directly via + # Bot instance: `bot.send_message(chat_id=message.chat.id, ...)` + # await message.answer(f"Hello, {message.from_user.full_name}!") + # Create a keyboard with a single button + with get_db() as db: + user = get_or_create_user(db, message.from_user.id) + text, keyboard = views.start_main() + + await message.answer(text=text, reply_markup=keyboard) + + +@router.message(Command(commands=["help"])) +async def command_menu_handler(message: Message, bot: Bot) -> None: + """ + This handler receive messages with `/help` command + """ + bot_user = await bot.me() + text, markup = views.help_main(bot_user.username) + await message.answer(text=text, reply_markup=markup) + + +@router.message(Command(commands=["menu"])) +async def command_menu_handler(message: Message) -> None: + """ + This handler receive messages with `/menu` command + """ + classes = await api.get_classes(api_client.session, extended=True, magical_only=True) + text, markup = views.menu_main(classes) + await message.answer(text=text, reply_markup=markup) + + +@router.message(Command(commands=["spellbook"])) +async def command_spellbook_handler(message: Message) -> None: + """ + This handler receive messages with `/spellbook` command + """ + with get_db() as db: + saved_spells = get_saved_spells(db, message.from_user.id) + + try: + spell_id = saved_spells[0].spell_id + except IndexError: + text, keyboard = views.spellbook_empty() + await message.answer(text=text, reply_markup=keyboard) + return + + spell_data = await api.get_spell(api_client.session, spell_id, extended=True) + + text, keyboard = views.spellbook_main( + index=0, + index_max=len(saved_spells), + spell=spell_data, + ) + + await message.answer(text=text, reply_markup=keyboard, disable_web_page_preview=True) + + +@router.message(Command(commands=["settings"])) +async def command_settings_handler(message: Message, bot: Bot) -> None: + with get_db() as db: + chat_settings = get_chat_settings(db, message.from_user.id) + user_rulebooks = chat_settings.book_filter + + all_rulebooks = await api.get_rulebooks(api_client.session, with_spells=True) + + bot_user = await bot.me() + text, keyboard = views.settings_main(all_rulebooks, user_rulebooks, bot_user.username) + + await message.answer(text=text, reply_markup=keyboard) + + +@router.message(~(F.via_bot | F.from_user.is_bot)) +async def any_text_message_handler(message: Message) -> None: + text, keyboard = views.any_text_message(message.text) + await message.answer(text=text, reply_markup=keyboard) + + +@router.inline_query() +async def inline_search(inline_query: types.InlineQuery): + with get_db() as db: + chat_settings = get_chat_settings(db, inline_query.from_user.id) + user_rulebooks = chat_settings.book_filter + + try: + ru_name, en_name = text_parser.clean_spell_search_query(inline_query.query) + except ValueError as e: + logging.info(f"Ignored query {inline_query.query} ({e})") + spells = [] + else: + spells = await api.get_spells( + api_client.session, ru_name=ru_name, en_name=en_name, extended=True, rulebook_ids=user_rulebooks + ) + logging.info( + f'User @{inline_query.from_user.username} [{inline_query.from_user.id}] searched "{inline_query.query}"' + ) + + all_rulebooks = await api.get_rulebooks(api_client.session, with_spells=True) + + await inline_query.answer( + results=views.inline_results( + inline_query.query, spells, all_rulebooks, user_rulebooks, inline_query.from_user.id + ) + ) + + +@router.callback_query(SpellTablesCallback.filter()) +async def spell_tables_callback(query: CallbackQuery, callback_data: SpellTablesCallback, bot: Bot): + await query.answer(texts.toast_drawing_tables()) + + spell = await api.get_spell(api_client.session, callback_data.spell_id, extended=True) + text, tables = texts.extended_description_message(spell) + + hti = HtmlToImage(settings.storage.data_root_dir, settings.hti.css_file) + spell_table_images = hti.spell_tables(tables, spell_alias=spell.alias) + + media_group = [] + for image_idx, image_path in enumerate(spell_table_images): + photo = InputMediaPhoto( + media=FSInputFile(image_path), caption=f"{spell.name.upper()} таблица {image_idx + 1}" + ) + media_group.append(photo) + + await bot(SendMediaGroup(chat_id=query.from_user.id, media=media_group)) + + +@router.callback_query(ChatSettingsAddFilterCallback.filter()) +async def chat_settings_add_rulebook_callback( + query: CallbackQuery, callback_data: ChatSettingsAddFilterCallback, bot: Bot +): + with get_db() as db: + chat_settings = chat_settings_add_rulebook(db, query.from_user.id, callback_data.rulebook_id) + user_rulebooks = chat_settings.book_filter + + all_rulebooks = await api.get_rulebooks(api_client.session, with_spells=True) + keyboard = keyboards.chat_settings(all_rulebooks, user_rulebooks) + + await query.answer() + if query.message: + await query.message.edit_reply_markup(reply_markup=keyboard) + else: + await bot(EditMessageReplyMarkup(inline_message_id=query.inline_message_id, reply_markup=keyboard)) + + +@router.callback_query(ChatSettingsRemoveFilterCallback.filter()) +async def chat_settings_remove_rulebook_callback( + query: CallbackQuery, callback_data: ChatSettingsRemoveFilterCallback, bot: Bot +): + with get_db() as db: + chat_settings = chat_settings_remove_rulebook(db, query.from_user.id, callback_data.rulebook_id) + user_rulebooks = chat_settings.book_filter + + all_rulebooks = await api.get_rulebooks(api_client.session, with_spells=True) + keyboard = keyboards.chat_settings(all_rulebooks, user_rulebooks) + + await query.answer() + if query.message: + await query.message.edit_reply_markup(reply_markup=keyboard) + else: + await bot(EditMessageReplyMarkup(inline_message_id=query.inline_message_id, reply_markup=keyboard)) + + +@router.callback_query(ClassesSpellsCallback.filter()) +async def classes_with_id_spell_level_callback(query: CallbackQuery, callback_data: ClassesSpellsCallback): + await query.answer() + + with get_db() as db: + chat_settings = get_chat_settings(db, query.from_user.id) + user_rulebooks = chat_settings.book_filter + + class_ = await api.get_class(api_client.session, callback_data.id, extended=True, magical_only=True) + spells = await api.get_spells( + api_client.session, class_id=class_.id, level=callback_data.spell_level, rulebook_ids=user_rulebooks + ) + + text, markup = views.menu_class_spell_level( + class_, spells, active_spell_level=callback_data.spell_level, page=callback_data.page + ) + await query.message.edit_text(text=text, reply_markup=markup, disable_web_page_preview=True) + + +@router.callback_query(ClassesTableCallback.filter()) +async def classes_with_id_tables_callback(query: CallbackQuery, callback_data: ClassesTableCallback): + await query.answer(texts.toast_drawing_tables()) + + class_ = await api.get_class(api_client.session, callback_data.id, extended=True, magical_only=True) + text, markup = views.menu_class(class_, show_tables_button=False) + + hti = HtmlToImage(settings.storage.data_root_dir, settings.hti.css_file) + + class_tables = [] + if class_.tableFeatures: + class_tables.append(texts.class_table_feature(class_)) + + if class_.tableSpellCount: + class_tables.append(texts.class_table_spell_count(class_)) + + class_table_images = hti.class_tables(class_tables, class_.alias) + + await query.message.delete() + await query.message.answer(f"📜 {class_.name.upper()} 📜") + + media_group = [] + for image_idx, image_path in enumerate(class_table_images): + photo = InputMediaPhoto( + media=FSInputFile(image_path), caption=f"{class_.name.upper()} таблица {image_idx + 1}" + ) + media_group.append(photo) + + await query.message.answer_media_group(media=media_group) + await query.message.answer(text=text, reply_markup=markup, disable_web_page_preview=True) + + +@router.callback_query(BaseClassesCallback.filter()) +async def classes_with_id_callback(query: CallbackQuery, callback_data: BaseClassesCallback): + await query.answer() + + class_ = await api.get_class(api_client.session, callback_data.id, extended=True, magical_only=True) + text, markup = views.menu_class(class_) + + await query.message.edit_text(text=text, reply_markup=markup, disable_web_page_preview=True) + + +@router.callback_query(MenuClassCallback.filter()) +async def classes_callback(query: CallbackQuery, callback_data: MenuClassCallback): + await query.answer() + + classes = await api.get_classes(api_client.session, extended=False, magical_only=True) + text, markup = views.menu_main(classes) + await query.message.edit_text(text=text, reply_markup=markup, disable_web_page_preview=True) + + +@router.callback_query(SpellbookReadCallback.filter()) +async def spellbook_read_callback(query: CallbackQuery, callback_data: SpellbookReadCallback): + await query.answer() + + with get_db() as db: + try: + spell, index_max = get_saved_spell_by_index(db, query.from_user.id, callback_data.index) + except IndexError: + await query.message.edit_text( + text=f"No spell for index {callback_data.index}", disable_web_page_preview=True + ) + return + + spell_data = await api.get_spell(api_client.session, spell.spell_id, extended=True) + + text, keyboard = views.spellbook_main( + index=callback_data.index, + index_max=index_max, + spell=spell_data, + extended=callback_data.extended, + ) + + await query.message.edit_text(text=text, reply_markup=keyboard, disable_web_page_preview=True) + + +@router.callback_query(SpellbookCreateCallback.filter()) +async def spellbook_create_callback(query: CallbackQuery, callback_data: SpellbookReadCallback, bot: Bot): + with get_db() as db: + try: + create_saved_spell(db, query.from_user.id, callback_data.spell_id) + spell_data = await api.get_spell(api_client.session, callback_data.spell_id, extended=True) + + await query.answer("Добавлено в книгу заклинаний") + await bot( + EditMessageReplyMarkup( + inline_message_id=query.inline_message_id, + reply_markup=keyboards.extended_description_message_saved_spell(spell_data), + ) + ) + except IntegrityError: + await query.answer("Заклинание уже есть в книге") + + +@router.callback_query(SpellbookPromptDeleteCallback.filter()) +async def spellbook_prompt_delete_callback(query: CallbackQuery, callback_data: SpellbookPromptDeleteCallback): + await query.answer("Стереть заклинание из книги?") + with get_db() as db: + spell, index_max = get_saved_spell_by_index(db, query.from_user.id, callback_data.index) + spell_data = await api.get_spell(api_client.session, spell.spell_id, extended=callback_data.extended) + + keyboard = keyboards.spellbook_main_prompt_delete_spell( + spell_data, callback_data.index, index_max, callback_data.extended + ) + await query.message.edit_reply_markup(reply_markup=keyboard) + + +@router.callback_query(SpellbookConfirmDeleteCallback.filter()) +async def spellbook_delete_callback(query: CallbackQuery, callback_data: SpellbookConfirmDeleteCallback): + with get_db() as db: + delete_saved_spell(db, query.from_user.id, callback_data.spell_id) + updated_index = callback_data.index - 1 + try: + spell, index_max = get_saved_spell_by_index(db, query.from_user.id, updated_index) + + spell_data = await api.get_spell(api_client.session, spell.spell_id, extended=True) + + text, keyboard = views.spellbook_main( + index=updated_index, + index_max=index_max, + spell=spell_data, + extended=callback_data.extended, + ) + except IndexError: + text, keyboard = views.spellbook_empty() + + await query.answer("Вы стерли заклинание из книги") + await query.message.edit_text(text=text, reply_markup=keyboard, disable_web_page_preview=True) + + +@router.callback_query(EmptyCallback.filter()) +async def empty_callback(query: CallbackQuery, callback_data: EmptyCallback): + await query.answer() + + +async def main() -> None: + # Dispatcher is a root router + dp = Dispatcher() + # ... and all other routers should be attached to Dispatcher + dp.include_router(router) + + # Initialize Bot instance with a default parse mode which will be passed to all API calls + bot = Bot(settings.telegram.bot_token, parse_mode="HTML") + # And the run events dispatching + await dp.start_polling(bot, on_startup=on_startup) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + asyncio.run(main()) diff --git a/spells_bot/search/__init__.py b/spells_bot/search/__init__.py deleted file mode 100644 index b32a4dc..0000000 --- a/spells_bot/search/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from spells_bot.search.spell_search import SpellSearch diff --git a/spells_bot/search/sourcing/__init__.py b/spells_bot/search/sourcing/__init__.py deleted file mode 100644 index cd744fe..0000000 --- a/spells_bot/search/sourcing/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from spells_bot.search.sourcing.database import Database -from spells_bot.search.sourcing.html2image import HctiApi -from spells_bot.search.sourcing.spellsource import SourceUpdater diff --git a/spells_bot/search/sourcing/database.py b/spells_bot/search/sourcing/database.py deleted file mode 100644 index 44b925d..0000000 --- a/spells_bot/search/sourcing/database.py +++ /dev/null @@ -1,609 +0,0 @@ -from typing import List, Generator, Optional, Dict, Iterable, Union, Tuple - -from sqlalchemy import ( - create_engine, - Column, - Integer, - String, - Boolean, - ForeignKey, - update, - and_, - asc, - UnicodeText, -) -from sqlalchemy.dialects.sqlite import JSON -from sqlalchemy.exc import IntegrityError -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker, relationship, Session - -from spells_bot.config import DatabaseSettings -from spells_bot.search.sourcing.datatypes import ( - ShortSpellInfo, - ExtendedSpellInfo, - SpellTable, - ChatSettings, - ClassInfo, - SchoolInfo, - ClassInfoSpellRestriction, - BasicShortSpellInfo, -) -from spells_bot.utils.log import create_logger - -logger = create_logger("database") - -Base = declarative_base() - - -class ShortSpellInfoRecord(Base): - __tablename__ = "short_spell_info" - - id = Column(Integer, primary_key=True, index=True) - alias = Column(String, unique=True, index=True) - short_description_components = Column(String) - book_abbreviation = Column(String) - book_alias = Column(String) - short_description = Column(String) - schools = Column(JSON) - classes = Column(JSON) - name = Column(UnicodeText) - is_race_spell = Column(Boolean) - - extended_spell_info = relationship( - "ExtendedSpellInfoRecord", back_populates="short_spell_info", uselist=False - ) - - -class ExtendedSpellInfoRecord(Base): - __tablename__ = "extended_spell_info" - - id = Column(Integer, primary_key=True, index=True) - full_name = Column(String) - school = Column(String) - variables = Column(JSON) - text = Column(String) - short_spell_info_id = Column( - Integer, ForeignKey("short_spell_info.id"), unique=True - ) - - short_spell_info = relationship( - "ShortSpellInfoRecord", back_populates="extended_spell_info" - ) - tables = relationship("SpellTableRecord", back_populates="extended_spell_info") - - -class SpellTableRecord(Base): - __tablename__ = "spell_table" - - id = Column(Integer, primary_key=True, index=True) - html = Column(String) - url = Column(String, default="") - path = Column(String, default="") - extended_spell_info_id = Column(Integer, ForeignKey("extended_spell_info.id")) - - extended_spell_info = relationship( - "ExtendedSpellInfoRecord", back_populates="tables" - ) - - -class ClassRecord(Base): - __tablename__ = "class" - - id = Column(Integer, primary_key=True, index=True) - alias = Column(String, unique=True, index=True) - book_abbreviation = Column(String, default="") - book_alias = Column(String, default="") - short_description = Column(String, default="") - name = Column(UnicodeText, unique=True, index=True) - is_own_spell_list = Column(Boolean, nullable=True) - max_spell_lvl = Column(Integer, nullable=True) - parent_class_ids = Column(JSON) - - -class SchoolRecord(Base): - __tablename__ = "school" - - id = Column(Integer, primary_key=True, index=True) - name = Column(UnicodeText, index=True) - type_id = Column(Integer) - type_name = Column(UnicodeText) - - -class ChatSettingsRecord(Base): - __tablename__ = "chat_settings" - - id = Column(Integer, primary_key=True, index=True) - chat_id = Column(Integer, unique=True) - book_filter = Column(JSON) - - -def init_db(sqlalchemy_database_url: str, drop: bool = False) -> sessionmaker: - """Create sqlalchemy sessionmaker - - :param sqlalchemy_database_url: currently tested only against sqlite - :param drop: drop existing and recreate database if True - :return: - """ - engine = create_engine(sqlalchemy_database_url) - session_callable = sessionmaker(bind=engine) - - if drop: - Base.metadata.drop_all(bind=engine) - - Base.metadata.create_all(bind=engine) - - return session_callable - - -class Database: - """Implements CRUD operations. - - Private methods are basic reusable functions which must accept ``db: Session`` as their first argument - - Public methods must always instantiate sessions as ``with self._db() as db: ...`` - - """ - def __init__(self, settings: DatabaseSettings, drop_on_startup: bool = False): - self._settings = settings - self._db = init_db(settings.sqlalchemy_url, drop_on_startup) - - @staticmethod - def _create_or_update_registry_item( - db: Session, - db_item_type, - item: Union[ShortSpellInfo, ClassInfo, SchoolInfo], - where_db: str, - where_item: str, - ): - """Wrapper over try: add, except not unique: update existing - - :param db: sqlalchemy session - :param db_item_type: database class for this item - :param item: an instance of pydantic model - :param where_db: name of database item attribute for equality comparison - :param where_item: name of pydantic item attribute for equality comparison - :return: - """ - try: - db.add(db_item_type(**item.to_orm())) - db.commit() - except IntegrityError: - db.rollback() - stmt_update = ( - update(db_item_type) - .where(getattr(db_item_type, where_db) == getattr(item, where_item)) - .values(item.to_orm()) - ) - db.execute(stmt_update) - db.commit() - - @staticmethod - def _iter_classes( - db: Session, book_filter: list = None - ) -> Generator[ClassInfo, None, None]: - """Yield classes which appear in the ``book_filter`` list - - :param db: sqlalchemy session - :param book_filter: list of aliases in camelCase - :return: - """ - if book_filter: - classes = ( - db.query(ClassRecord) - .where(ClassRecord.book_alias.in_(book_filter)) - .all() - ) - else: - classes = db.query(ClassRecord).all() - - for c in classes: - yield ClassInfo.from_orm(c) - - @staticmethod - def _create_or_update_classes(db: Session, classes: List[ClassInfo]): - """Create or update classes in ``class`` table - - :param db: sqlalchemy session - :param classes: list of ClassInfo items - :return: - """ - for c in classes: - Database._create_or_update_registry_item(db, ClassRecord, c, "id", "id") - - @staticmethod - def _iter_schools(db: Session) -> Generator[SchoolInfo, None, None]: - """Yield all SchoolInfo records - - :param db: sqlalchemy session - :return: - """ - for s in db.query(SchoolRecord).all(): - yield SchoolInfo.from_orm(s) - - @staticmethod - def _create_or_update_schools(db: Session, schools: List[SchoolInfo]): - """Create or update schools in ``school`` table - - :param db: sqlalchemy session - :param schools: list of SchoolInfo items - :return: - """ - for s in schools: - Database._create_or_update_registry_item(db, SchoolRecord, s, "id", "id") - - @staticmethod - def _create_or_update_spells(db: Session, spells: List[ShortSpellInfo]): - """Create or update spells in ``short_spell_info`` table - - :param db: sqlalchemy session - :param spells: list of ShortSpellInfo items - :return: - """ - for s in spells: - Database._create_or_update_registry_item( - db, ShortSpellInfoRecord, s, "alias", "alias" - ) - - @staticmethod - def _convert_short_spell_info_rows(db: Session, rows: List[ShortSpellInfoRecord]): - """Serialize list of ShortSpellInfoRecord items to list of ShortSpellInfo items. - - Since class and school info are stored in a json column as ids, - we need to get corresponding class and school info from database - before we can instantiate a ``ShortSpellInfo`` model - - :param db: sqlalchemy session - :param rows: list of ShortSpellInfoRecord items - :return: - """ - id2class = {c.id: c for c in Database._iter_classes(db)} - id2school = {s.id: s for s in Database._iter_schools(db)} - - for result in rows: - schools = [id2school[s] for s in result.schools] - classes = [] - for c, lvl in result.classes.items(): - class_info = id2class[int(c)] - class_info_restriction = ClassInfoSpellRestriction( - **class_info.dict(), level=lvl - ) - classes.append(class_info_restriction) - - basic_spell_info = BasicShortSpellInfo.from_orm(result) - yield ShortSpellInfo( - **basic_spell_info.dict(), classes=classes, schools=schools - ) - - @staticmethod - def _iter_rulebooks(db: Session) -> Generator[str, None, None]: - """Yield rulebook aliases in camelCase - - :param db: sqlalchemy session - :return: - """ - for book_alias in db.query(ShortSpellInfoRecord.book_alias).distinct(): - yield book_alias[0] - - @staticmethod - def _default_book_filter(book_alias_list: Iterable[str]): - """Custom default book filter which includes only coreRulebook and advancedPlayerGuide - - :param book_alias_list: list of all available book aliases - :return: - """ - book_filter = {book: False for book in book_alias_list} - book_filter["coreRulebook"] = True - book_filter["advancedPlayerGuide"] = True - return book_filter - - @staticmethod - def _get_chat_settings(db: Session, chat_id: int) -> ChatSettingsRecord: - """Return chat settings for the given chat_id - - :param db: sqlalchemy session - :param chat_id: chat id - :return: - """ - return ( - db.query(ChatSettingsRecord) - .filter(ChatSettingsRecord.chat_id == chat_id) - .first() - ) - - @staticmethod - def _create_chat_settings( - db: Session, chat_id: int, book_filter: Dict[str, bool] = None - ) -> ChatSettingsRecord: - """Create chat settings record with the given book filter or a default one - - :param db: sqlalchemy session - :param chat_id: chat id - :param book_filter: dict of aliases and their values, e.g. {"aliasNameOne": True, "aliasNameTwo": False}. - If not provided, will create a default alias. - :return: - """ - - book_filter = book_filter or Database._default_book_filter(Database._iter_rulebooks(db)) - chat_settings = ChatSettingsRecord(chat_id=chat_id, book_filter=book_filter) - - db.add(chat_settings) - db.commit() - db.refresh(chat_settings) - - return chat_settings - - @staticmethod - def _get_or_create_chat_settings( - db: Session, chat_id: int, book_filter: Dict[str, bool] = None - ) -> ChatSettingsRecord: - """Wrapper over get or create chat settings - - :param db: sqlalchemy session - :param chat_id: chat id - :param book_filter: used only if _get_chat_settings fails. - Dict of aliases and their values, e.g. {"aliasNameOne": True, "aliasNameTwo": False}. - If not provided, will create a default alias. - :return: - """ - chat_settings = Database._get_chat_settings(db, chat_id) - - if not chat_settings: - chat_settings = Database._create_chat_settings(db, chat_id, book_filter) - - return chat_settings - - @staticmethod - def _get_book_filter(db: Session, chat_id: int = None) -> List[str]: - """Return list of enabled book aliases in camelCase. - - :param db: sqlalchemy session - :param chat_id: chat id. if not provided, will return all available aliases - :return: - """ - if chat_id: - chat_settings = Database._get_or_create_chat_settings(db, chat_id) - include_books = [k for k, v in chat_settings.book_filter.items() if v] - else: - include_books = list(Database._iter_rulebooks(db)) - - return include_books - - def has_spell_list(self, min_n: int = 1000): - """Check if there are at least ``min_n`` spells. - Use it to check if you need to update registry - - :param min_n: min number of spells to return True - :return: - """ - with self._db() as db: - short_spell_info_count = db.query(ShortSpellInfoRecord).count() - - return short_spell_info_count > min_n - - def create_or_update_registry( - self, - spells: List[ShortSpellInfo], - classes: List[ClassInfo], - schools: List[SchoolInfo], - ): - """Create or update short spell info, class and school tables - with provided values - - :param spells: list of ShortSpellInfo objects - :param classes: list of ClassInfo objects - :param schools: list of SchoolInfo objects - :return: - """ - with self._db() as db: - self._create_or_update_classes(db, classes) - self._create_or_update_schools(db, schools) - self._create_or_update_spells(db, spells) - - def iter_short_spell_info_by_name( - self, name: str, chat_id: int = None - ) -> Generator[ShortSpellInfo, None, None]: - """Yield short spell info filtering by name and chat's book filter if chat_id is provided - - :param name: spell name in cyrillic - :param chat_id: if provided and not found in database, will create a default filter - :return: - """ - - with self._db() as db: - include_books = self._get_book_filter(db, chat_id) - - rows = ( - db.query(ShortSpellInfoRecord) - .where(ShortSpellInfoRecord.book_alias.in_(include_books)) - .order_by(asc(ShortSpellInfoRecord.name)) - .all() - ) - - for result in self._convert_short_spell_info_rows(db, rows): - if name in result.name.lower(): - yield result - - def iter_short_spell_info_by_class_level( - self, class_id: int, spell_level: int, chat_id: int = None - ) -> Generator[ShortSpellInfo, None, None]: - """Yield short spell info filtering by class, spell level restriction, - and chat's book filter if chat_id is provided - - :param class_id: class id - :param spell_level: spell circle level - :param chat_id: if provided and not found in database, will create a default filter - :return: - """ - - with self._db() as db: - include_books = self._get_book_filter(db, chat_id) - - rows = ( - db.query(ShortSpellInfoRecord) - .where( - and_( - ShortSpellInfoRecord.classes[str(class_id)].as_integer() - == spell_level, - ShortSpellInfoRecord.book_alias.in_(include_books), - ) - ) - .order_by(asc(ShortSpellInfoRecord.name)) - .all() - ) - - yield from self._convert_short_spell_info_rows(db, rows) - - def get_class(self, class_id: int) -> ClassInfo: - """Get class info by id - - :param class_id: class id - :return: - """ - with self._db() as db: - c = db.query(ClassRecord).where(ClassRecord.id == class_id).first() - return ClassInfo.from_orm(c) - - def iter_classes(self, chat_id: int): - """Yield classes filtering by chat settings - - :param chat_id: chat id - :return: - """ - with self._db() as db: - include_books = self._get_book_filter(db, chat_id) - yield from self._iter_classes(db, include_books) - - def iter_levels(self, class_id: int) -> Generator[int, None, None]: - """Yield all level numbers which appear in spells for the given class id - - :param class_id: class id - :return: - """ - class_id_str = str(class_id) - with self._db() as db: - rows = ( - db.query(ShortSpellInfoRecord) - .where(ShortSpellInfoRecord.classes[class_id_str].as_integer() >= 0) - .all() - ) - - unique_levels = set() - for r in rows: - unique_levels.add(r.classes[class_id_str]) - - yield from unique_levels - - def iter_rulebooks(self) -> Generator[str, None, None]: - """Yield rulebook aliases in camelCase - - :return: - """ - with self._db() as db: - yield from self._iter_rulebooks(db) - - def get_full_spell_info( - self, spell_alias: str - ) -> Tuple[ShortSpellInfo, Optional[ExtendedSpellInfo]]: - """Get all spell info by english alias if it exists - - :param spell_alias: spell name in ascii - :return: - """ - extended_spell_info = None - - with self._db() as db: - short_spell_info = ( - db.query(ShortSpellInfoRecord) - .filter(ShortSpellInfoRecord.alias == spell_alias) - .first() - ) - - if short_spell_info: - extended_spell_info = short_spell_info.extended_spell_info - short_spell_info = list(self._convert_short_spell_info_rows(db, [short_spell_info]))[0] - - if extended_spell_info: - extended_spell_info = ExtendedSpellInfo.from_orm( - extended_spell_info - ) - - return short_spell_info, extended_spell_info - - def get_extended_spell_info(self, spell_alias: str) -> Optional[ExtendedSpellInfo]: - """Get extended spell info by english alias if it exists - - :param spell_alias: spell name in ascii - :return: - """ - _, extended_spell_info = self.get_full_spell_info(spell_alias) - return extended_spell_info - - def create_extended_spell_info( - self, - spell_alias: str, - extended_spell_info: ExtendedSpellInfo, - tables: List[SpellTable], - ) -> ExtendedSpellInfo: - """Create extended spell info record attaching it to its parent short spell info record - - :param spell_alias: spell name in ascii - :param extended_spell_info: extended spell info - :param tables: discovered or created spell tables - :return: - """ - tables = [SpellTableRecord(**t.to_orm()) for t in tables] - - with self._db() as db: - short_spell_info = ( - db.query(ShortSpellInfoRecord) - .filter(ShortSpellInfoRecord.alias == spell_alias) - .first() - ) - extended_spell_info = ExtendedSpellInfoRecord( - **extended_spell_info.to_orm(), - tables=tables, - short_spell_info=short_spell_info, - ) - db.add(extended_spell_info) - db.commit() - db.refresh(extended_spell_info) - extended_spell_info = ExtendedSpellInfo.from_orm(extended_spell_info) - - return extended_spell_info - - def get_or_create_chat_settings(self, chat_id: int) -> ChatSettings: - """Return existing chat settings or new ones with the default filter - - :param chat_id: chat id - :return: existing or new default ChatSettings - """ - with self._db() as db: - chat_settings = self._get_or_create_chat_settings(db, chat_id) - chat_settings = ChatSettings.from_orm(chat_settings) - - return chat_settings - - def update_book_filter(self, chat_id: int, book_alias: str) -> ChatSettings: - """Toggle book filter value for the given alias and return updated chat settings - - :param chat_id: chat id - :param book_alias: alias name in camelCase whose value will be toggled - :return: updated ChatSettings - """ - with self._db() as db: - chat_settings = self._get_or_create_chat_settings(db, chat_id) - - new_book_filter = chat_settings.book_filter - new_book_filter[book_alias] = not new_book_filter[book_alias] - - stmt_update = ( - update(ChatSettingsRecord) - .where(ChatSettingsRecord.chat_id == chat_id) - .values(book_filter=new_book_filter) - ) - db.execute(stmt_update) - db.commit() - db.refresh(chat_settings) - chat_settings = ChatSettings.from_orm(chat_settings) - - return chat_settings diff --git a/spells_bot/search/sourcing/datatypes.py b/spells_bot/search/sourcing/datatypes.py deleted file mode 100644 index 2343235..0000000 --- a/spells_bot/search/sourcing/datatypes.py +++ /dev/null @@ -1,109 +0,0 @@ -from pathlib import Path -from typing import List, Optional, Dict - -from pydantic import BaseModel - - -class OrmSerializableBaseModel(BaseModel): - """Base pydantic model with orm_mode enabled. - - - ``.from_orm(...)`` is implemented by default when orm_mode = True - - ``.to_orm()`` is implemented for convenience - and can be overridden in child models, e.g. to serialize ``Path``s - - """ - - class Config: - orm_mode = True - - def to_orm(self): - return self.dict() - - -class SpellTable(OrmSerializableBaseModel): - html: str - url: Optional[str] - path: Optional[Path] - - def to_orm(self): - return {"html": self.html, "url": self.url, "path": str(self.path)} - - -class ClassInfo(OrmSerializableBaseModel): - id: int - alias: str = "" - book_abbreviation: str = "" - book_alias: str = "" - short_description: str = "" - name: str - is_own_spell_list: Optional[bool] - max_spell_lvl: Optional[int] - parent_class_ids: list - - -class ClassInfoSpellRestriction(ClassInfo): - level: int - - -class SchoolInfo(OrmSerializableBaseModel): - id: int - name: str - type_id: int - type_name: str - - -class BasicShortSpellInfo(OrmSerializableBaseModel): - """Part of ShortSpellInfo which can be serialized from orm and to orm - - """ - alias: str - short_description_components: str - book_abbreviation: str - book_alias: str - short_description: str - name: str - is_race_spell: bool - - -class ShortSpellInfo(BasicShortSpellInfo): - """Part of ShortSpellInfo which cannot be serialized from orm but can be serialized to orm - - """ - schools: List[SchoolInfo] - classes: List[ClassInfoSpellRestriction] - - @classmethod - def from_orm(cls, *args, **kwargs): - raise NotImplementedError(f"{cls.__name__} should be instantiated with __init__ rather than from_orm") - - def to_orm(self): - d = self.dict(exclude={"schools", "classes"}) - d["schools"] = [s.id for s in self.schools] - d["classes"] = {c.id: c.level for c in self.classes} - return d - - -class ExtendedSpellInfo(OrmSerializableBaseModel): - full_name: str - school: str - variables: dict - text: str - tables: List[SpellTable] - - def to_orm(self): - return { - "full_name": self.full_name, - "school": self.school, - "variables": self.variables, - "text": self.text, - } - - -class ChatSettings(OrmSerializableBaseModel): - chat_id: int - book_filter: Dict[str, bool] - - def to_orm(self): - return self.dict() diff --git a/spells_bot/search/sourcing/html2image.py b/spells_bot/search/sourcing/html2image.py deleted file mode 100644 index 3f7bf2b..0000000 --- a/spells_bot/search/sourcing/html2image.py +++ /dev/null @@ -1,86 +0,0 @@ -from pathlib import Path -from typing import Union - -import requests - -from spells_bot.config import HctiSettings -from spells_bot.utils.log import create_logger -from spells_bot.search.sourcing.datatypes import SpellTable - - -logger = create_logger("hcti") - - -class HctiApi: - def __init__(self, settings: HctiSettings): - self._settings = settings - self.css = self._load_css(settings.css_file) - - @staticmethod - def _load_css(path): - with open(path, "r", encoding="utf-8") as css_f: - css = css_f.read() - return css - - def create_image(self, html: str): - """Create image via hcti api from html and pre-configured css - - :param html: raw html - :return: - """ - data = { - "html": html, - "css": self.css, - "device_scale": 1, - } - response = requests.post( - url=self._settings.url, - data=data, - auth=(self._settings.user_id, self._settings.api_key), - ) - - image_url = None - try: - image_url = response.json()["url"] - except KeyError: - logger.error(str(response.json())) - - return image_url - - @staticmethod - def download_image( - url: str, download_path: Union[Path, str], overwrite: bool = False - ) -> None: - """Download image - - :param url: image url - :param download_path: path to file - :param overwrite: overwrite existing file if True - :return: - """ - download_path = Path(download_path) - if overwrite or not download_path.exists(): - try: - download_path.parent.mkdir(exist_ok=True, parents=True) - response = requests.get(url) - with open(download_path, "wb") as f: - f.write(response.content) - logger.info(f"Downloaded {url} to {download_path}") - except Exception as e: - logger.error(f"Failed to save {url} to {download_path} because {e}") - - def find_or_create(self, html: str, path: Union[Path, str, None]): - """Discover image locally or create and download - - :param html: raw html - :param path: possible image path - :return: - """ - url = None - path = Path(path) - - if not path.is_file(): - url = self.create_image(html) - self.download_image(url, path) - - return SpellTable(html=html, url=url, path=path) diff --git a/spells_bot/search/sourcing/spellsource.py b/spells_bot/search/sourcing/spellsource.py deleted file mode 100644 index 8be3946..0000000 --- a/spells_bot/search/sourcing/spellsource.py +++ /dev/null @@ -1,310 +0,0 @@ -import json -import re -from typing import List, Tuple - -from bs4 import BeautifulSoup -from requests_html import HTMLSession - -from spells_bot.config import DataSourceSettings -from spells_bot.search.sourcing.datatypes import ( - ExtendedSpellInfo, - ClassInfo, - SchoolInfo, - ShortSpellInfo, - ClassInfoSpellRestriction, - SpellTable, -) -from spells_bot.utils.log import create_logger - - -logger = create_logger("source_updater") - -CLASSES_KEY_MAP = { - "Id": "id", - "Name": "name", - "IsOwnSpellList": "is_own_spell_list", - "MaxSpellLvl": "max_spell_lvl", - "ParentClassIds": "parent_class_ids", -} - -SCHOOLS_KEY_MAP = { - "Id": "id", - "Name": "name", - "TypeId": "type_id", - "TypeName": "type_name", -} - -SPELLS_KEY_MAP = { - "Alias": "alias", - "ShortDescriptionComponents": "short_description_components", - "BookAbbreviation": "book_abbreviation", - "BookAlias": "book_alias", - "ShortDescription": "short_description", - "SchoolIds": "schools", - "ClassSpell": "classes", - "Name": "name", - "IsRaceSpell": "is_race_spell", -} - - -def _rename_keys(original_dict, key_map): - return {key_map[k]: v for k, v in original_dict.items()} - - -class SourceUpdater: - def __init__(self, settings: DataSourceSettings, save_dir: str = None): - self._settings = settings - self.spell_list_url = settings.spell_list_url.rstrip("/") - self.class_list_url = settings.class_list_url.rstrip("/") - self.prestige_class_list_url = settings.prestige_class_list_url.rstrip("/") - self.spell_info_url_prefix = settings.spell_info_url_prefix.rstrip("/") - self.save_dir = save_dir - - @staticmethod - def _extract_data_from_js( - raw_data: str, - data_type: str, - variable_prefix: str = None, - variable_postfix: str = None, - ) -> List[dict]: - """Strip syntax and extra characters and load data as json - - :param raw_data: string with js var containing data inside its value - :param data_type: "spells", "classes" or "schools" - :param variable_prefix: (optional) string before data value, defaults to "var {data_type} = \\'" - :param variable_postfix: (optional) string after data value, defaults to "\\'; " - :return: list of dicts with data - """ - variable_prefix = variable_prefix or f"var {data_type} = \\'" - variable_postfix = variable_postfix or "\\'; " - start, end = len(variable_prefix) - 1, -(len(variable_postfix) - 1) - - raw_json = raw_data.strip()[start:end] - json_data = json.loads(raw_json) - - return json_data - - @staticmethod - def _iter_class_extra_info(raw_data): - """Yield class name and extracted class info from one of the class lists - - :param raw_data: - :return: - """ - for p in raw_data.find("p.indent"): - try: - header = p.find("span.textHeader")[0] - name_html = header.find("a")[0] - alias = list(name_html.links)[0].split("/")[-1] - name = name_html.text - - sup_html = header.find("sup")[0] - book_alias = list(sup_html.links)[0].split("/")[-1] - book_abbreviation = sup_html.text - - short_description = p.text.split(":")[-1] - - extra_class_info = { - "alias": alias, - "name": name, - "book_alias": book_alias, - "book_abbreviation": book_abbreviation, - "short_description": short_description, - } - - yield name, extra_class_info - except IndexError: - pass - - def _extract_classes_from_js_and_extra_data( - self, - raw_classes_data: str, - raw_basic_class_extra_data, - raw_prestige_class_extra_data, - ) -> List[ClassInfo]: - """Serialize class info from class data taken from spell list and class lists - - :param raw_classes_data: data taken from spell list - :param raw_basic_class_extra_data: data taken from class list - :param raw_prestige_class_extra_data: data taken from prestige class list - :return: - """ - classes = [] - - extra_basic_class_data = list( - self._iter_class_extra_info(raw_basic_class_extra_data) - ) - extra_prestige_class_data = list( - self._iter_class_extra_info(raw_prestige_class_extra_data) - ) - extra_class_data_map = { - name: data - for name, data in extra_basic_class_data + extra_prestige_class_data - } - - for class_info in self._extract_data_from_js(raw_classes_data, "classes"): - class_info_kwargs = _rename_keys(class_info, CLASSES_KEY_MAP) - class_name = class_info_kwargs["name"] - - extra_class_info_kwargs = extra_class_data_map.get(class_name, {}) - class_info_kwargs.update(extra_class_info_kwargs) - - classes.append(ClassInfo(**class_info_kwargs)) - - return classes - - def _extract_schools_from_js(self, raw_schools_data: str) -> List[SchoolInfo]: - """Serialize school info - - :param raw_schools_data: raw school data - :return: - """ - schools = [] - - for school_info in self._extract_data_from_js(raw_schools_data, "schools"): - school_info_kwargs = _rename_keys(school_info, SCHOOLS_KEY_MAP) - schools.append(SchoolInfo(**school_info_kwargs)) - - return schools - - def _extract_spells_from_js( - self, raw_spell_data: str, classes: List[ClassInfo], schools: List[SchoolInfo] - ) -> List[ShortSpellInfo]: - """Serialize short spell info with mapped classes and schools - - :param raw_spell_data: raw spell data - :param classes: list of serialized ClassInfo - :param schools: list of serialized Schools - :return: - """ - spells = [] - - id2class = {c.id: c for c in classes} - id2school = {s.id: s for s in schools} - - for spell in self._extract_data_from_js(raw_spell_data, "spells"): - class_restrictions = [] - for restriction in spell["ClassSpell"]: - class_info = id2class[restriction["ClassId"]] - class_info_restriction = ClassInfoSpellRestriction( - **class_info.dict(), level=restriction["Level"] - ) - class_restrictions.append(class_info_restriction) - - spell["ClassSpell"] = class_restrictions - spell["SchoolIds"] = [id2school[idx] for idx in spell["SchoolIds"]] - spell["ShortDescription"] = re.sub( - "|", "", spell["ShortDescription"] - ) - - spell_kwargs = _rename_keys(spell, SPELLS_KEY_MAP) - spells.append(ShortSpellInfo(**spell_kwargs)) - - return spells - - def _collect_spell_list_js_lines(self) -> Tuple[str, str, str]: - """Scrape raw js strings containing spells, classes, schools data - - :return: - """ - with HTMLSession() as sess: - response = sess.get(self.spell_list_url) - - script_with_data = response.html.find("script")[1].full_text - spells_raw, classes_raw, schools_raw = script_with_data.split("\n")[1:4] - - return spells_raw, classes_raw, schools_raw - - def _collect_extra_class_data(self): - """Scrape class info from class and class/prestige lists - - :return: - """ - with HTMLSession() as sess: - basic_classes_response = sess.get(self._settings.class_list_url) - prestige_classes_response = sess.get(self._settings.prestige_class_list_url) - - return basic_classes_response.html, prestige_classes_response.html - - def _collect_spell_info(self, spell_alias: str) -> ExtendedSpellInfo: - """Scrape extended spell info - - :param spell_alias: camelCase spell name - :return: - """ - with HTMLSession() as sess: - response = sess.get(f"{self.spell_info_url_prefix}/{spell_alias}") - - # using lxml soup to correctly parse tables, because they're mangled with

tags - soup = BeautifulSoup(response.html.html, "lxml") - - full_name_raw = soup.find("h1", class_="detailPage").text - full_name = full_name_raw.strip().split("\n")[0] - school = None - variables = {} - text_lines = [] - tables = [] - - for table in soup.find_all("table"): - table = re.sub('

|

', "", str(table)) - tables.append(SpellTable(html=table)) - - for p in response.html.find("p.indent"): - var_header = p.find("span.textHeader") - table_row = p.find("table, thead, tbody, tr, td") - - if p.text.startswith("Школа"): - school = p.text - elif var_header: - full_text_raw = p.full_text - var_name_raw = var_header[0].text - var_value_raw = full_text_raw[len(var_name_raw) :] - - var_name, var_value = var_name_raw.strip(" :"), var_value_raw.strip() - variables[var_name] = var_value - elif table_row: - pass - else: - if p.text: - text_lines.append(p.text) - - return ExtendedSpellInfo( - full_name=full_name, - school=school, - variables=variables, - text="\n".join(text_lines), - tables=tables, - ) - - def update_spell_info(self, spell_alias: str) -> ExtendedSpellInfo: - """Get spell info - - :param spell_alias: - :return: - """ - extended_spell_info = self._collect_spell_info(spell_alias) - - logger.info(f"Collected extended spell info for {spell_alias}") - return extended_spell_info - - def update_registry(self): - """Get spells, classes, schools data - - :return: - """ - spells_raw, classes_raw, schools_raw = self._collect_spell_list_js_lines() - ( - basic_classes_extra_raw, - prestige_classes_extra_raw, - ) = self._collect_extra_class_data() - - schools = self._extract_schools_from_js(schools_raw) - classes = self._extract_classes_from_js_and_extra_data( - classes_raw, basic_classes_extra_raw, prestige_classes_extra_raw - ) - spells = self._extract_spells_from_js(spells_raw, classes, schools) - - logger.info( - f"Collected registry, found: {len(spells)} spells, {len(classes)} classes, {len(schools)} schools" - ) - return spells, classes, schools diff --git a/spells_bot/search/spell_search.py b/spells_bot/search/spell_search.py deleted file mode 100644 index 70f4702..0000000 --- a/spells_bot/search/spell_search.py +++ /dev/null @@ -1,111 +0,0 @@ -import math - -from spells_bot.config import ( - DatabaseSettings, - DataSourceSettings, - StorageSettings, - HctiSettings, -) -from spells_bot.search.sourcing import Database, SourceUpdater, HctiApi -from spells_bot.utils.log import create_logger - -logger = create_logger("search") - - -class SpellSearch: - def __init__( - self, - db_settings: DatabaseSettings, - storage_settings: StorageSettings, - source_settings: DataSourceSettings, - hcti_settings: HctiSettings, - ) -> None: - self._db_name = db_settings.sqlalchemy_url - self.db = Database(db_settings) - - self.data_root_dir = storage_settings.data_root_dir - self.classinfo_tables_dir = storage_settings.data_root_dir / "classinfo" - self.spell_tables_dir = storage_settings.data_root_dir / "tables" - - self.source = SourceUpdater(source_settings) - self.hcti = HctiApi(hcti_settings) - self._check_db_readiness() - - def _check_db_readiness(self): - """Check and update short spell info, classes and schools registry if needed - - :return: - """ - if not self.db.has_spell_list(): - logger.info(f"{self._db_name} is not prepared, updating spell list...") - self.update_sources() - - logger.info(f"{self._db_name} is ready") - - def get_chat_settings(self, chat_id: int): - return self.db.get_or_create_chat_settings(chat_id) - - def update_chat_settings(self, chat_id: int, book: str): - return self.db.update_book_filter(chat_id, book) - - def update_sources(self): - spells, classes, schools = self.source.update_registry() - self.db.create_or_update_registry(spells, classes, schools) - - def short_info(self, query: str, chat_id: int, top_n: int = 10): - return list(self.db.iter_short_spell_info_by_name(query, chat_id))[:top_n] - - def extended_info(self, spell_alias: str): - _, extended_spell_info = self.full_info(spell_alias) - return extended_spell_info - - def full_info(self, spell_alias: str): - short_spell_info, extended_spell_info = self.db.get_full_spell_info(spell_alias) - - if not extended_spell_info: - try: - extended_spell_info = self.source.update_spell_info(spell_alias) - - updated_tables = [] - for t_idx, t in enumerate(extended_spell_info.tables): - table_image_path = self.spell_tables_dir / spell_alias / f"{t_idx}.png" - table = self.hcti.find_or_create(t.html, table_image_path) - updated_tables.append(table) - - extended_spell_info = self.db.create_extended_spell_info( - spell_alias, extended_spell_info, updated_tables - ) - except Exception as e: - pass - - return short_spell_info, extended_spell_info - - def class_info(self, class_id: int): - return self.db.get_class(class_id) - - def iter_classes(self, chat_id: int): - yield from self.db.iter_classes(chat_id) - - def iter_class_info_tables(self, class_id: int): - class_info = self.class_info(class_id) - class_info_dir = self.classinfo_tables_dir / class_info.alias - - for p in class_info_dir.glob("*.png"): - yield p, class_info - - def iter_levels(self, class_id: int): - yield from self.db.iter_levels(class_id) - - def paginate_short_info_by_level( - self, - class_id: int, - level: int, - chat_id: int, - page: int = 0, - n_per_page: int = 50, - ): - all_spells = list( - self.db.iter_short_spell_info_by_class_level(class_id, level, chat_id) - ) - n_pages_total = math.ceil(len(all_spells) / n_per_page) - return n_pages_total, all_spells[n_per_page * page : n_per_page * (page + 1)] diff --git a/spells_bot/utils/__init__.py b/spells_bot/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spells_bot/utils/log.py b/spells_bot/utils/logger.py similarity index 100% rename from spells_bot/utils/log.py rename to spells_bot/utils/logger.py diff --git a/spells_bot/utils/text_parser.py b/spells_bot/utils/text_parser.py new file mode 100644 index 0000000..d9cf600 --- /dev/null +++ b/spells_bot/utils/text_parser.py @@ -0,0 +1,34 @@ +from string import ascii_lowercase + + +# Allow alphabet + space + roman numbers +RU_CHARACTERS = "абвгдеёжзийклмнопрстуфхцчшщъыьэюя" + " " + "ivx" +EN_CHARACTERS = ascii_lowercase + " " + "ivx" + + +def clean_spell_search_query(raw_query: str): + """Cleans up and returns either ru name or en name + + Args: + raw_query: search text + + Returns: + tuple of either (ru_name, None) or (None, en_name) + + Raises: + ValueError: if impossible to detect language or query is empty + """ + query = raw_query.lower().strip() + + ru_name = en_name = None + if all(char in RU_CHARACTERS for char in query): + ru_name = query + elif all(char in EN_CHARACTERS for char in query): + en_name = query + else: + raise ValueError("Query must be in either english or russian") + + if not (ru_name or en_name): + raise ValueError("Empty query") + + return ru_name, en_name diff --git a/templates/table.css b/templates/table.css index 7bdc86c..66b04dd 100644 --- a/templates/table.css +++ b/templates/table.css @@ -5,7 +5,16 @@ html { -moz-background-size: cover; -o-background-size: cover; background-size: cover; - device-scale: 1; + /*device-scale: 1;*/ +} + +.tableBlock { + width: 100%; +} + +.tableHeader { + font: italic small-caps bold 1.1rem/2 cursive; + font-weight: bolder; } .commonTable { @@ -13,7 +22,7 @@ html { border-style: hidden; color: rgb(10, 10, 10); text-shadow: rgb(255, 255, 255) 1px 0 10px; - font: italic small-caps bold 3rem/2 cursive; + font: italic small-caps bold 1.1rem/2 cursive; font-weight: bolder; padding: 4px; margin: 2%; @@ -22,11 +31,13 @@ html { .commonTable thead td { border-bottom: 3px solid #000000; } + table td { border-left: 1px solid rgba(0,0,0,0.2); border-right: 1px solid rgba(0,0,0,0.2); border-top: 1px solid rgba(0,0,0,0.2); } + table td{ /* Added padding for better layout after collapsing */ padding: 4px 8px; }