diff --git a/requirements.txt b/requirements.txt index d24d88d..cde41ee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,4 @@ python-magic ipdb ipython configargparse -https://github.com/LonamiWebs/Telethon/archive/sync-stale.zip +Telethon diff --git a/telegramircd.py b/telegramircd.py index 581a6ad..f671ebd 100755 --- a/telegramircd.py +++ b/telegramircd.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone from itertools import chain import subprocess +from typing import Optional, Dict, Tuple, Any from telethon import TelegramClient from telethon.errors import SessionPasswordNeededError @@ -14,14 +15,14 @@ import telethon.tl.functions.contacts import telethon.tl.functions.messages -import aiohttp.web, asyncio, base64, inspect, json, logging.handlers, magic, os, pprint, random, re, \ - shlex, signal, socket, ssl, string, sys, tempfile, time, traceback, uuid, weakref +import aiohttp.web, asyncio, base64, inspect, logging.handlers, magic, os, pprint, re, \ + signal, socket, ssl, string, sys, tempfile, time, traceback, weakref logger = logging.getLogger('telegramircd') im_name = 'Telegram' capabilities = set(['away-notify', 'draft/message-tags', 'echo-message', 'multi-prefix', 'sasl', 'server-time']) # http://ircv3.net/irc/ -options = None -server = None +options: Optional[Namespace] = None +server: Optional["Server"] = None web = None @@ -58,19 +59,20 @@ def __init__(self, *msg): ### HTTP server class Web(object): - def __init__(self, tls): + def __init__(self, tls) -> None: global web web = self self.tls = tls - self.id2media = {} - self.id2message = {} - self.webpage_id2sender_to = {} - self.recent_messages = deque() - self.proc = None + self.id2media: Dict[str, Tuple[Any, Any]] = {} + self.id2message: Dict[str, str] = {} + self.webpage_id2sender_to: Dict[str, Tuple[Any, Any]] = {} + self.recent_messages: deque[str] = deque() + self.proc: Optional[TelegramClient] = None self.authorized = False self.two_step = False - async def handle_media(self, typ, request): + async def handle_media(self, typ, request) -> aiohttp.web.Response: + assert options is not None and self.proc is not None id = re.sub(r'\..*', '', request.match_info.get('id')) if id not in self.id2media: return aiohttp.web.Response(status=404, text='Not Found') @@ -80,7 +82,7 @@ async def handle_media(self, typ, request): try: with tempfile.NamedTemporaryFile(dir=options.tg_media_dir, suffix='.blob') as temp: filename = temp.name - self.proc.download_media(media, filename) + await self.proc.download_media(media, filename) self.id2media[id] = (media, filename) except asyncio.TimeoutError: return aiohttp.web.Response(status=504, text='I used to live in 504A') @@ -100,19 +102,20 @@ async def handle_media(self, typ, request): except Exception as ex: return aiohttp.web.Response(status=500, text=str(ex)) - async def handle_document(self, request): + async def handle_document(self, request) -> aiohttp.web.Response: return await self.handle_media('document', request) - def run_telethon(self): + async def run_telethon(self) -> None: + assert options is not None and server is not None if self.proc: - self.proc.disconnect() + await self.proc.disconnect() self.proc = TelegramClient(options.tg_session, options.tg_api_id, options.tg_api_hash) try: - self.proc.connect() + await self.proc.connect() except: error('Failed to connect to Telegram server') sys.exit(2) - self.authorized = self.proc.is_user_authorized() + self.authorized = await self.proc.is_user_authorized() if not self.authorized and not options.tg_phone: error('Not authorized. Please set --tg-phone') sys.exit(2) @@ -122,7 +125,7 @@ async def restart_telegram_cli(self): traceback.print_stack() if self.proc: try: - self.proc.disconnect() + await self.proc.disconnect() time.sleep(1) except: pass @@ -137,9 +140,9 @@ def start(self, listens, port, loop): for i in listens: self.srv.append(loop.run_until_complete( loop.create_server(self.handler, i, port, ssl=self.tls))) - self.run_telethon() + loop.run_until_complete(self.run_telethon()) if self.authorized: - self.init() + loop.run_until_complete(self.init()) #async def poll(): # while 1: @@ -151,7 +154,7 @@ def start(self, listens, port, loop): # self.poll = loop.create_task(poll()) def stop(self): - self.proc.disconnect() + self.loop.run_until_complete(self.proc.disconnect()) for i in self.srv: i.close() self.loop.run_until_complete(i.wait_closed()) @@ -173,11 +176,11 @@ def append_history(self, record): self.id2message[record['id']] = record # TODO admin channel.update_admins(members) - def channel_get_participants(self, channel): + async def channel_get_participants(self, channel): tg_users = [] offset = 0 while True: - participants = self.proc(tl.functions.channels.GetParticipantsRequest( + participants = await self.proc(tl.functions.channels.GetParticipantsRequest( channel.tg_room, tl.types.ChannelParticipantsSearch(''), offset, @@ -187,19 +190,19 @@ def channel_get_participants(self, channel): if not participants.users: break tg_users.extend(participants.users) offset += len(participants.users) - channel.update_members(tg_users) + await channel.update_members(tg_users) - def channel_invite(self, client, channel, user): + async def channel_invite(self, client, channel, user): try: if channel.is_type(tl.types.PeerChannel): - self.proc(tl.functions.channels.InviteToChannelRequest( + await self.proc(tl.functions.channels.InviteToChannelRequest( channel.peer.channel_id, - [self.proc.get_input_entity(user.user_id)] + [await self.proc.get_input_entity(user.user_id)] )) elif channel.is_type(tl.types.PeerChat): - self.proc(tl.functions.messages.AddChatUserRequest( + await self.proc(tl.functions.messages.AddChatUserRequest( channel.peer.chat_id, - self.proc.get_input_entity(user.user_id), + await self.proc.get_input_entity(user.user_id), 0, )) except telethon.errors.rpcerrorlist.UserAlreadyParticipantError: @@ -226,38 +229,38 @@ async def channel_set_admin(self, client, channel, user, ty): else: client.err_chanoprivsneeded(channel.name) - def channel_kick(self, client, channel, user): + async def channel_kick(self, client, channel, user): try: if channel.is_type(tl.types.PeerChannel): pass elif channel.is_type(tl.types.PeerChat): - self.proc(tl.functions.messages.DeleteChatUserRequest( + await self.proc(tl.functions.messages.DeleteChatUserRequest( channel.peer.chat_id, - self.proc.get_input_entity(user.user_id), + await self.proc.get_input_entity(user.user_id), )) except telethon.errors.rpcerrorlist.UserNotParticipantError: client.err_usernotinchannel(user.nick, channel.name) except telethon.errors.rpcbaseerrors.RPCError: pass - def chat_get_full(self, channel): - chatfull = self.proc(tl.functions.messages.GetFullChatRequest( + async def chat_get_full(self, channel): + chatfull = await self.proc(tl.functions.messages.GetFullChatRequest( channel.tg_room.id )) - channel.update_members(chatfull.users) + await channel.update_members(chatfull.users) - def contact_list(self): - contacts = self.proc(tl.functions.contacts.GetContactsRequest(0)) + async def contact_list(self): + contacts = await self.proc(tl.functions.contacts.GetContactsRequest(0)) for tg_user in contacts.users: - server.ensure_special_user(tg_user.id, tg_user) + await server.ensure_special_user(tg_user.id, tg_user) - def channel_list(self): + async def channel_list(self): # https://github.com/LonamiWebs/Telethon/wiki/Retrieving-all-dialogs last_date = None chunk_size = 20 while True: debug('channel_list %r', last_date) - r = self.proc(tl.functions.messages.GetDialogsRequest( + r = await self.proc(tl.functions.messages.GetDialogsRequest( offset_date=last_date, offset_id=0, offset_peer=tl.types.InputPeerEmpty(), @@ -265,66 +268,67 @@ def channel_list(self): hash=0 )) for tg_user in r.users: - server.ensure_special_user(tg_user.id, tg_user) + await server.ensure_special_user(tg_user.id, tg_user) # Channel & Chat & ChatForbidden & ... for tg_room in r.chats: if isinstance(tg_room, (tl.types.Channel, tl.types.Chat)): - server.ensure_special_room(tg_room.id, tg_room) + await server.ensure_special_room(tg_room.id, tg_room) if not r.messages: break date = min(msg.date for msg in r.messages) if date == last_date: break last_date = date time.sleep(0.7) - def channel_message_get(self, channel, id): - messages = self.proc(tl.functions.channels.GetMessagesRequest( - self.proc.get_input_entity(channel.peer), [id])).messages + async def channel_message_get(self, channel, id): + messages = await self.proc(tl.functions.channels.GetMessagesRequest( + await self.proc.get_input_entity(channel.peer), [id])).messages return messages[0] if messages else None - def message_get(self, id): - messages = self.proc(tl.functions.messages.GetMessagesRequest([id])).messages + async def message_get(self, id): + messages = await self.proc(tl.functions.messages.GetMessagesRequest([id])).messages return messages[0] if messages else None - def get_self(self): - data = self.proc.get_me() - server.user_id = data.id + async def get_self(self): + data = await self.proc.get_me() + if data is not None: + server.user_id = data.id - def init(self): + async def init(self): try: - web.get_self() - web.channel_list() - web.contact_list() + await web.get_self() + await web.channel_list() + await web.contact_list() except Exception as ex: traceback.print_exc() - def mark_read(self, peer, max_id): - self.proc.send_read_acknowledge(peer, max_id=max_id) + async def mark_read(self, peer, max_id): + await self.proc.send_read_acknowledge(peer, max_id=max_id) - def channel_members(self, channel): + async def channel_members(self, channel): try: if channel.is_type(tl.types.PeerChannel): - self.channel_get_participants(channel) + await self.channel_get_participants(channel) elif channel.is_type(tl.types.PeerChat): - self.chat_get_full(channel) + await self.chat_get_full(channel) except Exception as ex: error('channel_members %r', channel) traceback.print_exc() - def send_file(self, client, peer, filename, body): + async def send_file(self, client, peer, filename, body): with tempfile.TemporaryDirectory() as directory: filename = os.path.join(directory, filename) try: with open(filename, 'wb') as f: f.write(body) f.flush() - self.proc.send_file(peer, f.name) + await self.proc.send_file(peer, f.name) except telethon.errors.rpcbaseerrors.RPCError: client.err_cannotsendtochan(peer.nick, 'Cannot send the file') os.unlink(filename) - def msg(self, client, to, text, reply_to=None): + async def msg(self, client, to, text, reply_to=None): try: - msg = self.proc.send_message(to.peer, text, reply_to=reply_to) + msg = await self.proc.send_message(to.peer, text, reply_to=reply_to) irc_log(to, to, msg.date, client, msg.message) except telethon.errors.rpcbaseerrors.RPCError: traceback.print_exc() @@ -430,12 +434,12 @@ def irc_log(where, peer, local_time, sender, line): where.log_file.flush() -def irc_privmsg(client, command, to, text): +async def irc_privmsg(client, command, to, text): if command == 'PRIVMSG' and client.ctcp(to.peer, text): return - def send(): - web.msg(client, to, to.privmsg_text, to.privmsg_reply) + async def send(): + await web.msg(client, to, to.privmsg_text, to.privmsg_reply) to.last_text_by_client[client] = to.privmsg_text to.privmsg_reply = None to.privmsg_text = '' @@ -443,11 +447,11 @@ def send(): async def wait(seq): await asyncio.sleep(options.paste_wait) if to.privmsg_seq == seq: - send() + await send() reply, text = process_text(to, text) if reply and len(to.privmsg_text): - send() + await send() to.privmsg_reply = reply to.privmsg_seq = to.privmsg_seq+1 if len(to.privmsg_text): @@ -470,7 +474,7 @@ def wrapped(fn): class Command: @staticmethod @registered(7) - def authenticate(client, arg): + async def authenticate(client, arg): if arg.upper() == 'PLAIN': client.write('AUTHENTICATE +') return @@ -481,19 +485,19 @@ def authenticate(client, arg): client.authenticated = True client.reply('900 {} {} {} :You are now logged in as {}', client.nick, client.user, client.nick, client.nick) client.reply('903 {} :SASL authentication successful', client.nick) - client.register() + await client.register() else: client.reply('904 {} :SASL authentication failed', client.nick) except: client.reply('904 {} :SASL authentication failed', client.nick) @staticmethod - def away(client): + async def away(client): pass @staticmethod @registered(7) - def cap(client, *args): + async def cap(client, *args): if not args: return comm = args[0].lower() if comm == 'list': @@ -511,26 +515,26 @@ def cap(client, *args): client.reply('CAP * ACK :{}', ' '.join(client.capabilities)) @staticmethod - def info(client): + async def info(client): client.rpl_info('{} users', len(server.nicks)) client.rpl_info('{} {} users', im_name, len(client.nick2special_user)) client.rpl_info('{} {} rooms', im_name, len(client.name2special_room)) @staticmethod - def invite(client, nick, channelname): + async def invite(client, nick, channelname): if client.is_in_channel(channelname): server.get_channel(channelname).on_invite(client, nick) else: client.err_notonchannel(channelname) @staticmethod - def ison(client, *nicks): + async def ison(client, *nicks): client.reply('303 {} :{}', client.nick, ' '.join(nick for nick in nicks if server.has_nick(nick))) @staticmethod - def join(client, *args): + async def join(client, *args): if not args: self.err_needmoreparams('JOIN') else: @@ -547,7 +551,7 @@ def join(client, *args): web.proc(tl.functions.messages.ImportChatInviteRequest(channelname[len(JOINCHAT):])) else: if server.has_special_room(channelname): - server.get_special_room(channelname).on_join(client) + await server.get_special_room(channelname).on_join(client) else: try: server.ensure_channel(channelname).on_join(client) @@ -555,14 +559,14 @@ def join(client, *args): client.err_nosuchchannel(channelname) @staticmethod - def kick(client, channelname, nick, reason=None): + async def kick(client, channelname, nick, reason=None): if client.is_in_channel(channelname): server.get_channel(channelname).on_kick(client, nick, reason) else: client.err_notonchannel(channelname) @staticmethod - def kill(client, nick, reason=None): + async def kill(client, nick, reason=None): if not server.has_nick(nick): client.err_nosuchnick(nick) return @@ -573,7 +577,7 @@ def kill(client, nick, reason=None): user.disconnect(reason) @staticmethod - def list(client, arg=None): + async def list(client, arg=None): if arg: channels = [] for channelname in arg.split(','): @@ -593,7 +597,7 @@ def list(client, arg=None): client.reply('323 {} :End of LIST', client.nick) @staticmethod - def lusers(client): + async def lusers(client): client.reply('251 :There are {} users and {} {} users on 1 server', len(server.nicks), len(server.nick2special_user), @@ -601,7 +605,7 @@ def lusers(client): ) @staticmethod - def mode(client, target, *args): + async def mode(client, target, *args): if server.has_nick(target): if args: client.err_umodeunknownflag() @@ -613,7 +617,7 @@ def mode(client, target, *args): client.err_nosuchchannel(target) @staticmethod - def motd(client): + async def motd(client): async def do(): try: async with aiohttp.ClientSession() as session: @@ -628,17 +632,17 @@ async def do(): server.loop.create_task(do()) @staticmethod - def names(client, target): + async def names(client, target): if not client.is_in_channel(target): client.err_notonchannel(target) return channel = server.get_channel(target) - web.channel_members(channel) + await web.channel_members(channel) channel.on_names(client) @staticmethod @registered(7) - def nick(client, *args): + async def nick(client, *args): if len(options.irc_password) and not client.authenticated: client.err_passwdmismatch('NICK') return @@ -649,17 +653,17 @@ def nick(client, *args): @staticmethod @registered(7) - def oper(client, _, password): + async def oper(client, _, password): ok = False StatusChannel.instance.respond(client, 'Signing in...') if web.two_step: web.two_step = False - ok = web.proc.sign_in(password=password) + ok = await web.proc.sign_in(password=password) if not ok: StatusChannel.instance.respond(client, 'Wrong password. Please type /oper a $login_code; /oper a $password') else: try: - ok = web.proc.sign_in(options.tg_phone, password) + ok = await web.proc.sign_in(options.tg_phone, password) if not ok: StatusChannel.instance.respond(client, 'Wrong login code. Please type /oper a $login_code') except SessionPasswordNeededError: @@ -667,14 +671,15 @@ def oper(client, _, password): StatusChannel.instance.respond(client, 'Two step verification enabled. Please type /oper a $password') if ok: StatusChannel.instance.respond(client, 'Authorized. Initializing...') - web.init() + await web.init() + @staticmethod - def notice(client, *args): - Command.notice_or_privmsg(client, 'NOTICE', *args) + async def notice(client, *args): + await Command.notice_or_privmsg(client, 'NOTICE', *args) @staticmethod - def part(client, arg, *args): + async def part(client, arg, *args): partmsg = args[0] if args else None for channelname in arg.split(','): if client.is_in_channel(channelname): @@ -684,14 +689,14 @@ def part(client, arg, *args): @staticmethod @registered(6) - def pass_(client, password): + async def pass_(client, password): if len(options.irc_password) and password == options.irc_password: client.authenticated = True client.register() @staticmethod @registered(7) - def ping(client, *args): + async def ping(client, *args): if not args: client.err_noorigin() return @@ -699,24 +704,24 @@ def ping(client, *args): @staticmethod @registered(7) - def pong(client, *args): + async def pong(client, *args): pass @staticmethod - def privmsg(client, *args): - Command.notice_or_privmsg(client, 'PRIVMSG', *args) + async def privmsg(client, *args): + await Command.notice_or_privmsg(client, 'PRIVMSG', *args) @staticmethod @registered(7) - def quit(client, *args): + async def quit(client, *args): client.disconnect(args[0] if args else client.prefix) @staticmethod - def squit(client, *args): + async def squit(client, *args): client.err_unknowncommand('SQUIT') @staticmethod - def stats(client, query): + async def stats(client, query): if len(query) == 1: if query == 'u': td = datetime.now() - server._boot @@ -726,23 +731,23 @@ def stats(client, query): client.reply('219 {} {} :End of STATS report', client.nick, query) @staticmethod - def summon(client, nick, msg): + async def summon(client, nick, msg): client.err_nologin(nick) @staticmethod - def time(client): + async def time(client): client.reply('391 {} {} :{}Z', client.nick, server.name, datetime.utcnow().isoformat()) @staticmethod - def topic(client, channelname, new=None): + async def topic(client, channelname, new=None): if not client.is_in_channel(channelname): client.err_notonchannel(channelname) return server.get_channel(channelname).on_topic(client, new) @staticmethod - def who(client, target): + async def who(client, target): if server.has_channel(target): server.get_channel(target).on_who(client) elif server.has_nick(target): @@ -750,7 +755,7 @@ def who(client, target): client.reply('315 {} {} :End of WHO list', client.nick, target) @staticmethod - def whois(client, *args): + async def whois(client, *args): if not args: client.err_nonicknamegiven() return @@ -766,7 +771,7 @@ def whois(client, *args): client.reply('318 {} {} :End of WHOIS list', client.nick, target) @classmethod - def notice_or_privmsg(cls, client, command, *args): + async def notice_or_privmsg(cls, client, command, *args): if not args: client.err_norecipient(command) return @@ -781,7 +786,7 @@ def notice_or_privmsg(cls, client, command, *args): if isinstance(user, Client): user.write(':{} PRIVMSG {} :{}'.format(client.prefix, user.nick, msg)) else: - user.on_notice_or_privmsg(client, command, msg) + await user.on_notice_or_privmsg(client, command, msg) # IRC channel or special chatroom elif client.is_in_channel(target): server.get_channel(target).on_notice_or_privmsg( @@ -791,13 +796,13 @@ def notice_or_privmsg(cls, client, command, *args): @staticmethod @registered(6) - def user(client, user, mode, _, realname): + async def user(client, user, mode, _, realname): if len(options.irc_password) and not client.authenticated: client.err_passwdmismatch('USER') return client.user = user client.realname = realname - client.register() + await client.register() ### Channels: StandardChannel, StatusChannel, SpecialChannel @@ -1104,7 +1109,6 @@ def __init__(self, tg_room): assert False self.joined = {} # `client` has not joined self.explicit_parted = set() - self.update(tg_room) self.log_file = None self.last_text_by_client = weakref.WeakKeyDictionary() self.max_id = 0 @@ -1123,7 +1127,7 @@ def nick(self): def is_type(self, type): return isinstance(self.peer, type) - def update(self, tg_room): + async def update(self, tg_room): self.tg_room = tg_room old_name = getattr(self, 'name', None) base = options.special_channel_prefix + irc_escape(tg_room.title) @@ -1142,7 +1146,7 @@ def update(self, tg_room): self.on_part(client, 'Changing name') self.name = name for client in joined: - self.on_join(client) + await self.on_join(client) if self.is_type(tl.types.PeerChannel): topic = '{} {}'.format(self.peer.channel_id, tg_room.title.replace('\n', '\\n')) elif self.is_type(tl.types.PeerChat): @@ -1152,11 +1156,12 @@ def update(self, tg_room): for client in server.auth_clients(): client.reply('332 {} {} :{}', client.nick, self.name, self.topic) - def update_admins(self, admins): + async def update_admins(self, admins) -> None: + assert server is not None seen_me = False seen = set() for admin in admins: - user = server.ensure_special_user(admin) + user = await server.ensure_special_user(admin, None) if user == server: seen_me = True elif user in self.members: @@ -1176,16 +1181,16 @@ def update_admins(self, admins): self.set_cmode(user, 'o') self.op_event(user) - def update_members(self, tg_users): + async def update_members(self, tg_users): seen = {} for tg_user in tg_users: - user = server.ensure_special_user(tg_user.id, tg_user) + user = await server.ensure_special_user(tg_user.id, tg_user) if user != server: seen[user] = 'v' if user.is_contact else '' for user in self.members.keys() - seen.keys(): self.on_part(user, self.name) for user in seen.keys() - self.members.keys(): - self.on_join(user) + await self.on_join(user) for user, mode in seen.items(): old = self.members.get(user, '') if 'h' in old and 'h' not in mode: @@ -1249,8 +1254,8 @@ def on_mode(self, client, *args): def on_names(self, client): self.on_names_impl(client, chain(self.joined.items(), self.members.items())) - def on_notice_or_privmsg(self, client, command, text): - irc_privmsg(client, command, self, text) + async def on_notice_or_privmsg(self, client, command, text): + await irc_privmsg(client, command, self, text) def on_invite(self, client, nick): if server.has_special_user(nick): @@ -1262,13 +1267,13 @@ def on_invite(self, client, nick): else: client.err_nosuchnick(nick) - def on_join(self, member): + async def on_join(self, member): if isinstance(member, Client): if member in self.joined: return False self.joined[member] = '' self.explicit_parted.discard(member) - web.channel_members(self) + await web.channel_members(self) super().on_join(member) else: if member in self.members: @@ -1336,14 +1341,14 @@ def enter(self, channel): def leave(self, channel): del self.channels[irc_lower(channel.name)] - def auto_join(self, room): + async def auto_join(self, room): for regex in options.ignore or []: if re.search(regex, room.name): return for regex in options.ignore_topic or []: if re.search(regex, room.topic): return - room.on_join(self) + await room.on_join(self) def is_in_channel(self, name): return irc_lower(name) in self.channels @@ -1474,7 +1479,7 @@ def message_related(self, include_self, fmt, *args): for client in clients: client.write(line) - def register(self): + async def register(self): if self.registered: return if self.user and self.nick and (not (options.irc_password or options.sasl_password) or self.authenticated): @@ -1483,18 +1488,18 @@ def register(self): self.reply('001 {} :Hi, welcome to IRC', self.nick) self.reply('002 {} :Your host is {}', self.nick, server.name) self.reply('005 {} PREFIX=(ohv)@%+ CHANTYPES=!#&+ CHANMODES=,,,m SAFELIST :are supported by this server', self.nick) - Command.lusers(self) - Command.motd(self) + await Command.lusers(self) + await Command.motd(self) - Command.join(self, StatusChannel.instance.name) + await Command.join(self, StatusChannel.instance.name) StatusChannel.instance.respond(self, 'Your contacts are listed in this channel') if not web.authorized: StatusChannel.instance.respond(self, 'This session is unauthorized. Requesting login code. Please type /oper a $login_code') - web.proc.send_code_request(options.tg_phone) + await web.proc.send_code_request(options.tg_phone) else: StatusChannel.instance.respond(self, 'Reuse {}.session . Initializing...', options.tg_session) - def handle_command(self, command, args): + async def handle_command(self, command, args): cmd = irc_lower(command) if cmd == 'pass': cmd = cmd+'_' @@ -1517,7 +1522,7 @@ def handle_command(self, command, args): except TypeError: self.err_needmoreparams(command) return - fn(*ba.args) + await fn(*ba.args) async def handle_irc(self): sent_ping = False @@ -1561,7 +1566,7 @@ async def handle_irc(self): if len(y) == 2: args.append(y[1]) try: - self.handle_command(command, args) + await self.handle_command(command, args) except: traceback.print_exc() self.disconnect('client error') @@ -1627,19 +1632,19 @@ def UpdateChannelWebPage(server, update): info('UpdateChannelWebpage %r', update.to_dict()) @staticmethod - def UpdateChatParticipantAdd(server, update): - user = server.ensure_special_user(update.user_id, None) + async def UpdateChatParticipantAdd(server, update): + user = await server.ensure_special_user(update.user_id, None) if user is not server: - room = server.ensure_special_room(update.chat_id, None) - room.on_join(user) + room = await server.ensure_special_room(update.chat_id, None) + await room.on_join(user) if user.is_contact: room.voice_event(user) room.set_cmode(user, 'v') @staticmethod - def UpdateChatParticipantDelete(server, update): - room = server.ensure_special_room(update.chat_id, None) - user = server.ensure_special_user(update.user_id, None) + async def UpdateChatParticipantDelete(server, update): + room = await server.ensure_special_room(update.chat_id, None) + user = await server.ensure_special_user(update.user_id, None) if user is server: joined = [client for client in server.auth_clients() if client in channel.joined] for client in joined: @@ -1648,79 +1653,79 @@ def UpdateChatParticipantDelete(server, update): room.on_part(user) @staticmethod - def UpdateChatUserTyping(server, update): + async def UpdateChatUserTyping(server, update): pass @staticmethod - def UpdateContactLink(server, update): + async def UpdateContactLink(server, update): pass @staticmethod - def UpdateContactRegistered(server, update): + async def UpdateContactRegistered(server, update): pass @staticmethod - def UpdateDeleteChannelMessages(server, update): + async def UpdateDeleteChannelMessages(server, update): pass @staticmethod - def UpdateDeleteMessages(server, update): + async def UpdateDeleteMessages(server, update): pass @staticmethod - def UpdateEditChannelMessage(server, update): - server.on_telegram_update_message(update, update.message) + async def UpdateEditChannelMessage(server, update): + await server.on_telegram_update_message(update, update.message) @staticmethod - def UpdateEditMessage(server, update): - server.on_telegram_update_message(update, update.message) + async def UpdateEditMessage(server, update): + await server.on_telegram_update_message(update, update.message) @staticmethod - def UpdateNewChannelMessage(server, update): - server.on_telegram_update_message(update, update.message) + async def UpdateNewChannelMessage(server, update): + await server.on_telegram_update_message(update, update.message) @staticmethod - def UpdateNewMessage(server, update): - server.on_telegram_update_message(update, update.message) + async def UpdateNewMessage(server, update): + await server.on_telegram_update_message(update, update.message) @staticmethod - def UpdateReadChannelInbox(server, update): + async def UpdateReadChannelInbox(server, update): pass @staticmethod - def UpdateReadChannelOutbox(server, update): + async def UpdateReadChannelOutbox(server, update): pass @staticmethod - def UpdateReadHistoryInbox(server, update): + async def UpdateReadHistoryInbox(server, update): pass @staticmethod - def UpdateReadHistoryOutbox(server, update): + async def UpdateReadHistoryOutbox(server, update): pass @staticmethod - def UpdateShortChatMessage(server, update): - from_ = server.ensure_special_user(update.from_id, None) - to = server.ensure_special_room(update.chat_id, None) - server.on_telegram_update_message(update, update, from_, to) + async def UpdateShortChatMessage(server, update): + from_ = await server.ensure_special_user(update.from_id, None) + to = await server.ensure_special_room(update.chat_id, None) + await server.on_telegram_update_message(update, update, from_, to) @staticmethod - def UpdateShortMessage(server, update): - from_ = server.ensure_special_user(update.user_id, None) + async def UpdateShortMessage(server, update): + from_ = await server.ensure_special_user(update.user_id, None) to = server if update.out: from_, to = to, from_ - server.on_telegram_update_message(update, update, from_, to) + await server.on_telegram_update_message(update, update, from_, to) @staticmethod - def UpdateUserName(server, update): + async def UpdateUserName(server, update): pass @staticmethod - def UpdateUserStatus(server, update): + async def UpdateUserStatus(server, update): try: - user = server.ensure_special_user(update.user_id, None) + user = await server.ensure_special_user(update.user_id, None) except: return if user is not server: @@ -1732,11 +1737,11 @@ def UpdateUserStatus(server, update): user.event('AWAY') @staticmethod - def UpdateUserTyping(server, update): + async def UpdateUserTyping(server, update): pass @staticmethod - def UpdateWebPage(server, update): + async def UpdateWebPage(server, update): webpage = update.webpage info('UpdateWebpage %r %r', type(webpage), webpage.to_dict()) if isinstance(webpage, tl.types.WebPage): @@ -1744,7 +1749,7 @@ def UpdateWebPage(server, update): if value is None: return sender, to = value - server.deliver_message(None, sender, to, datetime.utcnow(), + await server.deliver_message(None, sender, to, datetime.utcnow(), '[WebPage] {} {}'.format(webpage.url.replace('\n', '\\n'), (webpage.title or webpage.display_url).replace('\n', '\\n'))) @@ -1842,12 +1847,12 @@ def enter(self, channel): def leave(self, channel): self.channels.remove(channel) - def on_notice_or_privmsg(self, client, command, text): - irc_privmsg(client, command, self, text) + async def on_notice_or_privmsg(self, client, command, text): + await irc_privmsg(client, command, self, text) if 'a' in self.mode: client.write(':{} AWAY away'.format(self.prefix)) if options.mark_read == 'reply': - web.mark_read(self.peer, self.max_id) + await web.mark_read(self.peer, self.max_id) def on_who_member(self, client, channel): client.reply('352 {} {} {} {} {} {} H :0 {}', client.nick, channel.name, @@ -1956,7 +1961,7 @@ def ensure_channel(self, channelname): self.channels[irc_lower(channelname)] = channel return channel - def ensure_special_room(self, peer_id, tg_room): + async def ensure_special_room(self, peer_id, tg_room): debug('ensure_special_room %r %r', peer_id, tg_room) if peer_id in self.peer_id2special_room: room = self.peer_id2special_room[peer_id] @@ -1964,8 +1969,9 @@ def ensure_special_room(self, peer_id, tg_room): #room.update(record) else: if tg_room is None: - tg_room = web.proc.get_entity(peer_id) + tg_room = await web.proc.get_entity(peer_id) room = SpecialChannel(tg_room) + await room.update(tg_room) self.peer_id2special_room[peer_id] = room if options.join == 'all': for client in self.auth_clients(): @@ -1973,17 +1979,18 @@ def ensure_special_room(self, peer_id, tg_room): self.name2special_room[irc_lower(room.name)] = room return room - def ensure_special_user(self, user_id, tg_user): + async def ensure_special_user(self, user_id, tg_user): debug('ensure_special_user %r %r', user_id, tg_user) if user_id == self.user_id: return self if user_id in self.user_id2special_user: + print(user_id) user = self.user_id2special_user[user_id] self.remove_special_user(user.nick) #user.update(tg_user) else: if tg_user is None: - tg_user = web.proc.get_entity(user_id) + tg_user = await web.proc.get_entity(user_id) user = SpecialUser(tg_user) self.user_id2special_user[user.user_id] = user self.nick2special_user[irc_lower(user.nick)] = user @@ -2021,38 +2028,41 @@ def stop(self): i.close() self.loop.run_until_complete(i.wait_closed()) - def resolve_from_to(self, msg): + async def resolve_from_to(self, msg): if isinstance(msg.to_id, tl.types.PeerUser): - to = server.ensure_special_user(msg.to_id.user_id, None) + to = await server.ensure_special_user(msg.to_id.user_id, None) elif isinstance(msg.to_id, tl.types.PeerChannel): - to = server.ensure_special_room(msg.to_id.channel_id, None) + to = await server.ensure_special_room(msg.to_id.channel_id, None) elif isinstance(msg.to_id, tl.types.PeerChat): - to = server.ensure_special_room(msg.to_id.chat_id, None) + to = await server.ensure_special_room(msg.to_id.chat_id, None) else: assert False try: - from_ = server.ensure_special_user(msg.from_id, None) + if isinstance(msg.from_id, tl.types.PeerUser): + from_ = await server.ensure_special_user(msg.from_id.user_id, None) + else: + from_ = await server.ensure_special_user(msg.from_id, None) except: # Haven't seen the peer before. Retry. if isinstance(msg.to_id, (tl.types.PeerChannel, tl.types.PeerChat)): - web.channel_members(to) - from_ = server.ensure_special_user(msg.from_id, None) + await web.channel_members(to) + from_ = await server.ensure_special_user(msg.from_id, None) return from_, to def is_type(self, _type): return False - def on_telegram_update(self, update): + async def on_telegram_update(self, update): name = type(update).__name__ if type(TelegramUpdate.__dict__.get(name)) is staticmethod: - getattr(TelegramUpdate, name)(self, update) + await getattr(TelegramUpdate, name)(self, update) else: info('on_telegram_update %r %r', type(update).__name__, update.to_dict()) - def on_telegram_update_message(self, update, msg, sender=None, to=None): + async def on_telegram_update_message(self, update, msg, sender=None, to=None): if sender is None: - sender, to = self.resolve_from_to(msg) + sender, to = await self.resolve_from_to(msg) if options.ignore_bot and isinstance(sender, SpecialUser) and sender.bot: return @@ -2108,13 +2118,13 @@ def on_telegram_update_message(self, update, msg, sender=None, to=None): else: text = msg.message - self.deliver_message(msg.id, sender, to, msg.date, text, fwd_from=msg.fwd_from, reply_to_msg_id=msg.reply_to_msg_id) + await self.deliver_message(msg.id, sender, to, msg.date, text, fwd_from=msg.fwd_from, reply_to_msg_id=msg.reply_to_msg_id) - def deliver_message(self, msg_id, sender, to, date, text, fwd_from=None, reply_to_msg_id=None): + async def deliver_message(self, msg_id, sender, to, date, text, fwd_from=None, reply_to_msg_id=None): for line in text.splitlines(): if fwd_from is not None: try: - from1 = server.ensure_special_user(fwd_from.from_id, None) + from1 = await server.ensure_special_user(fwd_from.from_id, None) for client in server.auth_clients(): line = '\x0315「Fwd {}」\x0f{}'.format( client.nick if from1 == server else from1.nick, line) @@ -2126,13 +2136,13 @@ def deliver_message(self, msg_id, sender, to, date, text, fwd_from=None, reply_t refer = web.id2message[reply_to_msg_id] else: if to.is_type(tl.types.PeerChannel): - message = web.channel_message_get(to, reply_to_msg_id) + message = await web.channel_message_get(to, reply_to_msg_id) else: - message = web.message_get(reply_to_msg_id) + message = await web.message_get(reply_to_msg_id) if isinstance(message, tl.types.MessageEmpty): refer = None else: - from1, to1 = self.resolve_from_to(message) + from1, to1 = await self.resolve_from_to(message) refer = {'id': message.id, 'date': message.date, 'from': from1, 'to': to1, 'message': message.message, 'inferred': True} web.append_history(refer) if refer is not None: @@ -2177,9 +2187,9 @@ def deliver_message(self, msg_id, sender, to, date, text, fwd_from=None, reply_t if msg_id is not None and options.mark_read == 'always' and isinstance(sender, SpecialUser): if to is server: if sender is not server: - web.mark_read(sender.peer, msg_id) + await web.mark_read(sender.peer, msg_id) else: - web.mark_read(to.peer, msg_id) + await web.mark_read(to.peer, msg_id) def on_disconnect(self, peername): # PART all special channels, these chatrooms will be garbage collected