This repository has been archived by the owner on Oct 25, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
yturl.py
executable file
·171 lines (133 loc) · 5.97 KB
/
yturl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python
"""YouTube videos on the command line."""
from __future__ import print_function, unicode_literals
import argparse
import collections
import logging
import sys
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
import requests
LOG = logging.getLogger(__name__)
# A mapping of quality names to functions that determine the desired itag from
# a list of itags. This is used when `-q quality` is passed on the command line
# to determine which available itag best suits that quality specification.
NAMED_QUALITY_GROUPS = {
"low": lambda itags: itags[-1],
"medium": lambda itags: itags[len(itags) // 2],
"high": lambda itags: itags[0],
}
DEFAULT_HEADERS = {"User-Agent": "yturl (https://github.com/cdown/yturl)"}
def construct_youtube_get_video_info_url(video_id):
"""
Construct a YouTube API url for the get_video_id endpoint from a video ID.
"""
base_parsed_api_url = urlparse("https://www.youtube.com/get_video_info")
new_query = urlencode({"video_id": video_id})
# As documented in the core Python docs, ._replace() is not internal, the
# leading underscore is just to prevent name collisions with field names.
new_parsed_api_url = base_parsed_api_url._replace(query=new_query)
new_api_url = urlunparse(new_parsed_api_url)
return new_api_url
def video_id_from_url(url):
"""
Parse a video ID, either from the "v" parameter or the last URL path slice.
"""
parsed_url = urlparse(url)
url_params = parse_qs_single(parsed_url.query)
video_id = url_params.get("v", parsed_url.path.split("/")[-1])
LOG.debug("Parsed video ID %s from %s", url, video_id)
return video_id
def itags_for_video(video_id):
"""
Return itags for a video with their media URLs, sorted by quality.
"""
api_url = construct_youtube_get_video_info_url(video_id)
api_response_raw = requests.get(api_url, headers=DEFAULT_HEADERS)
LOG.debug("Raw API response: %r", api_response_raw.text)
api_response = parse_qs_single(api_response_raw.text)
LOG.debug("parse_qs_single API response: %r", api_response)
if api_response.get("status") != "ok":
reason = api_response.get("reason", "Unspecified error.")
# Unfortunately YouTube returns HTML in this instance, so there's no
# reasonable way to use api_response directly.
if "CAPTCHA" in api_response_raw.text:
reason = "You need to solve a CAPTCHA, visit %s" % api_url
raise YouTubeAPIError(reason)
# The YouTube API returns these from highest to lowest quality, which we
# rely on. From this point forward, we need to make sure we maintain order.
try:
streams = api_response["url_encoded_fmt_stream_map"].split(",")
except KeyError:
raise NotImplementedError(
"Live and streaming videos are unsupported."
) from None
videos = [parse_qs_single(stream) for stream in streams]
return collections.OrderedDict((vid["itag"], vid["url"]) for vid in videos)
def itag_from_quality(group_or_itag, itags):
"""
If "group_or_itag" is a quality group, return an appropriate itag from
itags for that group. Otherwise, group_or_itag is an itag -- just return
it.
"""
if group_or_itag in NAMED_QUALITY_GROUPS:
# "group_or_itag" is really a named quality group. Use
# NAMED_QUALITY_GROUPS to get a function to determine the itag to use.
func_to_get_desired_itag = NAMED_QUALITY_GROUPS[group_or_itag]
return func_to_get_desired_itag(itags)
if group_or_itag in itags:
# "group_or_itag" is really an itag. Just pass it through unaltered.
return group_or_itag
raise ValueError(
"Group/itag %s unavailable (video itags: %r, known groups: %r)"
% (group_or_itag, itags, list(NAMED_QUALITY_GROUPS))
)
def parse_qs_single(query_string):
"""
Parse a query string per parse_qs, but with the values as single elements.
parse_qs, as mandated by the standard, dutifully returns each value as a
list in case the key appears twice in the input, which can be quite
inconvienient and results in hard to read code.
We *could* just do dict(parse_qsl(x)), but this would indiscriminately
discard any duplicates, and we'd rather raise an exception on that.
Instead, we verify that no key appears twice (throwing an exception if any
do), and then return each value as a single element in the dictionary.
"""
raw_pairs = parse_qs(query_string)
dupes = [key for (key, values) in raw_pairs.items() if len(values) > 1]
if dupes:
raise ValueError("Duplicate keys in query string: %r" % dupes)
one_val_pairs = {key: values[0] for (key, values) in raw_pairs.items()}
return one_val_pairs
def main(argv=None):
"""
Parse user-provided args and print out an itag on stderr and URL on stdout.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-q", "--quality", default="medium", help="low/medium/high or an itag"
)
parser.add_argument(
"--debug",
action="store_const",
dest="log_level",
const=logging.DEBUG,
default=logging.WARNING,
help="enable debug logging",
)
parser.add_argument("video_id", metavar="video_id/url", type=video_id_from_url)
args = parser.parse_args(argv)
logging.basicConfig(level=args.log_level)
itag_to_url_map = itags_for_video(args.video_id)
# available_itags must be indexable for use with NAMED_QUALITY_GROUPS
available_itags = list(itag_to_url_map)
desired_itag = itag_from_quality(args.quality, available_itags)
print("Using itag %s." % desired_itag, file=sys.stderr)
print(itag_to_url_map[desired_itag])
class YouTubeAPIError(Exception):
"""
Raised when the YouTube API returns failure. This is not used when issues
arise during processing of the received API data -- in those cases, we use
more specific exception types.
"""
if __name__ == "__main__": # pragma: no cover
main()