-
Notifications
You must be signed in to change notification settings - Fork 33
/
getstickers.py
224 lines (209 loc) · 9.29 KB
/
getstickers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import asyncio
from collections import defaultdict
import os
import zipfile
from PIL import Image
from telethon.tl.functions.messages import GetAllStickersRequest
from telethon.tl.functions.messages import GetStickerSetRequest
from telethon.errors import MessageNotModifiedError
from telethon.errors.rpcerrorlist import StickersetInvalidError
from pagermaid import working_dir, version
from telethon.tl.types import (
DocumentAttributeSticker,
InputStickerSetID,
)
from pagermaid.listener import listener
from pagermaid.utils import alias_command
lottie_import = True
try:
from lottie.exporters.gif import export_gif
from lottie.importers.core import import_tgs
except ImportError:
lottie_import = False
@listener(is_plugin=True, outgoing=True, command=alias_command("getstickers"),
description="获取整个贴纸包的贴纸,false 关闭 tgs 转 gif;任意值开启下载所有贴纸包;\n"
"转 gif 需要手动安装 pypi 依赖 lottie[gif] \n"
"Ubuntu 转换出错请先运行 `-sh apt install libpangocairo-1.0-0 -y`。",
parameters="<任意值>")
async def getstickers(context):
tgs_gif = True
if not os.path.isdir('data/sticker/'):
os.makedirs('data/sticker/')
if len(context.parameter) == 1 or len(context.parameter) == 2:
if "false" in context.arguments:
tgs_gif = False
if "all" in context.arguments:
sticker_sets = await context.client(GetAllStickersRequest(0))
for stickerset in sticker_sets.sets:
file_ext_ns_ion = "webp"
wdnmd = InputStickerSetID(id=stickerset.id, access_hash=stickerset.access_hash)
sticker_set = await context.client(GetStickerSetRequest(wdnmd))
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
if os.path.isfile(pack_file):
os.remove(pack_file)
# Sticker emojis
emojis = defaultdict(str)
for pack in sticker_set.packs:
for document_id in pack.documents:
emojis[document_id] += pack.emoticon
async def download(sticker, emojis, path, file):
await context.client.download_media(sticker, file=os.path.join(path, file))
with open(pack_file, "a") as f:
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
animated = import_tgs(os.path.join(path, file))
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
elif file_ext_ns_ion == 'webp':
convert_png(os.path.join(path, file))
pending_tasks = [
asyncio.ensure_future(
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
f"{i:03d}.{file_ext_ns_ion}")
) for i, document in enumerate(sticker_set.documents)
]
xx = await context.client.send_message(context.chat_id,
f"正在下载 {sticker_set.set.short_name} "
f"中的 {sticker_set.set.count} 张贴纸。。。")
num_tasks = len(pending_tasks)
while 1:
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
return_when=asyncio.FIRST_COMPLETED)
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
try:
await xx.edit(
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
except MessageNotModifiedError:
pass
if not pending_tasks:
break
await xx.edit("下载完毕,打包上传中。")
directory_name = sticker_set.set.short_name
os.chdir("data/sticker/") # 修改当前工作目录
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
zipdir(directory_name, zipf)
zipf.close()
await context.client.send_file(
context.chat_id,
directory_name + ".zip",
caption=sticker_set.set.short_name,
force_document=True,
allow_cache=False
)
try:
os.remove(directory_name + ".zip")
os.remove(directory_name)
except:
pass
os.chdir(working_dir)
await xx.delete()
return
reply_message = await context.get_reply_message()
if reply_message:
if not reply_message.sticker:
await context.edit("请回复一张贴纸。")
return
else:
await context.edit("请回复一张贴纸。")
return
sticker = reply_message.sticker
sticker_attrib = find_instance(sticker.attributes, DocumentAttributeSticker)
if not sticker_attrib.stickerset:
await context.edit("回复的贴纸不属于任何贴纸包。")
return
is_a_s = is_it_animated_sticker(reply_message)
file_ext_ns_ion = "webp"
if is_a_s:
file_ext_ns_ion = "tgs"
if tgs_gif and not lottie_import:
await context.reply('`lottie[gif]` 依赖未安装,tgs 无法转换为 gif ,进行标准格式导出。')
try:
sticker_set = await context.client(GetStickerSetRequest(sticker_attrib.stickerset))
except StickersetInvalidError:
await context.edit('回复的贴纸不存在于任何贴纸包中。')
return
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
if os.path.isfile(pack_file):
os.remove(pack_file)
# Sticker emojis
emojis = defaultdict(str)
for pack in sticker_set.packs:
for document_id in pack.documents:
emojis[document_id] += pack.emoticon
async def download(sticker, emojis, path, file):
await context.client.download_media(sticker, file=os.path.join(path, file))
with open(pack_file, "a") as f:
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
animated = import_tgs(os.path.join(path, file))
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
elif file_ext_ns_ion == 'webp':
convert_png(os.path.join(path, file))
pending_tasks = [
asyncio.ensure_future(
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
f"{i:03d}.{file_ext_ns_ion}")
) for i, document in enumerate(sticker_set.documents)
]
await context.edit(
f"正在下载 {sticker_set.set.short_name} 中的 {sticker_set.set.count} 张贴纸。。。")
num_tasks = len(pending_tasks)
while 1:
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
return_when=asyncio.FIRST_COMPLETED)
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
try:
await context.edit(
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
except MessageNotModifiedError:
pass
if not pending_tasks:
break
await context.edit("下载完毕,打包上传中。")
directory_name = sticker_set.set.short_name
os.chdir("data/sticker/") # 修改当前工作目录
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
zipdir(directory_name, zipf)
zipf.close()
await context.client.send_file(
context.chat_id,
directory_name + ".zip",
caption=sticker_set.set.short_name,
force_document=True,
allow_cache=False,
reply_to=reply_message.id
)
try:
os.remove(directory_name + ".zip")
os.remove(directory_name)
except:
pass
os.chdir(working_dir)
await context.delete()
def find_instance(items, class_or_tuple):
for item in items:
if isinstance(item, class_or_tuple):
return item
return None
def is_it_animated_sticker(message):
try:
if message.media and message.media.document:
mime_type = message.media.document.mime_type
if "tgsticker" in mime_type:
return True
else:
return False
else:
return False
except:
return False
def zipdir(path, ziph):
for root, dirs, files in os.walk(path):
for file in files:
ziph.write(os.path.join(root, file))
os.remove(os.path.join(root, file))
def convert_png(path):
im = Image.open(path)
im = im.convert('RGBA')
new_path = path.replace(".webp", ".png")
im.save(new_path, 'PNG')
return new_path