Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for local emoji sources. 新增对本地emoji资源的支持。 (Sourcery refactored) #89

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 35 additions & 62 deletions ssrspeed/speedtest/method/st_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,7 @@
nf_ip_re = re.compile(r'"requestIpAddress":"(.*)"')


def retry(count=5):
def wrapper(func):
async def inner(*args, **kwargs):
for _ in range(count):
result = await func(*args, **kwargs)
if result is True:
break
else:
return False
return True

return inner

return wrapper


class StreamTest:
session = None

@classmethod
async def netflix(cls, host, headers, inner_dict, port, outbound_ip):
logger.info(f"Performing netflix test LOCAL_PORT: {port}.")
Expand Down Expand Up @@ -73,53 +55,44 @@ async def netflix(cls, host, headers, inner_dict, port, outbound_ip):
logger.error(f"Netflix error: {str(e)}")
return {}

@classmethod
@retry(5)
async def netflix_inner_method_1(cls, outbound_ip, inner_dict):
async with cls.session.get(
url="https://www.netflix.com/title/70143836"
) as response:
if response.status != 200:
return False
text = str(await response.read())
locate = text.find("preferredLocale")
netflix_ip = nf_ip_re.findall(text)[0].split(",")[0]
logger.info(f"Netflix IP : {netflix_ip}")
region = text[locate + 29 : locate + 31] if locate > 0 else "Unknown"
if outbound_ip == netflix_ip:
logger.info("Netflix test result: Full Native.")
inner_dict["Ntype"] = f"Full Native({region})"
else:
logger.info("Netflix test result: Full DNS.")
inner_dict["Ntype"] = f"Full DNS({region})"
return True

@classmethod
@retry(5)
async def netflix_inner_method_2(cls, inner_dict):
async with cls.session.get(
url="https://www.netflix.com/title/70242311"
) as response:
if response.status == 200:
logger.info("Netflix test result: Only Original.")
inner_dict["Ntype"] = "Only Original"
return True
logger.info("Netflix test result: None.")
inner_dict["Ntype"] = "None"
return False

@classmethod
async def netflix_new(cls, host, headers, inner_dict, port, outbound_ip):
logger.info(f"Performing netflix(new) test LOCAL_PORT: {port}.")
try:
async with aiohttp.ClientSession(
headers=headers,
connector=ProxyConnector(host=host, port=port),
timeout=aiohttp.ClientTimeout(connect=10),
) as cls.session:
result = await cls.netflix_inner_method_1(outbound_ip, inner_dict)
if result is False:
await cls.netflix_inner_method_2(inner_dict)
async with (
aiohttp.ClientSession(
headers=headers,
connector=ProxyConnector(host=host, port=port),
timeout=aiohttp.ClientTimeout(connect=10),
) as session,
session.get(
url="https://www.netflix.com/title/70143836" # "https://www.netflix.com/title/70242311"
) as response1,
):
if response1.status == 200:
text = str(await response1.read())
locate = text.find("preferredLocale")
netflix_ip = nf_ip_re.findall(text)[0].split(",")[0]
logger.info(f"Netflix IP : {netflix_ip}")
region = text[locate + 29 : locate + 31] if locate > 0 else "未知"
if outbound_ip == netflix_ip:
logger.info("Netflix test result: Full Native.")
inner_dict["Ntype"] = f"Full Native({region})"
else:
logger.info("Netflix test result: Full DNS.")
inner_dict["Ntype"] = f"Full DNS({region})"
return
async with session.get(
url="https://www.netflix.com/title/70242311"
) as response2:
rg = ""
if response2.status == 200:
logger.info("Netflix test result: Only Original.")
inner_dict["Ntype"] = "Only Original"
else:
logger.info("Netflix test result: None.")
inner_dict["Ntype"] = "None"
# 测试连接状态
except Exception as e:
logger.error(f"Netflix error: {str(e)}")
return {}
Expand Down Expand Up @@ -376,7 +349,7 @@ async def start_stream_test(port, stream_cfg, outbound_ip):
if stream_cfg["NETFLIX_TEST"]:
test_list.append(
asyncio.create_task(
StreamTest.netflix_new(host, headers, inner_dict, port, outbound_ip)
StreamTest.netflix(host, headers, inner_dict, port, outbound_ip)
)
)
await asyncio.wait(test_list)
Expand Down
137 changes: 136 additions & 1 deletion ssrspeed/util/emo.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import abc
import asyncio
import contextlib
import os
import re
import shutil
from io import BytesIO
from typing import ClassVar, Optional

import pilmoji
import requests
from aiohttp import ClientSession
from emoji import demojize
from loguru import logger
from unidecode import unidecode


Expand Down Expand Up @@ -105,6 +111,121 @@ class TossFacePediaSource(EmojiPediaSource):
STYLE = "toss-face/342/"


class LocalSource(pilmoji.source.BaseSource):
"""
emoji本地源基类
"""

def get_emoji(self, emoji: str, /) -> Optional[BytesIO]:
file_path = self.get_file_path(emoji)
with contextlib.suppress(FileNotFoundError):
with open(file_path, "rb") as file:
return BytesIO(file.read())
return None

def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]:
raise NotImplementedError

@abc.abstractmethod
def get_file_path(self, emoji: str) -> str:
return ""

@abc.abstractmethod
def download_emoji(self, download_url):
pass


class OpenmojiLocalSource(LocalSource):
"""
图片源:https://github.com/hfg-gmuend/openmoji/tree/master/color/72x72
安装路径:./resources/emoji/openmoji
"""

def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]:
pass

def get_file_path(self, emoji: str) -> str:
code_points = [f"{ord(c):04X}" for c in emoji]
return f"./resources/emoji/openmoji/{'-'.join(code_points)}.png"

def download_emoji(self, download_url):
pass


class TwemojiLocalSource(LocalSource):
"""
图片源:https://github.com/twitter/twemoji/tree/master/assets/72x72
安装路径:./resources/emoji/twemoji
"""

def __init__(self, init: str = None, proxy=None):
"""
构造函数中,如果init不为none,则提供下载emoji资源包的url地址
"""
self.savepath = "./resources/emoji/twemoji.zip"
self._download_url = (
"https://github.com/twitter/twemoji/archive/refs/tags/v14.0.2.zip"
)
if init is None:
return
self.download_emoji(init, proxy=proxy)
self.init_emoji(self.savepath)

@property
def download_url(self):
return self._download_url

@staticmethod
def init_emoji(savepath: str):
# 解压下载好的文件
shutil.unpack_archive(savepath, "./resources/emoji/", format="zip")
# print("解压完成")
# 重命名
dirs = os.listdir("./resources/emoji/")
for d in dirs:
if d.startswith("twemoji") and not d.endswith(".zip"):
os.rename(
os.path.join(os.path.abspath("./resources/emoji/"), d),
os.path.join(os.path.abspath("./resources/emoji/"), "twemoji"),
)
break
return os.path.isdir("./resources/emoji/twemoji")

async def download_emoji(
self,
download_url: str = None,
savepath="./resources/emoji/twemoji.zip",
proxy=None,
):
# 如果本地已存在,便无需重新下载
if os.path.isdir("./resources/emoji/twemoji"):
return
_url = (
self.download_url if download_url is None else download_url
) # 如果没有提供下载地址则用默认的
print("Download URL:", _url)
# 从网络上下载
async with ClientSession(headers={"user-agent": "SSRSpeedN"}) as session:
async with session.get(_url, proxy=proxy, timeout=20) as resp:
if resp.status != 200:
raise Exception(f"NetworkError: {resp.status}==>\t{_url}")
with open(savepath, "wb") as f:
while True:
block = await resp.content.read(1024)
if not block:
break
f.write(block)

def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]:
pass

def get_file_path(self, emoji: str) -> str:
code_points = [f"{ord(c):x}" for c in emoji]
if emoji in {"4️⃣", "6️⃣"}:
del code_points[1]
return f"./resources/emoji/twemoji/assets/72x72/{'-'.join(code_points)}.png"


__all__ = [
"ApplePediaSource",
"GooglePediaSource",
Expand All @@ -117,16 +238,30 @@ class TossFacePediaSource(EmojiPediaSource):
"SkypePediaSource",
"JoyPixelsPediaSource",
"TossFacePediaSource",
"TwemojiLocalSource",
"OpenmojiLocalSource",
]


if __name__ == "__main__":
from PIL import Image, ImageFont

my_string = """
Hello, world! 👋 Here are some flags: 🇧🇦 🇷🇪 🇨🇼 🇺🇲
"""

def check_init():
if not os.path.isdir("./resources/emoji/twemoji"):
twemoji = TwemojiLocalSource()
print("检测到未安装emoji资源包,正在初始化本地emoji...")
asyncio.get_event_loop().run_until_complete(
twemoji.download_emoji(proxy=None)
)
if twemoji.init_emoji(twemoji.savepath):
logger.info("初始化emoji成功")
else:
logger.warning("初始化emoji失败")

# check_init()
with Image.new("RGB", (550, 80), (255, 255, 255)) as image:
font = ImageFont.truetype("arial.ttf", 24)

Expand Down