From de72a6e2ebecf4db06f59c0cf74fb2c5390ad890 Mon Sep 17 00:00:00 2001 From: "LI, Jinjie" <45286479+Li-Jinjie@users.noreply.github.com> Date: Tue, 21 Jan 2025 21:26:00 +0900 Subject: [PATCH] [planning][hand] format code using black --- .../scripts/mapping_file/instance_objects.py | 48 +++++-- .../scripts/mpc_smach_node.py | 122 ++++++++++-------- 2 files changed, 108 insertions(+), 62 deletions(-) diff --git a/aerial_robot_planning/scripts/mapping_file/instance_objects.py b/aerial_robot_planning/scripts/mapping_file/instance_objects.py index 2ed6649e1..3b728f40f 100644 --- a/aerial_robot_planning/scripts/mapping_file/instance_objects.py +++ b/aerial_robot_planning/scripts/mapping_file/instance_objects.py @@ -36,7 +36,9 @@ def decorator(cls): def wrapper(self, *args, **kwargs): available_topics = [topic for topic, _ in rospy.get_published_topics()] if topic_name not in available_topics: - rospy.logerr(f"Topic {topic_name} does not exist. Initialization failed.") + rospy.logerr( + f"Topic {topic_name} does not exist. Initialization failed." + ) raise RuntimeError(f"Topic {topic_name} does not exist.") original_init(self, *args, **kwargs) @@ -52,7 +54,9 @@ class HandPosition: def __init__(self, topic_name="/hand/mocap/pose"): self.hand_position = None self.topic_name = topic_name - self.subscriber = rospy.Subscriber(self.topic_name, PoseStamped, self.hand_position_callback) + self.subscriber = rospy.Subscriber( + self.topic_name, PoseStamped, self.hand_position_callback + ) rospy.loginfo(f"Subscribed to {self.topic_name}") while self.hand_position is None: @@ -71,7 +75,9 @@ def get_position(self): class ArmPosition: def __init__(self): self.arm_position = None - self.arm_pose_sub = rospy.Subscriber("/arm/mocap/pose", PoseStamped, self.arm_position_callback) + self.arm_pose_sub = rospy.Subscriber( + "/arm/mocap/pose", PoseStamped, self.arm_position_callback + ) rospy.loginfo("Subscribed to /arm/mocap/pose") while self.arm_position is None: @@ -123,7 +129,9 @@ class ControlMode: def __init__(self): self.control_mode = None - self.control_mode_sub = rospy.Subscriber("/hand/control_mode", UInt8, self.callback_control_mode) + self.control_mode_sub = rospy.Subscriber( + "/hand/control_mode", UInt8, self.callback_control_mode + ) while self.control_mode is None: rospy.loginfo("Waiting for control mode...") @@ -160,8 +168,12 @@ def __init__( r_acc, p_acc, y_acc = 0.0, 0.0, 0.0 vx, vy, vz = 0.0, 0.0, 0.0 ax, ay, az = 0.0, 0.0, 0.0 - self.vel_twist = Twist(linear=Vector3(vx, vy, vz), angular=Vector3(r_rate, p_rate, y_rate)) - self.acc_twist = Twist(linear=Vector3(ax, ay, az), angular=Vector3(r_acc, p_acc, y_acc)) + self.vel_twist = Twist( + linear=Vector3(vx, vy, vz), angular=Vector3(r_rate, p_rate, y_rate) + ) + self.acc_twist = Twist( + linear=Vector3(ax, ay, az), angular=Vector3(r_acc, p_acc, y_acc) + ) self._check_last_time = None self._check_last_position = None @@ -201,20 +213,32 @@ def _check_finish_auto(self): self.hand.hand_position.pose.orientation.w, ] - position_change = [abs(current_check_position[i] - self._check_last_position[i]) for i in range(3)] + position_change = [ + abs(current_check_position[i] - self._check_last_position[i]) + for i in range(3) + ] - orientation_change = [abs(current_check_orientation[i] - self._check_check_orientation[i]) for i in range(4)] + orientation_change = [ + abs(current_check_orientation[i] - self._check_check_orientation[i]) + for i in range(4) + ] # self._check_last_position[:] = current_check_position # self._check_check_orientation[:] = current_check_orientation - goal_reached = all(change < self._check_position_tolerance for change in position_change) and all( + goal_reached = all( + change < self._check_position_tolerance for change in position_change + ) and all( change < self._check_orientation_tolerance for change in orientation_change ) new_state = "goal_reached" if goal_reached else "goal_not_reached" if new_state != self.last_state: - rospy.loginfo("Now state: reach the goal" if goal_reached else "Now state:not reach the goal") + rospy.loginfo( + "Now state: reach the goal" + if goal_reached + else "Now state:not reach the goal" + ) self.last_state = new_state if goal_reached: @@ -249,7 +273,9 @@ def fill_trajectory_points(self, t_elapsed: float) -> MultiDOFJointTrajectory: self.hand.hand_position.pose.position.z, ] - self.position_change = [current_position[i] - self.initial_hand_position[i] for i in range(3)] + self.position_change = [ + current_position[i] - self.initial_hand_position[i] for i in range(3) + ] direction = [ self.initial_drone_position[0] + self.position_change[0], diff --git a/aerial_robot_planning/scripts/mpc_smach_node.py b/aerial_robot_planning/scripts/mpc_smach_node.py index 240735a40..d17610015 100644 --- a/aerial_robot_planning/scripts/mpc_smach_node.py +++ b/aerial_robot_planning/scripts/mpc_smach_node.py @@ -95,19 +95,30 @@ def execute(self, userdata): max_traj_idx = len(traj_cls_list) + len(csv_files) - 1 - traj_type_str = input(f"Enter trajectory type (0..{max_traj_idx}) or 'q' to quit or 'h' to hand control: ") + traj_type_str = input( + f"Enter trajectory type (0..{max_traj_idx}) or 'q' to quit or 'h' to hand control: " + ) if traj_type_str.lower() == "q": return "shutdown" if traj_type_str.lower() == "h": userdata.traj_type = "h" - userdata.mapping_config = {"is_arm_active": False, "is_glove_active": False} + userdata.mapping_config = { + "is_arm_active": False, + "is_glove_active": False, + } devices = {"is_arm_active": "Arm mocap", "is_glove_active": "Glove"} - print("Whether activate the following devices. ([Y]/Yes or [N]/No, case-insensitive)") + print( + "Whether activate the following devices. ([Y]/Yes or [N]/No, case-insensitive)" + ) for mapping_config_key, device in devices.items(): while True: - user_input = input(f"Whether activate {device} ([Y]/[N]): ").strip().lower() + user_input = ( + input(f"Whether activate {device} ([Y]/[N]): ") + .strip() + .lower() + ) if user_input == "y": userdata.mapping_config[mapping_config_key] = True rospy.loginfo(f"Decide to activate {device}.") @@ -117,7 +128,9 @@ def execute(self, userdata): rospy.loginfo(f"Decided not to activate {device}.") break else: - rospy.logwarn("Invalid input. Please enter Y or N (case-insensitive).") + rospy.logwarn( + "Invalid input. Please enter Y or N (case-insensitive)." + ) return "go_mapping_init" @@ -166,16 +179,22 @@ def __init__(self): self.rate = rospy.Rate(20) # 20 Hz loop checking if finished def execute(self, userdata): - rospy.loginfo("State: INIT -- Start to reach the first point of the trajectory.") + rospy.loginfo( + "State: INIT -- Start to reach the first point of the trajectory." + ) if userdata.traj_type < len(traj_cls_list): - rospy.loginfo(f"Using trajs.{traj_cls_list[userdata.traj_type].__name__} trajectory.") + rospy.loginfo( + f"Using trajs.{traj_cls_list[userdata.traj_type].__name__} trajectory." + ) traj = traj_factory(userdata.traj_type, userdata.loop_num) x, y, z, vx, vy, vz, ax, ay, az = traj.get_3d_pt(0.0) try: - (qw, qx, qy, qz, r_rate, p_rate, y_rate, r_acc, p_acc, y_acc) = traj.get_3d_orientation(0.0) + (qw, qx, qy, qz, r_rate, p_rate, y_rate, r_acc, p_acc, y_acc) = ( + traj.get_3d_orientation(0.0) + ) except AttributeError: qw, qx, qy, qz = 1.0, 0.0, 0.0, 0.0 @@ -184,7 +203,9 @@ def execute(self, userdata): rospy.loginfo(f"Using CSV file: {csv_file}") # csv_traj = np.loadtxt(os.path.join(csv_folder_path, csv_file), delimiter=',', max_rows=1) # TODO: change the order of csv file. one row for one point is better. - csv_traj = np.loadtxt(os.path.join(csv_folder_path, csv_file), delimiter=",") + csv_traj = np.loadtxt( + os.path.join(csv_folder_path, csv_file), delimiter="," + ) x, y, z = csv_traj[0:3, 0] qw, qx, qy, qz = csv_traj[6:10, 0] @@ -237,7 +258,9 @@ def execute(self, userdata): else: csv_file = csv_files[userdata.traj_type - len(traj_cls_list)] rospy.loginfo(f"Using CSV file: {csv_file}") - mpc_node = MPCPubCSVPredXU(userdata.robot_name, os.path.join(csv_folder_path, csv_file)) + mpc_node = MPCPubCSVPredXU( + userdata.robot_name, os.path.join(csv_folder_path, csv_file) + ) # Wait here until the node signals it is finished or ROS shuts down while not rospy.is_shutdown(): @@ -252,7 +275,12 @@ def execute(self, userdata): class InitObjectState(smach.State): def __init__(self): - smach.State.__init__(self, outcomes=["go_lock"], input_keys=["robot_name", "mapping_config"], output_keys=[]) + smach.State.__init__( + self, + outcomes=["go_lock"], + input_keys=["robot_name", "mapping_config"], + output_keys=[], + ) def execute(self, userdata): @@ -277,7 +305,9 @@ def execute(self, userdata): class LockState(smach.State): def __init__(self): - smach.State.__init__(self, outcomes=["go_unlock"], input_keys=[], output_keys=[]) + smach.State.__init__( + self, outcomes=["go_unlock"], input_keys=[], output_keys=[] + ) self.last_threshold_time = None self.xy_angle_threshold = 8 @@ -291,7 +321,9 @@ def execute(self, userdata): global shared_data rospy.loginfo("Current state: Lock") - rospy.loginfo("Please align the direction of your hand with the drone's hand direction.") + rospy.loginfo( + "Please align the direction of your hand with the drone's hand direction." + ) while not rospy.is_shutdown(): drone_orientation = [ @@ -325,7 +357,12 @@ def execute(self, userdata): is_z_angular_alignment = abs(euler_angles[2]) < self.z_angle_threshold is_z_height_alignment = abs(z_difference) < self.height_threshold - is_in_thresh = (is_x_angular_alignment and is_y_angular_alignment and is_z_angular_alignment and is_z_height_alignment) + is_in_thresh = ( + is_x_angular_alignment + and is_y_angular_alignment + and is_z_angular_alignment + and is_z_height_alignment + ) if not is_in_thresh: self.last_threshold_time = None @@ -334,7 +371,10 @@ def execute(self, userdata): if self.last_threshold_time is None: self.last_threshold_time = rospy.get_time() - if rospy.get_time() - self.last_threshold_time > self.direction_hold_time: + if ( + rospy.get_time() - self.last_threshold_time + > self.direction_hold_time + ): break self.rate.sleep() @@ -358,10 +398,7 @@ class OneToOneMapState(smach.State): def __init__(self) -> None: smach.State.__init__( - self, - outcomes=["done_track"], - input_keys=["robot_name"], - output_keys=[], + self, outcomes=["done_track"], input_keys=["robot_name"], output_keys=[] ) self.pub_object = None @@ -391,25 +428,23 @@ def execute(self, userdata): def create_hand_control_state_machine(): """HandControlStateMachine""" - sm_sub = smach.StateMachine(outcomes=["DONE"], input_keys=["robot_name", "mapping_config"]) + sm_sub = smach.StateMachine( + outcomes=["DONE"], input_keys=["robot_name", "mapping_config"] + ) with sm_sub: # InitObjectState smach.StateMachine.add( - "HAND_CONTROL_INIT", - InitObjectState(), - transitions={"go_lock": "LOCK"}, + "HAND_CONTROL_INIT", InitObjectState(), transitions={"go_lock": "LOCK"} ) # LockState - smach.StateMachine.add( - "LOCK", - LockState(), - transitions={"go_unlock": "UNLOCK"}, - ) + smach.StateMachine.add("LOCK", LockState(), transitions={"go_unlock": "UNLOCK"}) # UnlockState - smach.StateMachine.add("UNLOCK", UnlockState(), transitions={"go_one_to_one_map": "ONE_TO_ONE_MAP"}) + smach.StateMachine.add( + "UNLOCK", UnlockState(), transitions={"go_one_to_one_map": "ONE_TO_ONE_MAP"} + ) # One_To_One_MapState smach.StateMachine.add( @@ -426,7 +461,9 @@ def create_hand_control_state_machine(): ############################################### def main(): parser = argparse.ArgumentParser(description="SMACH-based MPC Trajectory Publisher") - parser.add_argument("robot_name", type=str, help="Robot name, e.g., beetle1, gimbalrotors") + parser.add_argument( + "robot_name", type=str, help="Robot name, e.g., beetle1, gimbalrotors" + ) args = parser.parse_args() # Initialize a single ROS node for the entire SMACH-based system @@ -441,12 +478,7 @@ def main(): sm.userdata.loop_num = np.inf global shared_data - shared_data = { - "hand": None, - "arm": None, - "drone": None, - "control_mode": None, - } + shared_data = {"hand": None, "arm": None, "drone": None, "control_mode": None} # Open the container with sm: @@ -463,29 +495,17 @@ def main(): ) # INIT - smach.StateMachine.add( - "INIT", - InitState(), - transitions={ - "go_track": "TRACK", - }, - ) + smach.StateMachine.add("INIT", InitState(), transitions={"go_track": "TRACK"}) # TRACK smach.StateMachine.add( - "TRACK", - TrackState(), - transitions={ - "done_track": "IDLE", - }, + "TRACK", TrackState(), transitions={"done_track": "IDLE"} ) smach.StateMachine.add( "HAND_CONTROL", create_hand_control_state_machine(), - transitions={ - "DONE": "IDLE", - }, + transitions={"DONE": "IDLE"}, remapping={"robot_name": "robot_name", "mapping_config": "mapping_config"}, )