-
Notifications
You must be signed in to change notification settings - Fork 12
/
telus.py
255 lines (198 loc) · 7.88 KB
/
telus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import urllib, urllib2, re
from cookies import Cookies
from urlparse import urlparse
def parseForm(inputs, html):
"""
Return a dictionary containing the inputs and values of a form. If there is
a form action then it is included as an element with key 'action'
@param html the html to parse
@return the form parameters and action (if present)
"""
values = {}
action = re.search('<form.*?action=\"(.*?)"', html, re.MULTILINE)
if action:
values['action'] = action.group(1)
for field in inputs:
result = re.search('<(input|button).*?name=\"' + field + '\".*?value=\"(.*?)\"', html, re.MULTILINE)
if result:
values[field] = result.group(2)
return values
class Telus:
"""
@class Telus
MSO class to handle authorization with the Shaw MSO.
IMPORTANT (if you are ever trying to figure out what goes on here). The
telus authentication does a really weird thing (well, its weird to me, but
I'm not a webdev -- maybe this isn't weird. If you aren't me and you are
reading this, do you think its weird? let me know in the comments -- don't
forget to subscribe) where it calls the SOS page, then the bookend page
twice. Then, it calls SOS again, and then bookend twice again. Finally,
on that fourth call to bookend, we are sent to the logon page.
"""
@staticmethod
def getID():
return 'telus_auth-gateway_net'
@staticmethod
def authorize(streamProvider, username, password):
"""
Perform authorization with Telus
@param streamProvider the stream provider object. Needs to handle the
getAuthURI.
@param username the username to authenticate with
@param password the password to authenticate with
"""
uri = streamProvider.getAuthURI('telus_auth-gateway_net')
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
try:
resp = opener.open(uri)
except:
print "Unable get Telus OAUTH location"
return None
Cookies.saveCookieJar(jar)
html = resp.read()
values = parseForm(['SAMLRequest', 'RelayState'], html)
action = values.pop('action')
if values == None:
print "Form parsing failed in authorize"
return None
return Telus.getBookend(username, password, values, action)
@staticmethod
def getBookend(username, password, values, url):
"""
Perform OAuth
@param username the username
@param password the password
@param saml the SAML request (form data)
@param relay the relay state (form data)
@param url the entitlement URL
"""
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
try:
resp = opener.open(url, urllib.urlencode(values))
except urllib2.URLError, e:
print e.args
return None
Cookies.saveCookieJar(jar)
html = resp.read()
values = parseForm(['AuthState', 'id', 'coeff'], html)
if values == None:
print "Form parsing failed in getBookend"
return None
values['history'] = '2'
return Telus.getBookendAgain(username, password, values,
resp.url.split('?')[0])
@staticmethod
def getBookendAgain(username, password, values, url):
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
values = urllib.urlencode(values)
url += '?' + values
try:
resp = opener.open(url, values)
except urllib2.URLError, e:
print e.args
return None
Cookies.saveCookieJar(jar)
html = resp.read()
# we might have been redirected to the login
if resp.url.find('login.php') > 0:
return Telus.login(username, password, html)
# if login was previously successful we might just get sent off to
url = re.search('location\.href.*?=.*?\"(.*?)\"', html, re.MULTILINE)
if url:
url = url.group(1)
return Telus.discoveryAssociations(url)
values = parseForm(['AuthState', 'id', 'coeff'], html)
if values == None:
print "Form parsing failed in getBookendAgain"
return None
values['history'] = '7'
return Telus.getBookendAgain(username, password, values,
resp.url.split('?')[0])
@staticmethod
def login(username, password, html):
values = parseForm(['login_type', 'remember_me', 'source',
'source_button'], html)
if values == None:
print "Form parsing failed in login"
return None
action = values.pop('action', None)
values['username'] = username
values['password'] = password
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
try:
resp = opener.open(action, urllib.urlencode(values))
except urllib2.URLError, e:
print e.args
return None
Cookies.saveCookieJar(jar)
html = resp.read()
url = re.search('location\.href.*?=.*?\"(.*?)\"', html, re.MULTILINE)
if not url:
print "Unable to parse URL from login result"
return None
url = url.group(1)
return Telus.discoveryAssociations(url)
@staticmethod
def discoveryAssociations(url):
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
values = { 'AuthState' : url[url.find('='):] }
try:
resp = opener.open(url, urllib.urlencode(values))
except urllib2.URLError, e:
print e.args
return None
Cookies.saveCookieJar(jar)
html = resp.read()
values = parseForm(['AuthState', 'id', 'coeff'], html)
if values == None:
print "Form parsing failed in getBookendAgain"
return None
return Telus.lastBookend(values, resp.url.split('?')[0])
@staticmethod
def lastBookend(values, url):
"""
Make the lastBookend call
"""
values['history'] = '7'
url = url + '?' + urllib.urlencode(values)
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
try:
resp = opener.open(url)
except urllib2.URLError, e:
print e.args
return None
Cookies.saveCookieJar(jar)
html = resp.read()
values = parseForm(['SAMLResponse', 'RelayState'], html)
if values == None:
print "Failed to parse last bookend"
return None
action = values.pop('action')
# note, the action could be either
# - https://adobe.auth-gateway.net/saml/module.php/saml/sp/saml2-acs.php/proxy_telus.auth-gateway.net
# - https://sp.auth.adobe.com/sp/saml/SAMLAssertionConsumer
return Telus.authGateway(values, action)
@staticmethod
def authGateway(values, url):
jar = Cookies.getCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
try:
resp = opener.open(url, urllib.urlencode(values))
except urllib2.URLError, e:
if e.reason[1] == "No address associated with hostname":
return True
return None
Cookies.saveCookieJar(jar)
html = resp.read()
url = re.search('location\.href.*?=.*?\"(.*?)\"', html, re.MULTILINE)
if not url:
print "Unable to parse auth gateway return"
return None
url = url.group(1)
return Telus.discoveryAssociations(url)