-
Notifications
You must be signed in to change notification settings - Fork 18
/
EnvironmentSim.py
143 lines (111 loc) · 4.35 KB
/
EnvironmentSim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import sys
import cv2
import time
import math
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import xml.etree.ElementTree as ET
from MultiRotorConnector import MultiRotorConnector
from CarConnector import CarConnector
class State():
DELTA_X = 0.0
DELTA_Y = 0.0
DELTA_Z = 0.0
pass
class EnvironmentSim:
def __init__(self, image_shape=(720, 1280)):
if not os.path.exists('data'):
os.mkdir('data/')
self.current_episode = 0
self.current_timestep = 0
self.im_width = image_shape[1]
self.im_height = image_shape[0]
self.max_dist = 80.0
self.max_dist_z = 5.0
self.MIN_ALTITUDE = -0.5
self.MAX_ALTITUDE = -40.0
self.current_frame = None
self.current_output = None
self._uav_connector = MultiRotorConnector()
self._car_connector = CarConnector()
def state_to_array(self, state):
out = np.zeros((3,), dtype='float32')
out[0] = float(state.DELTA_X)/float(self.max_dist)
out[1] = float(state.DELTA_Y)/float(self.max_dist)
out[2] = float(state.DELTA_Z)/float(self.max_dist_z)
return out
def reset(self):
self.current_episode += 1
self.current_timestep += 1
car_pos, car_ort = self._car_connector.reset()
self._uav_connector.reset()
# self._uav_connector.move_by_angle(car_ort, self._uav_connector.INIT_Z)
self._car_connector.drive()
car_pos = self._car_connector.get_position()
uav_pos = self._uav_connector.get_position()
_state = State()
_state.DELTA_X = car_pos.x_val - uav_pos.x_val
_state.DELTA_Y = car_pos.y_val - uav_pos.y_val
_state.DELTA_Z = uav_pos.z_val - self._uav_connector.INIT_Z
print "\n-----Parameters-----"
print "Delta X:", _state.DELTA_X
print "Delta Y:", _state.DELTA_Y
print "Delta Z:", _state.DELTA_Z
self.current_state = _state
return self.state_to_array(_state)
def step(self, action):
# TRACKER
done = 0
reward = 0.0
uav_vel = self._uav_connector.get_velocity()
print "\n-----Parameters-----"
print "Delta X :", self.current_state.DELTA_X
print "Delta Y :", self.current_state.DELTA_Y
print "Delta Z :", self.current_state.DELTA_Z
print "UAV Vel :", (uav_vel.x_val, uav_vel.y_val, uav_vel.z_val)
self._car_connector.drive()
self._uav_connector.move_by_velocity(action)
car_pos = self._car_connector.get_position()
uav_pos = self._uav_connector.get_position()
print "Car Pos :", car_pos.x_val, car_pos.y_val, car_pos.z_val
print "UAV Pos :", uav_pos.x_val, uav_pos.y_val, uav_pos.z_val
dist_x = car_pos.x_val - uav_pos.x_val
dist_y = car_pos.y_val - uav_pos.y_val
dist_xy = np.linalg.norm([dist_x, dist_y])
reward_xy = (1.0 - dist_xy/self.max_dist) * 1.0
dist_z = abs(uav_pos.z_val - self._uav_connector.INIT_Z)
reward_z = dist_z/self.max_dist_z
if reward_z > 1.0:
reward_z = -1.0 * math.exp(reward_z)
else:
reward_z = -1.0 * reward_z
reward = (reward_xy + reward_z)
print "Distance XY:", dist_xy, \
"\nDistance Z :", dist_z
print "Reward XY :", reward_xy, \
"\nReward Z :", reward_z
print "Reward (+T):", reward
reward += 0.20
frame = self._uav_connector.get_frame(path=('data/' + str(self.current_timestep).zfill(4) + '.jpg'))
cv2.imshow('Simulation', frame)
cv2.waitKey(5)
# NEXT frame
_state = State()
if (uav_pos.z_val > self.MIN_ALTITUDE) or \
(uav_pos.z_val < self.MAX_ALTITUDE) or \
reward_xy<0.0 or \
car_pos.z_val<=-2.0:
reward = -10
done = 1
else:
_state.DELTA_X = car_pos.x_val - uav_pos.x_val
_state.DELTA_Y = car_pos.y_val - uav_pos.y_val
_state.DELTA_Z = uav_pos.z_val - self._uav_connector.INIT_Z
self.current_state = _state
print "Reward :", reward
print "Action RL :", action, "+m/s"
print "Done :", done
self.current_timestep += 1
return self.state_to_array(_state), reward, done