-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
425 lines (348 loc) · 13.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
from datetime import date, datetime
import subprocess
import os
import requests
import json
import re
from bs4 import BeautifulSoup
NAME = None
EMAIL = None
served_file = 'Served.json'
# Change the working directory to the path of this file
PATH = os.path.abspath(os.path.dirname(__file__))
os.chdir(PATH)
# Set up GitHub repository details
repository_name = 'Get-Pair-Extraordinaire'
branch_name = None
AUTHOR_NAME = 'Prakash4844'
AUTHOR_EMAIL = os.environ.get('GIT_EMAIL', '[email protected]')
# Get GITHUB_TOKEN from GitHub secrets
GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN')
# Get GitHub token from GitHub secrets
GITHUB_PAT = os.environ.get('EXTRAORDINAIREPAT', f'{GITHUB_TOKEN}')
# Get today's date
today = date.today().isoformat()
def fetch_issues():
"""
Fetches issues from GitHub API, given the parameters above
:param: none
:return: list of issues with the given label and state or None if the request fails
"""
# Set up GitHub API URL
ISSUE_API_URL = f'https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/issues'
# API Parameters for filtering issues by label and state
API_PARAM = {
"labels": "Request",
"state": "open",
}
issue_response = requests.get(url=ISSUE_API_URL, params=API_PARAM)
if issue_response.status_code == 200:
return issue_response.json()
else:
print("Failed to fetch issues. Status code:", issue_response.status_code)
return None
def add_entry_to_table(username, avatar_url, github_link):
"""
Add a new entry to the table in the Markdown content
:param username:
:param avatar_url:
:param github_link:
:return:
"""
# Read the Markdown content from the file
with open('Extraordinary.md', 'r') as f:
md_content = f.read()
# Parse the Markdown content using BeautifulSoup
soup = BeautifulSoup(md_content, 'html.parser')
# Find the table element with the 'contributors' class
table = soup.find('table', {'class': 'contributors'})
# If the table doesn't exist, create a new one
if not table:
table = soup.new_tag('table')
table['class'] = 'contributors'
soup.append(table)
# Find all the rows in the table
rows = table.find_all('tr')
# Get the last row in the table (where the new entry will be added)
last_row = rows[-1] if rows else None
# If the last row is full (6 entries), create a new row
if not last_row or len(last_row.find_all('td')) == 7:
new_row = soup.new_tag('tr')
table.append(new_row)
last_row = new_row
# Create a new cell (column) for the entry
cell = soup.new_tag('td', align='center')
last_row.append(cell)
# Create the link with the username, avatar, and GitHub link
link = soup.new_tag('a', href=github_link)
cell.append(link)
# Create the image tag for the avatar
img = soup.new_tag('img', src=avatar_url, width="100;", alt=username)
link.append(img)
br = soup.new_tag('br')
link.append(br)
# Create the b tag for the username
b = soup.new_tag('b')
b.string = username
link.append(b)
soup = soup.prettify()
# Write the Markdown content back to the file
with open('Extraordinary.md', 'w') as f:
f.write(soup)
def update_served_json():
"""
Updates the JSON file with the name and email of the user who has been served in a structure like this:
{
"Year 2021": {
"January": {
"Week 01": {
"2021-01-01": {
"Messages": "served to Prakash4844
}
}
}
}
}
:param: none
:return:
"""
data = {}
try:
with open(served_file) as file:
data = json.load(file)
except FileNotFoundError:
pass
except json.decoder.JSONDecodeError:
pass
finally:
# Get current year, month, and week
current_year = 'Year ' + str(datetime.now().year)
current_month = datetime.now().strftime('%B')
current_week = 'Week ' + datetime.now().strftime('%U')
current_date = today
# Create JSON structure if it doesn't exist
if current_year not in data:
data[current_year] = {}
if current_month not in data[current_year]:
data[current_year][current_month] = {}
if current_week not in data[current_year][current_month]:
data[current_year][current_month][current_week] = {}
if current_date not in data[current_year][current_month][current_week]:
data[current_year][current_month][current_week][current_date] = {}
if NAME not in data[current_year][current_month][current_week][current_date]:
data[current_year][current_month][current_week][current_date][NAME] = {}
else:
print("User has already been served, today.")
exit(0)
# Update JSON data
data[current_year][current_month][current_week][current_date][NAME].update({
'Messages': "served to " + NAME + " (" + EMAIL + ") on " + today,
})
# Write updated JSON data back to file
with open(served_file, 'w') as file:
json.dump(data, file, indent=4)
def git_config_commit_push():
"""
Performs git operations to configure the git user, create a new branch,
add the changes, commit them, and push/publish the branch to the remote repository with tracking enabled.
:param: none
:return:
"""
subprocess.run(['git', 'config', 'user.email', AUTHOR_EMAIL])
subprocess.run(['git', 'config', 'user.name', AUTHOR_NAME])
subprocess.run(['git', 'checkout', '-b', f'{branch_name}'])
subprocess.run(['git', 'add', '.'])
subprocess.run(['git', 'commit', '-m', f'''Add {issue_creator} to the list of served users on {today}
\n\n\nCo-authored-by: {NAME} <{EMAIL}>'''])
subprocess.run(['git', 'push', '-u',
f'https://{GITHUB_PAT}@github.com/{AUTHOR_NAME}/{repository_name}.git',
f'{branch_name}'])
def git_cleanup():
"""
Performs git operations to checkout the main branch, delete the local and remote branches.
:param: none
:return:
"""
subprocess.run(['git', 'checkout', 'main'])
subprocess.run(['git', 'branch', '-D', f'{branch_name}'])
subprocess.run(
['git', 'push', f'https://{GITHUB_PAT}@github.com/{AUTHOR_NAME}/{repository_name}.git', '--delete',
f'{branch_name}'])
subprocess.run(['git', 'fetch', '--prune'])
subprocess.run(['git', 'pull', f'https://{GITHUB_PAT}@github.com/{AUTHOR_NAME}/{repository_name}.git',
'main'])
def comment_on_issue(comment_text, issue_no):
"""
Adds a comment_text to the issue with the given Issue_number
:param comment_text:
:param issue_no: (This is the issue number to which the comment should be added on)
:return:
"""
# Set up the URL, Data and headers for the API request
url = f"https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/issues/{issue_no}/comments"
headers = {
"Authorization": f"Bearer {GITHUB_PAT}",
"Accept": "application/vnd.github.v3+json"
}
data = {
"body": comment_text
}
# Make the API POST request to add the comment and get the response
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 201:
print("Comment added successfully.")
else:
print(f"Failed to add comment. Response: {response.text}")
def create_pull_request(title, body, head_branch, base_branch="main"):
"""
Creates a pull request in the given repository from the head_branch to the base_branch
With the given title and body
:param base_branch:
:param head_branch:
:param title:
:param body:
:return:
"""
url = f"https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/pulls"
headers = {
"Authorization": f"Bearer {GITHUB_PAT}",
"Accept": "application/vnd.github.v3+json"
}
data = {
"title": title, # Title of the pull request
"body": body, # Body of the pull request
"head": head_branch, # Branch where your changes are implemented (feature branch)
"base": base_branch # Branch you want your changes pulled into (default: main)
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 201:
print("Pull request created successfully.")
else:
print(f"Failed to create pull request. Response: {response.text}")
print("Aborting the process.\nPlease check the logs and try again.")
exit(1)
def get_pull_requests():
"""
Get a list of Open pull requests from the repository
:return:
"""
url = f"https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/pulls"
headers = {
"Authorization": f"Bearer {GITHUB_PAT}",
"Accept": "application/vnd.github.v3+json"
}
params = {
"state": "open" # possible values: open, closed, all
}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
prs = response.json()
return prs
else:
print(f"Failed to fetch pull requests. Response: {response.text}")
print("Aborting the process.\nPlease check the logs and try again.")
exit(1)
def merge_pull_request(pull_number):
"""
merge the pull request identified by pull number to main branch in the repository
:param pull_number:
:return:
"""
url = f"https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/pulls/{pull_number}/merge"
headers = {
"Authorization": f"Bearer {GITHUB_PAT}",
"Accept": "application/vnd.github.v3+json"
}
data = {
"merge_method": "merge", # possible values: merge, squash, rebase
"commit_message": f"merged PR for issue #{issue_number}, Created by {NAME} " # custom merged commit
# message
}
response = requests.put(url, headers=headers, json=data)
if response.status_code == 200:
print("Pull request merged successfully.")
else:
print(f"Failed to merge pull request. Response: {response.text}")
print("Aborting the process.\nPlease check the logs and try again.")
exit(1)
def close_issue_with_comment(issue_no):
"""
Close the issue with the given issue number in the repository along with a comment
:param issue_no:
:return:
"""
issue_url = f"https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/issues/{issue_no}"
comment_url = f"https://api.github.com/repos/{AUTHOR_NAME}/{repository_name}/issues/{issue_no}/comments"
headers = {
"Authorization": f"Bearer {GITHUB_PAT}",
"Accept": "application/vnd.github.v3+json"
}
# Add a comment to the closed issue
data = {
"body": f"Your Request has been processed in PR #{pr_number}\nThank you for your contribution."
"\n\nPlease star the repository if you liked it.\n"
}
response = requests.post(comment_url, headers=headers, data=json.dumps(data))
if response.status_code == 201:
print("Comment added successfully.")
else:
print(f"Failed to add comment. Response: {response.text}")
# Close the issue
data = {
"state": "closed"
}
response = requests.patch(issue_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("Issue closed successfully.")
else:
print(f"Failed to close issue. Response: {response.text}")
print(f"close manually issue #{issue_no}.")
# Get the list of issues
issue_list = fetch_issues()
if not issue_list:
print('No issue with "Request" Label found. Exiting...')
exit(0)
# Loop through the list of issues and process each one
for issue in issue_list:
issue_body = issue['body']
# Find the position of the colon (:) to separate the label and the value
name_start = issue_body.find(':') + 2
issue_creator = issue['user']['login']
issue_number = issue['number']
# Extract the name and email using string slicing
NAME = issue_body[name_start:issue_body.find('\r')]
EMAIL = re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b', issue_body).group()
# Set branch name for issue
branch_name = f'{issue_creator}-request-{today}'
# Update Extraordinaire.md
add_entry_to_table(username=issue_creator, avatar_url=issue['user']['avatar_url'],
github_link=issue['user']['html_url'])
# Update served.json
update_served_json()
# Perform git operations to create a new branch,
# add the changes, commit them, and push the branch to the remote repository with tracking enabled
git_config_commit_push()
# Comment on Current Issue
Issue_comment = f"Hi @{issue_creator},\n\nProcess has been started for your request." \
"\n\nThank you for using Get Pair Extraordinaire! :smile:\n"
comment_on_issue(Issue_comment, issue_number)
# Create Pull Request
pr_title = f"Serving Pair-Extraordinaire, Add {issue_creator} to the list of served users on {today}"
pr_body = f"Committed with, co-author, @{issue_creator} for providing Pair Extraordinaire badge. " \
f"This PR is in Relation to Issue #{issue_number}"
create_pull_request(pr_title, pr_body, head_branch=f"{branch_name}")
# Get the list of pull requests
pr_list = get_pull_requests()
for pr in pr_list:
if pr['title'] == pr_title:
pr_number = pr['number']
break
else:
pr_number = 0
# Merge Pull Request with pr_number
merge_pull_request(pr_number)
# Close Issue with issue_number
close_issue_with_comment(issue_number)
# Clean up local and remote git repository
git_cleanup()
print('Served everyone on the list! ;)')