From f4434eeeef4d0ff4860daa08f6fe5718a39d3f23 Mon Sep 17 00:00:00 2001 From: Burrup Lambert Date: Fri, 11 Oct 2024 13:52:16 +0000 Subject: [PATCH 1/2] Added support for backup code detection, maximum retry counts, automatic TOPT generation from MFA token, api v1.1 support and v1.1 endpoint users/email_phone_info.json --- tweeterpy/constants.py | 3 ++ tweeterpy/login_util.py | 81 +++++++++++++++++++++++++++++------------ tweeterpy/tweeterpy.py | 27 +++++++++++++- tweeterpy/util.py | 17 +++++++++ 4 files changed, 103 insertions(+), 25 deletions(-) diff --git a/tweeterpy/constants.py b/tweeterpy/constants.py index 66b6f0d..7456a36 100644 --- a/tweeterpy/constants.py +++ b/tweeterpy/constants.py @@ -41,6 +41,7 @@ class Path: DOMAIN = "x.com" BASE_URL = "https://x.com/" API_URL = "https://x.com/i/api/graphql/" + API_V1_URL = "https://x.com/i/api/1.1/" TASK_URL = "https://api.x.com/1.1/onboarding/task.json" GUEST_TOKEN_URL = "https://api.x.com/1.1/guest/activate.json" JAVSCRIPT_INSTRUMENTATION_URL = "https://twitter.com/i/js_inst" @@ -71,6 +72,8 @@ class Path: TWEET_LIKES_ENDPOINT = "mpMee2WCjo7Nm4gRRHHnvA/Favoriters" RETWEETED_BY_ENDPOINT = "7Fwe5A6kE05QIybims116A/Retweeters" USER_HIGHLIGHTS_ENDPOINT = "w9-i9VNm_92GYFaiyGT1NA/UserHighlightsTweets" + # V1 ENDPOINTS + EMAIL_PHONE_INFO_ENDPOINT = "users/email_phone_info.json" class FeatureSwitch: diff --git a/tweeterpy/login_util.py b/tweeterpy/login_util.py index 12419c1..eaa30c3 100644 --- a/tweeterpy/login_util.py +++ b/tweeterpy/login_util.py @@ -1,6 +1,7 @@ import random +import time from tweeterpy.constants import Path -from tweeterpy.util import find_nested_key +from tweeterpy.util import find_nested_key, generate_totp_token from tweeterpy.request_util import RequestClient from tweeterpy.logging_util import disable_logger @@ -9,7 +10,7 @@ class TaskHandler: def __init__(self, request_client: RequestClient = None): self.request_client = request_client - def _create_task_mapper(self, username, password, verification_input_data): + def _create_task_mapper(self, username, password, mfa_token, verification_input_data): # fmt: off - Turns off formatting for this block of code. Just for the readability purpose. task_flow_mapper = {"LoginJsInstrumentationSubtask":{"task_executor": self._get_user_flow_token,"task_parameter":None}, "LoginEnterUserIdentifierSSO":{"task_executor": self._get_password_flow_token,"task_parameter":username}, @@ -18,7 +19,7 @@ def _create_task_mapper(self, username, password, verification_input_data): "DenyLoginSubtask":{"task_executor": self._check_suspicious_login,"task_parameter":None}, "AccountDuplicationCheck":{"task_executor": self._check_account_duplication,"task_parameter":None}, "LoginAcid":{"task_executor":self._handle_suspicious_login,"task_parameter":verification_input_data}, - "LoginTwoFactorAuthChallenge":{"task_executor":self._handle_suspicious_login,"task_parameter":verification_input_data}, + "LoginTwoFactorAuthChallenge":{"task_executor":self._handle_two_factor_auth,"task_parameter":mfa_token}, "LoginSuccessSubtask":{"task_output": "\nPlease Wait... Logging In...\n"}} return task_flow_mapper @@ -75,21 +76,49 @@ def _check_account_duplication(self, flow_token, subtask_id="AccountDuplicationC 'subtask_inputs': [{'subtask_id': subtask_id, 'check_logged_in_account': {'link': 'AccountDuplicationCheck_false'}}]} return self.request_client.request(Path.TASK_URL, method="POST", json=payload) - def _handle_suspicious_login(self, flow_token, subtask_id="LoginAcid",verification_input_data=None): + def _handle_auth_challenge(self, flow_token, subtask_id, get_input, verification_input_data=None, max_retries=float('inf')): + """Handles the authentication flow for both suspicious login and two-factor auth.""" + input_value = verification_input_data if verification_input_data is not None else get_input() payload = {"flow_token": flow_token, - "subtask_inputs": [{"subtask_id": subtask_id, "enter_text": {"text": verification_input_data,"link":"next_link"}}]} - handle_incorrect_input = True - while handle_incorrect_input: + "subtask_inputs": [{"subtask_id": subtask_id, "enter_text": {"text": input_value,"link":"next_link"}}]} + retries = 0 + while retries <= max_retries: response = self.request_client.request(Path.TASK_URL, method="POST", json=payload, skip_error_checking=True) if isinstance(response, dict) and "errors" in response.keys(): + retries += 1 + if retries > max_retries: + maximum_retry_error = f"We exceeded the self imposed maximum retry limit of {max_retries}." + raise Exception(maximum_retry_error) error_message = "\n".join([error['message'] for error in response['errors']]) - payload['subtask_inputs'][0]['enter_text']['text'] = str(input(f"{error_message} - Type again ==> ")) + print(f"{error_message} - ", end="") + input_value = get_input() + payload['subtask_inputs'][0]['enter_text']['text'] = input_value else: - handle_incorrect_input = False + break return response + def _handle_suspicious_login(self, flow_token, subtask_id="LoginAcid", verification_input_data=None): + """Handles suspicious login challenges.""" + def get_input(): + return input("Input the verification data ==> ") + + return self._handle_auth_challenge(flow_token, subtask_id, get_input, verification_input_data) + + def _handle_two_factor_auth(self, flow_token, subtask_id="LoginTwoFactorAuthChallenge", mfa_token=None): + """Handles two-factor authentication challenges.""" + def get_totp(): + """Generates or prompts for a TOTP.""" + if mfa_token: + totp = generate_totp_token(mfa_token) + print(f"\nGenerated TOTP {totp} from MFA token {mfa_token}\n") + return totp + return input("Input the 6-digit OTP ==> ") + + return self._handle_auth_challenge(flow_token, subtask_id, get_totp, max_retries=1) + @disable_logger - def login(self, username, password, email=None, phone=None, **kwargs): + def login(self, username, password, email=None, phone=None, mfa_token=None, **kwargs): + # MANUAL WAY OF HANDLING LOG IN """ initital_flow_token = self._get_flow_token() @@ -108,7 +137,7 @@ def login(self, username, password, email=None, phone=None, **kwargs): tasks_pending = True verification_input_data = None try: - task_flow_mapper = self._create_task_mapper(username or email, password, verification_input_data) + task_flow_mapper = self._create_task_mapper(username or email, password, mfa_token, verification_input_data) response = self._get_flow_token() self._get_javscript_instrumentation_subtask() while tasks_pending: @@ -127,18 +156,24 @@ def login(self, username, password, email=None, phone=None, **kwargs): if task_id in ['LoginAcid', 'LoginEnterAlternateIdentifierSubtask', 'LoginTwoFactorAuthChallenge']: input_type = (find_nested_key(response,"keyboard_type") or "").strip().lower() hint_message = (find_nested_key(response,"hint_text") or "").strip().lower() - input_message = f"{hint_message} (Input Type - {input_type}) ==> " - otp_required = True if hint_message == "confirmation code" and input_type == "text" else False - email_verification = True if input_type == "email" or (hint_message == "phone or email" and input_type == "text") else False - phone_verification = True if hint_message == "phone number" and input_type == "telephone" else False - identity_verification = True if hint_message == "phone or username" and input_type == "text" else False - two_fac_auth = True if task_id == "LoginTwoFactorAuthChallenge" else False - if input_type and hint_message and error_message: - print(f"\n{error_message}\n") - verification_input_data = phone if (phone_verification or identity_verification) and phone else email if email_verification and email else str(input(input_message)) if two_fac_auth or otp_required else None - if not verification_input_data: - raise Exception(error_message) - task_flow_mapper[task_id].update({"task_parameter":verification_input_data}) + if task_id == 'LoginTwoFactorAuthChallenge': + # Not required because LoginTwoFactorAuthChallenge is handled in self._handle_two_factor_auth() + # otp_required = True if hint_message == "enter code" and input_type == "number" else False + backup_code_required = True if hint_message == "enter code" and input_type == "text" else False + if backup_code_required: + raise Exception(error_message) # "Enter your backup code" + else: + email_verification = True if input_type == "email" or (hint_message == "phone or email" and input_type == "text") else False + phone_verification = True if hint_message == "phone number" and input_type == "telephone" else False + identity_verification = True if hint_message == "phone or username" and input_type == "text" else False + if input_type and hint_message and error_message: + print(f"\n{error_message}\n") + verification_input_data = (phone if (phone_verification or identity_verification) and phone else + email if email_verification and email else + None) + if not verification_input_data: + raise Exception(error_message) + task_flow_mapper[task_id].update({"task_parameter":verification_input_data}) if task_id == 'LoginSuccessSubtask': tasks_pending = False print(task['task_output']) diff --git a/tweeterpy/tweeterpy.py b/tweeterpy/tweeterpy.py index 848074f..2fb22c2 100644 --- a/tweeterpy/tweeterpy.py +++ b/tweeterpy/tweeterpy.py @@ -38,6 +38,12 @@ def __init__(self): logger.warn(error) self.request_client.session.headers.update({"Authorization": token}) + def _generate_v1_request_data(self, endpoint, params={}): + url = util.generate_url(domain=Path.API_V1_URL, url_path=endpoint) + request_payload = {"url": url, "params": params} + logger.debug(f"Request Payload => {request_payload}") + return request_payload + def _generate_request_data(self, endpoint, variables=None, **kwargs): # fmt: off - Turns off formatting for this block of code. Just for the readability purpose. url = util.generate_url(domain=Path.API_URL, url_path=endpoint) @@ -232,7 +238,7 @@ def logged_in(self): return True return False - def login(self, username=None, password=None, email=None, phone=None, **kwargs): + def login(self, username=None, password=None, email=None, phone=None, mfa_token=None, **kwargs): """Log into an account. Args: @@ -240,6 +246,7 @@ def login(self, username=None, password=None, email=None, phone=None, **kwargs): password (str, optional): Password. Defaults to None. email (str, optional): Twitter email. Defaults to None. phone (str, optional): Twitter phone. Defaults to None. + mfa_token (str, optional): Twitter MFA/2FA manual code. Defaults to None. """ self.generate_session() if username is None: @@ -247,7 +254,7 @@ def login(self, username=None, password=None, email=None, phone=None, **kwargs): if password is None: password = getpass.getpass() TaskHandler(request_client=self.request_client).login( - username, password, email=email, phone=phone, **kwargs) + username, password, email=email, phone=phone, mfa_token=mfa_token, **kwargs) util.generate_headers(session=self.request_client.session) try: user = self.me @@ -296,6 +303,22 @@ def get_user_info(self, user_id): response = self.request_client.request(**request_payload) return response['data']['user']['result'] + @login_decorator + def get_user_emails_and_phone_numbers(self): + """Gets all emails and phone numbers associated with a users account. + + Args: + None + + Returns: + dict: User information. + """ + params = {'include_pending_email': True} + request_payload = self._generate_v1_request_data( + Path.EMAIL_PHONE_INFO_ENDPOINT, params) + response = self.request_client.request(**request_payload) + return response + def get_user_data(self, username): """Extracts user details as same as get_user_info method. Except this one returns info about blue tick verification badge as well. diff --git a/tweeterpy/util.py b/tweeterpy/util.py index a04cc40..e7f81b9 100644 --- a/tweeterpy/util.py +++ b/tweeterpy/util.py @@ -1,6 +1,9 @@ import re import bs4 import time +import base64 +import struct +import hmac import datetime import logging.config from functools import reduce @@ -10,6 +13,7 @@ from tweeterpy.constants import Path, PUBLIC_TOKEN from dataclasses import dataclass, field, fields, asdict, _MISSING_TYPE + logging.config.dictConfig(config.LOGGING_CONFIG) logger = logging.getLogger(__name__) @@ -192,6 +196,19 @@ def get_nested_data(dataset, nested_key, placeholder): return [get_nested_data(data, nested_key, []) for data in dataset] if isinstance(dataset, list) else get_nested_data(dataset, nested_key, []) +def generate_hotp_token(key, counter, digits=6, digest='sha1'): + key = base64.b32decode(key.upper() + '=' * ((8 - len(key)) % 8)) + counter = struct.pack('>Q', counter) + mac = hmac.new(key, counter, digest).digest() + offset = mac[-1] & 0x0f + binary = struct.unpack('>L', mac[offset:offset+4])[0] & 0x7fffffff + return str(binary)[-digits:].zfill(digits) + + +def generate_totp_token(key, time_step=30, digits=6, digest='sha1'): + return generate_hotp_token(key, int(time.time() / time_step), digits, digest) + + @dataclass class _Item: # Base Item class for other data-classes to manipulate data. From 9b2be3fd45154887ad0b7ba9522e451973d4f0a2 Mon Sep 17 00:00:00 2001 From: Burrup Lambert Date: Fri, 11 Oct 2024 16:01:03 +0000 Subject: [PATCH 2/2] Backup code required error message update --- tweeterpy/login_util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tweeterpy/login_util.py b/tweeterpy/login_util.py index eaa30c3..b9b09b3 100644 --- a/tweeterpy/login_util.py +++ b/tweeterpy/login_util.py @@ -161,7 +161,8 @@ def login(self, username, password, email=None, phone=None, mfa_token=None, **kw # otp_required = True if hint_message == "enter code" and input_type == "number" else False backup_code_required = True if hint_message == "enter code" and input_type == "text" else False if backup_code_required: - raise Exception(error_message) # "Enter your backup code" + backup_code_required_error_message = 'Backup code required.' + raise Exception(backup_code_required_error_message) else: email_verification = True if input_type == "email" or (hint_message == "phone or email" and input_type == "text") else False phone_verification = True if hint_message == "phone number" and input_type == "telephone" else False