-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache-purger.py
100 lines (80 loc) · 3.04 KB
/
cache-purger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import json
import shutil
import socket
import sys
from hashlib import md5
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
CACHE_DIR = "/mnt/cache"
class CachePurgeException(Exception):
pass
class HTTPServerIPv6(HTTPServer):
address_family = socket.AF_INET6
class handler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
try:
check_headers(self.headers)
content_length = int(self.headers.get("Content-Length", "0"))
if content_length == 0:
purgeEverything()
self.send_response(HTTPStatus.OK, "Purge succeeded")
else:
body = self.rfile.read(content_length)
keys = json.loads(body)
if isinstance(keys, list):
purge(keys)
self.send_response(HTTPStatus.OK, "Purge succeeded")
else:
self.send_error(
HTTPStatus.BAD_REQUEST, "Missing array of keys to purge"
)
except CachePurgeException as e:
self.send_response(HTTPStatus.BAD_REQUEST, str(e))
except json.JSONDecodeError:
self.send_error(HTTPStatus.BAD_REQUEST, "Invalid JSON")
self.end_headers()
def check_headers(headers) -> None:
if "Content-Type" not in headers:
raise CachePurgeException("Missing Content-Type header")
if not headers.get("Content-Type") == "application/json":
raise CachePurgeException("Invalid Content-Type header")
if "Content-Length" not in headers:
raise CachePurgeException("Missing Content-Length header")
try:
int(headers.get("Content-Length"))
except ValueError as e:
raise CachePurgeException("Invalid Content-Length header") from e
def purge(keys: list[str]) -> None:
for key in keys:
filename = md5(key.encode("utf-8")).hexdigest()
path = os.path.join( # because nginx cache_path levels=1:2
CACHE_DIR, filename[-1], filename[-3:-1], filename
)
try:
if os.path.isfile(path):
os.remove(path)
log(f"Purged {key}")
except OSError as e:
log(f"Failed to purge {key}: {e}")
raise CachePurgeException("Purge failed") from e
def purgeEverything() -> None:
for filename in os.listdir(CACHE_DIR):
if filename == "lost+found":
continue
file_path = os.path.join(CACHE_DIR, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
log(f"Failed to purge entire cache: {e}")
raise CachePurgeException("Purge failed") from e
def log(message: str) -> None:
print(message, file=sys.stderr)
def main() -> None:
with HTTPServerIPv6(("::", 8081), handler) as server:
server.serve_forever()
if __name__ == "__main__":
main()