-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSimClient.py
472 lines (356 loc) · 16.4 KB
/
SimClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
#!/usr/bin/env python
'''
simulation client for AirSimInterface.py
this runs the main loop and holds the settings for the simulation.
'''
from AirSimInterface import AirSimInterface
import airsim
import numpy as np
import pprint
import curses
import os
import time
from math import *
import time
import cv2
from copy import deepcopy
from util import *
# import MAVeric polynomial trajectory planner
import MAVeric.trajectory_planner as maveric
# use custom PID controller
# from VelocityPID import VelocityPID
from UnityPID import VelocityPID
class SimClient(AirSimInterface):
def __init__(self, raceTrackName="track0", *args, **kwargs):
# init super class (AirSimInterface)
super().__init__(raceTrackName=raceTrackName, *args, **kwargs)
# do custom setup here
if self.config.debug:
self.c = curses.initscr()
curses.noecho()
curses.cbreak()
self.gateConfigurations = []
self.currentGateConfiguration = 0
self.timestep = 1. / self.config.framerate
'''
this function should be called at end of lifetime of this object (see contextlib)
close all opened files here
and reset simulation
'''
def close(self):
self.client.simPause(False)
if self.config.debug:
curses.nocbreak()
curses.echo()
curses.endwin()
self.client.simFlushPersistentMarkers()
# close super class (AirSimController)
super().close()
'''
run mission
- initialize velocity pid controller
- generate trajectory through gates
- save current configuration to dataset folder
- follow trajectory with sampled waypoints
showMarkers: boolean, if true, trajectory will be visualized with red markers in simulation
captureImages: boolean, if true, each iteration will capture a frame of each camera, simulation is paused for this
velocity_limit: float, maximum velocity of the vehicle
variable name prefix:
W: coordinates in world frame
B: coordinates in body frame (drone/uav)
'''
def gateMission(self, showMarkers=True, captureImages=True, velocity_limit=2, uav_position=None):
mission = True
# reset sim
self.reset()
# takeoff
self.client.takeoffAsync().join()
if uav_position:
uav_position[3] = uav_position[3] + 90
self.setPositionUAV(uav_position)
self.client.moveByVelocityAsync(float(0), float(0), float(0),
duration=float(3), yaw_mode=airsim.YawMode(False, uav_position[3]))
# make sure drone is not drifting anymore after takeoff
time.sleep(3)
# init pid controller for velocity control
pp = 2
dd = .2
ii = .0
Kp = np.array([pp, pp, pp])
Ki = np.array([ii, ii, ii])
Kd = np.array([dd, dd, dd])
yaw_gain = np.array([.2, 0., 25]) # [.25, 0, 0.25]
distance_threshold = 0.01
angle_threshold = 0.1
ctrl = VelocityPID(Kp, Ki, Kd, yaw_gain, distance_threshold, angle_threshold)
# set initial state and goal to 0
ctrl.setState([0, 0, 0, 0])
ctrl.setGoal([0, 0, 0, 0])
# get trajectory
Wtimed_waypoints, Wtrajectory = self.generateTrajectoryFromCurrentGatePositions(timestep=1)
Wpath, WpathComplete = self.convertTrajectoryToWaypoints(Wtimed_waypoints, Wtrajectory,
evaltime=self.config.roundtime)
# save current configuration and trajectory in data set folder
data = {
"waypoints": WpathComplete
}
self.saveConfigToDataset(self.gateConfigurations[self.currentGateConfiguration], data)
# show trajectory
if showMarkers:
self.client.simPlotPoints(Wpath, color_rgba=[1.0, 0.0, 0.0, .2], size=10.0, duration=-1.0,
is_persistent=True)
lastWP = time.time()
lastImage = time.time()
lastIMU = time.time()
lastPID = time.time()
lastBG = time.time()
WLastAirSimVel = [0., 0., 0., 0.]
timePerWP = float(self.config.roundtime) / len(WpathComplete)
timePerImage = 1. / float(self.config.framerate)
timePerIMU = 1. / float(self.config.imuRate)
timePerPID = 1. / float(self.config.pidRate)
timePerBG = 1. / float(self.config.backgroundChangeRate)
cwpindex = 0
cimageindex = 0
if self.config.debug:
self.c.clear()
def angleDifference(a: float, b: float):
return (a - b + 540) % 360 - 180
# controll loop
while mission:
# if self.config.debug:
# self.c.clear()
# get and plot current waypoint (blue)
wp = WpathComplete[cwpindex]
# show markers if applicable
self.showMarkers(showMarkers, wp)
# get current time and time delta
tn = time.time()
nextWP = tn - lastWP > timePerWP
nextImage = tn - lastImage > timePerImage
nextIMU = tn - lastIMU > timePerIMU
nextPID = tn - lastPID > timePerPID
nextBG = tn - lastBG > timePerBG
if showMarkers:
current_drone_pose = self.getPositionUAV()
self.client.simPlotPoints(
[airsim.Vector3r(current_drone_pose[0], current_drone_pose[1], current_drone_pose[2])],
color_rgba=[1.0, 0.0, 1.0, 1.0],
size=10.0, duration=self.timestep, is_persistent=False)
self.client.simPlotPoints(
[airsim.Vector3r(current_drone_pose[0], current_drone_pose[1], current_drone_pose[2])],
color_rgba=[1.0, 0.6, 1.0, .5],
size=10.0, duration=self.timestep, is_persistent=True)
if self.config.debug:
if nextWP:
self.c.addstr(3, 0, f"wpt: {format(1. / float(tn - lastWP), '.4f')}hz")
if nextImage:
self.c.addstr(4, 0, f"img: {format(1. / float(tn - lastImage), '.4f')}hz")
if nextIMU:
self.c.addstr(5, 0, f"imu: {format(1. / float(tn - lastIMU), '.4f')}hz")
if nextPID:
self.c.addstr(6, 0, f"pid: {format(1. / float(tn - lastPID), '.4f')}hz")
if nextBG:
self.changeBackground()
lastBG = tn
if nextIMU:
self.captureIMU()
lastIMU = tn
if nextImage and captureImages:
# pause simulation
prepause = time.time()
self.client.simPause(True)
Bvel, Byaw = ctrl.getVelocityYaw()
Bvel = Bvel / velocity_limit
# save images of current frame
self.captureAndSaveImages(cwpindex, cimageindex, [*Bvel, Byaw], WLastAirSimVel)
cimageindex += 1
# unpause simulation
self.client.simPause(False)
postpause = time.time()
pausedelta = postpause - prepause
if self.config.debug:
self.c.addstr(10, 0, f"pausedelta: {pausedelta}")
lastWP += pausedelta
lastIMU += pausedelta
tn += pausedelta
lastImage = tn
if nextPID:
# get current state
Wcstate = self.getState()
# set goal state of pid controller
Bgoal = vector_world_to_body(wp[:3], Wcstate[:3], Wcstate[3])
# desired yaw angle is target point yaw angle world minus current uav yaw angle world
ByawGoal = angleDifference(wp[3], degrees(Wcstate[3]))
print(f"angle target: {ByawGoal:5.4f}")
ctrl.setGoal([*Bgoal, ByawGoal])
# update pid controller
ctrl.update(tn - lastPID)
# get current pid output´
Bvel, Byaw = ctrl.getVelocityYaw()
print(f"magnitude: {magnitude(Bvel)}")
Bvel_percent = magnitude(Bvel) / velocity_limit
print(f"percent: {Bvel_percent * 100}")
# if magnitude of pid output is greater than velocity limit, scale pid output to velocity limit
if Bvel_percent > 1:
Bvel = Bvel / Bvel_percent
# rotate velocity command such that it is in world coordinates
Wvel = vector_body_to_world(Bvel, [0, 0, 0], Wcstate[3])
# add pid output for yaw to current yaw position
Wyaw = degrees(Wcstate[3]) + Byaw
WLastAirSimVel = [*Wvel, Wyaw]
'''
Args:
vx (float): desired velocity in world (NED) X axis
vy (float): desired velocity in world (NED) Y axis
vz (float): desired velocity in world (NED) Z axis
duration (float): Desired amount of time (seconds), to send this command for
drivetrain (DrivetrainType, optional):
yaw_mode (YawMode, optional):
vehicle_name (str, optional): Name of the multirotor to send this command to
'''
self.client.moveByVelocityAsync(float(Wvel[0]), float(Wvel[1]), float(Wvel[2]),
duration=float(timePerPID), yaw_mode=airsim.YawMode(False, Wyaw))
# save last PID time
lastPID = tn
# debug output
if self.config.debug:
self.c.refresh()
# increase current waypoint index if time per waypoint passed and if there are more waypoints available in path
if nextWP and len(WpathComplete) > (cwpindex + 1):
cwpindex = cwpindex + 1
lastWP = tn
# end mission when no more waypoints available
if len(WpathComplete) - 10 <= (cwpindex + 1): # ignore last 80 waypoints
mission = False
if showMarkers:
# clear persistent markers
self.client.simFlushPersistentMarkers()
def showMarkers(self, showMarkers, wp):
if showMarkers:
self.client.simPlotPoints([airsim.Vector3r(wp[0], wp[1], wp[2])], color_rgba=[0.0, 0.0, 1.0, 1.0],
size=10.0, duration=self.timestep, is_persistent=False)
# set gate poses in simulation to provided configuration
# gates: [ [x, y, z, yaw], ...]
def loadGatePositions(self, gates):
# load gate positions
for i, gate in enumerate(gates):
self.setPositionGate(i + 1, gate)
# load the next gate pose configuration in self.gateConfigurations
# if index overflows, wraps around to 0 and repeats configurations
def loadNextGatePosition(self):
currentConfiguration = self.gateConfigurations[self.currentGateConfiguration]
self.loadGatePositions(currentConfiguration)
self.currentGateConfiguration += 1
if self.currentGateConfiguration >= len(self.gateConfigurations):
self.currentGateConfiguration = 0
'''
generate random gate pose configurations
'''
def generateGateConfigurations(self):
# reset fields
self.currentGateConfiguration = 0
self.gateConfigurations = []
gridsize = self.config.gates["grid_size"]
# generate range for each axis
xr0 = self.config.gates["range"]["x"][0]
xr1 = self.config.gates["range"]["x"][1]
xr = list(np.linspace(xr0, xr1, gridsize))
yr0 = self.config.gates["range"]["y"][0]
yr1 = self.config.gates["range"]["y"][1]
yr = list(np.linspace(yr0, yr1, gridsize))
zr0 = self.config.gates["range"]["z"][0]
zr1 = self.config.gates["range"]["z"][1]
zr = list(np.linspace(zr0, zr1, gridsize))
wr0 = self.config.gates["range"]["yaw"][0]
wr1 = self.config.gates["range"]["yaw"][1]
wr = list(np.linspace(wr0, wr1, gridsize))
gateCenters = self.config.gates['poses']
# generate configurations
for i in range(self.config.gates['number_configurations']):
currentConfiguration = []
for gate in gateCenters:
# create new gate from gate center
newGate = np.array(deepcopy(gate))
ra = np.random.randint(gridsize, size=4)
# move gate by random offset
newGate += np.array([xr[ra[0]], yr[ra[1]], zr[ra[2]], wr[ra[3]]])
currentConfiguration.append(newGate)
# append configuration
self.gateConfigurations.append(currentConfiguration)
# remove duplicates
self.gateConfigurations = np.array(self.gateConfigurations)
remove_duplicates = lambda data: np.unique([tuple(row) for row in data], axis=0)
self.gateConfigurations = remove_duplicates(self.gateConfigurations)
self.gateConfigurations.tolist()
# helper function for visualizing gate center points and waypoints offset from gate centers with certain distance from gate and same rotation, unused
def printGateWPs(self):
self.reset()
self.client.takeoffAsync().join()
self.loadGatePositions(self.config.gates['poses'])
self.captureAndSaveImages()
gates = [
[5.055624961853027, -0.7640624642372131, -0.75],
[10.555624961853027, 3.7359328269958496, -0.75],
[5.055624961853027, 8.235932350158691, -0.75],
[1.0556249618530273, 3.7359390258789062, -0.75]
]
wps = self.generateTrajectoryFromCurrentGatePositions(1, False)
plot = [self.toVector3r(wp) for wp in wps]
self.client.simPlotPoints(plot, color_rgba=[1.0, 0.0, 0.0, .2], size=10.0, duration=-1, is_persistent=True)
plot = [self.toVector3r(wp) for wp in gates]
self.client.simPlotPoints(plot, color_rgba=[0.0, 1.0, 0.0, .2], size=10.0, duration=-1, is_persistent=True)
print("wps")
for wp in wps:
print(wp)
time.sleep(10)
self.client.simFlushPersistentMarkers()
# test to check roration of aditional waypoints for each gate, unused
def rotationTest(self):
self.reset()
self.client.takeoffAsync().join()
self.loadGatePositions(self.config.gates['poses'])
yaw = radians(90)
gate = np.array([5.055624961853027, -0.7640624642372131, -0.75])
gate1 = deepcopy(gate)
y1 = np.array([0, 1, 0])
# rotation matrix z-axis
rot = np.array([[cos(yaw), sin(yaw), 0], [-1 * sin(yaw), cos(yaw), 0], [0, 0, 1]])
rot1 = rot.dot(np.transpose(y1))
gate1 += rot1
# test vector
tv = np.array([1, 0, 0])
tvout = rot.dot(np.transpose(tv))
print("tv", tv)
print("to", tvout)
gate2 = deepcopy(gate)
gate2 -= rot1
plot = [self.toVector3r(wp) for wp in [gate1, gate2]]
self.client.simPlotPoints(plot, color_rgba=[1.0, 0.0, 0.0, .2], size=10.0, duration=-1, is_persistent=True)
self.client.simPlotPoints([self.toVector3r(gate)], color_rgba=[0.0, 1.0, 0.0, .2], size=10.0, duration=-1,
is_persistent=True)
time.sleep(10)
self.client.simFlushPersistentMarkers()
# wp: [x, y, z]
# returns airsim.Vector3r()
def toVector3r(self, wp):
return airsim.Vector3r(wp[0], wp[1], wp[2])
def printGatePositions(self, count):
for i in range(count):
pos = self.getPositionGate(i + 1)
print(f"gate {i}: ")
self.printPose(pos)
if __name__ == "__main__":
import contextlib
with contextlib.closing(SimClient(configFilePath='config/config_dr_test.json')) as sc:
# generate random gate configurations within bounds set in config.json
sc.generateGateConfigurations()
configurations = deepcopy(sc.gateConfigurations)
for i, gateConfig in enumerate(configurations):
with contextlib.closing(SimClient(raceTrackName=f"track{i}", configFilePath='config/config_dr_test.json')) as sc:
sc.gateConfigurations = [gateConfig]
sc.loadNextGatePosition()
# fly mission
sc.gateMission(False, True, uav_position=sc.config.uav_position)
sc.loadGatePositions(sc.config.gates['poses'])
sc.reset()