-
Notifications
You must be signed in to change notification settings - Fork 188
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for platforms to return an optional profile link
- Switch to using keyword-only arguments for response message and link to improve clarity
- Loading branch information
Showing
2 changed files
with
83 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,51 +53,55 @@ async def get_token(self): | |
def is_taken(self, message): | ||
return any(x in message for x in self.USERNAME_TAKEN_MSGS) | ||
|
||
def response_failure(self, query, message="Failure"): | ||
def response_failure(self, query, *, message="Failure"): | ||
return PlatformResponse( | ||
platform=Platforms(self.__class__), | ||
query=query, | ||
available=False, | ||
valid=False, | ||
success=False, | ||
message=message, | ||
link=None, | ||
) | ||
|
||
def response_available(self, query, message="Available"): | ||
def response_available(self, query, *, message="Available"): | ||
return PlatformResponse( | ||
platform=Platforms(self.__class__), | ||
query=query, | ||
available=True, | ||
valid=True, | ||
success=True, | ||
message=message, | ||
link=None, | ||
) | ||
|
||
def response_unavailable(self, query, message="Unavailable"): | ||
def response_unavailable(self, query, *, message="Unavailable", link=None): | ||
return PlatformResponse( | ||
platform=Platforms(self.__class__), | ||
query=query, | ||
available=False, | ||
valid=True, | ||
success=True, | ||
message=message, | ||
link=link, | ||
) | ||
|
||
def response_invalid(self, query, message="Invalid"): | ||
def response_invalid(self, query, *, message="Invalid"): | ||
return PlatformResponse( | ||
platform=Platforms(self.__class__), | ||
query=query, | ||
available=False, | ||
valid=False, | ||
success=True, | ||
message=message, | ||
link=None, | ||
) | ||
|
||
def response_unavailable_or_invalid(self, query, message): | ||
def response_unavailable_or_invalid(self, query, *, message, link=None): | ||
if self.is_taken(message): | ||
return self.response_unavailable(query, message) | ||
return self.response_unavailable(query, message=message, link=link) | ||
else: | ||
return self.response_invalid(query, message) | ||
return self.response_invalid(query, message=message) | ||
|
||
def _request(self, method, url, **kwargs): | ||
proxy = ( | ||
|
@@ -166,7 +170,7 @@ async def check_username(self, username): | |
json_body = await self.get_json(r) | ||
if "error_message" in json_body["reference"]: | ||
return self.response_unavailable_or_invalid( | ||
username, json_body["reference"]["error_message"] | ||
username, message=json_body["reference"]["error_message"] | ||
) | ||
elif json_body["reference"]["status_code"] == "OK": | ||
return self.response_available(username) | ||
|
@@ -181,6 +185,7 @@ class Instagram(PlatformChecker): | |
"This username isn't available.", | ||
"A user with that username already exists.", | ||
] | ||
USERNAME_LINK_FORMAT = "https://www.instagram.com/{}" | ||
|
||
async def prerequest(self): | ||
async with self.get(self.URL) as r: | ||
|
@@ -196,10 +201,12 @@ async def check_username(self, username): | |
json_body = await self.get_json(r) | ||
# Too many requests | ||
if json_body["status"] == "fail": | ||
return self.response_failure(username, json_body["message"]) | ||
return self.response_failure(username, message=json_body["message"]) | ||
if "username" in json_body["errors"]: | ||
return self.response_unavailable_or_invalid( | ||
username, json_body["errors"]["username"][0]["message"] | ||
username, | ||
message=json_body["errors"]["username"][0]["message"], | ||
link=Instagram.USERNAME_LINK_FORMAT.format(username), | ||
) | ||
else: | ||
return self.response_available(username) | ||
|
@@ -212,15 +219,15 @@ async def check_email(self, email): | |
json_body = await self.get_json(r) | ||
# Too many requests | ||
if json_body["status"] == "fail": | ||
return self.response_failure(email, json_body["message"]) | ||
return self.response_failure(email, message=json_body["message"]) | ||
if "email" not in json_body["errors"]: | ||
return self.response_available(email) | ||
else: | ||
message = json_body["errors"]["email"][0]["message"] | ||
if json_body["errors"]["email"][0]["code"] == "invalid_email": | ||
return self.response_invalid(email, message) | ||
return self.response_invalid(email, message=message) | ||
else: | ||
return self.response_unavailable(email, message) | ||
return self.response_unavailable(email, message=message) | ||
|
||
|
||
class GitHub(PlatformChecker): | ||
|
@@ -229,6 +236,7 @@ class GitHub(PlatformChecker): | |
EMAIL_ENDPOINT = "https://github.com/signup_check/email" | ||
# [username taken, reserved keyword (Username __ is unavailable)] | ||
USERNAME_TAKEN_MSGS = ["already taken", "unavailable", "not available"] | ||
USERNAME_LINK_FORMAT = "https://github.com/{}" | ||
|
||
token_regex = re.compile( | ||
r'<auto-check src="/signup_check/username[\s\S]*value="([\S]*)"[\s\S]*<auto-check src="/signup_check/email[\s\S]*value="([\S]*)"[\s\S]*</auto-check>' | ||
|
@@ -255,16 +263,18 @@ async def check_username(self, username): | |
if r.status == 422: | ||
text = await r.text() | ||
text = self.tag_regex.sub("", text).strip() | ||
return self.response_unavailable_or_invalid(username, text) | ||
return self.response_unavailable_or_invalid( | ||
username, message=text, link=GitHub.USERNAME_LINK_FORMAT.format(username) | ||
) | ||
elif r.status == 200: | ||
return self.response_available(username) | ||
elif r.status == 429: | ||
return self.response_failure(username, self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
return self.response_failure(username, message=self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
|
||
async def check_email(self, email): | ||
pr = await self.get_token() | ||
if pr is None: | ||
return self.response_failure(email, self.TOKEN_ERROR_MESSAGE) | ||
return self.response_failure(email, message=self.TOKEN_ERROR_MESSAGE) | ||
else: | ||
(_, email_token, cookies) = pr | ||
async with self.post( | ||
|
@@ -274,11 +284,11 @@ async def check_email(self, email): | |
) as r: | ||
if r.status == 422: | ||
text = await r.text() | ||
return self.response_unavailable(email, text) | ||
return self.response_unavailable(email, message=text) | ||
elif r.status == 200: | ||
return self.response_available(email) | ||
elif r.status == 429: | ||
return self.response_failure(email, self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
return self.response_failure(email, message=self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
|
||
|
||
class Tumblr(PlatformChecker): | ||
|
@@ -289,6 +299,7 @@ class Tumblr(PlatformChecker): | |
"Someone beat you to that username", | ||
"Try something else, that one is spoken for", | ||
] | ||
USERNAME_LINK_FORMAT = "https://{}.tumblr.com" | ||
|
||
SAMPLE_UNUSED_EMAIL = "[email protected]" | ||
SAMPLE_PASSWORD = "correcthorsebatterystaple" | ||
|
@@ -320,14 +331,18 @@ async def _check(self, email=SAMPLE_UNUSED_EMAIL, username=SAMPLE_UNUSED_USERNAM | |
json_body = await self.get_json(r) | ||
if username == query: | ||
if "usernames" in json_body or len(json_body["errors"]) > 0: | ||
return self.response_unavailable_or_invalid(query, json_body["errors"][0]) | ||
return self.response_unavailable_or_invalid( | ||
query, | ||
message=json_body["errors"][0], | ||
link=Tumblr.USERNAME_LINK_FORMAT.format(query), | ||
) | ||
elif json_body["errors"] == []: | ||
return self.response_available(query) | ||
elif email == query: | ||
if "This email address is already in use." in json_body["errors"]: | ||
return self.response_unavailable(query, json_body["errors"][0]) | ||
return self.response_unavailable(query, message=json_body["errors"][0],) | ||
elif "This email address isn't correct. Please try again." in json_body["errors"]: | ||
return self.response_invalid(query, json_body["errors"][0]) | ||
return self.response_invalid(query, message=json_body["errors"][0]) | ||
elif json_body["errors"] == []: | ||
return self.response_available(query) | ||
|
||
|
@@ -341,24 +356,29 @@ async def check_email(self, email): | |
class GitLab(PlatformChecker): | ||
URL = "https://gitlab.com/users/sign_in" | ||
ENDPOINT = "https://gitlab.com/users/{}/exists" | ||
USERNAME_LINK_FORMAT = "https://gitlab.com/{}" | ||
|
||
async def check_username(self, username): | ||
# Custom matching required as validation is implemented locally and not server-side by GitLab | ||
if not re.fullmatch( | ||
r"[a-zA-Z0-9_\.][a-zA-Z0-9_\-\.]*[a-zA-Z0-9_\-]|[a-zA-Z0-9_]", username | ||
): | ||
return self.response_invalid( | ||
username, "Please create a username with only alphanumeric characters." | ||
username, message="Please create a username with only alphanumeric characters." | ||
) | ||
async with self.get( | ||
self.ENDPOINT.format(username), headers={"X-Requested-With": "XMLHttpRequest"} | ||
) as r: | ||
# Special case for usernames | ||
if r.status == 401: | ||
return self.response_unavailable(username) | ||
return self.response_unavailable( | ||
username, link=GitLab.USERNAME_LINK_FORMAT.format(username) | ||
) | ||
json_body = await self.get_json(r) | ||
if json_body["exists"]: | ||
return self.response_unavailable(username) | ||
return self.response_unavailable( | ||
username, link=GitLab.USERNAME_LINK_FORMAT.format(username) | ||
) | ||
else: | ||
return self.response_available(username) | ||
|
||
|
@@ -372,16 +392,19 @@ class Reddit(PlatformChecker): | |
"that username is already taken", | ||
"that username is taken by a deleted account", | ||
] | ||
USERNAME_LINK_FORMAT = "https://www.reddit.com/u/{}" | ||
|
||
async def check_username(self, username): | ||
# Custom user agent required to overcome rate limits for Reddit API | ||
async with self.post(self.ENDPOINT, data={"user": username}) as r: | ||
json_body = await self.get_json(r) | ||
if "error" in json_body and json_body["error"] == 429: | ||
return self.response_failure(username, self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
return self.response_failure(username, message=self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
elif "json" in json_body: | ||
return self.response_unavailable_or_invalid( | ||
username, json_body["json"]["errors"][0][1] | ||
username, | ||
message=json_body["json"]["errors"][0][1], | ||
link=Reddit.USERNAME_LINK_FORMAT.format(username), | ||
) | ||
elif json_body == {}: | ||
return self.response_available(username) | ||
|
@@ -395,34 +418,38 @@ class Twitter(PlatformChecker): | |
EMAIL_ENDPOINT = "https://api.twitter.com/i/users/email_available.json" | ||
# [account in use, account suspended] | ||
USERNAME_TAKEN_MSGS = ["That username has been taken", "unavailable"] | ||
USERNAME_LINK_FORMAT = "https://twitter.com/{}" | ||
|
||
async def check_username(self, username): | ||
async with self.get(self.USERNAME_ENDPOINT, params={"username": username}) as r: | ||
json_body = await self.get_json(r) | ||
message = json_body["desc"] | ||
if json_body["valid"]: | ||
return self.response_available(username, message) | ||
return self.response_available(username, message=message) | ||
else: | ||
return self.response_unavailable_or_invalid(username, message) | ||
return self.response_unavailable_or_invalid( | ||
username, message=message, link=Twitter.USERNAME_LINK_FORMAT.format(username) | ||
) | ||
|
||
async def check_email(self, email): | ||
async with self.get(self.EMAIL_ENDPOINT, params={"email": email}) as r: | ||
json_body = await self.get_json(r) | ||
message = json_body["msg"] | ||
if not json_body["valid"] and not json_body["taken"]: | ||
return self.response_invalid(email, message) | ||
return self.response_invalid(email, message=message) | ||
|
||
if json_body["taken"]: | ||
return self.response_unavailable(email, message) | ||
return self.response_unavailable(email, message=message) | ||
else: | ||
return self.response_available(email, message) | ||
return self.response_available(email, message=message) | ||
|
||
|
||
class Pastebin(PlatformChecker): | ||
URL = "https://pastebin.com/signup" | ||
USERNAME_ENDPOINT = "https://pastebin.com/ajax/check_username.php" | ||
EMAIL_ENDPOINT = "https://pastebin.com/ajax/check_email.php" | ||
USERNAME_TAKEN_MSGS = ["Username not available!"] | ||
USERNAME_LINK_FORMAT = "https://pastebin.com/u/{}" | ||
|
||
regex = re.compile(r"^<font color=\"(red|green)\">([^<>]+)<\/font>$") | ||
|
||
|
@@ -431,19 +458,23 @@ async def _check(self, query, endpoint, data, is_email): | |
text = await r.text() | ||
match = self.regex.match(text) | ||
if not match: | ||
return self.response_failure(query, self.UNEXPECTED_CONTENT_TYPE_ERROR_MESSAGE) | ||
return self.response_failure( | ||
query, message=self.UNEXPECTED_CONTENT_TYPE_ERROR_MESSAGE | ||
) | ||
else: | ||
message = match[2] | ||
if match[1] == "green": | ||
return self.response_available(query, message) | ||
return self.response_available(query, message=message) | ||
else: | ||
if is_email: | ||
if message == "Please use a valid email address.": | ||
return self.response_invalid(query, message) | ||
return self.response_invalid(query, message=message) | ||
else: | ||
return self.response_unavailable(query, message) | ||
return self.response_unavailable(query, message=message) | ||
else: | ||
return self.response_unavailable_or_invalid(query, message) | ||
return self.response_unavailable_or_invalid( | ||
query, message=message, link=Pastebin.USERNAME_LINK_FORMAT.format(query) | ||
) | ||
|
||
async def check_username(self, username): | ||
return await self._check( | ||
|
@@ -481,6 +512,7 @@ class Lastfm(PlatformChecker): | |
URL = "https://www.last.fm/join" | ||
ENDPOINT = "https://www.last.fm/join/partial/validate" | ||
USERNAME_TAKEN_MSGS = ["Sorry, this username isn't available."] | ||
USERNAME_LINK_FORMAT = "https://www.last.fm/user/{}" | ||
|
||
async def prerequest(self): | ||
async with self.get(self.URL) as r: | ||
|
@@ -501,17 +533,23 @@ async def _check(self, username="", email=""): | |
json_body = await self.get_json(r) | ||
if email: | ||
if json_body["email"]["valid"]: | ||
return self.response_available(email, json_body["email"]["success_message"]) | ||
return self.response_available( | ||
email, message=json_body["email"]["success_message"] | ||
) | ||
else: | ||
return self.response_unavailable(email, json_body["email"]["error_messages"][0]) | ||
return self.response_unavailable( | ||
email, message=json_body["email"]["error_messages"][0] | ||
) | ||
elif username: | ||
if json_body["userName"]["valid"]: | ||
return self.response_available( | ||
username, json_body["userName"]["success_message"] | ||
username, message=json_body["userName"]["success_message"] | ||
) | ||
else: | ||
return self.response_unavailable_or_invalid( | ||
username, re.sub("<[^<]+?>", "", json_body["userName"]["error_messages"][0]) | ||
username, | ||
message=re.sub("<[^<]+?>", "", json_body["userName"]["error_messages"][0]), | ||
link=Lastfm.USERNAME_LINK_FORMAT.format(username), | ||
) | ||
|
||
async def check_email(self, email): | ||
|
@@ -535,7 +573,7 @@ async def check_email(self, email): | |
elif text == "false": | ||
return self.response_unavailable(email) | ||
else: | ||
return self.response_failure(email, self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
return self.response_failure(email, message=self.TOO_MANY_REQUEST_ERROR_MESSAGE) | ||
|
||
|
||
class Yahoo(PlatformChecker): | ||
|
@@ -580,9 +618,9 @@ async def check_username(self, username): | |
error = json_body["errors"][2]["error"] | ||
error_pretty = self.error_messages.get(error, error.replace("_", " ").capitalize()) | ||
if error == "IDENTIFIER_EXISTS" or error == "RESERVED_WORD_PRESENT": | ||
return self.response_unavailable(username, error_pretty) | ||
return self.response_unavailable(username, message=error_pretty) | ||
else: | ||
return self.response_invalid(username, error_pretty) | ||
return self.response_invalid(username, message=error_pretty) | ||
|
||
|
||
class Platforms(Enum): | ||
|
@@ -614,3 +652,4 @@ class PlatformResponse: | |
valid: bool | ||
success: bool | ||
message: str | ||
link: str |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters