Skip to content

Commit

Permalink
[planning][hand] format code using black
Browse files Browse the repository at this point in the history
  • Loading branch information
Li-Jinjie committed Jan 21, 2025
1 parent e5f9c51 commit de72a6e
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 62 deletions.
48 changes: 37 additions & 11 deletions aerial_robot_planning/scripts/mapping_file/instance_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def decorator(cls):
def wrapper(self, *args, **kwargs):
available_topics = [topic for topic, _ in rospy.get_published_topics()]
if topic_name not in available_topics:
rospy.logerr(f"Topic {topic_name} does not exist. Initialization failed.")
rospy.logerr(
f"Topic {topic_name} does not exist. Initialization failed."
)
raise RuntimeError(f"Topic {topic_name} does not exist.")
original_init(self, *args, **kwargs)

Expand All @@ -52,7 +54,9 @@ class HandPosition:
def __init__(self, topic_name="/hand/mocap/pose"):
self.hand_position = None
self.topic_name = topic_name
self.subscriber = rospy.Subscriber(self.topic_name, PoseStamped, self.hand_position_callback)
self.subscriber = rospy.Subscriber(
self.topic_name, PoseStamped, self.hand_position_callback
)
rospy.loginfo(f"Subscribed to {self.topic_name}")

while self.hand_position is None:
Expand All @@ -71,7 +75,9 @@ def get_position(self):
class ArmPosition:
def __init__(self):
self.arm_position = None
self.arm_pose_sub = rospy.Subscriber("/arm/mocap/pose", PoseStamped, self.arm_position_callback)
self.arm_pose_sub = rospy.Subscriber(
"/arm/mocap/pose", PoseStamped, self.arm_position_callback
)
rospy.loginfo("Subscribed to /arm/mocap/pose")

while self.arm_position is None:
Expand Down Expand Up @@ -123,7 +129,9 @@ class ControlMode:
def __init__(self):
self.control_mode = None

self.control_mode_sub = rospy.Subscriber("/hand/control_mode", UInt8, self.callback_control_mode)
self.control_mode_sub = rospy.Subscriber(
"/hand/control_mode", UInt8, self.callback_control_mode
)

while self.control_mode is None:
rospy.loginfo("Waiting for control mode...")
Expand Down Expand Up @@ -160,8 +168,12 @@ def __init__(
r_acc, p_acc, y_acc = 0.0, 0.0, 0.0
vx, vy, vz = 0.0, 0.0, 0.0
ax, ay, az = 0.0, 0.0, 0.0
self.vel_twist = Twist(linear=Vector3(vx, vy, vz), angular=Vector3(r_rate, p_rate, y_rate))
self.acc_twist = Twist(linear=Vector3(ax, ay, az), angular=Vector3(r_acc, p_acc, y_acc))
self.vel_twist = Twist(
linear=Vector3(vx, vy, vz), angular=Vector3(r_rate, p_rate, y_rate)
)
self.acc_twist = Twist(
linear=Vector3(ax, ay, az), angular=Vector3(r_acc, p_acc, y_acc)
)

self._check_last_time = None
self._check_last_position = None
Expand Down Expand Up @@ -201,20 +213,32 @@ def _check_finish_auto(self):
self.hand.hand_position.pose.orientation.w,
]

position_change = [abs(current_check_position[i] - self._check_last_position[i]) for i in range(3)]
position_change = [
abs(current_check_position[i] - self._check_last_position[i])
for i in range(3)
]

orientation_change = [abs(current_check_orientation[i] - self._check_check_orientation[i]) for i in range(4)]
orientation_change = [
abs(current_check_orientation[i] - self._check_check_orientation[i])
for i in range(4)
]

# self._check_last_position[:] = current_check_position
# self._check_check_orientation[:] = current_check_orientation

goal_reached = all(change < self._check_position_tolerance for change in position_change) and all(
goal_reached = all(
change < self._check_position_tolerance for change in position_change
) and all(
change < self._check_orientation_tolerance for change in orientation_change
)

new_state = "goal_reached" if goal_reached else "goal_not_reached"
if new_state != self.last_state:
rospy.loginfo("Now state: reach the goal" if goal_reached else "Now state:not reach the goal")
rospy.loginfo(
"Now state: reach the goal"
if goal_reached
else "Now state:not reach the goal"
)
self.last_state = new_state

if goal_reached:
Expand Down Expand Up @@ -249,7 +273,9 @@ def fill_trajectory_points(self, t_elapsed: float) -> MultiDOFJointTrajectory:
self.hand.hand_position.pose.position.z,
]

self.position_change = [current_position[i] - self.initial_hand_position[i] for i in range(3)]
self.position_change = [
current_position[i] - self.initial_hand_position[i] for i in range(3)
]

direction = [
self.initial_drone_position[0] + self.position_change[0],
Expand Down
122 changes: 71 additions & 51 deletions aerial_robot_planning/scripts/mpc_smach_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,30 @@ def execute(self, userdata):

max_traj_idx = len(traj_cls_list) + len(csv_files) - 1

traj_type_str = input(f"Enter trajectory type (0..{max_traj_idx}) or 'q' to quit or 'h' to hand control: ")
traj_type_str = input(
f"Enter trajectory type (0..{max_traj_idx}) or 'q' to quit or 'h' to hand control: "
)
if traj_type_str.lower() == "q":
return "shutdown"

if traj_type_str.lower() == "h":
userdata.traj_type = "h"
userdata.mapping_config = {"is_arm_active": False, "is_glove_active": False}
userdata.mapping_config = {
"is_arm_active": False,
"is_glove_active": False,
}

devices = {"is_arm_active": "Arm mocap", "is_glove_active": "Glove"}
print("Whether activate the following devices. ([Y]/Yes or [N]/No, case-insensitive)")
print(
"Whether activate the following devices. ([Y]/Yes or [N]/No, case-insensitive)"
)
for mapping_config_key, device in devices.items():
while True:
user_input = input(f"Whether activate {device} ([Y]/[N]): ").strip().lower()
user_input = (
input(f"Whether activate {device} ([Y]/[N]): ")
.strip()
.lower()
)
if user_input == "y":
userdata.mapping_config[mapping_config_key] = True
rospy.loginfo(f"Decide to activate {device}.")
Expand All @@ -117,7 +128,9 @@ def execute(self, userdata):
rospy.loginfo(f"Decided not to activate {device}.")
break
else:
rospy.logwarn("Invalid input. Please enter Y or N (case-insensitive).")
rospy.logwarn(
"Invalid input. Please enter Y or N (case-insensitive)."
)

return "go_mapping_init"

Expand Down Expand Up @@ -166,16 +179,22 @@ def __init__(self):
self.rate = rospy.Rate(20) # 20 Hz loop checking if finished

def execute(self, userdata):
rospy.loginfo("State: INIT -- Start to reach the first point of the trajectory.")
rospy.loginfo(
"State: INIT -- Start to reach the first point of the trajectory."
)

if userdata.traj_type < len(traj_cls_list):
rospy.loginfo(f"Using trajs.{traj_cls_list[userdata.traj_type].__name__} trajectory.")
rospy.loginfo(
f"Using trajs.{traj_cls_list[userdata.traj_type].__name__} trajectory."
)
traj = traj_factory(userdata.traj_type, userdata.loop_num)

x, y, z, vx, vy, vz, ax, ay, az = traj.get_3d_pt(0.0)

try:
(qw, qx, qy, qz, r_rate, p_rate, y_rate, r_acc, p_acc, y_acc) = traj.get_3d_orientation(0.0)
(qw, qx, qy, qz, r_rate, p_rate, y_rate, r_acc, p_acc, y_acc) = (
traj.get_3d_orientation(0.0)
)
except AttributeError:
qw, qx, qy, qz = 1.0, 0.0, 0.0, 0.0

Expand All @@ -184,7 +203,9 @@ def execute(self, userdata):
rospy.loginfo(f"Using CSV file: {csv_file}")
# csv_traj = np.loadtxt(os.path.join(csv_folder_path, csv_file), delimiter=',', max_rows=1)
# TODO: change the order of csv file. one row for one point is better.
csv_traj = np.loadtxt(os.path.join(csv_folder_path, csv_file), delimiter=",")
csv_traj = np.loadtxt(
os.path.join(csv_folder_path, csv_file), delimiter=","
)
x, y, z = csv_traj[0:3, 0]
qw, qx, qy, qz = csv_traj[6:10, 0]

Expand Down Expand Up @@ -237,7 +258,9 @@ def execute(self, userdata):
else:
csv_file = csv_files[userdata.traj_type - len(traj_cls_list)]
rospy.loginfo(f"Using CSV file: {csv_file}")
mpc_node = MPCPubCSVPredXU(userdata.robot_name, os.path.join(csv_folder_path, csv_file))
mpc_node = MPCPubCSVPredXU(
userdata.robot_name, os.path.join(csv_folder_path, csv_file)
)

# Wait here until the node signals it is finished or ROS shuts down
while not rospy.is_shutdown():
Expand All @@ -252,7 +275,12 @@ def execute(self, userdata):

class InitObjectState(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=["go_lock"], input_keys=["robot_name", "mapping_config"], output_keys=[])
smach.State.__init__(
self,
outcomes=["go_lock"],
input_keys=["robot_name", "mapping_config"],
output_keys=[],
)

def execute(self, userdata):

Expand All @@ -277,7 +305,9 @@ def execute(self, userdata):

class LockState(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=["go_unlock"], input_keys=[], output_keys=[])
smach.State.__init__(
self, outcomes=["go_unlock"], input_keys=[], output_keys=[]
)

self.last_threshold_time = None
self.xy_angle_threshold = 8
Expand All @@ -291,7 +321,9 @@ def execute(self, userdata):
global shared_data

rospy.loginfo("Current state: Lock")
rospy.loginfo("Please align the direction of your hand with the drone's hand direction.")
rospy.loginfo(
"Please align the direction of your hand with the drone's hand direction."
)

while not rospy.is_shutdown():
drone_orientation = [
Expand Down Expand Up @@ -325,7 +357,12 @@ def execute(self, userdata):
is_z_angular_alignment = abs(euler_angles[2]) < self.z_angle_threshold
is_z_height_alignment = abs(z_difference) < self.height_threshold

is_in_thresh = (is_x_angular_alignment and is_y_angular_alignment and is_z_angular_alignment and is_z_height_alignment)
is_in_thresh = (
is_x_angular_alignment
and is_y_angular_alignment
and is_z_angular_alignment
and is_z_height_alignment
)

if not is_in_thresh:
self.last_threshold_time = None
Expand All @@ -334,7 +371,10 @@ def execute(self, userdata):
if self.last_threshold_time is None:
self.last_threshold_time = rospy.get_time()

if rospy.get_time() - self.last_threshold_time > self.direction_hold_time:
if (
rospy.get_time() - self.last_threshold_time
> self.direction_hold_time
):
break

self.rate.sleep()
Expand All @@ -358,10 +398,7 @@ class OneToOneMapState(smach.State):
def __init__(self) -> None:

smach.State.__init__(
self,
outcomes=["done_track"],
input_keys=["robot_name"],
output_keys=[],
self, outcomes=["done_track"], input_keys=["robot_name"], output_keys=[]
)

self.pub_object = None
Expand Down Expand Up @@ -391,25 +428,23 @@ def execute(self, userdata):

def create_hand_control_state_machine():
"""HandControlStateMachine"""
sm_sub = smach.StateMachine(outcomes=["DONE"], input_keys=["robot_name", "mapping_config"])
sm_sub = smach.StateMachine(
outcomes=["DONE"], input_keys=["robot_name", "mapping_config"]
)

with sm_sub:
# InitObjectState
smach.StateMachine.add(
"HAND_CONTROL_INIT",
InitObjectState(),
transitions={"go_lock": "LOCK"},
"HAND_CONTROL_INIT", InitObjectState(), transitions={"go_lock": "LOCK"}
)

# LockState
smach.StateMachine.add(
"LOCK",
LockState(),
transitions={"go_unlock": "UNLOCK"},
)
smach.StateMachine.add("LOCK", LockState(), transitions={"go_unlock": "UNLOCK"})

# UnlockState
smach.StateMachine.add("UNLOCK", UnlockState(), transitions={"go_one_to_one_map": "ONE_TO_ONE_MAP"})
smach.StateMachine.add(
"UNLOCK", UnlockState(), transitions={"go_one_to_one_map": "ONE_TO_ONE_MAP"}
)

# One_To_One_MapState
smach.StateMachine.add(
Expand All @@ -426,7 +461,9 @@ def create_hand_control_state_machine():
###############################################
def main():
parser = argparse.ArgumentParser(description="SMACH-based MPC Trajectory Publisher")
parser.add_argument("robot_name", type=str, help="Robot name, e.g., beetle1, gimbalrotors")
parser.add_argument(
"robot_name", type=str, help="Robot name, e.g., beetle1, gimbalrotors"
)
args = parser.parse_args()

# Initialize a single ROS node for the entire SMACH-based system
Expand All @@ -441,12 +478,7 @@ def main():
sm.userdata.loop_num = np.inf

global shared_data
shared_data = {
"hand": None,
"arm": None,
"drone": None,
"control_mode": None,
}
shared_data = {"hand": None, "arm": None, "drone": None, "control_mode": None}

# Open the container
with sm:
Expand All @@ -463,29 +495,17 @@ def main():
)

# INIT
smach.StateMachine.add(
"INIT",
InitState(),
transitions={
"go_track": "TRACK",
},
)
smach.StateMachine.add("INIT", InitState(), transitions={"go_track": "TRACK"})

# TRACK
smach.StateMachine.add(
"TRACK",
TrackState(),
transitions={
"done_track": "IDLE",
},
"TRACK", TrackState(), transitions={"done_track": "IDLE"}
)

smach.StateMachine.add(
"HAND_CONTROL",
create_hand_control_state_machine(),
transitions={
"DONE": "IDLE",
},
transitions={"DONE": "IDLE"},
remapping={"robot_name": "robot_name", "mapping_config": "mapping_config"},
)

Expand Down

0 comments on commit de72a6e

Please sign in to comment.