Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for backup code detection, maximum retry counts, automatic TOPT generation from MFA token, api v1.1 support and v1.1 endpoint users/email_phone_info.json #85

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tweeterpy/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Path:
DOMAIN = "x.com"
BASE_URL = "https://x.com/"
API_URL = "https://x.com/i/api/graphql/"
API_V1_URL = "https://x.com/i/api/1.1/"
TASK_URL = "https://api.x.com/1.1/onboarding/task.json"
GUEST_TOKEN_URL = "https://api.x.com/1.1/guest/activate.json"
JAVSCRIPT_INSTRUMENTATION_URL = "https://twitter.com/i/js_inst"
Expand Down Expand Up @@ -71,6 +72,8 @@ class Path:
TWEET_LIKES_ENDPOINT = "mpMee2WCjo7Nm4gRRHHnvA/Favoriters"
RETWEETED_BY_ENDPOINT = "7Fwe5A6kE05QIybims116A/Retweeters"
USER_HIGHLIGHTS_ENDPOINT = "w9-i9VNm_92GYFaiyGT1NA/UserHighlightsTweets"
# V1 ENDPOINTS
EMAIL_PHONE_INFO_ENDPOINT = "users/email_phone_info.json"


class FeatureSwitch:
Expand Down
82 changes: 59 additions & 23 deletions tweeterpy/login_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random
import time
from tweeterpy.constants import Path
from tweeterpy.util import find_nested_key
from tweeterpy.util import find_nested_key, generate_totp_token
from tweeterpy.request_util import RequestClient
from tweeterpy.logging_util import disable_logger

Expand All @@ -9,7 +10,7 @@ class TaskHandler:
def __init__(self, request_client: RequestClient = None):
self.request_client = request_client

def _create_task_mapper(self, username, password, verification_input_data):
def _create_task_mapper(self, username, password, mfa_token, verification_input_data):
# fmt: off - Turns off formatting for this block of code. Just for the readability purpose.
task_flow_mapper = {"LoginJsInstrumentationSubtask":{"task_executor": self._get_user_flow_token,"task_parameter":None},
"LoginEnterUserIdentifierSSO":{"task_executor": self._get_password_flow_token,"task_parameter":username},
Expand All @@ -18,7 +19,7 @@ def _create_task_mapper(self, username, password, verification_input_data):
"DenyLoginSubtask":{"task_executor": self._check_suspicious_login,"task_parameter":None},
"AccountDuplicationCheck":{"task_executor": self._check_account_duplication,"task_parameter":None},
"LoginAcid":{"task_executor":self._handle_suspicious_login,"task_parameter":verification_input_data},
"LoginTwoFactorAuthChallenge":{"task_executor":self._handle_suspicious_login,"task_parameter":verification_input_data},
"LoginTwoFactorAuthChallenge":{"task_executor":self._handle_two_factor_auth,"task_parameter":mfa_token},
"LoginSuccessSubtask":{"task_output": "\nPlease Wait... Logging In...\n"}}
return task_flow_mapper

Expand Down Expand Up @@ -75,21 +76,49 @@ def _check_account_duplication(self, flow_token, subtask_id="AccountDuplicationC
'subtask_inputs': [{'subtask_id': subtask_id, 'check_logged_in_account': {'link': 'AccountDuplicationCheck_false'}}]}
return self.request_client.request(Path.TASK_URL, method="POST", json=payload)

def _handle_suspicious_login(self, flow_token, subtask_id="LoginAcid",verification_input_data=None):
def _handle_auth_challenge(self, flow_token, subtask_id, get_input, verification_input_data=None, max_retries=float('inf')):
"""Handles the authentication flow for both suspicious login and two-factor auth."""
input_value = verification_input_data if verification_input_data is not None else get_input()
payload = {"flow_token": flow_token,
"subtask_inputs": [{"subtask_id": subtask_id, "enter_text": {"text": verification_input_data,"link":"next_link"}}]}
handle_incorrect_input = True
while handle_incorrect_input:
"subtask_inputs": [{"subtask_id": subtask_id, "enter_text": {"text": input_value,"link":"next_link"}}]}
retries = 0
while retries <= max_retries:
response = self.request_client.request(Path.TASK_URL, method="POST", json=payload, skip_error_checking=True)
if isinstance(response, dict) and "errors" in response.keys():
retries += 1
if retries > max_retries:
maximum_retry_error = f"We exceeded the self imposed maximum retry limit of {max_retries}."
raise Exception(maximum_retry_error)
error_message = "\n".join([error['message'] for error in response['errors']])
payload['subtask_inputs'][0]['enter_text']['text'] = str(input(f"{error_message} - Type again ==> "))
print(f"{error_message} - ", end="")
input_value = get_input()
payload['subtask_inputs'][0]['enter_text']['text'] = input_value
else:
handle_incorrect_input = False
break
return response

def _handle_suspicious_login(self, flow_token, subtask_id="LoginAcid", verification_input_data=None):
"""Handles suspicious login challenges."""
def get_input():
return input("Input the verification data ==> ")

return self._handle_auth_challenge(flow_token, subtask_id, get_input, verification_input_data)

def _handle_two_factor_auth(self, flow_token, subtask_id="LoginTwoFactorAuthChallenge", mfa_token=None):
"""Handles two-factor authentication challenges."""
def get_totp():
"""Generates or prompts for a TOTP."""
if mfa_token:
totp = generate_totp_token(mfa_token)
print(f"\nGenerated TOTP {totp} from MFA token {mfa_token}\n")
return totp
return input("Input the 6-digit OTP ==> ")

return self._handle_auth_challenge(flow_token, subtask_id, get_totp, max_retries=1)

@disable_logger
def login(self, username, password, email=None, phone=None, **kwargs):
def login(self, username, password, email=None, phone=None, mfa_token=None, **kwargs):

# MANUAL WAY OF HANDLING LOG IN
"""
initital_flow_token = self._get_flow_token()
Expand All @@ -108,7 +137,7 @@ def login(self, username, password, email=None, phone=None, **kwargs):
tasks_pending = True
verification_input_data = None
try:
task_flow_mapper = self._create_task_mapper(username or email, password, verification_input_data)
task_flow_mapper = self._create_task_mapper(username or email, password, mfa_token, verification_input_data)
response = self._get_flow_token()
self._get_javscript_instrumentation_subtask()
while tasks_pending:
Expand All @@ -127,18 +156,25 @@ def login(self, username, password, email=None, phone=None, **kwargs):
if task_id in ['LoginAcid', 'LoginEnterAlternateIdentifierSubtask', 'LoginTwoFactorAuthChallenge']:
input_type = (find_nested_key(response,"keyboard_type") or "").strip().lower()
hint_message = (find_nested_key(response,"hint_text") or "").strip().lower()
input_message = f"{hint_message} (Input Type - {input_type}) ==> "
otp_required = True if hint_message == "confirmation code" and input_type == "text" else False
email_verification = True if input_type == "email" or (hint_message == "phone or email" and input_type == "text") else False
phone_verification = True if hint_message == "phone number" and input_type == "telephone" else False
identity_verification = True if hint_message == "phone or username" and input_type == "text" else False
two_fac_auth = True if task_id == "LoginTwoFactorAuthChallenge" else False
if input_type and hint_message and error_message:
print(f"\n{error_message}\n")
verification_input_data = phone if (phone_verification or identity_verification) and phone else email if email_verification and email else str(input(input_message)) if two_fac_auth or otp_required else None
if not verification_input_data:
raise Exception(error_message)
task_flow_mapper[task_id].update({"task_parameter":verification_input_data})
if task_id == 'LoginTwoFactorAuthChallenge':
# Not required because LoginTwoFactorAuthChallenge is handled in self._handle_two_factor_auth()
# otp_required = True if hint_message == "enter code" and input_type == "number" else False
backup_code_required = True if hint_message == "enter code" and input_type == "text" else False
if backup_code_required:
backup_code_required_error_message = 'Backup code required.'
raise Exception(backup_code_required_error_message)
else:
email_verification = True if input_type == "email" or (hint_message == "phone or email" and input_type == "text") else False
phone_verification = True if hint_message == "phone number" and input_type == "telephone" else False
identity_verification = True if hint_message == "phone or username" and input_type == "text" else False
if input_type and hint_message and error_message:
print(f"\n{error_message}\n")
verification_input_data = (phone if (phone_verification or identity_verification) and phone else
email if email_verification and email else
None)
if not verification_input_data:
raise Exception(error_message)
task_flow_mapper[task_id].update({"task_parameter":verification_input_data})
if task_id == 'LoginSuccessSubtask':
tasks_pending = False
print(task['task_output'])
Expand Down
27 changes: 25 additions & 2 deletions tweeterpy/tweeterpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ def __init__(self):
logger.warn(error)
self.request_client.session.headers.update({"Authorization": token})

def _generate_v1_request_data(self, endpoint, params={}):
url = util.generate_url(domain=Path.API_V1_URL, url_path=endpoint)
request_payload = {"url": url, "params": params}
logger.debug(f"Request Payload => {request_payload}")
return request_payload

def _generate_request_data(self, endpoint, variables=None, **kwargs):
# fmt: off - Turns off formatting for this block of code. Just for the readability purpose.
url = util.generate_url(domain=Path.API_URL, url_path=endpoint)
Expand Down Expand Up @@ -232,22 +238,23 @@ def logged_in(self):
return True
return False

def login(self, username=None, password=None, email=None, phone=None, **kwargs):
def login(self, username=None, password=None, email=None, phone=None, mfa_token=None, **kwargs):
"""Log into an account.

Args:
username (str, optional): Twitter username. Defaults to None.
password (str, optional): Password. Defaults to None.
email (str, optional): Twitter email. Defaults to None.
phone (str, optional): Twitter phone. Defaults to None.
mfa_token (str, optional): Twitter MFA/2FA manual code. Defaults to None.
"""
self.generate_session()
if username is None:
username = str(input("Enter Your Username or Email : ")).strip()
if password is None:
password = getpass.getpass()
TaskHandler(request_client=self.request_client).login(
username, password, email=email, phone=phone, **kwargs)
username, password, email=email, phone=phone, mfa_token=mfa_token, **kwargs)
util.generate_headers(session=self.request_client.session)
try:
user = self.me
Expand Down Expand Up @@ -296,6 +303,22 @@ def get_user_info(self, user_id):
response = self.request_client.request(**request_payload)
return response['data']['user']['result']

@login_decorator
def get_user_emails_and_phone_numbers(self):
"""Gets all emails and phone numbers associated with a users account.

Args:
None

Returns:
dict: User information.
"""
params = {'include_pending_email': True}
request_payload = self._generate_v1_request_data(
Path.EMAIL_PHONE_INFO_ENDPOINT, params)
response = self.request_client.request(**request_payload)
return response

def get_user_data(self, username):
"""Extracts user details as same as get_user_info method. Except this one returns info about blue tick verification badge as well.

Expand Down
17 changes: 17 additions & 0 deletions tweeterpy/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import re
import bs4
import time
import base64
import struct
import hmac
import datetime
import logging.config
from functools import reduce
Expand All @@ -10,6 +13,7 @@
from tweeterpy.constants import Path, PUBLIC_TOKEN
from dataclasses import dataclass, field, fields, asdict, _MISSING_TYPE


logging.config.dictConfig(config.LOGGING_CONFIG)
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,6 +196,19 @@ def get_nested_data(dataset, nested_key, placeholder):
return [get_nested_data(data, nested_key, []) for data in dataset] if isinstance(dataset, list) else get_nested_data(dataset, nested_key, [])


def generate_hotp_token(key, counter, digits=6, digest='sha1'):
key = base64.b32decode(key.upper() + '=' * ((8 - len(key)) % 8))
counter = struct.pack('>Q', counter)
mac = hmac.new(key, counter, digest).digest()
offset = mac[-1] & 0x0f
binary = struct.unpack('>L', mac[offset:offset+4])[0] & 0x7fffffff
return str(binary)[-digits:].zfill(digits)


def generate_totp_token(key, time_step=30, digits=6, digest='sha1'):
return generate_hotp_token(key, int(time.time() / time_step), digits, digest)


@dataclass
class _Item:
# Base Item class for other data-classes to manipulate data.
Expand Down