forked from Daffan/the-barn-challenge
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgazebo_simulation.py
94 lines (78 loc) · 3.3 KB
/
gazebo_simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import rospy
import numpy as np
from std_srvs.srv import Empty
from gazebo_msgs.msg import ModelState
from gazebo_msgs.srv import SetModelState, GetModelState
from geometry_msgs.msg import Quaternion
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Bool
def create_model_state(x, y, z, angle):
# the rotation of the angle is in (0, 0, 1) direction
model_state = ModelState()
model_state.model_name = 'jackal'
model_state.pose.position.x = x
model_state.pose.position.y = y
model_state.pose.position.z = z
model_state.pose.orientation = Quaternion(0, 0, np.sin(angle/2.), np.cos(angle/2.))
model_state.reference_frame = "world"
return model_state
class GazeboSimulation():
def __init__(self, init_position = [0, 0, 0]):
self._pause = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
self._unpause = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
self._reset = rospy.ServiceProxy('/gazebo/set_model_state', SetModelState)
self._model_state = rospy.ServiceProxy('/gazebo/get_model_state', GetModelState)
self._init_model_state = create_model_state(init_position[0],init_position[1],0,init_position[2])
self.collision_count = 0
self._collision_sub = rospy.Subscriber("/collision", Bool, self.collision_monitor)
def collision_monitor(self, msg):
if msg.data:
self.collision_count += 1
def get_hard_collision(self):
# hard collision count since last call
collided = self.collision_count > 0
self.collision_count = 0
return collided
def pause(self):
rospy.wait_for_service('/gazebo/pause_physics')
try:
self._pause()
except rospy.ServiceException:
print ("/gazebo/pause_physics service call failed")
def unpause(self):
rospy.wait_for_service('/gazebo/unpause_physics')
try:
self._unpause()
except rospy.ServiceException:
print ("/gazebo/unpause_physics service call failed")
def reset(self):
"""
/gazebo/reset_world or /gazebo/reset_simulation will
destroy the world setting, here we used set model state
to put the model back to the origin
"""
rospy.wait_for_service("/gazebo/set_model_state")
try:
self._reset(self._init_model_state)
except (rospy.ServiceException):
rospy.logwarn("/gazebo/set_model_state service call failed")
def get_laser_scan(self):
data = None
while data is None:
try:
data = rospy.wait_for_message('front/scan', LaserScan, timeout=5)
except:
pass
return data
def get_model_state(self):
rospy.wait_for_service("/gazebo/get_model_state")
try:
return self._model_state('jackal', 'world')
except (rospy.ServiceException):
rospy.logwarn("/gazebo/get_model_state service call failed")
def reset_init_model_state(self, init_position = [0, 0, 0]):
"""Overwrite the initial model state
Args:
init_position (list, optional): initial model state in x, y, z. Defaults to [0, 0, 0].
"""
self._init_model_state = create_model_state(init_position[0],init_position[1],0,init_position[2])