forked from harpaj/acc14
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
103 lines (89 loc) · 3.41 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from flask import Flask, jsonify, request
from celery.result import ResultSet
import time, threading
import os
import json
from tasks import one_angle
app = Flask(__name__)
RESULT_DIR = "results"
@app.route('/run')
def schedule_run():
start_time = time.time()
# check and store parameters
args = request.args
try:
from_angle = int(args.get('from_angle'))
to_angle = int(args.get('to_angle'))
step_size = int(args.get('step_size'))
naca = args.get('naca', '0012')
assert len(naca) == 4
nodes = args.get('nodes', '200')
int(nodes)
refinements = args.get('refinements', '0')
assert 0 <= int(refinements) <= 2
samples = args.get('samples', '10')
int(samples)
viscosity = args.get('viscosity', '0.0001')
float(viscosity)
speed = args.get('speed', '10.')
float(speed)
total_time = args.get('total_time', '1')
int(total_time)
except (TypeError, ValueError, AssertionError) as e:
return jsonify({
"message": "You delivered invalid parameters.",
"required_parameters": {
"from_angle": {"type": "int"},
"to_angle": {"type": "int"},
"step_size": {"type": "int"}
},
"optional parameters": {
"naca": {"type": "char[4]", "default": "0012"},
"nodes": {"type": "int", "default": "20"},
"refinements": {"type": "[0, 1, 2]", "default": "0"},
"samples": {"type": "int", "default": "10"},
"viscosity": {"type": "float", "default": "0.0001"},
"speed": {"type": "float", "default": "10."},
"total_time": {"type": "int", "default": "1"}
},
"python_error": str(e)
})
# load filenames of all currently stored results
stored_results = set([f.name for f in os.scandir(RESULT_DIR) if f.is_file()])
# for each angle, load results from file or schedule task
filename = '_'.join([naca, nodes, refinements, samples, viscosity, speed, total_time])
results = []
tasks = []
for angle in range(from_angle, to_angle+1, step_size):
angle = str(angle)
_filename = angle + "_" + filename + ".json"
if _filename not in stored_results:
tasks.append(one_angle.delay(
angle, *naca, nodes, refinements, samples, viscosity, speed, total_time))
else:
with open(os.path.join(RESULT_DIR, _filename), 'r') as fh:
results.append((angle, json.load(fh)))
# get results from tasks, store them
new_results = ResultSet(tasks).join_native()
for angle, res in new_results:
_filename = angle + "_" + filename + ".json"
with open(os.path.join(RESULT_DIR, _filename), 'w') as fh:
json.dump(res, fh)
response = {
"result": dict(results + new_results),
"duration": time.time() - start_time,
}
return jsonify(response)
def monitor_cluster_cpu(nodes):
cpu_tasks = []
all_cpu_util = []
cpu_util = 0
for node in nodes:
cpu_tasks.append(cpu_monitor.delay())
all_cpu_util = ResultSet(cpu_tasks).join_native()
cpu_util = sum(ResultSet)/nodes
print all_cpu_util, cpu_util
threading.Timer(60, monitor_cluster_cpu).start()
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
monitor_cluster_cpu(8)