-
Notifications
You must be signed in to change notification settings - Fork 5
Spot GraphNav multi session map recording command line interface
Kaiyu Zheng edited this page Jun 20, 2022
·
1 revision
The following code is based on recording_command_line.py
and graph_nav_command_line.py
, provided by Spot SDK as examples.
This modified code allows uploading an existing map to continue mapping on top of it, enabling multi-session mapping. One
use case is if you forgot to do loop closure for a previously recorded map, you can upload it and then do loop closure using this
interface.
# Copyright (c) 2022 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Command line interface integrating options to record maps with WASD controls. """
import argparse
import grpc
import logging
import os
import sys
import time
from bosdyn.api.graph_nav import map_pb2, map_processing_pb2, recording_pb2
from bosdyn.client import create_standard_sdk, ResponseError, RpcError
import bosdyn.client.channel
from bosdyn.client.graph_nav import GraphNavClient
from bosdyn.client.math_helpers import SE3Pose, Quat
from bosdyn.client.recording import GraphNavRecordingServiceClient
from bosdyn.client.map_processing import MapProcessingServiceClient
import bosdyn.client.util
import google.protobuf.timestamp_pb2
from google.protobuf import wrappers_pb2 as wrappers
import graph_nav_util
class RecordingMultiSessionInterface(object):
"""Recording service command line interface."""
def __init__(self, robot, download_filepath, upload_path=None):
# Keep the robot instance and it's ID.
self._robot = robot
# Force trigger timesync.
self._robot.time_sync.wait_for_sync()
# Filepath for uploading a saved graph's and snapshots too.
if upload_path is not None:
if upload_path[-1] == "/":
self._upload_filepath = upload_path[:-1]
else:
self._upload_filepath = upload_path
else:
self._upload_filepath = None
# Filepath for the location to put the downloaded graph and snapshots.
if download_filepath[-1] == "/":
self._download_filepath = download_filepath + "downloaded_graph"
else:
self._download_filepath = download_filepath + "/downloaded_graph"
# Setup the recording service client.
self._recording_client = self._robot.ensure_client(
GraphNavRecordingServiceClient.default_service_name)
# Setup the graph nav service client.
self._graph_nav_client = robot.ensure_client(GraphNavClient.default_service_name)
self._map_processing_client = robot.ensure_client(
MapProcessingServiceClient.default_service_name)
# Store the most recent knowledge of the state of the robot based on rpc calls.
self._current_graph = None
self._current_edges = dict() #maps to_waypoint to list(from_waypoint)
self._current_waypoint_snapshots = dict() # maps id to waypoint snapshot
self._current_edge_snapshots = dict() # maps id to edge snapshot
self._current_annotation_name_to_wp_id = dict()
# Add recording service properties to the command line dictionary.
self._command_dictionary = {
'0': self._clear_map,
'1': self._start_recording,
'2': self._stop_recording,
'3': self._get_recording_status,
'4': self._create_default_waypoint,
'5': self._download_full_graph,
'6': self._list_graph_waypoint_and_edge_ids,
'7': self._create_new_edge,
'8': self._create_loop,
'9': self._auto_close_loops_prompt,
'a': self._optimize_anchoring,
'b': self._upload_graph_and_snapshots,
}
def should_we_start_recording(self):
# Before starting to record, check the state of the GraphNav system.
graph = self._graph_nav_client.download_graph()
if graph is not None:
# Check that the graph has waypoints. If it does, then we need to be localized to the graph
# before starting to record
if len(graph.waypoints) > 0:
localization_state = self._graph_nav_client.get_localization_state()
if not localization_state.localization.waypoint_id:
# Not localized to anything in the map. The best option is to clear the graph or
# attempt to localize to the current map.
# Returning false since the GraphNav system is not in the state it should be to
# begin recording.
return False
# If there is no graph or there exists a graph that we are localized to, then it is fine to
# start recording, so we return True.
return True
def _clear_map(self, *args):
"""Clear the state of the map on the robot, removing all waypoints and edges."""
return self._graph_nav_client.clear_graph()
def _start_recording(self, *args):
"""Start recording a map."""
should_start_recording = self.should_we_start_recording()
if not should_start_recording:
print("The system is not in the proper state to start recording.", \
"Try using the graph_nav_command_line to either clear the map or", \
"attempt to localize to the map.")
return
try:
status = self._recording_client.start_recording()
print("Successfully started recording a map.")
except Exception as err:
print("Start recording failed: " + str(err))
def _stop_recording(self, *args):
"""Stop or pause recording a map."""
first_iter = True
while True:
try:
status = self._recording_client.stop_recording()
print("Successfully stopped recording a map.")
break
except bosdyn.client.recording.NotReadyYetError as err:
# It is possible that we are not finished recording yet due to
# background processing. Try again every 1 second.
if first_iter:
print("Cleaning up recording...")
first_iter = False
time.sleep(1.0)
continue
except Exception as err:
print("Stop recording failed: " + str(err))
break
def _get_recording_status(self, *args):
"""Get the recording service's status."""
status = self._recording_client.get_record_status()
if status.is_recording:
print("The recording service is on.")
else:
print("The recording service is off.")
def _create_default_waypoint(self, *args):
"""Create a default waypoint at the robot's current location."""
resp = self._recording_client.create_waypoint(waypoint_name="default")
if resp.status == recording_pb2.CreateWaypointResponse.STATUS_OK:
print("Successfully created a waypoint.")
else:
print("Could not create a waypoint.")
def _download_full_graph(self, *args):
"""Download the graph and snapshots from the robot."""
graph = self._graph_nav_client.download_graph()
if graph is None:
print("Failed to download the graph.")
return
self._write_full_graph(graph)
print("Graph downloaded with {} waypoints and {} edges".format(
len(graph.waypoints), len(graph.edges)))
# Download the waypoint and edge snapshots.
self._download_and_write_waypoint_snapshots(graph.waypoints)
self._download_and_write_edge_snapshots(graph.edges)
def _write_full_graph(self, graph):
"""Download the graph from robot to the specified, local filepath location."""
graph_bytes = graph.SerializeToString()
self._write_bytes(self._download_filepath, '/graph', graph_bytes)
def _download_and_write_waypoint_snapshots(self, waypoints):
"""Download the waypoint snapshots from robot to the specified, local filepath location."""
num_waypoint_snapshots_downloaded = 0
for waypoint in waypoints:
if len(waypoint.snapshot_id) == 0:
continue
try:
waypoint_snapshot = self._graph_nav_client.download_waypoint_snapshot(
waypoint.snapshot_id)
except Exception:
# Failure in downloading waypoint snapshot. Continue to next snapshot.
print("Failed to download waypoint snapshot: " + waypoint.snapshot_id)
continue
self._write_bytes(self._download_filepath + '/waypoint_snapshots',
'/' + waypoint.snapshot_id, waypoint_snapshot.SerializeToString())
num_waypoint_snapshots_downloaded += 1
print("Downloaded {} of the total {} waypoint snapshots.".format(
num_waypoint_snapshots_downloaded, len(waypoints)))
def _download_and_write_edge_snapshots(self, edges):
"""Download the edge snapshots from robot to the specified, local filepath location."""
num_edge_snapshots_downloaded = 0
num_to_download = 0
for edge in edges:
if len(edge.snapshot_id) == 0:
continue
num_to_download += 1
try:
edge_snapshot = self._graph_nav_client.download_edge_snapshot(edge.snapshot_id)
except Exception:
# Failure in downloading edge snapshot. Continue to next snapshot.
print("Failed to download edge snapshot: " + edge.snapshot_id)
continue
self._write_bytes(self._download_filepath + '/edge_snapshots', '/' + edge.snapshot_id,
edge_snapshot.SerializeToString())
num_edge_snapshots_downloaded += 1
print("Downloaded {} of the total {} edge snapshots.".format(
num_edge_snapshots_downloaded, num_to_download))
def _write_bytes(self, filepath, filename, data):
"""Write data to a file."""
os.makedirs(filepath, exist_ok=True)
with open(filepath + filename, 'wb+') as f:
f.write(data)
f.close()
def _update_graph_waypoint_and_edge_ids(self, do_print=False):
# Download current graph
graph = self._graph_nav_client.download_graph()
if graph is None:
print("Empty graph.")
return
self._current_graph = graph
localization_id = self._graph_nav_client.get_localization_state().localization.waypoint_id
# Update and print waypoints and edges
self._current_annotation_name_to_wp_id, self._current_edges = graph_nav_util.update_waypoints_and_edges(
graph, localization_id, do_print)
def _list_graph_waypoint_and_edge_ids(self, *args):
"""List the waypoint ids and edge ids of the graph currently on the robot."""
self._update_graph_waypoint_and_edge_ids(do_print=True)
def _create_new_edge(self, *args):
"""Create new edge between existing waypoints in map."""
if len(args[0]) != 2:
print("ERROR: Specify the two waypoints to connect (short code or annotation).")
return
self._update_graph_waypoint_and_edge_ids(do_print=False)
from_id = graph_nav_util.find_unique_waypoint_id(args[0][0], self._current_graph,
self._current_annotation_name_to_wp_id)
to_id = graph_nav_util.find_unique_waypoint_id(args[0][1], self._current_graph,
self._current_annotation_name_to_wp_id)
print("Creating edge from {} to {}.".format(from_id, to_id))
from_wp = self._get_waypoint(from_id)
if from_wp is None:
return
to_wp = self._get_waypoint(to_id)
if to_wp is None:
return
# Get edge transform based on kinematic odometry
edge_transform = self._get_transform(from_wp, to_wp)
# Define new edge
new_edge = map_pb2.Edge()
new_edge.id.from_waypoint = from_id
new_edge.id.to_waypoint = to_id
new_edge.from_tform_to.CopyFrom(edge_transform)
print("edge transform =", new_edge.from_tform_to)
# Send request to add edge to map
self._recording_client.create_edge(edge=new_edge)
def _create_loop(self, *args):
"""Create edge from last waypoint to first waypoint."""
self._update_graph_waypoint_and_edge_ids(do_print=False)
if len(self._current_graph.waypoints) < 2:
self._add_message(
"Graph contains {} waypoints -- at least two are needed to create loop.".format(
len(self._current_graph.waypoints)))
return False
sorted_waypoints = graph_nav_util.sort_waypoints_chrono(self._current_graph)
edge_waypoints = [sorted_waypoints[-1][0], sorted_waypoints[0][0]]
self._create_new_edge(edge_waypoints)
def _auto_close_loops_prompt(self, *args):
print("""
Options:
(0) Close all loops.
(1) Close only fiducial-based loops.
(2) Close only odometry-based loops.
(q) Back.
""")
try:
inputs = input('>')
except NameError:
return
req_type = str.split(inputs)[0]
close_fiducial_loops = False
close_odometry_loops = False
if req_type == '0':
close_fiducial_loops = True
close_odometry_loops = True
elif req_type == '1':
close_fiducial_loops = True
elif req_type == '2':
close_odometry_loops = True
elif req_type == 'q':
return
else:
print("Unrecognized command. Going back.")
return
self._auto_close_loops(close_fiducial_loops, close_odometry_loops)
def _auto_close_loops(self, close_fiducial_loops, close_odometry_loops, *args):
"""Automatically find and close all loops in the graph."""
response = self._map_processing_client.process_topology(
params=map_processing_pb2.ProcessTopologyRequest.Params(
do_fiducial_loop_closure=wrappers.BoolValue(value=close_fiducial_loops),
do_odometry_loop_closure=wrappers.BoolValue(value=close_odometry_loops)),
modify_map_on_server=True)
print("Created {} new edge(s).".format(len(response.new_subgraph.edges)))
def _optimize_anchoring(self, *args):
"""Call anchoring optimization on the server, producing a globally optimal reference frame for waypoints to be expressed in."""
response = self._map_processing_client.process_anchoring(
params=map_processing_pb2.ProcessAnchoringRequest.Params(),
modify_anchoring_on_server=True, stream_intermediate_results=False)
if response.status == map_processing_pb2.ProcessAnchoringResponse.STATUS_OK:
print("Optimized anchoring after {} iteration(s).".format(response.iteration))
else:
print("Error optimizing {}".format(response))
def _get_waypoint(self, id):
"""Get waypoint from graph (return None if waypoint not found)"""
if self._current_graph is None:
self._current_graph = self._graph_nav_client.download_graph()
for waypoint in self._current_graph.waypoints:
if waypoint.id == id:
return waypoint
print('ERROR: Waypoint {} not found in graph.'.format(id))
return None
def _get_transform(self, from_wp, to_wp):
"""Get transform from from-waypoint to to-waypoint."""
from_se3 = from_wp.waypoint_tform_ko
from_tf = SE3Pose(
from_se3.position.x, from_se3.position.y, from_se3.position.z,
Quat(w=from_se3.rotation.w, x=from_se3.rotation.x, y=from_se3.rotation.y,
z=from_se3.rotation.z))
to_se3 = to_wp.waypoint_tform_ko
to_tf = SE3Pose(
to_se3.position.x, to_se3.position.y, to_se3.position.z,
Quat(w=to_se3.rotation.w, x=to_se3.rotation.x, y=to_se3.rotation.y,
z=to_se3.rotation.z))
from_T_to = from_tf.mult(to_tf.inverse())
return from_T_to.to_proto()
def _upload_graph_and_snapshots(self, *args):
"""Upload the graph and snapshots to the robot."""
if self._upload_filepath is None:
print("No upload path specified.")
return
print("Loading the graph from disk into local storage...")
with open(self._upload_filepath + "/graph", "rb") as graph_file:
# Load the graph from disk.
data = graph_file.read()
self._current_graph = map_pb2.Graph()
self._current_graph.ParseFromString(data)
print("Loaded graph has {} waypoints and {} edges".format(
len(self._current_graph.waypoints), len(self._current_graph.edges)))
for waypoint in self._current_graph.waypoints:
# Load the waypoint snapshots from disk.
with open(self._upload_filepath + "/waypoint_snapshots/{}".format(waypoint.snapshot_id),
"rb") as snapshot_file:
waypoint_snapshot = map_pb2.WaypointSnapshot()
waypoint_snapshot.ParseFromString(snapshot_file.read())
self._current_waypoint_snapshots[waypoint_snapshot.id] = waypoint_snapshot
for edge in self._current_graph.edges:
if len(edge.snapshot_id) == 0:
continue
# Load the edge snapshots from disk.
with open(self._upload_filepath + "/edge_snapshots/{}".format(edge.snapshot_id),
"rb") as snapshot_file:
edge_snapshot = map_pb2.EdgeSnapshot()
edge_snapshot.ParseFromString(snapshot_file.read())
self._current_edge_snapshots[edge_snapshot.id] = edge_snapshot
# Upload the graph to the robot.
print("Uploading the graph and snapshots to the robot...")
true_if_empty = not len(self._current_graph.anchoring.anchors)
response = self._graph_nav_client.upload_graph(lease=None,
graph=self._current_graph,
generate_new_anchoring=true_if_empty)
# Upload the snapshots to the robot.
for snapshot_id in response.unknown_waypoint_snapshot_ids:
waypoint_snapshot = self._current_waypoint_snapshots[snapshot_id]
self._graph_nav_client.upload_waypoint_snapshot(waypoint_snapshot)
print("Uploaded {}".format(waypoint_snapshot.id))
for snapshot_id in response.unknown_edge_snapshot_ids:
edge_snapshot = self._current_edge_snapshots[snapshot_id]
self._graph_nav_client.upload_edge_snapshot(edge_snapshot)
print("Uploaded {}".format(edge_snapshot.id))
# The upload is complete! Check that the robot is localized to the graph,
# and if it is not, prompt the user to localize the robot before attempting
# any navigation commands.
localization_state = self._graph_nav_client.get_localization_state()
if not localization_state.localization.waypoint_id:
# The robot is not localized to the newly uploaded graph.
print("\n")
print("Upload complete! The robot is currently not localized to the map; please localize", \
"the robot using commands (2) or (3) before attempting a navigation command.")
def run(self):
"""Main loop for the command line interface."""
while True:
print("""
Options:
(0) Clear map.
(1) Start recording a map.
(2) Stop recording a map.
(3) Get the recording service's status.
(4) Create a default waypoint in the current robot's location.
(5) Download the map after recording.
(6) List the waypoint ids and edge ids of the map on the robot.
(7) Create new edge between existing waypoints using odometry.
(8) Create new edge from last waypoint to first waypoint using odometry.
(9) Automatically find and close loops.
(a) Optimize the map's anchoring.
(b) Upload the graph and its snapshots.
(q) Exit.
""")
try:
inputs = input('>')
except NameError:
pass
req_type = str.split(inputs)[0]
if req_type == 'q':
break
if req_type not in self._command_dictionary:
print("Request not in the known command dictionary.")
continue
try:
cmd_func = self._command_dictionary[req_type]
cmd_func(str.split(inputs)[1:])
except Exception as e:
print(e)
def main(argv):
"""Run the command-line interface."""
parser = argparse.ArgumentParser(description=__doc__)
bosdyn.client.util.add_base_arguments(parser)
parser.add_argument('-d', '--download-filepath',
help='Full filepath for where to download graph and snapshots.',
default=os.getcwd())
parser.add_argument('-u', '--upload-filepath',
help='Full filepath to graph and snapshots to be uploaded.')
options = parser.parse_args(argv)
# Create robot object.
sdk = bosdyn.client.create_standard_sdk('RecordingClient')
robot = sdk.create_robot(options.hostname)
bosdyn.client.util.authenticate(robot)
recording_command_line = RecordingMultiSessionInterface(robot, options.download_filepath,
upload_path=options.upload_filepath)
try:
recording_command_line.run()
return True
except Exception as exc: # pylint: disable=broad-except
print(exc)
print("Recording command line client threw an error.")
return False
if __name__ == '__main__':
if not main(sys.argv[1:]):
sys.exit(1)