From e594de21b82267dc2f389244e7a476b76029f430 Mon Sep 17 00:00:00 2001 From: Maxim Talimanchuk Date: Mon, 22 Aug 2022 21:59:36 +0300 Subject: [PATCH] Feat/run tg rework (#114) Co-authored-by: Fedor Ignatov Co-authored-by: dilyararimovna Co-authored-by: Denis Kuznetosv --- MANIFEST.in | 1 + deeppavlov_agent/channels/__init__.py | 0 .../channels/telegram/__init__.py | 0 deeppavlov_agent/channels/telegram/bot.py | 173 ++++++++++++++++++ .../telegram/config/telegram_config.yml | 7 + .../telegram/config/telegram_keyboards.yml | 7 + .../telegram/config/telegram_messages.yml | 61 ++++++ deeppavlov_agent/channels/telegram/utils.py | 150 +++++++++++++++ deeppavlov_agent/cmd_client.py | 49 ----- deeppavlov_agent/core/agent.py | 23 ++- deeppavlov_agent/core/connectors.py | 28 ++- deeppavlov_agent/core/log.py | 7 +- deeppavlov_agent/core/state_manager.py | 40 +++- deeppavlov_agent/core/state_schema.py | 45 +++-- deeppavlov_agent/core/telegram_client.py | 31 ---- .../core/transport/gateways/rabbitmq.py | 13 +- deeppavlov_agent/http_api/__init__.py | 28 +-- deeppavlov_agent/parse_config.py | 2 +- deeppavlov_agent/run.py | 50 ++--- deeppavlov_agent/run_cmd.py | 54 ++++++ deeppavlov_agent/run_http.py | 31 ++-- deeppavlov_agent/run_tg.py | 30 +-- deeppavlov_agent/settings.py | 91 +++++---- deeppavlov_agent/settings.yaml | 30 +++ deeppavlov_agent/setup_agent.py | 92 ++++++---- deeppavlov_agent/utils/__init__.py | 0 deeppavlov_agent/utils/config_tools.py | 16 ++ .../utils/http_api_stress_test.py | 9 + deeppavlov_agent/utils/http_api_test.py | 7 + docs/source/intro/overview.rst | 5 +- requirements.txt | 7 +- setup.py | 3 +- 32 files changed, 823 insertions(+), 267 deletions(-) create mode 100644 deeppavlov_agent/channels/__init__.py create mode 100644 deeppavlov_agent/channels/telegram/__init__.py create mode 100644 deeppavlov_agent/channels/telegram/bot.py create mode 100644 deeppavlov_agent/channels/telegram/config/telegram_config.yml create mode 100644 deeppavlov_agent/channels/telegram/config/telegram_keyboards.yml create mode 100644 deeppavlov_agent/channels/telegram/config/telegram_messages.yml create mode 100644 deeppavlov_agent/channels/telegram/utils.py delete mode 100644 deeppavlov_agent/cmd_client.py delete mode 100644 deeppavlov_agent/core/telegram_client.py create mode 100644 deeppavlov_agent/run_cmd.py create mode 100644 deeppavlov_agent/settings.yaml create mode 100644 deeppavlov_agent/utils/__init__.py create mode 100644 deeppavlov_agent/utils/config_tools.py diff --git a/MANIFEST.in b/MANIFEST.in index 9ee1f729..751491a6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,4 @@ include requirements.txt include deeppavlov_agent/http_api/templates/*.html include deeppavlov_agent/log_config.yml +include deeppavlov_agent/channels/telegram/config/*.yml diff --git a/deeppavlov_agent/channels/__init__.py b/deeppavlov_agent/channels/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deeppavlov_agent/channels/telegram/__init__.py b/deeppavlov_agent/channels/telegram/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deeppavlov_agent/channels/telegram/bot.py b/deeppavlov_agent/channels/telegram/bot.py new file mode 100644 index 00000000..987229a3 --- /dev/null +++ b/deeppavlov_agent/channels/telegram/bot.py @@ -0,0 +1,173 @@ +import asyncio +import logging +from pathlib import Path + +from aiogram import Bot, Dispatcher, types +from aiogram.contrib.fsm_storage.memory import MemoryStorage +from aiogram.dispatcher import FSMContext +from aiogram.dispatcher.filters.state import State, StatesGroup +from aiogram.utils import executor + +from .utils import MessageResponder + +config_dir = Path(__file__).resolve().parent / 'config' + +logging.basicConfig( + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO +) +logger = logging.getLogger(__name__) + + +class DialogState(StatesGroup): + active = State() + awaiting_rating = State() + inactive = State() + + +def run_tg(token, proxy, agent): + loop = asyncio.get_event_loop() + bot = Bot(token=token, loop=loop, proxy=proxy) + storage = MemoryStorage() # TODO change to actual storage maybe? + dp = Dispatcher(bot, storage=storage) + responder = MessageResponder( + config_path=config_dir / "telegram_config.yml", + messages_path=config_dir / "telegram_messages.yml", + keyboards_path=config_dir / "telegram_keyboards.yml", + ) + + @dp.message_handler(commands="start") + async def start_handler(message: types.Message): + text = responder.message("start") + reply_markup = responder.reply_keyboard("dialog_inactive") + + await message.answer(text, reply_markup=reply_markup) + + @dp.message_handler(commands="help", state="*") + async def help_handler(message: types.Message): + text = responder.message("help") + + await message.answer(text) + + @dp.message_handler(commands="complain", state="*") + async def complain_handler(message: types.Message, state: FSMContext): + # TODO Add actual complaint logic + if await state.get_state() == DialogState.active.state: + text = responder.message("complain_success") + else: + text = responder.message("complain_fail") + + await message.answer(text) + + @dp.message_handler(commands="begin", state="*") + async def begin_dialog(message: types.Message, state: FSMContext): + state = await state.get_state() + must_evaluate = ( + state == DialogState.awaiting_rating.state + and responder.config.evaluation_options.user_must_evaluate + ) + is_not_finished = state == DialogState.active + + if must_evaluate or is_not_finished: + text = responder.message("begin_fail") + reply_markup = None + + else: + await DialogState.active.set() + + text = responder.message("begin_success") + reply_markup = responder.reply_keyboard("dialog_active") + + await message.answer(text, reply_markup=reply_markup) + + @dp.message_handler(commands="end", state="*") + async def end_dialog(message: types.Message, state: FSMContext): + if await state.get_state() != DialogState.active.state: + text = responder.message("end_fail") + reply_markup = responder.reply_keyboard("dialog_inactive") + else: + text = responder.message("end_success") + dialog_id = await agent.state_manager.drop_active_dialog( + str(message.from_user.id) + ) + reply_markup = responder.dialog_rating_inline_keyboard(dialog_id) + + await DialogState.awaiting_rating.set() + + await message.answer(text, reply_markup=reply_markup) + + @dp.callback_query_handler( + lambda c: c.data.startswith("utt"), state=DialogState.active + ) + async def handle_utterance_rating( + callback_query: types.CallbackQuery, state: FSMContext + ): + _, utterance_id, rating = callback_query.data.split("-") + await agent.state_manager.set_rating_utterance( + str(callback_query.from_user.id), utterance_id, rating + ) + await bot.answer_callback_query(callback_query.id, text=rating.capitalize()) + + @dp.callback_query_handler(lambda c: c.data.startswith("dialog"), state="*") + async def handle_dialog_rating( + callback_query: types.CallbackQuery, state: FSMContext + ): + if await state.get_state() != DialogState.active.state: + _, dialog_id, rating = callback_query.data.split("-") + + await agent.state_manager.set_rating_dialog( + str(callback_query.from_user.id), dialog_id, rating + ) + + edited_inline_keyboard = responder.dialog_rating_inline_keyboard( + dialog_id, chosen_rating=rating + ) + + await bot.edit_message_reply_markup( + chat_id=callback_query.from_user.id, + message_id=callback_query.message.message_id, + reply_markup=edited_inline_keyboard, + ) + + if responder.config.dialog_options.reveal_dialog_id: + message_text = responder.message( + "evaluate_dialog_success_reveal_id", dialog_id=dialog_id + ) + else: + message_text = responder.message("evaluate_dialog_success") + callback_text = "Evaluation saved!" + reply_markup = responder.reply_keyboard("dialog_inactive") + + await DialogState.inactive.set() + + else: + callback_text = "" + message_text = responder.message("evaluate_dialog_success") + reply_markup = None + + await bot.answer_callback_query(callback_query.id, text=callback_text) + await bot.send_message( + callback_query.from_user.id, message_text, reply_markup=reply_markup + ) + + @dp.message_handler(state="*") + async def handle_message(message: types.Message, state: FSMContext): + if await state.get_state() == DialogState.active.state: + response_data = await agent.register_msg( + utterance=message.text, + user_external_id=str(message.from_user.id), + user_device_type="telegram", + date_time=message.date, + location="", + channel_type="telegram", + require_response=True, + ) + text = response_data["dialog"].utterances[-1].text + utterance_id = response_data["dialog"].utterances[-1].utt_id + reply_markup = responder.utterance_rating_inline_keyboard(utterance_id) + else: + text = responder.message("unexpected_message") + reply_markup = None + + await message.answer(text, reply_markup=reply_markup) + + executor.start_polling(dp, skip_updates=True) diff --git a/deeppavlov_agent/channels/telegram/config/telegram_config.yml b/deeppavlov_agent/channels/telegram/config/telegram_config.yml new file mode 100644 index 00000000..886b1352 --- /dev/null +++ b/deeppavlov_agent/channels/telegram/config/telegram_config.yml @@ -0,0 +1,7 @@ +dialog_options: + reveal_dialog_id: true + +evaluation_options: + user_must_evaluate: true + min_score: 1 + max_score: 5 diff --git a/deeppavlov_agent/channels/telegram/config/telegram_keyboards.yml b/deeppavlov_agent/channels/telegram/config/telegram_keyboards.yml new file mode 100644 index 00000000..9d294d38 --- /dev/null +++ b/deeppavlov_agent/channels/telegram/config/telegram_keyboards.yml @@ -0,0 +1,7 @@ +dialog_inactive: + - '/begin' + - '/help' + +dialog_active: + - '/end' + - '/complain' diff --git a/deeppavlov_agent/channels/telegram/config/telegram_messages.yml b/deeppavlov_agent/channels/telegram/config/telegram_messages.yml new file mode 100644 index 00000000..46cb0322 --- /dev/null +++ b/deeppavlov_agent/channels/telegram/config/telegram_messages.yml @@ -0,0 +1,61 @@ +start: > + This chatbot is developed by Neural Networks and Deep Learning Lab at MIPT. + Please have a chat with it and evaluate its performance. + To begin a conversation enter /begin. To end a conversation: enter /end. + When the dialogue is finished, you will be asked to evaluate it. + You will have to rate the conversation from 1 (bad) to 5 (excellent). + Your conversations will be recorded for further use. + By starting a chat you give permission for your anonymized conversation data + to be released publicly under Apache License Version 2.0 + https://www.apache.org / licenses / LICENSE – 2.0. + +help: > + This chatbot is developed by Neural Networks and Deep Learning Lab at MIPT. + Please have a chat with it and evaluate its performance. + To begin a conversation enter /begin. To end a conversation: enter /end. + When the dialogue is finished, you will be asked to evaluate it. + You will have to rate the conversation from 1 (bad) to 5 (excellent). + Your conversations will be recorded for further use. + By starting a chat you give permission for your anonymized conversation data + to be released publicly under Apache License Version 2.0 + https://www.apache.org / licenses / LICENSE – 2.0. + +complain_success: > + Your complaint has been recorded and will be examined by the system administrator. + Note that your conversation is still active. + You can always use /end command to end it. + +complain_fail: > + Could not save your complaint. Did you /begin the dialog? + You cannot complain when there are no messages in the dialog. + +begin_success: > + Starting a dialog. To finish the dialog enter /end. + +begin_fail: > + Cannot start a new conversation. + Please finish and evaluate your current dialog first. + Use /help command for usage instructions. + +end_success: > + Dialog is finished. + Please evaluate the whole dialog using one of the buttons below: + +end_fail: > + You're not in a dialog now. + +evaluate_dialog_success: > + Thank you for participation! + +evaluate_dialog_success_reveal_id: > + Thank you for participation! + Evaluated dialog secret id: ${dialog_id} + +evaluate_dialog_fail: > + Evaluation is not allowed at the moment. + Use /help command for usage instructions. + +unexpected_message: > + Unexpected message. + You are not in a dialog yet or the dialog has already been finished. + Use /help command for usage instructions. diff --git a/deeppavlov_agent/channels/telegram/utils.py b/deeppavlov_agent/channels/telegram/utils.py new file mode 100644 index 00000000..64e54865 --- /dev/null +++ b/deeppavlov_agent/channels/telegram/utils.py @@ -0,0 +1,150 @@ +from pathlib import Path +from string import Template +from typing import Union + +from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup +from pydantic import BaseModel +from yaml import load +try: + from yaml import CLoader as Loader, CDumper as Dumper +except ImportError: + from yaml import Loader, Dumper + + +def _load_yml(path): + with open(path, "r") as yml_f: + data = load(yml_f, Loader=Loader) + return data + + +class YmlModel(BaseModel): + @classmethod + def from_yml(cls, path): + return cls.parse_obj(_load_yml(path)) + + +class DialogOptions(BaseModel): + reveal_dialog_id: bool + + +class EvaluationOptions(BaseModel): + user_must_evaluate: bool + min_score: int + max_score: int + + +class Config(YmlModel): + dialog_options: DialogOptions + evaluation_options: EvaluationOptions + + +class Messages(YmlModel): + start: str + help: str + complain_success: str + complain_fail: str + begin_success: str + begin_fail: str + end_success: str + end_fail: str + evaluate_dialog_success: str + evaluate_dialog_success_reveal_id: str + evaluate_dialog_fail: str + unexpected_message: str + + +class Keyboards(YmlModel): + dialog_inactive: list + dialog_active: list + + +class MessageResponder: + def __init__( + self, + config_path: Union[Path, str], + messages_path: Union[Path, str], + keyboards_path: Union[Path, str], + ): + self._config = Config.from_yml(config_path) + self._messages = Messages.from_yml(messages_path) + self._keyboards = Keyboards.from_yml(keyboards_path) + + @property + def config(self) -> Config: + return self._config + + def message(self, key: str, **kwargs) -> str: + """Create text from template with substituted placeholders + + Args: + key: message key + **kwargs: substitutions if the message is a template + + Returns: string with substituted placeholders + + """ + template = Template(self._messages.dict()[key]) + return template.safe_substitute(**kwargs) + + def dialog_rating_inline_keyboard( + self, dialog_id: str, chosen_rating: Union[str, int] = None + ) -> InlineKeyboardMarkup: + """Create inline keyboard with rating buttons. Min and max score are set via config. + Provide chosen_rating argument if the keyboard is edited after the conversation was rated + + Args: + dialog_id: dialog uuid + chosen_rating: this rating button will be shown with a star in front of the rating value + + Returns: instance of InlineKeyboardMarkup + + """ + reply_markup = InlineKeyboardMarkup(row_width=5) + min_range = self.config.evaluation_options.min_score + max_range = self.config.evaluation_options.max_score + 1 + + for rating in range(min_range, max_range): + btn_text = str(rating) + if chosen_rating: + if int(chosen_rating) == int(rating): + btn_text = f"⭐️{btn_text}" + btn = InlineKeyboardButton( + btn_text, callback_data=f"dialog-{dialog_id}-{rating}" + ) + reply_markup.insert(btn) + + return reply_markup + + def utterance_rating_inline_keyboard(self, utterance_id: str) -> InlineKeyboardMarkup: + """Create inline keyboard with thumbs up/down buttons + + Args: + utterance_id: utterance uuid + + Returns: instance of InlineKeyboardMarkup + + """ + reply_markup = InlineKeyboardMarkup(row_width=2) + reply_markup.insert( + InlineKeyboardButton("👍", callback_data=f"utt-{utterance_id}-like") + ) + reply_markup.insert( + InlineKeyboardButton("👎", callback_data=f"utt-{utterance_id}-dislike") + ) + + return reply_markup + + def reply_keyboard(self, state: str) -> ReplyKeyboardMarkup: + """Create keyboard with buttons + + Args: + state: pre-configured keyboard state + + Returns: reply keyboard with pre-configured buttons + + """ + buttons = self._keyboards.dict()[state] + reply_markup = ReplyKeyboardMarkup(resize_keyboard=True, selective=True) + reply_markup.add(*buttons) + + return reply_markup diff --git a/deeppavlov_agent/cmd_client.py b/deeppavlov_agent/cmd_client.py deleted file mode 100644 index e6dcf407..00000000 --- a/deeppavlov_agent/cmd_client.py +++ /dev/null @@ -1,49 +0,0 @@ -import argparse -import asyncio - -from aioconsole import ainput - -from .setup_agent import setup_agent - - -async def message_processor(register_msg): - user_id = await ainput('Provide user id: ') - while True: - msg = await ainput(f'You ({user_id}): ') - msg = msg.strip() - if msg: - response = await register_msg(utterance=msg, user_external_id=user_id, user_device_type='cmd', - location='lab', channel_type='cmd_client', - deadline_timestamp=None, require_response=True) - print('Bot: ', response['dialog'].utterances[-1].text) - - -def run_cmd(pipeline_configs, debug): - agent, session, workers = setup_agent(pipeline_configs=pipeline_configs) - loop = asyncio.get_event_loop() - loop.set_debug(debug) - future = asyncio.ensure_future(message_processor(agent.register_msg)) - for i in workers: - loop.create_task(i.call_service(agent.process)) - try: - loop.run_until_complete(future) - except KeyboardInterrupt: - pass - except Exception as e: - raise e - finally: - future.cancel() - if session: - loop.run_until_complete(session.close()) - loop.stop() - loop.close() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('-pl', '--pipeline_configs', help='Pipeline config (overwrite value, defined in settings)', - type=str, action='append') - parser.add_argument('-d', '--debug', help='run in debug mode', action='store_true') - args = parser.parse_args() - - run_cmd(args.pipeline_configs, args.debug) diff --git a/deeppavlov_agent/core/agent.py b/deeppavlov_agent/core/agent.py index a0c4877c..16d1030c 100644 --- a/deeppavlov_agent/core/agent.py +++ b/deeppavlov_agent/core/agent.py @@ -1,6 +1,9 @@ import asyncio from time import time from typing import Any +import os + +import sentry_sdk from .log import BaseResponseLogger from .pipeline import Pipeline @@ -8,6 +11,9 @@ from .workflow_manager import WorkflowManager +sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN')) + + class Agent: _response_logger: BaseResponseLogger @@ -58,9 +64,24 @@ async def process(self, task_id, response: Any = None, **kwargs): if not workflow_record: return service = task_data['service'] - + # self._response_logger._logger.info(f"Service {service.label}: {response}") self._response_logger.log_end(task_id, workflow_record, service) + if service.label in set(['last_chance_service', 'timeout_service']): + # extract services from workflow_record and group them by status + done = [k for k, v in workflow_record['services'].items() if v['done'] and not v.get('error', False)] + in_progress = [k for k, v in workflow_record['services'].items() + if not v['done'] and not v.get('error', False)] + with_errors = [k for k, v in workflow_record['services'].items() if v.get('error', False)] + with sentry_sdk.push_scope() as scope: + scope.set_extra('user_id', workflow_record['dialog'].human.external_id) + scope.set_extra('dialog_id', workflow_record['dialog'].id) + scope.set_extra('response', response) + scope.set_extra('done', done) + scope.set_extra('in_progress', in_progress) + scope.set_extra('with_errors', with_errors) + sentry_sdk.capture_message(f"{service.label} was called") + if isinstance(response, Exception): # Skip all services, which are depends on failured one for i in service.dependent_services: diff --git a/deeppavlov_agent/core/connectors.py b/deeppavlov_agent/core/connectors.py index 7620648b..181b11f3 100644 --- a/deeppavlov_agent/core/connectors.py +++ b/deeppavlov_agent/core/connectors.py @@ -1,32 +1,38 @@ import asyncio from typing import Any, Callable, Dict, List from collections import defaultdict +from logging import getLogger +import os +import sentry_sdk import aiohttp from .transport.base import ServiceGatewayConnectorBase +logger = getLogger(__name__) +sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN')) + class HTTPConnector: - def __init__(self, session: aiohttp.ClientSession, url: str): + def __init__(self, session: aiohttp.ClientSession, url: str, timeout: float): self.session = session self.url = url + self.timeout = aiohttp.ClientTimeout(total=timeout) async def send(self, payload: Dict, callback: Callable): try: - async with self.session.post(self.url, json=payload['payload']) as resp: + async with self.session.post(self.url, json=payload["payload"], timeout=self.timeout) as resp: resp.raise_for_status() response = await resp.json() - await callback( - task_id=payload['task_id'], - response=response[0] - ) + await callback(task_id=payload["task_id"], response=response[0]) except Exception as e: + with sentry_sdk.push_scope() as scope: + scope.set_extra("payload", payload) + scope.set_extra("url", self.url) + sentry_sdk.capture_exception(e) + logger.exception(Exception(e, {"payload": payload, "url": self.url})) response = e - await callback( - task_id=payload['task_id'], - response=response - ) + await callback(task_id=payload["task_id"], response=response) class AioQueueConnector: @@ -85,6 +91,8 @@ async def send(self, payload: Dict, callback: Callable): response=best_skill ) except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) await callback( task_id=payload['task_id'], response=e diff --git a/deeppavlov_agent/core/log.py b/deeppavlov_agent/core/log.py index 03d805e5..e6cff1f8 100644 --- a/deeppavlov_agent/core/log.py +++ b/deeppavlov_agent/core/log.py @@ -72,9 +72,10 @@ def __init__(self, enabled: bool, cleanup_timedelta: int = 300) -> None: self._logger.addHandler(fh) def _log(self, time: datetime, task_id: str, workflow_record: dict, service: Service, status: str) -> None: - service_name = service.name - dialog_id = workflow_record['dialog'].id - self._logger.info(f"{time.strftime('%Y-%m-%d %H:%M:%S.%f')}\t{dialog_id}\t{task_id}\t{status}\t{service_name}") + # service_name = service.name + # dialog_id = workflow_record['dialog'].id + # self._logger.info(f"{time.strftime('%Y-%m-%d %H:%M:%S.%f')}\t{dialog_id}\t{task_id}\t{status}\t{service_name}") + pass def _cleanup(self, time): time_threshold = time - self._timedelta diff --git a/deeppavlov_agent/core/state_manager.py b/deeppavlov_agent/core/state_manager.py index 9aad0d7e..b6f3f96f 100644 --- a/deeppavlov_agent/core/state_manager.py +++ b/deeppavlov_agent/core/state_manager.py @@ -1,3 +1,4 @@ +from copy import deepcopy from typing import Dict from datetime import datetime @@ -41,7 +42,9 @@ async def add_hypothesis_annotation_batch(self, dialog: Dialog, payload: Dict, l dialog.utterances[-1].hypotheses[i]['annotations'][label] = {} else: for i in range(len(payload["batch"])): - dialog.utterances[-1].hypotheses[i]['annotations'][label] = payload["batch"][i] + new_val = deepcopy(dialog.utterances[-1].hypotheses[i]) + new_val['annotations'][label] = payload["batch"][i] + dialog.utterances[-1].hypotheses[i] = new_val async def add_text(self, dialog: Dialog, payload: str, label: str, **kwargs): dialog.utterances[-1].text = payload @@ -124,9 +127,15 @@ async def get_dialogs_by_user_ext_id(self, user_external_id): async def get_all_dialogs(self): return await Dialog.get_all(self._db) + async def get_active_dialog(self, user_external_id): + user = await Human.get_or_create(self._db, user_external_id) + dialog_id = await Dialog.get_active(self._db, user._id) + return dialog_id + async def drop_active_dialog(self, user_external_id): user = await Human.get_or_create(self._db, user_external_id) - await Dialog.drop_active(self._db, user._id) + dialog_id = await Dialog.drop_active(self._db, user._id) + return dialog_id async def set_rating_dialog(self, user_external_id, dialog_id, rating): dialog = await Dialog.get_by_dialog_id(self._db, dialog_id, False) @@ -134,7 +143,9 @@ async def set_rating_dialog(self, user_external_id, dialog_id, rating): return False if 'ratings' not in dialog.attributes: dialog.attributes['ratings'] = [] - dialog.attributes['ratings'].append({'rating': rating, 'user_external_id': user_external_id, 'datetime': datetime.now()}) + dialog.attributes['ratings'].append( + {'rating': rating, 'user_external_id': user_external_id, 'datetime': datetime.now()} + ) await dialog.save(self._db) async def set_rating_utterance(self, user_external_id, utt_id, rating): @@ -143,10 +154,12 @@ async def set_rating_utterance(self, user_external_id, utt_id, rating): return False if 'ratings' not in utt.attributes: utt.attributes['ratings'] = [] - utt.attributes['ratings'].append({'rating': rating, 'user_external_id': user_external_id, 'datetime': datetime.now()}) - await utt.save(self._db) + utt.attributes['ratings'].append( + {'rating': rating, 'user_external_id': user_external_id, 'datetime': datetime.now()} + ) + await utt.save(self._db, force_encode_date=False) - async def drop_and_rating_active_dialog(self, user_external_id, rating): + async def drop_and_rate_active_dialog(self, user_external_id, rating): user = await Human.get_or_create(self._db, user_external_id) await Dialog.set_rating_drop_active(self._db, user._id, rating) @@ -158,3 +171,18 @@ async def prepare_db(self): async def get_channels(self): return await Dialog.get_channels(self._db) + + +class ExtendedStateManager(StateManager): + async def update_attributes(self, dialog, payload, label: str, **kwargs): + if isinstance(payload.get("human_attributes"), dict): + await self.update_human(dialog.human, payload) + if isinstance(payload.get("bot_attributes"), dict): + await self.update_bot(dialog.bot, payload) + + async def add_annotation_and_reset_human_attributes_for_first_turn( + self, dialog: Dialog, payload: Dict, label: str, **kwargs + ): + dialog.utterances[-1].annotations[label] = payload + if len(dialog.utterances) == 1: + dialog.human.attributes = {"disliked_skills": dialog.human.attributes.get("disliked_skills", [])} diff --git a/deeppavlov_agent/core/state_schema.py b/deeppavlov_agent/core/state_schema.py index 71b00774..fcacc585 100644 --- a/deeppavlov_agent/core/state_schema.py +++ b/deeppavlov_agent/core/state_schema.py @@ -1,3 +1,4 @@ +import logging import uuid from hashlib import md5 from collections import defaultdict @@ -49,8 +50,11 @@ async def prepare_collection(cls, db): await db[cls.collection_name].create_index('date_time') await db[cls.collection_name].create_index('utt_id') - def to_dict(self): - dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default)) + def to_dict(self, force_encode_date=True): + if force_encode_date: + dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default)) + else: + dumped_attrs = self.attributes return { 'utt_id': self.utt_id, 'text': self.text, @@ -61,8 +65,8 @@ def to_dict(self): 'attributes': dumped_attrs } - async def save(self, db): - data = self.to_dict() + async def save(self, db, force_encode_date=True): + data = self.to_dict(force_encode_date) data['date_time'] = self.date_time data['_dialog_id'] = self._dialog_id data['_in_dialog_id'] = self._in_dialog_id @@ -125,8 +129,11 @@ async def prepare_collection(cls, db): await db[cls.collection_name].create_index('date_time') await db[cls.collection_name].create_index('utt_id') - def to_dict(self): - dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default)) + def to_dict(self, force_encode_date=True): + if force_encode_date: + dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default)) + else: + dumped_attrs = self.attributes return { 'utt_id': self.utt_id, 'text': self.text, @@ -139,8 +146,8 @@ def to_dict(self): 'attributes': dumped_attrs } - async def save(self, db): - data = self.to_dict() + async def save(self, db, force_encode_date=True): + data = self.to_dict(force_encode_date) data['date_time'] = self.date_time data['_dialog_id'] = self._dialog_id data['_in_dialog_id'] = self._in_dialog_id @@ -337,23 +344,35 @@ async def get_by_dialog_id(cls, db, dialog_id, full=False): return dialog_obj return None + @classmethod + async def get_active(cls, db, human_id): + dialog = await db[cls.collection_name].find_one({'_human_id': human_id, '_active': True}) + if dialog: + return dialog["dialog_id"] + @classmethod async def drop_active(cls, db, human_id): dialog = await db[cls.collection_name].find_one({'_human_id': human_id, '_active': True}) if dialog: await db[cls.collection_name].update_one({'_id': dialog['_id']}, {'$set': {'_active': False}}) + return dialog["dialog_id"] @classmethod async def set_rating_drop_active(cls, db, human_id, rating=None): dialog = await db[cls.collection_name].find_one({'_human_id': human_id, '_active': True}) - attributes = dialog.attributes + attributes = dialog["attributes"] if rating: if 'ratings' not in attributes: attributes['ratings'] = [] - attributes['ratings'].append({'rating': rating, 'human_id': human_id, 'datetime': datetime.now()}) - + attributes['ratings'].append( + {'rating': rating, 'human_id': human_id, 'datetime': datetime.now()} + ) + if dialog: - await db[cls.collection_name].update_one({'_id': dialog['_id']}, {'$set': {'_active': False, 'attributes': attributes}}) + await db[cls.collection_name].update_one( + {'_id': dialog['_id']}, {'$set': {'_active': False, 'attributes': attributes}} + ) + return dialog["dialog_id"] @classmethod async def get_or_create_by_ext_id(cls, db, external_id, channel_type): @@ -407,7 +426,7 @@ async def save(self, db, force=False): if utt.actual and not force: break utt._dialog_id = self._id - await utt.save(db) + await utt.save(db, force_encode_date=False) class Human: diff --git a/deeppavlov_agent/core/telegram_client.py b/deeppavlov_agent/core/telegram_client.py deleted file mode 100644 index 4c26d094..00000000 --- a/deeppavlov_agent/core/telegram_client.py +++ /dev/null @@ -1,31 +0,0 @@ -import asyncio - -from aiogram import Bot -from aiogram.dispatcher import Dispatcher -from aiogram.utils import executor - - -class TelegramMessageProcessor: - def __init__(self, register_msg): - self.register_msg = register_msg - - async def handle_message(self, message): - response = await self.register_msg( - utterance=message.text, - user_external_id=str(message.from_user.id), - user_device_type='telegram', - date_time=message.date, location='', channel_type='telegram', - require_response=True - ) - await message.answer(response['dialog'].utterances[-1].text) - - -def run_tg(token, proxy, agent): - loop = asyncio.get_event_loop() - bot = Bot(token=token, loop=loop, proxy=proxy) - dp = Dispatcher(bot) - tg_msg_processor = TelegramMessageProcessor(agent.register_msg) - - dp.message_handler()(tg_msg_processor.handle_message) - - executor.start_polling(dp, skip_updates=True) diff --git a/deeppavlov_agent/core/transport/gateways/rabbitmq.py b/deeppavlov_agent/core/transport/gateways/rabbitmq.py index 0d5870a3..8bd57644 100644 --- a/deeppavlov_agent/core/transport/gateways/rabbitmq.py +++ b/deeppavlov_agent/core/transport/gateways/rabbitmq.py @@ -3,7 +3,9 @@ import time from logging import getLogger from typing import Dict, List, Optional, Callable +import os +import sentry_sdk import aio_pika from aio_pika import Connection, Channel, Exchange, Queue, IncomingMessage, Message @@ -24,6 +26,7 @@ logger = getLogger(__name__) +sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN')) # TODO: add proper RabbitMQ SSL authentication class RabbitMQTransportBase: @@ -61,7 +64,8 @@ async def _connect(self) -> None: logger.info('RabbitMQ connected') break - except ConnectionError: + except ConnectionError as e: + sentry_sdk.capture_exception(e) reconnect_timeout = 5 logger.error(f'RabbitMQ connection error, making another attempt in {reconnect_timeout} secs') time.sleep(reconnect_timeout) @@ -241,6 +245,9 @@ async def _on_message_callback(self, message: IncomingMessage) -> None: elif self._add_to_buffer_lock.locked(): self._add_to_buffer_lock.release() + except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) finally: self._infer_lock.release() @@ -264,7 +271,9 @@ async def _process_tasks(self, tasks_batch: List[ServiceTaskMessage]) -> bool: await asyncio.gather(*results_replies) logger.debug(f'Processed tasks {str(task_uuids_batch)}') return True - except asyncio.TimeoutError: + except asyncio.TimeoutError as e: + sentry_sdk.capture_exception(e) + logger.exception(e) return False async def _send_results(self, task: ServiceTaskMessage, response: Dict) -> None: diff --git a/deeppavlov_agent/http_api/__init__.py b/deeppavlov_agent/http_api/__init__.py index f1ad184f..a09a266f 100644 --- a/deeppavlov_agent/http_api/__init__.py +++ b/deeppavlov_agent/http_api/__init__.py @@ -2,27 +2,27 @@ from .api import init_app -from ..settings import ( - TIME_LIMIT, OUTPUT_FORMATTER, DEBUG_OUTPUT_FORMATTER, DEBUG, RESPONSE_LOGGER, CORS -) - from ..setup_agent import setup_agent -from ..core.log import LocalResponseLogger +from ..utils import config_tools -def app_factory(pipeline_configs=None, debug=None, response_time_limit=None, cors=None): - agent, session, workers = setup_agent(pipeline_configs) +def app_factory(agent_config): + agent, session, workers = setup_agent(agent_config) response_logger = agent._response_logger - if DEBUG: - output_formatter = DEBUG_OUTPUT_FORMATTER + if agent_config.debug: + output_formatter_qualname = agent_config.debug_output_formatter else: - output_formatter = OUTPUT_FORMATTER + output_formatter_qualname = agent_config.output_formatter app = init_app( - agent=agent, session=session, consumers=workers, - logger_stats=response_logger, output_formatter=output_formatter, - debug=debug or DEBUG, response_time_limit=response_time_limit or TIME_LIMIT, - cors=CORS if cors is None else cors + agent=agent, + session=session, + consumers=workers, + logger_stats=response_logger, + output_formatter=config_tools.import_class(output_formatter_qualname), + debug=agent_config.debug, + response_time_limit=agent_config.response_time_limit, + cors=agent_config.cors, ) return app diff --git a/deeppavlov_agent/parse_config.py b/deeppavlov_agent/parse_config.py index c431413d..bb0953c7 100644 --- a/deeppavlov_agent/parse_config.py +++ b/deeppavlov_agent/parse_config.py @@ -94,7 +94,7 @@ def make_connector(self, name: str, data: Dict): for url in urllist: workers.append(QueueListenerBatchifyer(self.get_session(), url, queue, batch_size)) else: - connector = HTTPConnector(self.get_session(), data['url']) + connector = HTTPConnector(self.get_session(), data['url'], timeout=data.get("timeout", 0)) elif data['protocol'] == 'AMQP': gateway = self.get_gateway() diff --git a/deeppavlov_agent/run.py b/deeppavlov_agent/run.py index 3725c02f..1496134d 100644 --- a/deeppavlov_agent/run.py +++ b/deeppavlov_agent/run.py @@ -1,30 +1,32 @@ -import argparse +import logging -from .cmd_client import run_cmd +import hydra +from omegaconf import DictConfig + +from .run_cmd import run_cmd from .run_http import run_http from .run_tg import run_telegram -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('-pl', '--pipeline_configs', help='Pipeline config (overwrite value, defined in settings)', - type=str, action='append') - parser.add_argument("-ch", "--channel", help="run agent in telegram, cmd_client or http_client", type=str, - choices=['cmd_client', 'http_client', 'telegram'], default='cmd_client') - parser.add_argument('-p', '--port', help='port for http client, default 4242', default=4242) - parser.add_argument('-c', '--cors', help='whether to add CORS middleware to http_client', - action='store_true', default=None) - parser.add_argument('-d', '--debug', help='run in debug mode', action='store_true') - parser.add_argument('-tl', '--time_limit', help='response time limit, 0 = no limit', type=int, default=0) - args = parser.parse_args() - - if args.channel == 'cmd_client': - run_cmd(args.pipeline_configs, args.debug) - elif args.channel == 'http_client': - run_http(args.port, args.pipeline_configs, args.debug, args.time_limit, args.cors) - elif args.channel == 'telegram': - run_telegram(args.pipeline_configs) - - -if __name__ == '__main__': +logger = logging.getLogger(__name__) + +CHANNELS = { + "cmd": run_cmd, + "http": run_http, + "telegram": run_telegram, +} + + +@hydra.main(config_path=".", config_name="settings") +def main(cfg: DictConfig): + try: + run_channel = CHANNELS[cfg.agent.channel] + run_channel(cfg) + except KeyError: + logger.error( + f"agent.channel value must be one of: {', '.join(CHANNELS.keys())} (not {cfg.agent.channel})" + ) + + +if __name__ == "__main__": main() diff --git a/deeppavlov_agent/run_cmd.py b/deeppavlov_agent/run_cmd.py new file mode 100644 index 00000000..a9c0698e --- /dev/null +++ b/deeppavlov_agent/run_cmd.py @@ -0,0 +1,54 @@ +import asyncio +import os +from logging import getLogger + +import sentry_sdk +from aioconsole import ainput +from omegaconf import DictConfig + +from .setup_agent import setup_agent + +logger = getLogger(__name__) + +sentry_sdk.init(os.getenv("DP_AGENT_SENTRY_DSN")) + + +async def message_processor(register_msg): + user_id = await ainput("Provide user id: ") + while True: + msg = await ainput(f"You ({user_id}): ") + msg = msg.strip() + if msg: + response = await register_msg( + utterance=msg, + user_external_id=user_id, + user_device_type="cmd", + location="lab", + channel_type="cmd_client", + deadline_timestamp=None, + require_response=True, + ) + print("Bot: ", response["dialog"].utterances[-1].text) + + +def run_cmd(cfg: DictConfig): + agent, session, workers = setup_agent(cfg.agent) + loop = asyncio.get_event_loop() + loop.set_debug(cfg.agent.debug) + future = asyncio.ensure_future(message_processor(agent.register_msg)) + for i in workers: + loop.create_task(i.call_service(agent.process)) + try: + loop.run_until_complete(future) + except KeyboardInterrupt: + pass + except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) + raise e + finally: + future.cancel() + if session: + loop.run_until_complete(session.close()) + loop.stop() + loop.close() diff --git a/deeppavlov_agent/run_http.py b/deeppavlov_agent/run_http.py index 35fa73ec..3e518459 100644 --- a/deeppavlov_agent/run_http.py +++ b/deeppavlov_agent/run_http.py @@ -1,24 +1,21 @@ -import argparse +import os +from logging import getLogger +import sentry_sdk from aiohttp import web +from omegaconf import DictConfig from .http_api import app_factory -from .settings import PORT +logger = getLogger(__name__) +sentry_sdk.init(os.getenv("DP_AGENT_SENTRY_DSN")) -def run_http(port, pipeline_configs=None, debug=None, time_limit=None, cors=None): - app = app_factory(pipeline_configs=pipeline_configs, debug=debug, response_time_limit=time_limit, cors=cors) - web.run_app(app, port=port) - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('-p', '--port', help=f'port for http client, default {PORT}', type=int) - parser.add_argument('-pl', '--pipeline_configs', help='Pipeline config (overwrite value, defined in settings)', - type=str, action='append') - parser.add_argument('-d', '--debug', help='run in debug mode', action='store_true') - parser.add_argument('-tl', '--time_limit', help='response time limit, 0 = no limit', type=int) - args = parser.parse_args() - - port = args.port or PORT - run_http(port, args.pipeline_configs, args.debug, args.time_limit) +def run_http(cfg: DictConfig): + try: + app = app_factory(cfg.agent) + web.run_app(app, port=cfg.agent.port) + except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) + raise e diff --git a/deeppavlov_agent/run_tg.py b/deeppavlov_agent/run_tg.py index e284582e..a89ba6df 100644 --- a/deeppavlov_agent/run_tg.py +++ b/deeppavlov_agent/run_tg.py @@ -1,24 +1,24 @@ -import argparse +import os +from logging import getLogger -from .settings import TELEGRAM_TOKEN, TELEGRAM_PROXY -from .core.telegram_client import run_tg +import sentry_sdk +from omegaconf import DictConfig + +from .channels.telegram.bot import run_tg from .setup_agent import setup_agent +logger = getLogger(__name__) +sentry_sdk.init(os.getenv("DP_AGENT_SENTRY_DSN")) + -def run_telegram(pipeline_configs=None): - agent, session, workers = setup_agent(pipeline_configs) +def run_telegram(cfg: DictConfig): + agent, session, workers = setup_agent(cfg.agent) try: - run_tg(TELEGRAM_TOKEN, TELEGRAM_PROXY, agent) + run_tg(cfg.agent.telegram_token, cfg.agent.telegram_proxy, agent) + except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) finally: session.close() for i in workers: i.cancel() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('-pl', '--pipeline_config', help='Pipeline config (overwrite value, defined in settings)', - type=str, action='append') - args = parser.parse_args() - - run_telegram(args.pipeline_config) diff --git a/deeppavlov_agent/settings.py b/deeppavlov_agent/settings.py index 41486751..8bfcaa60 100644 --- a/deeppavlov_agent/settings.py +++ b/deeppavlov_agent/settings.py @@ -1,32 +1,33 @@ import logging from importlib import import_module +from typing import Dict from .core.db import DataBase -from .core.state_manager import StateManager +from .core.state_manager import StateManager, Dialog from .core.workflow_manager import WorkflowManager -from .state_formatters.output_formatters import (http_api_output_formatter, - http_debug_output_formatter) +from .state_formatters.output_formatters import http_api_output_formatter, http_debug_output_formatter + # Default parameters BASE_PARAMETERS = { - 'debug': True, - 'state_manager_class': StateManager, - 'workflow_manager_class': WorkflowManager, - 'db_class': DataBase, - 'pipeline_config': 'pipeline_conf.json', - 'db_config': 'db_conf.json', - 'overwrite_last_chance': None, - 'overwrite_timeout': None, - 'formatters_module': None, - 'connectors_module': None, - 'response_logger': True, - 'time_limit': 0, - 'output_formatter': http_api_output_formatter, - 'debug_output_formatter': http_debug_output_formatter, - 'port': 4242, - 'cors': False, - 'telegram_token': '', - 'telegram_proxy': '' + "debug": True, + "state_manager_class": StateManager, + "workflow_manager_class": WorkflowManager, + "db_class": DataBase, + "pipeline_config": "pipeline_conf.json", + "db_config": "db_conf.json", + "overwrite_last_chance": None, + "overwrite_timeout": None, + "formatters_module": None, + "connectors_module": None, + "response_logger": True, + "time_limit": 0, + "output_formatter": http_api_output_formatter, + "debug_output_formatter": http_debug_output_formatter, + "port": 4242, + "cors": False, + "telegram_token": "", + "telegram_proxy": "", } @@ -42,36 +43,52 @@ def setup_parameter(name, user_settings): user_settings = None try: - user_settings = import_module('dp_agent_settings') + user_settings = import_module("dp_agent_settings") except ModuleNotFoundError: - logging.info('settings.py was not found. Default settings are used') + logging.info("settings.py was not found. Default settings are used") # Set up common parameters -DEBUG = setup_parameter('debug', user_settings) +DEBUG = setup_parameter("debug", user_settings) + + +class ExtendedStateManager(StateManager): + async def update_attributes(self, dialog, payload, label: str, **kwargs): + if isinstance(payload.get("human_attributes"), dict): + await self.update_human(dialog.human, payload) + if isinstance(payload.get("bot_attributes"), dict): + await self.update_bot(dialog.bot, payload) + + async def add_annotation_and_reset_human_attributes_for_first_turn( + self, dialog: Dialog, payload: Dict, label: str, **kwargs + ): + dialog.utterances[-1].annotations[label] = payload + if len(dialog.utterances) == 1: + dialog.human.attributes = {"disliked_skills": dialog.human.attributes.get("disliked_skills", [])} + # Basic agent configuration parameters (some are currently unavailable) -STATE_MANAGER_CLASS = StateManager +STATE_MANAGER_CLASS = ExtendedStateManager WORKFLOW_MANAGER_CLASS = WorkflowManager DB_CLASS = DataBase -PIPELINE_CONFIG = setup_parameter('pipeline_config', user_settings) -DB_CONFIG = setup_parameter('db_config', user_settings) +PIPELINE_CONFIG = setup_parameter("pipeline_config", user_settings) +DB_CONFIG = setup_parameter("db_config", user_settings) -OVERWRITE_LAST_CHANCE = setup_parameter('overwrite_last_chance', user_settings) -OVERWRITE_TIMEOUT = setup_parameter('overwrite_timeout', user_settings) +OVERWRITE_LAST_CHANCE = setup_parameter("overwrite_last_chance", user_settings) +OVERWRITE_TIMEOUT = setup_parameter("overwrite_timeout", user_settings) -RESPONSE_LOGGER = setup_parameter('response_logger', user_settings) +RESPONSE_LOGGER = setup_parameter("response_logger", user_settings) # HTTP app configuraion parameters -TIME_LIMIT = setup_parameter('time_limit', user_settings) # Without engaging the timeout by default -CORS = setup_parameter('cors', user_settings) +TIME_LIMIT = setup_parameter("time_limit", user_settings) # Without engaging the timeout by default +CORS = setup_parameter("cors", user_settings) -OUTPUT_FORMATTER = setup_parameter('output_formatter', user_settings) -DEBUG_OUTPUT_FORMATTER = setup_parameter('debug_output_formatter', user_settings) +OUTPUT_FORMATTER = setup_parameter("output_formatter", user_settings) +DEBUG_OUTPUT_FORMATTER = setup_parameter("debug_output_formatter", user_settings) # HTTP api run parameters -PORT = setup_parameter('port', user_settings) +PORT = setup_parameter("port", user_settings) # Telegram client configuration parameters -TELEGRAM_TOKEN = setup_parameter('telegram_token', user_settings) -TELEGRAM_PROXY = setup_parameter('telegram_proxy', user_settings) +TELEGRAM_TOKEN = setup_parameter("telegram_token", user_settings) +TELEGRAM_PROXY = setup_parameter("telegram_proxy", user_settings) diff --git a/deeppavlov_agent/settings.yaml b/deeppavlov_agent/settings.yaml new file mode 100644 index 00000000..5a403a88 --- /dev/null +++ b/deeppavlov_agent/settings.yaml @@ -0,0 +1,30 @@ +agent: + channel: http + debug: true + state_manager_class: deeppavlov_agent.core.state_manager.ExtendedStateManager + workflow_manager_class: deeppavlov_agent.core.workflow_manager.WorkflowManager + db_class: deeppavlov_agent.core.db.DataBase + pipeline_config: pipeline_conf.json + db_config: db_conf.json + overwrite_last_chance: null + overwrite_timeout: null + formatters_module: null + connectors_module: null + enable_response_logger: true + response_time_limit: 0 + output_formatter: deeppavlov_agent.state_formatters.output_formatters.http_api_output_formatter + debug_output_formatter: deeppavlov_agent.state_formatters.output_formatters.http_debug_output_formatter + port: 4242 + cors: false + telegram_token: "" + telegram_proxy: "" + +defaults: + - _self_ + - override hydra/hydra_logging: disabled + - override hydra/job_logging: stdout + +hydra: + run: + dir: . + output_subdir: null diff --git a/deeppavlov_agent/setup_agent.py b/deeppavlov_agent/setup_agent.py index cfdc8106..ce405937 100644 --- a/deeppavlov_agent/setup_agent.py +++ b/deeppavlov_agent/setup_agent.py @@ -1,18 +1,16 @@ import json +import logging import os import yaml -from .settings import (DB_CLASS, DB_CONFIG, OVERWRITE_LAST_CHANCE, - OVERWRITE_TIMEOUT, PIPELINE_CONFIG, - RESPONSE_LOGGER, STATE_MANAGER_CLASS, - WORKFLOW_MANAGER_CLASS) from .core.agent import Agent from .core.connectors import EventSetOutputConnector from .core.log import LocalResponseLogger from .core.pipeline import Pipeline from .core.service import Service from .parse_config import PipelineConfigParser +from .utils import config_tools def merge_two_configs(d1, d2): @@ -26,60 +24,74 @@ def merge_two_configs(d1, d2): d1[k] = v -def setup_agent(pipeline_configs=None): - with open(DB_CONFIG, 'r') as db_config: - if DB_CONFIG.endswith('.json'): +def setup_agent(agent_config): + with open(agent_config.db_config, "r") as db_config: + if agent_config.db_config.endswith(".json"): db_data = json.load(db_config) - elif DB_CONFIG.endswith('.yml'): + elif agent_config.db_config.endswith(".yml"): db_data = yaml.load(db_config, Loader=yaml.FullLoader) else: - raise ValueError(f'unknown format for db_config file: {DB_CONFIG}') + raise ValueError( + f"unknown format for db_config file: {agent_config.db_config}" + ) - if db_data.pop('env', False): + if db_data.pop("env", False): for k, v in db_data.items(): db_data[k] = os.getenv(v) - db = DB_CLASS(**db_data) - - sm = STATE_MANAGER_CLASS(db.get_db()) - if pipeline_configs: - pipeline_data = {} - for name in pipeline_configs: - with open(name, 'r') as pipeline_config: - if name.endswith('.json'): - merge_two_configs(pipeline_data, json.load(pipeline_config)) - elif name.endswith('.yml'): - merge_two_configs(pipeline_data, yaml.load(pipeline_config, Loader=yaml.FullLoader)) - else: - raise ValueError(f'unknown format for pipeline_config file from command line: {name}') - - else: - with open(PIPELINE_CONFIG, 'r') as pipeline_config: - if PIPELINE_CONFIG.endswith('.json'): - pipeline_data = json.load(pipeline_config) - elif PIPELINE_CONFIG.endswith('.yml'): - pipeline_data = yaml.load(pipeline_config, Loader=yaml.FullLoader) - else: - raise ValueError(f'unknown format for pipeline_config file from setitngs: {PIPELINE_CONFIG}') + db_class = config_tools.import_class(agent_config.db_class) + db = db_class(**db_data) + + sm_class = config_tools.import_class(agent_config.state_manager_class) + sm = sm_class(db.get_db()) + # if pipeline_configs: + # pipeline_data = {} + # for name in pipeline_configs: + # with open(name, 'r') as pipeline_config: + # if name.endswith('.json'): + # merge_two_configs(pipeline_data, json.load(pipeline_config)) + # elif name.endswith('.yml'): + # merge_two_configs(pipeline_data, yaml.load(pipeline_config, Loader=yaml.FullLoader)) + # else: + # raise ValueError(f'unknown format for pipeline_config file from command line: {name}') + # + # else: + with open(agent_config.pipeline_config, "r") as pipeline_config_f: + if agent_config.pipeline_config.endswith(".json"): + pipeline_data = json.load(pipeline_config_f) + elif agent_config.pipeline_config.endswith(".yml"): + pipeline_data = yaml.load(pipeline_config_f, Loader=yaml.FullLoader) + else: + raise ValueError( + f"unknown format for pipeline_config file from setitngs: {agent_config.pipeline_config}" + ) pipeline_config = PipelineConfigParser(sm, pipeline_data) - input_srv = Service('input', None, sm.add_human_utterance, 1, ['input']) - responder_srv = Service('responder', EventSetOutputConnector('responder').send, - sm.save_dialog, 1, ['responder']) + input_srv = Service("input", None, sm.add_human_utterance, 1, ["input"]) + responder_srv = Service( + "responder", + EventSetOutputConnector("responder").send, + sm.save_dialog, + 1, + ["responder"], + ) last_chance_srv = None - if not OVERWRITE_LAST_CHANCE: + if not agent_config.overwrite_last_chance: last_chance_srv = pipeline_config.last_chance_service timeout_srv = None - if not OVERWRITE_TIMEOUT: + if not agent_config.overwrite_timeout: timeout_srv = pipeline_config.timeout_service - pipeline = Pipeline(pipeline_config.services, input_srv, responder_srv, last_chance_srv, timeout_srv) + pipeline = Pipeline( + pipeline_config.services, input_srv, responder_srv, last_chance_srv, timeout_srv + ) - response_logger = LocalResponseLogger(RESPONSE_LOGGER) + response_logger = LocalResponseLogger(agent_config.enable_response_logger) - agent = Agent(pipeline, sm, WORKFLOW_MANAGER_CLASS(), response_logger=response_logger) + wf_class = config_tools.import_class(agent_config.workflow_manager_class) + agent = Agent(pipeline, sm, wf_class(), response_logger=response_logger) if pipeline_config.gateway: pipeline_config.gateway.on_channel_callback = agent.register_msg pipeline_config.gateway.on_service_callback = agent.process diff --git a/deeppavlov_agent/utils/__init__.py b/deeppavlov_agent/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deeppavlov_agent/utils/config_tools.py b/deeppavlov_agent/utils/config_tools.py new file mode 100644 index 00000000..dea790d7 --- /dev/null +++ b/deeppavlov_agent/utils/config_tools.py @@ -0,0 +1,16 @@ +import importlib + + +def import_class(qualname: str): + """Dynamically imports class from a qualified name + + Args: + qualname: fully qualified class name including package name, + e.g. ``core.state_manager.StateManager`` + + Returns: + imported class + """ + module_name, class_name = qualname.rsplit(".", maxsplit=1) + klass = getattr(importlib.import_module(module_name), class_name) + return klass diff --git a/deeppavlov_agent/utils/http_api_stress_test.py b/deeppavlov_agent/utils/http_api_stress_test.py index 79713c7b..eff0fb20 100644 --- a/deeppavlov_agent/utils/http_api_stress_test.py +++ b/deeppavlov_agent/utils/http_api_stress_test.py @@ -3,9 +3,16 @@ import uuid from statistics import mean, median from time import time +import os +from logging import getLogger +import sentry_sdk import aiohttp +logger = getLogger(__name__) + +sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN')) + parser = argparse.ArgumentParser() parser.add_argument('-u', '--url', type=str) parser.add_argument('-pf', '--phrasesfile', help='name of the file with phrases for dialog', type=str, default="") @@ -19,6 +26,8 @@ with open(args.phrasesfile, 'r') as file: payloads = [line.rstrip('\n') for line in file] except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) raise e diff --git a/deeppavlov_agent/utils/http_api_test.py b/deeppavlov_agent/utils/http_api_test.py index 373565c6..d458818f 100644 --- a/deeppavlov_agent/utils/http_api_test.py +++ b/deeppavlov_agent/utils/http_api_test.py @@ -6,6 +6,9 @@ from time import time from random import randrange import uuid +import os + +import sentry_sdk from tqdm import tqdm ''' @@ -19,6 +22,8 @@ structure of phrase file (-pf) simple text file. One phrase per line ''' +sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN')) + parser = argparse.ArgumentParser() parser.add_argument('-u', '--url', type=str) parser.add_argument('-uc', '--usercount', help='count of test users, which will send the message', @@ -37,12 +42,14 @@ with open(args.dialogfile, 'r') as file: payloads = json.load(file) except Exception as e: + sentry_sdk.capture_exception(e) raise e elif args.phrasesfile: try: with open(args.phrasesfile, 'r') as file: phrases = [line.rstrip('\n') for line in file] except Exception as e: + sentry_sdk.capture_exception(e) raise e payloads = {uuid.uuid4().hex: [phrases[randrange(len(phrases))] for j in range(args.phrasecount)] for i in range(args.usercount)} diff --git a/docs/source/intro/overview.rst b/docs/source/intro/overview.rst index 44a02e21..bcf74b6a 100644 --- a/docs/source/intro/overview.rst +++ b/docs/source/intro/overview.rst @@ -62,9 +62,9 @@ To launch the agent enter: .. code:: bash - python -m deeppavlov_agent.run -ch http_client -p 4242 -pl pipeline_conf.json -db db_conf.json -rl -d + python -m deeppavlov_agent.run_http -Command parameters are: +Command parameters are set via deepavlov_agent/settings.yaml: * -ch - output channel for agent. Could be either ``http_client`` or ``cmd_client`` * -p - port for http_client, default value is 4242 @@ -73,6 +73,7 @@ Command parameters are: * -rl - include response logger * -d - launch in debug mode (additional data in http output) +The agent can send information about exceptions to `Sentry `__ using setted environment variable ``DP_AGENT_SENTRY_DSN``. **HTTP api server** ------------------- diff --git a/requirements.txt b/requirements.txt index 3c825ffc..81e358e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,9 @@ pyyaml==5.1 aio-pika==5.6.0 motor==2.0.0 tqdm==4.36.1 -pymongo==3.10.1 \ No newline at end of file +pymongo==3.10.1 +sentry-sdk==0.19.5 +pydantic==1.8.2 +hydra-core==1.1.1 +jinja2==3.0.3 +Werkzeug==2.0.3 diff --git a/setup.py b/setup.py index e19d6339..0359706e 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ def read_requirements(): setuptools.setup( name='deeppavlov_agent', - version='2.1.1', + version='2.2.0', include_package_data=True, description='An open source library, allowing you to create data processing systems based on a sequence graph, ' 'alongside with saving sample processing results in database.', @@ -34,6 +34,7 @@ def read_requirements(): keywords=['chatbots', 'microservices', 'dialog systems', 'NLP'], packages=setuptools.find_packages(exclude=('docs',)), python_requires='>=3.7', + data_files=[('.', ['deeppavlov_agent/settings.yaml'])], url="https://github.com/deepmipt/dp-agent", **read_requirements() )