diff --git a/README.md b/README.md index 893e613..16c1b25 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ By default logs are saved and can be analyzed afterwards by running: ## How to reproduce results -The idea and basic principles of this algorithm are presented in Geisslinger et al. 20221. The following describes how the results from the paper can be reproduced. To evaluate the planning algorithm on multiple scenarios execute: +The idea and basic principles of this algorithm are presented in Geisslinger et al. 20231. The following describes how the results from the paper can be reproduced. To evaluate the planning algorithm on multiple scenarios execute: * `python planner/Frenet/plannertools/evaluatefrenet.py` @@ -81,16 +81,18 @@ To evaluate with the according config settings of [ethical](/planner/Frenet/conf To evaluate on all 2000 scenarios, make sure to have at least 200 GB space left on your device for saving the log files. For better runtime, we recommend using [multiprocessing](/planner/Frenet/plannertools/evaluatefrenet.py#L46) and a [GPU](planner/Frenet/configs/prediction.json#L4) for the prediction network. Evaluating all scenarios in 10 parallel threads with a GPU takes around 48 hours. Results and logfiles for each run are stored in `planner/Frenet/results`. -Standard evaluation metrics such as cummulated harm on all scenarios are provided within the results (e.g. `results/eval/harm.json`). `planner/Frenet/analyze_tools/analyze_risk_dist.py` helps to extract risk values out of multiple logfiles. Boxplots with risk distribtuions as in Geisslinger et al. 20221 can be generated using `planner/Frenet/plot_tools/boxplots_risks.py`. +Standard evaluation metrics such as cummulated harm on all scenarios are provided within the results (e.g. `results/eval/harm.json`). `planner/Frenet/analyze_tools/analyze_risk_dist.py` helps to extract risk values out of multiple logfiles. Boxplots with risk distribtuions as in Geisslinger et al. 20231 can be generated using `planner/Frenet/plot_tools/boxplots_risks.py`. ## References -1. Geisslinger, M., Poszler, F., Lienkamp, M. An Ethical Trajectory Planning Algorithm for Autonomous Vehicles *(under review)* +1. Geisslinger, M., Poszler, F., Lienkamp, M. An Ethical Trajectory Planning Algorithm for Autonomous Vehicles. 2023 ## Contributions * Maximilian Geisslinger (Main Contributor, [maximilian.geisslinger@tum.de](mailto:maximilian.geisslinger@tum.de?subject=[GitHub]%20Ethical%20Trajectory%20Planning)) +* Rainer Trauth (Computing Performance) * Florian Pfab (Master Thesis: *Motion Planning with Risk Assessment for Automated Vehicles*) * Simon Sagmeister (Master Thesis: *Neural Networks: Real-time Capable Trajectory Planning through Supervised Learning*) * Tobias Geissenberger (Bachelor Thesis: *Harm Prediction for Risk-Aware Motion Planning of Automated Vehicles*) * Clemens Krispler (Bachelor Thesis: *Motion Planning for Autonomous Vehicles: Developing a Principle of Responsibility for Ethical Decision-Making*) +* Zhi Zheng (Semester Thesis: *Parallelization of a Planning Algorithm in the Field of Autonomous Driving* supervised by Rainer Trauth) diff --git a/planner/Frenet/analyze_tools/analyze_correlations.py b/planner/Frenet/analyze_tools/analyze_correlations.py index 10487e6..23685d5 100644 --- a/planner/Frenet/analyze_tools/analyze_correlations.py +++ b/planner/Frenet/analyze_tools/analyze_correlations.py @@ -11,6 +11,8 @@ import multiprocessing import progressbar import json +import argparse +import traceback def all_equal(iterable): @@ -26,15 +28,18 @@ def all_equal(iterable): return next(g, True) and not next(g, False) -logdir = "./planner/Frenet/results/logs" - corr_mat_list = [] +long_mat_list = [] +lat_mat_list = [] +dist_mat_list = [] corr_dict = {} +long_dict = {} +lat_dict = {} +dist_dict = {} scenario_list = [] key_list = [] cpu_count = 60 # multiprocessing.cpu_count() -log_file_list = os.listdir(logdir) def eval_func(logfile): @@ -48,12 +53,14 @@ def eval_func(logfile): """ try: frenet_log = FrenetLogVisualizer( - os.path.join(logdir, logfile), visualize=False, verbose=False + logfile, visualize=False, verbose=False ) corr_mat, keys = frenet_log.correlation_matrix(plot=False) + long_mat, lat_mat, dist_mat = frenet_log.distance_matrix(plot=False) - return corr_mat, keys, logfile + return corr_mat, long_mat, lat_mat, dist_mat, keys, logfile except Exception: + traceback.print_exc() return None @@ -64,46 +71,124 @@ def process_return_dict(return_list): return_list ([type]): [description] """ if return_list is not None: - corr_mat_list.append(return_list[0]) - key_list.append(return_list[1]) - scenario_list.append(return_list[2]) - - -with progressbar.ProgressBar(max_value=len(log_file_list)).start() as pbar: - with multiprocessing.Pool(processes=cpu_count) as pool: - for return_list in pool.imap_unordered(eval_func, log_file_list): - process_return_dict(return_list) - pbar.update(pbar.value + 1) - - -if not all_equal(key_list): - print( - "Error: Keys are ambiguous, but must be the same. Make sure to run with the same settings." - ) - -for i in range(len(key_list[0])): - for j in range(i + 1, len(key_list[0])): - corr_dict[str(key_list[0][i]) + "<->" + str(key_list[0][j])] = [ - corr_mat[i, j] for corr_mat in corr_mat_list - ] - -for key in corr_dict: - corr_dict[key] = [x for x in corr_dict[key] if x == x] - -with open("./planner/Frenet/results/corr_dict.json", "w") as fp: - json.dump(corr_dict, fp) - -with open("./planner/Frenet/results/scen_list.txt", "w") as fp2: - json.dump(scenario_list, fp2) - -clean_corr_dict = {k: corr_dict[k] for k in corr_dict if not isnan(sum(corr_dict[k]))} - -fig, ax = plt.subplots() -ax.boxplot(clean_corr_dict.values()) -ax.set_xticklabels(clean_corr_dict.keys(), rotation=90) - -plt.tight_layout() -plt.savefig("./planner/Frenet/results/correlations.pdf") - - -print("Done.") + # Filter nans + if (return_list[0] == return_list[0]).all(): + corr_mat_list.append(return_list[0]) + long_mat_list.append(return_list[1]) + lat_mat_list.append(return_list[2]) + dist_mat_list.append(return_list[3]) + key_list.append(return_list[4]) + scenario_list.append(return_list[5].split("/")[-1]) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("--logdir", type=str, default="./planner/Frenet/results/logs") + args = parser.parse_args() + + log_file_list = os.listdir(args.logdir) + log_file_list = [os.path.join(args.logdir, l) for l in log_file_list] + result_dir = os.path.dirname(args.logdir) + + with progressbar.ProgressBar(max_value=len(log_file_list)).start() as pbar: + with multiprocessing.Pool(processes=cpu_count) as pool: + for return_list in pool.imap_unordered(eval_func, log_file_list): + process_return_dict(return_list) + pbar.update(pbar.value + 1) + + if not all_equal(key_list): + print( + "Error: Keys are ambiguous, but must be the same. Make sure to run with the same settings." + ) + longest_len = max([len(i) for i in key_list]) + new_key_list = [] + new_scenario_list = [] + new_corr_mat_list = [] + for key, scen, corr in zip(key_list, scenario_list, corr_mat_list): + if len(key) == longest_len: + new_key_list.append(key) + new_scenario_list.append(scen) + new_corr_mat_list.append(corr) + else: + print(f"I had to remove {scen} with keys {key}") + + print(key_list) + + for i in range(len(key_list[0])): + for j in range(i + 1, len(key_list[0])): + corr_dict[str(key_list[0][i]) + "<->" + str(key_list[0][j])] = [ + corr_mat[i, j] for corr_mat in corr_mat_list + ] + long_dict[str(key_list[0][i]) + "<->" + str(key_list[0][j])] = [ + long_mat[i, j] for long_mat in long_mat_list + ] + lat_dict[str(key_list[0][i]) + "<->" + str(key_list[0][j])] = [ + lat_mat[i, j] for lat_mat in lat_mat_list + ] + dist_dict[str(key_list[0][i]) + "<->" + str(key_list[0][j])] = [ + dist_mat[i, j] for dist_mat in dist_mat_list + ] + + corr_dict_scenes = {} + long_dict_scenes = {} + lat_dict_scenes = {} + dist_dict_scenes = {} + + for key in corr_dict: + corr_dict_scenes[key] = {} + long_dict_scenes[key] = {} + lat_dict_scenes[key] = {} + dist_dict_scenes[key] = {} + + for idx, val in enumerate(corr_dict[key]): + corr_dict_scenes[key][scenario_list[idx]] = corr_dict[key][idx] + long_dict_scenes[key][scenario_list[idx]] = long_dict[key][idx] + lat_dict_scenes[key][scenario_list[idx]] = lat_dict[key][idx] + dist_dict_scenes[key][scenario_list[idx]] = dist_dict[key][idx] + + # corr_dict_scenes[key].sort() + corr_dict_scenes[key] = dict(sorted(corr_dict_scenes[key].items(), key=lambda item: item[1])) + long_dict_scenes[key] = dict(sorted(long_dict_scenes[key].items(), key=lambda item: item[1], reverse=True)) + lat_dict_scenes[key] = dict(sorted(lat_dict_scenes[key].items(), key=lambda item: item[1], reverse=True)) + dist_dict_scenes[key] = dict(sorted(dist_dict_scenes[key].items(), key=lambda item: item[1], reverse=True)) + + if len(corr_mat_list) != len(scenario_list): + print(f"Warning: Scenario list ({len(scenario_list)}) has not the same lentgh as corr_dict ({len(corr_mat_list)})") + + with open(os.path.join(result_dir, "corr_dict.json"), "w") as fp: + json.dump(corr_dict, fp) + + with open(os.path.join(result_dir, "long_dict.json"), "w") as fp: + json.dump(long_dict, fp) + + with open(os.path.join(result_dir, "lat_dict.json"), "w") as fp: + json.dump(lat_dict, fp) + + with open(os.path.join(result_dir, "dist_dict.json"), "w") as fp: + json.dump(dist_dict, fp) + + with open(os.path.join(result_dir, "scen_list.txt"), "w") as fp: + json.dump(scenario_list, fp) + + with open(os.path.join(result_dir, "corr_dict_scenes.json"), "w") as fp: + json.dump(corr_dict_scenes, fp) + + with open(os.path.join(result_dir, "long_dict_scenes.json"), "w") as fp: + json.dump(long_dict_scenes, fp) + + with open(os.path.join(result_dir, "lat_dict_scenes.json"), "w") as fp: + json.dump(lat_dict_scenes, fp) + + with open(os.path.join(result_dir, "dist_dict_scenes.json"), "w") as fp: + json.dump(dist_dict_scenes, fp) + + clean_corr_dict = {k: corr_dict[k] for k in corr_dict if not isnan(sum(corr_dict[k]))} + + fig, ax = plt.subplots() + ax.boxplot(clean_corr_dict.values()) + ax.set_xticklabels(clean_corr_dict.keys(), rotation=90) + + plt.tight_layout() + plt.savefig(os.path.join(result_dir, "correlations.pdf")) + + print("Done.") diff --git a/planner/Frenet/analyze_tools/analyze_log.py b/planner/Frenet/analyze_tools/analyze_log.py index 8250d9f..dd8a4b8 100644 --- a/planner/Frenet/analyze_tools/analyze_log.py +++ b/planner/Frenet/analyze_tools/analyze_log.py @@ -9,9 +9,12 @@ import matplotlib.pyplot as plt from matplotlib.lines import Line2D from matplotlib.widgets import CheckButtons +from matplotlib.colors import ListedColormap import numpy as np +import pandas as pd import argparse from scipy import stats +from statsmodels.formula.api import ols mopl_path = os.path.dirname( os.path.dirname( @@ -29,6 +32,7 @@ ) from EthicalTrajectoryPlanning.planner.planning import add_ego_vehicles_to_scenario from EthicalTrajectoryPlanning.planner.utils.vehicleparams import VehicleParameters +from EthicalTrajectoryPlanning.risk_assessment.helpers.coll_prob_helpers import distance class FrenetLogVisualizer: @@ -136,6 +140,10 @@ def _load_log_data(self, logfile): # Get number of lines self._no_lines_data = len(self._all_log_data) + else: + print(f"Logfile {logfile} does not exist.") + sys.exit() + def _load_scenario(self, scneario_id): self.scenario, self.planning_problem_set = CommonRoadFileReader( @@ -408,60 +416,183 @@ def visualize(self): def correlation_matrix(self, plot=True): """Calculate and visualize correlation matrix.""" - self.all_weighted_cost_dict = {} + all_weighted_cost_dict = self._get_all_weighted_cost_dict() + + corr_mat = np.zeros( + (len(all_weighted_cost_dict), len(all_weighted_cost_dict)) + ) + for key1, x in enumerate(all_weighted_cost_dict.values()): + for key2, y in enumerate(all_weighted_cost_dict.values()): + correlation, p_value = stats.pearsonr(x, y) + corr_mat[key1, key2] = correlation + + if plot: + self._draw_corr_mat(corr_mat, all_weighted_cost_dict) + + return corr_mat, list(all_weighted_cost_dict.keys()) + + def multi_correlation(self, use_keys=None): + """Calculate coefficients of multiple correlation. + + See: https://en.wikipedia.org/wiki/Coefficient_of_multiple_correlatio + """ + all_weighted_cost_dict = self._get_all_weighted_cost_dict() + + if use_keys is not None: + all_weighted_cost_dict = {k: v for k, v in all_weighted_cost_dict.items() if k in use_keys} + + multi_corr_dict = analyze_multi_correlation(all_weighted_cost_dict) + + return multi_corr_dict + + def distance_matrix(self, plot=False): + """Calculate distances for the best chosen trajectory according to each principle.""" + self.long_dict = {} + self.lat_dict = {} + weights = self.weights + weights["total"] = -1 for key in weights: if weights[key] != 0 and "risk" not in key: - self.all_weighted_cost_dict[key] = [] + self.long_dict[key] = [] + self.lat_dict[key] = [] + + for data in self._all_log_data: + (_, _, ft_list_valid, _, _, _, _) = data + + best_traj = self._get_best_traj_for_principle(ft_list_valid, key) + self.long_dict[key].append(best_traj["s"][-1]) + self.lat_dict[key].append(best_traj["d"][-1]) + + long_mat = np.zeros( + (len(self.long_dict), len(self.long_dict)) + ) + long_std = np.zeros( + (len(self.long_dict), len(self.long_dict)) + ) + lat_mat = np.zeros( + (len(self.lat_dict), len(self.lat_dict)) + ) + lat_std = np.zeros( + (len(self.lat_dict), len(self.lat_dict)) + ) + dist_mat = np.zeros( + (len(self.lat_dict), len(self.lat_dict)) + ) + dist_std = np.zeros( + (len(self.lat_dict), len(self.lat_dict)) + ) + + for key1, x in enumerate(self.long_dict.values()): + for key2, y in enumerate(self.long_dict.values()): + diff = [a - b for a, b in zip(x, y)] + long_mat[key1, key2] = np.mean(np.abs(diff)) + long_std[key1, key2] = np.std(np.abs(diff)) + + for key1, x in enumerate(self.lat_dict.values()): + for key2, y in enumerate(self.lat_dict.values()): + diff = [a - b for a, b in zip(x, y)] + lat_mat[key1, key2] = np.mean(np.abs(diff)) + lat_std[key1, key2] = np.std(np.abs(diff)) + + for key1, (x, y) in enumerate(zip(self.lat_dict.values(), self.long_dict.values())): + for key2, (xx, yy) in enumerate(zip(self.lat_dict.values(), self.long_dict.values())): + diff = [distance([a, b], [aa, bb]) for a, b, aa, bb in zip(x, y, xx, yy)] + dist_mat[key1, key2] = np.mean(np.abs(diff)) + dist_std[key1, key2] = np.std(np.abs(diff)) + + if plot: + self._draw_corr_mat(lat_mat, self.lat_dict, stdw_mat=lat_std, inverse=True, title="Lateral Distance Matrix") + self._draw_corr_mat(long_mat, self.long_dict, stdw_mat=long_std, inverse=True, title="Longitudinal Distance Matrix") + self._draw_corr_mat(dist_mat, self.lat_dict, stdw_mat=dist_std, inverse=True, title="Total Distance Matrix") + plt.show() - for key in self.all_weighted_cost_dict: + return long_mat, lat_mat, dist_mat + + def _get_all_weighted_cost_dict(self): + all_weighted_cost_dict = {} + weights = self.weights + + for key in weights: + if weights[key] > 0 and "risk" not in key: + all_weighted_cost_dict[key] = [] + + for key in all_weighted_cost_dict: for data in self._all_log_data: - (_, _, ft_list_valid, ft_list_invalid, _, _, _) = data + (_, _, ft_list_valid, _, _, _, _) = data - for traj in ft_list_valid + ft_list_invalid: + for traj in ft_list_valid: if key in self.risk_keys: - self.all_weighted_cost_dict[key].append( + all_weighted_cost_dict[key].append( traj["cost_dict"]["risk_cost_dict"][key] * weights[key] * weights["risk_cost"] ) else: - self.all_weighted_cost_dict[key].append( + all_weighted_cost_dict[key].append( traj["cost_dict"]["unweighted_cost"][key] * weights[key] ) - corr_mat = np.zeros( - (len(self.all_weighted_cost_dict), len(self.all_weighted_cost_dict)) - ) - for key1, x in enumerate(self.all_weighted_cost_dict.values()): - for key2, y in enumerate(self.all_weighted_cost_dict.values()): - correlation, p_value = stats.pearsonr(x, y) - corr_mat[key1, key2] = correlation + return all_weighted_cost_dict - if plot: - self._draw_corr_mat(corr_mat) + def _get_best_traj_for_principle(self, traj_list, cost_key): + + curr_best_weighted_cost = np.inf + weights = self.weights - return corr_mat, list(self.all_weighted_cost_dict.keys()) + for traj in traj_list: + if cost_key in self.risk_keys: + weighted_cost = traj["cost_dict"]["risk_cost_dict"][cost_key] * weights[cost_key] * weights["risk_cost"] - def _draw_corr_mat(self, corr_mat): - fig = plt.figure("Correlation Matrix", figsize=(9, 8)) + elif cost_key == "total": + weighted_cost = traj["cost"] + else: + weighted_cost = traj["cost_dict"]["unweighted_cost"][cost_key] * weights[cost_key] + + if weighted_cost < curr_best_weighted_cost: + curr_best_weighted_cost = weighted_cost + curr_best_traj = traj + + return curr_best_traj + + def _draw_corr_mat(self, corr_mat, raw_dict, stdw_mat=None, inverse=False, title="Confusion Matrix"): + fig = plt.figure(title, figsize=(9, 8)) ax = fig.add_subplot(111) + + top = plt.cm.get_cmap('Oranges_r', 128) + bottom = plt.cm.get_cmap('Blues', 128) + + newcolors = np.vstack((top(np.linspace(0, 1, 128)), bottom(np.linspace(0, 1, 128)))) + newcmp = ListedColormap(newcolors, name='OrangeBlue') + + if inverse: + cmap = newcmp + else: + cmap = plt.cm.RdYlGn + im = ax.imshow( corr_mat, aspect='auto', - cmap=plt.cm.RdYlGn, + cmap=cmap, interpolation='nearest', ) - ax.set_xticks(np.arange(len(self.all_weighted_cost_dict))) - ax.set_yticks(np.arange(len(self.all_weighted_cost_dict))) - ax.set_xticklabels(list(self.all_weighted_cost_dict.keys())) - ax.set_yticklabels(list(self.all_weighted_cost_dict.keys())) + ax.set_xticks(np.arange(len(raw_dict))) + ax.set_yticks(np.arange(len(raw_dict))) + ax.set_xticklabels(list(raw_dict.keys())) + ax.set_yticklabels(list(raw_dict.keys())) fig.colorbar(im) - plt.show() + for i in range(len(raw_dict)): + for j in range(len(raw_dict)): + if stdw_mat is not None: + text_str = "{0:.1f}\n{1:.1f}".format(corr_mat[i, j], stdw_mat[i, j]) + else: + text_str = "{0:.1f}".format(corr_mat[i, j]) + + ax.text(j, i, text_str, ha="center", va="center", color="black") def _visualize_selected(self, label): """ @@ -490,6 +621,35 @@ def __init__(self, *args, **kwargs): self.__dict__ = self +def analyze_multi_correlation(data): + """Analyze correlation between multiple principles. + + Args: + data (_type_): _description_ + + Returns: + _type_: _description_ + """ + multi_corr_dict = {} + + df = pd.DataFrame(data=data) + + for key in data: + reg_str = key + " ~" + for key2 in data: + if key != key2: + reg_str += " + " + key2 + + reg_model = ols(reg_str, data=df).fit() + multi_corr_dict[key] = { + "r_value": np.sqrt(reg_model.rsquared) + } + for idx, val in zip(reg_model.params.index, reg_model.params.values): + multi_corr_dict[key][idx] = val + + return multi_corr_dict + + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--logfile", type=str, default=None) @@ -498,6 +658,8 @@ def __init__(self, *args, **kwargs): logvisualizer = FrenetLogVisualizer(args.logfile, visualize=True) # Show correlation matrix of cost terms + # logvisualizer.multi_correlation() + # logvisualizer.distance_matrix(plot=True) # logvisualizer.correlation_matrix() logvisualizer.visualize() diff --git a/planner/Frenet/configs/load_json.py b/planner/Frenet/configs/load_json.py index 9539cad..35ad9ed 100644 --- a/planner/Frenet/configs/load_json.py +++ b/planner/Frenet/configs/load_json.py @@ -5,7 +5,7 @@ import os -def load_risk_json(): +def load_risk_json(filename="risk.json"): """ Load the risk.json with harm weights. @@ -14,11 +14,13 @@ def load_risk_json(): """ risk_config_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), - "risk.json", + filename, ) with open(risk_config_path, "r") as f: jsondata = json.load(f) + print(f"Loaded settings from {risk_config_path}") + return jsondata @@ -52,7 +54,7 @@ def load_weight_json(filename="weights.json"): with open(weight_config_path, "r") as f: jsondata = json.load(f) - print(f"\nLoaded weights from {weight_config_path}") + print(f"Loaded weights from {weight_config_path}") return jsondata diff --git a/planner/Frenet/configs/risk.json b/planner/Frenet/configs/risk.json index 9c28be5..44d5c8b 100644 --- a/planner/Frenet/configs/risk.json +++ b/planner/Frenet/configs/risk.json @@ -7,10 +7,12 @@ "reduced_angle_areas": true, "trajectory_risk": "max", "max_acceptable_risk": 1, + "multiple_cost_functions": false, "scale_factor_time": 0.9, "crash_angle_accuracy": 10, "crash_angle_simplified": true, + "fast_prob_mahalanobis": false, "figures": { "create_figures": false, diff --git a/planner/Frenet/frenet_planner.py b/planner/Frenet/frenet_planner.py index f9c4bfb..eada652 100644 --- a/planner/Frenet/frenet_planner.py +++ b/planner/Frenet/frenet_planner.py @@ -88,6 +88,7 @@ def __init__( sensor_radius: float = 50.0, plot_frenet_trajectories: bool = False, weights=None, + settings=None, ): """ Initialize a frenét planner. @@ -134,12 +135,16 @@ def __init__( self.p = vehicle_params # load parameters - self.params_mode = load_risk_json() self.params_harm = load_harm_parameter_json() if weights is None: self.params_weights = load_weight_json() else: self.params_weights = weights + if settings is not None: + if "risk_dict" in settings: + self.params_mode = settings["risk_dict"] + else: + self.params_mode = load_risk_json() self.params_dict = { 'weights': self.params_weights, @@ -572,13 +577,13 @@ def _step_planner(self): if __name__ == "__main__": - import argparse from planner.plannertools.evaluate import ScenarioEvaluator from planner.Frenet.plannertools.frenetcreator import FrenetCreator parser = argparse.ArgumentParser() parser.add_argument("--scenario", default="recorded/hand-crafted/ZAM_Tjunction-1_486_T-1.xml") + parser.add_argument("--time", action="store_true") args = parser.parse_args() if "commonroad" in args.scenario: @@ -588,7 +593,9 @@ def _step_planner(self): # load settings from planning_fast.json settings_dict = load_planning_json("planning_fast.json") - settings_dict["evaluation_settings"]["show_visualization"] = True + settings_dict["risk_dict"] = risk_dict = load_risk_json() + if not args.time: + settings_dict["evaluation_settings"]["show_visualization"] = True eval_directory = ( pathlib.Path(__file__).resolve().parents[0].joinpath("results").joinpath("eval") ) @@ -607,5 +614,20 @@ def _step_planner(self): timing_enabled=settings_dict["evaluation_settings"]["timing_enabled"], ) - _ = evaluator.eval_scenario(scenario_path) + def main(): + """Loop for cProfile.""" + _ = evaluator.eval_scenario(scenario_path) + + if args.time: + import cProfile + cProfile.run('main()', "output.dat") + no_trajectores = settings_dict["frenet_settings"]["frenet_parameters"]["n_v_samples"] * len(settings_dict["frenet_settings"]["frenet_parameters"]["d_list"]) + import pstats + sortby = pstats.SortKey.CUMULATIVE + with open(f"cProfile/{scenario_path.split('/')[-1]}_{no_trajectores}.txt", "w") as f: + p = pstats.Stats("output.dat", stream=f).sort_stats(sortby) + p.sort_stats(sortby).print_stats() + else: + main() + # EOF diff --git a/planner/Frenet/plannertools/evaluatefrenet.py b/planner/Frenet/plannertools/evaluatefrenet.py index 9deab44..cd739db 100644 --- a/planner/Frenet/plannertools/evaluatefrenet.py +++ b/planner/Frenet/plannertools/evaluatefrenet.py @@ -10,7 +10,7 @@ DatasetEvaluator, ) from EthicalTrajectoryPlanning.planner.Frenet.plannertools.frenetcreator import FrenetCreator - from EthicalTrajectoryPlanning.planner.Frenet.configs.load_json import load_planning_json, load_weight_json + from EthicalTrajectoryPlanning.planner.Frenet.configs.load_json import load_planning_json, load_weight_json, load_risk_json if __name__ == "__main__": @@ -18,6 +18,7 @@ import argparse parser = argparse.ArgumentParser() parser.add_argument('--weights', default="ethical") + parser.add_argument('--settings', default=None) parser.add_argument("--all", action="store_true") args = parser.parse_args() @@ -27,6 +28,11 @@ pathlib.Path(__file__).resolve().parents[1].joinpath("results").joinpath("eval") ) weights = load_weight_json(filename=f"weights_{args.weights}.json") + if args.settings is None: + risk_dict = load_risk_json() + else: + risk_dict = load_risk_json(filename=f"risk_{args.settings}.json") + settings_dict["risk_dict"] = risk_dict # Create the frenet creator frenet_creator = FrenetCreator(settings_dict, weights=weights) diff --git a/planner/Frenet/plannertools/frenetcreator.py b/planner/Frenet/plannertools/frenetcreator.py index 961c4aa..14544ac 100644 --- a/planner/Frenet/plannertools/frenetcreator.py +++ b/planner/Frenet/plannertools/frenetcreator.py @@ -15,6 +15,7 @@ def __init__(self, settings, weights=None): # Settings specific for a frenet planner. self.show_visualization = settings["evaluation_settings"]["show_visualization"] self.frenet_settings = settings["frenet_settings"] + self.settings = settings self.weights = weights def get_planner(self, scenario_handler, ego_vehicle_id): @@ -41,6 +42,7 @@ def get_planner(self, scenario_handler, ego_vehicle_id): plot_frenet_trajectories=self.show_visualization, frenet_parameters=self.frenet_settings["frenet_parameters"], weights=self.weights, + settings=self.settings, ) @staticmethod diff --git a/planner/Frenet/plot_tools/boxplots_correlations.py b/planner/Frenet/plot_tools/boxplots_correlations.py index 089282f..a73f968 100644 --- a/planner/Frenet/plot_tools/boxplots_correlations.py +++ b/planner/Frenet/plot_tools/boxplots_correlations.py @@ -4,6 +4,8 @@ import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap, ListedColormap import numpy as np +import argparse +import os TUM_BLUE = (0 / 255, 101 / 255, 189 / 255) TUM_BLUE_TRANS20 = (0 / 255, 101 / 255, 189 / 255, 0.2) @@ -86,58 +88,80 @@ def box_plot(data, edge_color, fill_color): return bp -with open('./planner/Frenet/results/corr_dict.json') as json_file: - data = json.load(json_file) +def plot_and_save(plt_data, title): + """Plot and save. -key_list = [ - "bayes", - "equality", - "maximin", - "responsibility", - "ego", - "velocity", - "dist_to_global_path", -] -mean_dict = {key: np.mean(data[key]) for key in data} - -corr_mat = np.zeros((len(key_list), len(key_list))) -for id1, key1 in enumerate(key_list): - for id2, key2 in enumerate(key_list): - if id1 == id2: - corr_mat[id1, id2] = 1 - - else: - for keypair in mean_dict: - if key1 in keypair and key2 in keypair: - corr_mat[id1, id2] = mean_dict[keypair] - -fig = plt.figure("Correlation Matrix", figsize=(9, 8)) -ax = fig.add_subplot(111) -im = ax.imshow( - corr_mat, - aspect='auto', - cmap=newcmp, # plt.cm.RdYlBu, - interpolation='nearest', -) - -ax.set_xticks(np.arange(len(key_list))) -ax.set_yticks(np.arange(len(key_list))) -ax.set_xticklabels(key_list) -ax.set_yticklabels(key_list) - -cbar = fig.colorbar(im) -im.set_clim(-1, 1) -plt.savefig("./planner/Frenet/results/correlations.pdf") - - -data = {key: data[key] for key in data if key in PLOT_LIST} - -fig, ax = plt.subplots() -bp = box_plot(data.values(), TUM_DARKBLUE, TUM_LIGHTBLUE) - -ax.set_xticklabels(data.keys(), rotation=90) - -plt.tight_layout() -plt.savefig("./planner/Frenet/results/correlations_boxplot.pdf") - -print("Done.") + Args: + plt_data (_type_): _description_ + title (_type_): _description_ + """ + _, ax1 = plt.subplots() + _ = box_plot(plt_data.values(), TUM_DARKBLUE, TUM_LIGHTBLUE) + ax1.set_xticklabels(plt_data.keys(), rotation=90) + plt.tight_layout() + plt.savefig(os.path.join(args.resultdir, title + ".pdf")) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("--resultdir", type=str, default="./planner/Frenet/results") + args = parser.parse_args() + + with open(os.path.join(args.resultdir, 'corr_dict.json')) as json_file: + corr_data = json.load(json_file) + + with open(os.path.join(args.resultdir, 'long_dict.json')) as json_file: + long_data = json.load(json_file) + + with open(os.path.join(args.resultdir, 'lat_dict.json')) as json_file: + lat_data = json.load(json_file) + + key_list = [ + "bayes", + "equality", + "maximin", + "responsibility", + "ego", + "velocity", + "dist_to_global_path", + ] + mean_dict = {key: np.mean(corr_data[key]) for key in corr_data} + + corr_mat = np.zeros((len(key_list), len(key_list))) + for id1, key1 in enumerate(key_list): + for id2, key2 in enumerate(key_list): + if id1 == id2: + corr_mat[id1, id2] = 1 + + else: + for keypair in mean_dict: + if key1 in keypair and key2 in keypair: + corr_mat[id1, id2] = mean_dict[keypair] + + fig = plt.figure("Correlation Matrix", figsize=(9, 8)) + ax = fig.add_subplot(111) + im = ax.imshow( + corr_mat, + aspect='auto', + cmap=newcmp, # plt.cm.RdYlBu, + interpolation='nearest', + ) + + ax.set_xticks(np.arange(len(key_list))) + ax.set_yticks(np.arange(len(key_list))) + ax.set_xticklabels(key_list) + ax.set_yticklabels(key_list) + + cbar = fig.colorbar(im) + im.set_clim(-1, 1) + plt.savefig(os.path.join(args.resultdir, "correlations.pdf")) + + # corr_data = {key: corr_data[key] for key in corr_data if key in PLOT_LIST} + # long_data = {key: long_data[key] for key in long_data if key in PLOT_LIST} + # lat_data = {key: lat_data[key] for key in lat_data if key in PLOT_LIST} + + plot_and_save(corr_data, "correlations_boxplot") + plot_and_save(long_data, "long_boxplot") + plot_and_save(lat_data, "lat_boxplot") + + print("Done.") diff --git a/planner/Frenet/plot_tools/boxplots_risks.py b/planner/Frenet/plot_tools/boxplots_risks.py index 04542dc..947039c 100644 --- a/planner/Frenet/plot_tools/boxplots_risks.py +++ b/planner/Frenet/plot_tools/boxplots_risks.py @@ -5,6 +5,7 @@ import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap, ListedColormap import numpy as np +from colors import COLORS RAW_DATA_PATHS = [ './planner/Frenet/RISK_VALUES/ethical.json', @@ -14,28 +15,15 @@ NO_TOP_VALUES = 100 -BLACK = (0, 0, 0) -DARK_GRAY = (0.2, 0.2, 0.2) -TUM_BLUE = (0 / 255, 101 / 255, 189 / 255) -TUM_BLUE_TRANS20 = (0 / 255, 101 / 255, 189 / 255, 0.2) -TUM_BLUE_TRANS50 = (0 / 255, 101 / 255, 189 / 255, 0.5) -TUM_DARKBLUE = (0 / 255, 82 / 255, 147 / 255) -TUM_LIGHTBLUE = (100 / 255, 160 / 255, 200 / 255) -TUM_ORANGE = (227 / 255, 114 / 255, 34 / 255) -TUM_ORANGE_TRANS50 = (227 / 255, 114 / 255, 34 / 255, 0.5) -TUM_GREEN = (162 / 255, 173 / 255, 0 / 255) -TUM_GREEN_TRANS50 = (162 / 255, 173 / 255, 0 / 255, 0.5) - - # Create dictionary of keyword aruments to pass to plt.boxplot red_dict = { 'patch_artist': True, - 'boxprops': {"color": DARK_GRAY, "facecolor": TUM_BLUE_TRANS50}, - 'capprops': {"color": DARK_GRAY}, + 'boxprops': {"color": COLORS["DARK_GRAY"], "facecolor": COLORS["TUM_BLUE_TRANS50"]}, + 'capprops': {"color": COLORS["DARK_GRAY"]}, "showfliers": False, - 'flierprops': {"color": TUM_BLUE_TRANS20, "markeredgecolor": TUM_BLUE_TRANS20}, - 'medianprops': {"color": DARK_GRAY}, - 'whiskerprops': {"color": DARK_GRAY}, + 'flierprops': {"color": COLORS["TUM_BLUE_TRANS20"], "markeredgecolor": COLORS["TUM_BLUE_TRANS20"]}, + 'medianprops': {"color": COLORS["DARK_GRAY"]}, + 'whiskerprops': {"color": COLORS["DARK_GRAY"]}, } @@ -108,10 +96,10 @@ def box_plot(data, edge_color, fill_color, ax): for patch, key in zip(bp['boxes'], data.keys()): if "EGO" in key: - patch.set_facecolor(TUM_ORANGE_TRANS50) + patch.set_facecolor(COLORS["TUM_ORANGE_TRANS50"]) elif "STANDARD" in key: - patch.set_facecolor(TUM_GREEN_TRANS50) + patch.set_facecolor(COLORS["TUM_GREEN_TRANS50"]) return bp @@ -129,7 +117,7 @@ def box_plot(data, edge_color, fill_color, ax): boxplot_data_sorted = {key: boxplot_data[key] for key in sorted_keys} fig, ax = plt.subplots() -bp = box_plot(boxplot_data_sorted, TUM_DARKBLUE, TUM_LIGHTBLUE, ax=ax) +bp = box_plot(boxplot_data_sorted, COLORS["TUM_DARKBLUE"], COLORS["TUM_LIGHTBLUE"], ax=ax) ax.set_xticklabels(boxplot_data_sorted.keys(), rotation=90) @@ -165,7 +153,7 @@ def box_plot(data, edge_color, fill_color, ax): fig2, ax2 = plt.subplots() -bp2 = box_plot(boxplot_data_high, TUM_DARKBLUE, TUM_LIGHTBLUE, ax=ax2) +bp2 = box_plot(boxplot_data_high, COLORS["TUM_DARKBLUE"], COLORS["TUM_LIGHTBLUE"], ax=ax2) ax2.set_xticklabels(boxplot_data_high.keys(), rotation=90, fontsize=4) plt.savefig( diff --git a/planner/Frenet/plot_tools/colors.py b/planner/Frenet/plot_tools/colors.py new file mode 100644 index 0000000..6ed14ef --- /dev/null +++ b/planner/Frenet/plot_tools/colors.py @@ -0,0 +1,15 @@ +"""Color dict according to TUM CI.""" + +COLORS = { + "BLACK": (0, 0, 0), + "DARK_GRAY": (0.2, 0.2, 0.2), + "TUM_BLUE": (0 / 255, 101 / 255, 189 / 255), + "TUM_BLUE_TRANS20": (0 / 255, 101 / 255, 189 / 255, 0.2), + "TUM_BLUE_TRANS50": (0 / 255, 101 / 255, 189 / 255, 0.5), + "TUM_DARKBLUE": (0 / 255, 82 / 255, 147 / 255), + "TUM_LIGHTBLUE": (100 / 255, 160 / 255, 200 / 255), + "TUM_ORANGE": (227 / 255, 114 / 255, 34 / 255), + "TUM_ORANGE_TRANS50": (227 / 255, 114 / 255, 34 / 255, 0.5), + "TUM_GREEN": (162 / 255, 173 / 255, 0 / 255), + "TUM_GREEN_TRANS50": (162 / 255, 173 / 255, 0 / 255, 0.5) +} diff --git a/planner/Frenet/utils/calc_trajectory_cost.py b/planner/Frenet/utils/calc_trajectory_cost.py index 18ae8b0..d770eca 100644 --- a/planner/Frenet/utils/calc_trajectory_cost.py +++ b/planner/Frenet/utils/calc_trajectory_cost.py @@ -117,7 +117,7 @@ def calc_trajectory_costs( ego_cost = get_ego_costs(ego_risk_max=traj.ego_risk_dict, boundary_harm=traj.bd_harm) - responsibility_cost = get_responsibility_cost( + responsibility_cost, bool_contain_cache = get_responsibility_cost( scenario=scenario, traj=traj, ego_state=ego_state, @@ -155,24 +155,33 @@ def calc_trajectory_costs( "simulation/sort trajectories/calculate costs/calculate responsibility/total" ): if weights["responsibility"] > 0.0: - responsibility_cost = 0.0 - for obj_id, rs in reach_set.reach_sets[ego_state.time_step].items(): - # time_steps = [float(list(entry.keys())[0]) for entry in rs] - responsibility = True - for part_set in rs: - time_t = list(part_set.keys())[0] - time_step = int(time_t / dt - 1) + if bool_contain_cache is not None: + key_list = list(reach_set.reach_sets[ego_state.time_step].keys()) + for i in range(len(key_list)): + if 1 not in bool_contain_cache[i]: + obj_id = key_list[i] + traj.risk_dict["responsibility"] -= traj.obst_risk_dict[obj_id] + traj.risk_dict["total"] -= traj.obst_risk_dict[obj_id] - ego_pos = Point(traj.x[time_step], traj.y[time_step]) - obj_rs = Polygon(list(part_set.values())[0]) + else: + responsibility_cost = 0.0 + for obj_id, rs in reach_set.reach_sets[ego_state.time_step].items(): + # time_steps = [float(list(entry.keys())[0]) for entry in rs] + responsibility = True + for part_set in rs: + time_t = list(part_set.keys())[0] + time_step = int(time_t / dt - 1) + + ego_pos = Point(traj.x[time_step], traj.y[time_step]) + obj_rs = Polygon(list(part_set.values())[0]) - if obj_rs.contains(ego_pos): - responsibility = False - break + if obj_rs.contains(ego_pos): + responsibility = False + break - if responsibility: - traj.risk_dict["responsibility"] -= traj.obst_risk_dict[obj_id] - traj.risk_dict["total"] -= traj.obst_risk_dict[obj_id] + if responsibility: + traj.risk_dict["responsibility"] -= traj.obst_risk_dict[obj_id] + traj.risk_dict["total"] -= traj.obst_risk_dict[obj_id] with timer.time_with_cm( "simulation/sort trajectories/calculate costs/calculate visible area" @@ -310,7 +319,7 @@ def calc_trajectory_costs( "simulation/sort trajectories/calculate costs/multiply weights and costs" ): curr_weights = {} - if validity_level < 10: + if validity_level < 10 and modes["multiple_cost_functions"]: for key in weights: if key in RISK_COSTS_KEY: curr_weights[key] = weights[key] diff --git a/planner/Frenet/utils/frenet_functions.py b/planner/Frenet/utils/frenet_functions.py index ccd9703..02cf8d8 100644 --- a/planner/Frenet/utils/frenet_functions.py +++ b/planner/Frenet/utils/frenet_functions.py @@ -262,20 +262,77 @@ def calc_frenet_trajectories( with timer.time_with_cm( "simulation/calculate trajectories/calculate quartic polynomial" ): + # use universal function feature to perform array operation # time vector - t = list(np.arange(0.0, tT, dt)) + t = np.arange(0.0, tT, dt) # longitudinal position and derivatives - s = [qp_long.calc_point(t) for t in t] - s_d = [qp_long.calc_first_derivative(t) for t in t] - s_dd = [qp_long.calc_second_derivative(t) for t in t] - s_ddd = [qp_long.calc_third_derivative(t) for t in t] + s = qp_long.calc_point(t) + s_d = qp_long.calc_first_derivative(t) + s_dd = qp_long.calc_second_derivative(t) + s_ddd = qp_long.calc_third_derivative(t) + with timer.time_with_cm( + "simulation/calculate trajectories/calculate global trajectory/calculate reference points" + ): + # use CubicSpineLine's internal function to perform array operation + # calculate the position of the reference path + global_path_x, global_path_y = csp.calc_position(s) + # global_path_x = np.zeros(len(s),dtype=np.float64) + # global_path_y = np.zeros(len(s),dtype=np.float64) + + # for i in range(len(s)): + # global_path_x[i] = csp.sx(s[i]) + # global_path_y[i] = csp.sy(s[i]) + + # move gradient calculation and deviation calculation out from calc_global_trajectory(), avoid unnecessary repeat calculation + with timer.time_with_cm( + "simulation/calculate trajectories/calculate global trajectory/calculate reference gradients" + ): + # calculate derivations necessary to get the curvature + dx = np.gradient(global_path_x, s) + ddx = np.gradient(dx, s) + dddx = np.gradient(ddx, s) + dy = np.gradient(global_path_y, s) + ddy = np.gradient(dy, s) + dddy = np.gradient(ddy, s) + + with timer.time_with_cm( + "simulation/calculate trajectories/calculate global trajectory/calculate reference yaw" + ): + # calculate yaw of the global path + global_path_yaw = np.arctan2(dy, dx) + + with timer.time_with_cm( + "simulation/calculate trajectories/calculate global trajectory/calculate reference curvature" + ): + # calculate the curvature of the global path + global_path_curv = (np.multiply(dx, ddy) - np.multiply(ddx, dy)) / ( + np.power(dx, 2) + np.power(dy, 2) ** (3 / 2) + ) + + with timer.time_with_cm( + "simulation/calculate trajectories/calculate global trajectory/calculate reference curvature derivation" + ): + # calculate the derivation of the global path's curvature + z = np.multiply(dx, ddy) - np.multiply(ddx, dy) + z_d = np.multiply(dx, dddy) - np.multiply(dddx, dy) + n = (np.power(dx, 2) + np.power(dy, 2)) ** (3 / 2) + n_d = (3 / 2) * np.multiply( + np.power((np.power(dx, 2) + np.power(dy, 2)), 0.5), + (2 * np.multiply(dx, ddx) + 2 * np.multiply(dy, ddy)), + ) + global_path_curv_d = (np.multiply(z_d, n) - np.multiply(z, n_d)) / ( + np.power(n, 2) + ) s0 = s[0] + ds = s[-1] - s0 # all lateral distances for dT in d_list: # quintic polynomial in lateral direction # for high velocities we have ds/dt and dd/dt - ds = s[-1] - s0 + if ds <= abs(dT): + continue + if lat_mode == "high_velocity": with timer.time_with_cm( @@ -294,14 +351,19 @@ def calc_frenet_trajectories( with timer.time_with_cm( "simulation/calculate trajectories/calculate quintic polynomial" ): + + # use universal function feature to perform array operation # lateral distance and derivatives - d = [qp_lat.calc_point(t) for t in t] - d_d = [qp_lat.calc_first_derivative(t) for t in t] - d_dd = [qp_lat.calc_second_derivative(t) for t in t] - d_ddd = [qp_lat.calc_third_derivative(t) for t in t] + d = qp_lat.calc_point(t) + d_d = qp_lat.calc_first_derivative(t) + d_dd = qp_lat.calc_second_derivative(t) + d_ddd = qp_lat.calc_third_derivative(t) d_d_time = d_d d_dd_time = d_dd + d_d = d_d / s_d + d_dd = (d_dd - d_d * s_dd) / np.power(s_d, 2) + # for low velocities, we have ds/dt and dd/ds elif lat_mode == "low_velocity": # singularity @@ -337,34 +399,53 @@ def calc_frenet_trajectories( with timer.time_with_cm( "simulation/calculate trajectories/calculate quintic polynomial" ): + # use universal function feature to perform array operation # lateral distance and derivatives - d = [qp_lat.calc_point(s - s0) for s in s] - d_d = [qp_lat.calc_first_derivative(s - s0) for s in s] - d_dd = [qp_lat.calc_second_derivative(s - s0) for s in s] - d_ddd = [qp_lat.calc_third_derivative(s - s0) for s in s] + d = qp_lat.calc_point(s - s0) + d_d = qp_lat.calc_first_derivative(s - s0) + d_dd = qp_lat.calc_second_derivative(s - s0) + d_ddd = qp_lat.calc_third_derivative(s - s0) # since dd/ds, a conversion to dd/dt is needed - d_d_time = [s_d[i] * d_d[i] for i in range(len(d))] - d_dd_time = [ - s_dd[i] * d_d[i] + (s_d[i] ** 2) * d_dd[i] - for i in range(len(d)) - ] + d_d_time = s_d * d_d + d_dd_time = s_dd * d_d + np.power(s_d, 2) * d_dd + + # with timer.time_with_cm( + # "simulation/calculate trajectories/calculate global trajectory/total" + # ): with timer.time_with_cm( - "simulation/calculate trajectories/calculate global trajectory/total" + "simulation/calculate trajectories/calculate global trajectory/calculate trajectory states" ): - # calculate global path with the cubic spline planner (input is s, ds/dt, dds/ddt and for high velocities, d, dd/dt, ddd/ddt is later converted to d, dd/ds, ddd/dds) - x, y, yaw, curv, v, a = calc_global_trajectory( - csp=csp, - s=s, - s_d=s_d, - s_dd=s_dd, - d=d, - d_d_lat=d_d, - d_dd_lat=d_dd, - lat_mode=lat_mode, - exec_timer=timer, - ) + yaw_diff_array = np.arctan(d_d / (1 - global_path_curv * d)) + yaw = yaw_diff_array + global_path_yaw + x = global_path_x - d * np.sin(global_path_yaw) + y = global_path_y + d * np.cos(global_path_yaw) + v = (s_d * (1 - global_path_curv * d)) / np.cos(yaw_diff_array) + curv = ( + ( + ( + d_dd + + (global_path_curv_d * d + global_path_curv * d_d) + * np.tan(yaw_diff_array) + ) + * (np.power(np.cos(yaw_diff_array), 2) / (1 - global_path_curv * d)) + ) + + global_path_curv + ) * (np.cos(yaw_diff_array) / (1 - global_path_curv * d)) + + # transform acceleration + # a = s_dd * ((1 - global_path_curv * d) / np.cos(yaw_diff_array)) + ( + # np.power(s_d , 2) / (np.cos(yaw_diff_array)) + # ) * ( + # (1 - global_path_curv * d) + # * np.tan(yaw_diff_array) + # * ( + # curv * ((1 - global_path_curv * d) / np.cos(yaw_diff_array)) + # - global_path_curv + # ) + # - (global_path_curv_d * d + global_path_curv * d_d) + # ) with timer.time_with_cm( "simulation/calculate trajectories/initialize trajectory" @@ -387,8 +468,8 @@ def calc_frenet_trajectories( curv=curv, ) - if ds > abs(dT): - fp_list.append(fp) + # if ds > abs(dT): + fp_list.append(fp) return fp_list @@ -718,11 +799,12 @@ def get_v_list( # return for the linspace mode if mode == "linspace": if v_goal_min is None: - return np.linspace(v_min, v_max, n_samples) + v_list = np.linspace(v_min, v_max, n_samples - 1) else: - return np.linspace( - max(min(v_goal_min, v_min), 0.001), max(v_goal_max, v_max), n_samples + v_list = np.linspace( + max(min(v_goal_min, v_min), 0.001), max(v_goal_max, v_max), n_samples - 1 ) + return np.append(v_list, v_cur) # check if n_samples is valid for the chosen mode if mode == "deterministic": diff --git a/planner/Frenet/utils/polynomials.py b/planner/Frenet/utils/polynomials.py index 2595924..e5ebc2e 100644 --- a/planner/Frenet/utils/polynomials.py +++ b/planner/Frenet/utils/polynomials.py @@ -67,10 +67,10 @@ def calc_point(self, t): xt = ( self.a0 + self.a1 * t - + self.a2 * t ** 2 - + self.a3 * t ** 3 - + self.a4 * t ** 4 - + self.a5 * t ** 5 + + self.a2 * np.power(t, 2) + + self.a3 * np.power(t, 3) + + self.a4 * np.power(t, 4) + + self.a5 * np.power(t, 5) ) return xt @@ -85,12 +85,19 @@ def calc_first_derivative(self, t): Returns: float: First derivative at time t. """ + # xt = ( + # self.a1 + # + 2 * self.a2 * t + # + 3 * self.a3 * t ** 2 + # + 4 * self.a4 * t ** 3 + # + 5 * self.a5 * t ** 4 + # ) xt = ( self.a1 + 2 * self.a2 * t - + 3 * self.a3 * t ** 2 - + 4 * self.a4 * t ** 3 - + 5 * self.a5 * t ** 4 + + 3 * self.a3 * np.power(t, 2) + + 4 * self.a4 * np.power(t, 3) + + 5 * self.a5 * np.power(t, 4) ) return xt @@ -105,11 +112,17 @@ def calc_second_derivative(self, t): Returns: float: Second derivative at time t. """ + # xt = ( + # 2 * self.a2 + # + 6 * self.a3 * t + # + 12 * self.a4 * t ** 2 + # + 20 * self.a5 * t ** 3 + # ) xt = ( 2 * self.a2 + 6 * self.a3 * t - + 12 * self.a4 * t ** 2 - + 20 * self.a5 * t ** 3 + + 12 * self.a4 * np.power(t, 2) + + 20 * self.a5 * np.power(t, 3) ) return xt @@ -124,7 +137,8 @@ def calc_third_derivative(self, t): Returns: float: Third derivative at time t. """ - xt = 6 * self.a3 + 24 * self.a4 * t + 60 * self.a5 * t ** 2 + # xt = 6 * self.a3 + 24 * self.a4 * t + 60 * self.a5 * t ** 2 + xt = 6 * self.a3 + 24 * self.a4 * t + 60 * self.a5 * np.power(t, 2) return xt diff --git a/planner/Frenet/utils/prediction_helpers.py b/planner/Frenet/utils/prediction_helpers.py index 6b1c898..f41a735 100644 --- a/planner/Frenet/utils/prediction_helpers.py +++ b/planner/Frenet/utils/prediction_helpers.py @@ -175,9 +175,8 @@ def collision_checker_prediction( pred_orientation = predictions[obstacle_id]['orientation_list'] # create a time variant collision object for the predicted ehicle - traj = [] - for i in range(pred_length): - traj.append([x[i], y[i], pred_orientation[i]]) + traj = [[x[i], y[i], pred_orientation[i]] for i in range(pred_length)] + prediction_collision_object_raw = create_tvobstacle( traj_list=traj, box_length=length / 2, diff --git a/planner/Frenet/utils/validity_checks.py b/planner/Frenet/utils/validity_checks.py index dd25024..bee280c 100644 --- a/planner/Frenet/utils/validity_checks.py +++ b/planner/Frenet/utils/validity_checks.py @@ -14,7 +14,6 @@ from EthicalTrajectoryPlanning.planner.utils.timers import ExecTimer from EthicalTrajectoryPlanning.planner.Frenet.utils.helper_functions import ( create_tvobstacle, - get_max_curvature, ) from EthicalTrajectoryPlanning.planner.Frenet.utils.prediction_helpers import ( collision_checker_prediction, @@ -80,14 +79,17 @@ def check_validity( ): # check maximum curvature reason_curvature_invalid = curvature_valid(ft, vehicle_params) + if reason_curvature_invalid is not None: return 0, f"curvature ({reason_curvature_invalid})" with timer.time_with_cm( "simulation/sort trajectories/check validity/check collision" ): + # get collision_object collision_object = create_collision_object(ft, vehicle_params, ego_state) + # collision checkking with obstacles if not collision_valid( ft, @@ -104,9 +106,13 @@ def check_validity( "simulation/sort trajectories/check validity/check road boundaries" ): # check road boundaries - if not boundary_valid(vehicle_params, collision_object, road_boundary): - return 2, "boundaries" - + # if ft.bd_harm is calculated, use it for boundary check + if predictions is None: + if not boundary_valid(vehicle_params, collision_object, road_boundary): + return 2, "boundaries" + else: + if ft.bd_harm: + return 2, "boundaries" with timer.time_with_cm( "simulation/sort trajectories/check validity/check max risk" ): @@ -118,10 +124,7 @@ def check_validity( def create_collision_object(ft, vehicle_params, ego_state): """Create a collision_object of the trajectory for collision checking with road boundary and with other vehicles.""" - traj_list = [] - for i in range(len(ft.t)): - traj_list.append([ft.x[i], ft.y[i], ft.yaw[i]]) - + traj_list = [[ft.x[i], ft.y[i], ft.yaw[i]] for i in range(len(ft.t))] collision_object_raw = create_tvobstacle( traj_list=traj_list, box_length=vehicle_params.l / 2, @@ -148,10 +151,14 @@ def velocity_valid(ft, vehicle_params): Returns: [bool]: [True if valid, false else] """ - for vi in ft.s_d: - if np.abs(vi) > vehicle_params.longitudinal.v_max: - return False - return True + # for vi in ft.s_d: + # if np.abs(vi) > vehicle_params.longitudinal.v_max: + # return False + # return True + if np.all(np.abs(ft.s_d) <= vehicle_params.longitudinal.v_max): + return True + else: + return False def acceleration_valid(ft, vehicle_params): @@ -180,14 +187,29 @@ def curvature_valid(ft, vehicle_params): Returns: [bool]: [True if valid, false else] """ + # numpy absolute value array operation + # calculate velocity threshold out of for loop + # calculate the turning radius + turning_radius = np.sqrt( + (vehicle_params.l ** 2 / np.tan(vehicle_params.steering.max) ** 2) + + (vehicle_params.l_r ** 2) + ) + # below this velocity, it is assumed that the vehicle can follow the turning radius + # above this velocity, the curvature is calculated differently + threshold_low_velocity = np.sqrt(vehicle_params.lateral_a_max * turning_radius) + + c = abs(ft.curv) for i in range(len(ft.t)): - ci = ft.curv[i] - c_max_current, vel_mode = get_max_curvature( - vehicle_params=vehicle_params, v=ft.v[i] - ) - if np.abs(ci) > c_max_current: + # get curvature via turning radius + if ft.v[i] < threshold_low_velocity: + c_max_current, vel_mode = 1.0 / turning_radius, "lvc" + # get velocity dependent curvature (curvature derived from the lateral acceleration) + else: + c_max_current = vehicle_params.lateral_a_max / (ft.v[i] ** 2) + vel_mode = "hvc" + + if c[i] > c_max_current: return vel_mode - return None diff --git a/planner/plannertools/evaluate.py b/planner/plannertools/evaluate.py index 163bf87..1f6fef5 100644 --- a/planner/plannertools/evaluate.py +++ b/planner/plannertools/evaluate.py @@ -292,6 +292,7 @@ def create_eval_files(self): self._create_harm_evaluation() self._create_eval_statistic_exec_times_dict() self._store_weights() + self._store_settings() def _create_completion_list(self): """WIP.""" @@ -489,3 +490,13 @@ def _store_weights(self): with open(file_path, "w") as output: json.dump(weights, output, indent=6) + + def _store_settings(self): + file_path = self.dataset_evaluator.eval_directory.joinpath("settings").with_suffix( + ".json" + ) + settings = self.dataset_evaluator.scenario_evaluator.planner_creator.settings + settings["frenet_settings"]["frenet_parameters"]["d_list"] = settings["frenet_settings"]["frenet_parameters"]["d_list"].tolist() + + with open(file_path, "w") as output: + json.dump(settings, output, indent=6) diff --git a/planner/plannertools/scenario_handler.py b/planner/plannertools/scenario_handler.py index aea6a9f..fb6cd14 100644 --- a/planner/plannertools/scenario_handler.py +++ b/planner/plannertools/scenario_handler.py @@ -29,10 +29,7 @@ ) from EthicalTrajectoryPlanning.risk_assessment.utils.logistic_regression_symmetrical import get_protected_inj_prob_log_reg_ignore_angle from EthicalTrajectoryPlanning.planner.Frenet.utils.helper_functions import create_tvobstacle -from EthicalTrajectoryPlanning.planner.Frenet.configs.load_json import ( - load_harm_parameter_json, - load_risk_json, -) +from EthicalTrajectoryPlanning.planner.Frenet.configs.load_json import load_harm_parameter_json from commonroad.scenario.obstacle import ( ObstacleRole, ObstacleType, @@ -201,7 +198,7 @@ def _create_planner_agent_for_ego_vehicle(self, ego_vehicle_id): def _check_collision(self, agent, time_step): # check if the current state is collision-free - vis = load_risk_json() + vis = self.planner_creator.settings["risk_dict"] coeffs = load_harm_parameter_json() # get ego position and orientation diff --git a/planner/utils/reachable_set.py b/planner/utils/reachable_set.py index 95c2bdc..5ee23c8 100644 --- a/planner/utils/reachable_set.py +++ b/planner/utils/reachable_set.py @@ -4,11 +4,13 @@ import numpy as np from commonroad.scenario.scenario import Scenario -from shapely.geometry import Polygon from EthicalTrajectoryPlanning.planner.GlobalPath.lanelet_based_planner import ( find_lanelet_by_position_and_orientation, ) from EthicalTrajectoryPlanning.planner.utils import reachable_set_simple +from EthicalTrajectoryPlanning.planner.utils.reachable_set_simple import simple_reachable_set +from EthicalTrajectoryPlanning.planner.utils.responsibility import polygon_padding +import pygeos class ReachSet(object): @@ -92,6 +94,20 @@ def calc_reach_sets(self, ego_state, obstacle_list=None): obstacles = self.scenario.obstacles self.reach_sets[ego_state.time_step] = {} + # calculate polygon array for self._reach_set_difference(), avoid repeating + b_dict = {} + for step in self.ego_reach_set[ego_state.time_step][0]: + b_poly_pygeos = [b_set[step] for b_set in self.ego_reach_set[ego_state.time_step]] + len_max = max(len(b_set[step]) for b_set in self.ego_reach_set[ego_state.time_step]) + b_poly_pygeos = polygon_padding(len_max, b_poly_pygeos) + b_poly_pygeos = pygeos.polygons(b_poly_pygeos) + # b_poly_pygeos_test = [pygeos.polygons(b_set[step]) for b_set in self.ego_reach_set[ego_state.time_step]] + + b_poly_pygeos = pygeos.union_all(b_poly_pygeos) + + # b[step] = b_poly + b_dict[step] = b_poly_pygeos + # uncomment to log ego # self.reach_sets[ego_state.time_step][self.ego_id] = self.ego_reach_set[ego_state.time_step] for obstacle in obstacles: @@ -129,29 +145,57 @@ def calc_reach_sets(self, ego_state, obstacle_list=None): self.reach_set_objs[lnlet_id].append(obj) self.reach_sets[ego_state.time_step][o_id] = [] + + # call simple_reachable_set() before for loop, avoid unnecessary repeating + srs = simple_reachable_set( + obj_pos=obstacle.prediction.trajectory.state_list[ + ego_state.time_step + ].position, + obj_heading=obstacle.prediction.trajectory.state_list[ + ego_state.time_step + ].orientation, + obj_vel=obstacle.prediction.trajectory.state_list[ + ego_state.time_step + ].velocity, + obj_length=obstacle.obstacle_shape.length, + obj_width=obstacle.obstacle_shape.width, + dt=self.reach_set_params["parameters"]["dt"], + t_max=self.reach_set_params["parameters"]["t_max"], + a_max=self.reach_set_params["parameters"]["a_max"] + ) + srs_t = pygeos.polygons([srs[t_key] for t_key in srs.keys()]) + for l_id in l_ids: for reach_set_obj in self.reach_set_objs[l_id]: - rs = reach_set_obj.calc_reach_set( - obj_pos=obstacle.prediction.trajectory.state_list[ - ego_state.time_step - ].position, - obj_heading=obstacle.prediction.trajectory.state_list[ - ego_state.time_step - ].orientation, - obj_vel=obstacle.prediction.trajectory.state_list[ - ego_state.time_step - ].velocity, - obj_length=obstacle.obstacle_shape.length, - obj_width=obstacle.obstacle_shape.width, - dt=self.reach_set_params["parameters"]["dt"], - t_max=self.reach_set_params["parameters"]["t_max"], - a_max=self.reach_set_params["parameters"]["a_max"], - ) + # adjust call of calc_reach_set() + # rs = reach_set_obj.calc_reach_set( + # obj_pos=obstacle.prediction.trajectory.state_list[ + # ego_state.time_step + # ].position, + # obj_heading=obstacle.prediction.trajectory.state_list[ + # ego_state.time_step + # ].orientation, + # obj_vel=obstacle.prediction.trajectory.state_list[ + # ego_state.time_step + # ].velocity, + # obj_length=obstacle.obstacle_shape.length, + # obj_width=obstacle.obstacle_shape.width, + # dt=self.reach_set_params["parameters"]["dt"], + # t_max=self.reach_set_params["parameters"]["t_max"], + # a_max=self.reach_set_params["parameters"]["a_max"], + # ) + rs = reach_set_obj.calc_reach_set(srs, srs_t) + if "safe_distance" in self.reach_set_params["rules"]: + + # adjust call of self._reach_set_difference() # subtract safe distance polygon + # reach_set_diffs = self._reach_set_difference( + # rs, self.ego_reach_set[ego_state.time_step] + # ) reach_set_diffs = self._reach_set_difference( - rs, self.ego_reach_set[ego_state.time_step] - ) + rs, b_dict) + self.reach_sets[ego_state.time_step][o_id] += reach_set_diffs else: self.reach_sets[ego_state.time_step][o_id].append(rs) @@ -262,68 +306,107 @@ def _get_non_parallel_lanelets(self, lanelets): final = [l for l in final if l not in parallel or l == lnlet] return set(final) - def _reach_set_difference(self, a, b): + def _reach_set_difference(self, a, b_dict): """ Calculate the difference between two reachable sets. Subtracts b from a. """ - rs_list = [] - for step in a: - a_poly = Polygon(a[step]) - b_poly = Polygon() - for b_set in b: - poly = Polygon(b_set[step]) - b_poly = b_poly.union(poly) - - diff = a_poly.difference(b_poly) - rs_list += self._geom_to_reach_set(diff, step) - return rs_list + rs_list_pygeos = [] + + a_poly_pygeos = [a[step] for step in a] + len_max = max(len(a[step]) for step in a) + a_poly_pygeos = polygon_padding(len_max, a_poly_pygeos) + a_poly_pygeos = pygeos.polygons(a_poly_pygeos) + + # a_poly_pygeos_test = [pygeos.polygons(a[step]) for step in a] - def _add_safe_distance(self, rs, rs_obj, safe_distance_frac, velocity): + diff_pygeos = pygeos.difference(a_poly_pygeos, [b_dict[step] for step in a]) + key_list = list(a.keys()) + for i in range(len(key_list)): + rs_list_pygeos += self._geom_to_reach_set(diff_pygeos[i], key_list[i]) + return rs_list_pygeos + + def _add_safe_distance(self, rs, rs_obj, safe_distance): """ Extend a reachable set in longitudinal direction. Extend a reachable set in longitudinal direction. Applys the two-second heuristic for safe distances. """ - # 2-second safe distance heuristic - # urban scenarios (< 30 km/h) - if velocity <= 8: - safe_distance_factor = 0.75 - # built-up area / residential area (< 54 km/h) - elif velocity <= 15: - safe_distance_factor = 1.0 - # freeway (>= 54 km/h) - else: - safe_distance_factor = 2.0 - min_safe_distance = safe_distance_factor * velocity - safe_distance = min_safe_distance * safe_distance_frac - + # move safe_distance calculation out to caller function + # # 2-second safe distance heuristic + # # urban scenarios (< 30 km/h) + # if velocity <= 8: + # safe_distance_factor = 0.75 + # # built-up area / residential area (< 54 km/h) + # elif velocity <= 15: + # safe_distance_factor = 1.0 + # # freeway (>= 54 km/h) + # else: + # safe_distance_factor = 2.0 + # min_safe_distance = safe_distance_factor * velocity + # safe_distance = min_safe_distance * safe_distance_frac + + # # Lane bounds used for reach set + # patch = rs_obj.intersection_patch_pygeos + # extended = {} + # for step in rs: + # poly = Polygon(rs[step]) + # buffer = Polygon(poly.buffer(safe_distance).exterior) + # if patch is not None: + # # trim extended with patch + # intersection = patch.intersection(buffer) + # if intersection.geom_type == "Polygon" and not intersection.is_empty: + # # convert intersection poly to points + # outline = list(zip(*intersection.exterior.coords.xy)) + # extended[step] = np.array(outline) + # else: + # # intersection is no polygon + # # extended[step] = rs[step] + # raise ValueError( + # "Unhandled geometry type: " + repr(intersection.geom_type) + # ) + # elif buffer.geom_type == "Polygon" and not buffer.is_empty: + # outline = list(zip(*buffer.exterior.coords.xy)) + # extended[step] = np.array(outline) + # else: + # extended[step] = rs[step] + +# replace shapely operations by pygeos # Lane bounds used for reach set - patch = rs_obj.intersection_patch + patch = rs_obj.intersection_patch_pygeos extended = {} - for step in rs: - poly = Polygon(rs[step]) - buffer = Polygon(poly.buffer(safe_distance).exterior) - if patch is not None: + poly_pygeos = [rs[step] for step in rs] + len_max = max(len(rs[step]) for step in rs) + poly_pygeos = polygon_padding(len_max, poly_pygeos) + poly_pygeos = pygeos.polygons(poly_pygeos) + # poly_pygeos = [pygeos.polygons(rs[step]) for step in rs] + buffer_pygeos = pygeos.polygons(pygeos.get_exterior_ring(pygeos.buffer(poly_pygeos, safe_distance, quadsegs=16))) + key_list = list(rs.keys()) + if patch is not None: + intersection_pygeos = pygeos.intersection(patch, buffer_pygeos) + for i in range(len(key_list)): # trim extended with patch - intersection = patch.intersection(buffer) - if intersection.geom_type == "Polygon" and not intersection.is_empty: + # if intersection.geom_type == "Polygon" and not intersection.is_empty: + if pygeos.get_type_id(intersection_pygeos[i]) == 3 and not pygeos.is_empty(intersection_pygeos[i]): # convert intersection poly to points - outline = list(zip(*intersection.exterior.coords.xy)) - extended[step] = np.array(outline) + outline = pygeos.get_coordinates(pygeos.get_exterior_ring(intersection_pygeos[i])) + extended[key_list[i]] = outline else: - # intersection is no polygon - # extended[step] = rs[step] + # raise ValueError( + # "Unhandled geometry type: " + repr(intersection.geom_type) + # ) raise ValueError( - "Unhandled geometry type: " + repr(intersection.geom_type) + "Unhandled geometry type: " + str(pygeos.get_type_id(intersection_pygeos[i])) ) - elif buffer.geom_type == "Polygon" and not buffer.is_empty: - outline = list(zip(*buffer.exterior.coords.xy)) - extended[step] = np.array(outline) - else: - extended[step] = rs[step] + else: + for i in range(len(key_list)): + if pygeos.get_type_id(buffer_pygeos[i]) == 3 and not pygeos.is_empty(buffer_pygeos[i]): + outline = pygeos.get_coordinates(pygeos.get_exterior_ring(buffer_pygeos[i])) + extended[key_list[i]] = outline + else: + extended[key_list[i]] = rs[key_list[i]] return extended def _geom_to_reach_set(self, geometry, step): @@ -334,18 +417,21 @@ def _geom_to_reach_set(self, geometry, step): """ rs_list = [] # if polygons don't intersect, the result is a MultiPolygon - if geometry.geom_type == "Polygon": - if geometry.is_empty: + # if geometry.geom_type == "Polygon": + if pygeos.get_type_id(geometry) == 3: + if pygeos.is_empty(geometry): return rs_list # convert difference to points rs = {} rs[step] = self._get_points_of_polygon(geometry) rs_list.append(rs) - elif geometry.geom_type == "MultiPolygon": - for poly in geometry: - rs_list += self._geom_to_reach_set(poly, step) + # elif geometry.geom_type == "MultiPolygon": + elif pygeos.get_type_id(geometry) == 6: + for i in range(pygeos.get_num_geometries(geometry)): + rs_list += self._geom_to_reach_set(pygeos.get_geometry(geometry, i), step) else: - raise ValueError("Unhandled geometry type: " + repr(geometry.geom_type)) + # raise ValueError("Unhandled geometry type: " + repr(geometry.geom_type)) + raise ValueError("Unhandled geometry type: " + str(pygeos.get_type_id(geometry))) return rs_list def _get_points_of_polygon(self, polygon): @@ -355,10 +441,16 @@ def _get_points_of_polygon(self, polygon): Convert a shapely polygon to a numpy array with columns [x,y]. """ + # replace polygon coordinates extraction for shapely by pygeos + # interior_line = [] + # for interior in polygon.interiors: + # interior_line = list(zip(*interior.coords.xy)) + # outline = list(zip(*polygon.exterior.coords.xy)) + # return np.array(outline + interior_line) interior_line = [] - for interior in polygon.interiors: - interior_line = list(zip(*interior.coords.xy)) - outline = list(zip(*polygon.exterior.coords.xy)) + for i in range(pygeos.get_num_interior_rings(polygon)): + interior_line = pygeos.get_coordinates(pygeos.get_interior_ring(polygon, i)).tolist() + outline = pygeos.get_coordinates(pygeos.get_exterior_ring(polygon)).tolist() return np.array(outline + interior_line) def _ego_reach_set(self, ego_state): @@ -385,25 +477,54 @@ def _ego_reach_set(self, ego_state): self.reach_set_objs_single[l_id].append(obj) self.ego_reach_set[ego_state.time_step] = [] + + # call simple_reachable_set() before for loop, avoid unnecessary repeating + # calculate reachable set + srs = simple_reachable_set( + obj_pos=ego_state.position, + obj_heading=ego_state.orientation, + obj_vel=ego_state.velocity, + obj_length=self.ego_length, + obj_width=self.ego_width, + dt=self.reach_set_params["parameters"]["dt"], + t_max=self.reach_set_params["parameters"]["t_max"], + a_max=0.01, + ) + srs_t = pygeos.polygons([srs[t_key] for t_key in srs.keys()]) + + # calculate safe_distance before for loop + # 2-second safe distance heuristic + # urban scenarios (< 30 km/h) + if ego_state.velocity <= 8: + safe_distance_factor = 0.75 + # built-up area / residential area (< 54 km/h) + elif ego_state.velocity <= 15: + safe_distance_factor = 1.0 + # freeway (>= 54 km/h) + else: + safe_distance_factor = 2.0 + min_safe_distance = safe_distance_factor * ego_state.velocity + safe_distance = min_safe_distance * self.reach_set_params["rules"]["safe_distance"]["safe_distance_frac"] + for reach_set_obj in self.reach_set_objs_single[l_id]: # reach set for ego assumes constant acceleration # essentially, this is the center of the vehicle, # given its current velocity - rs = reach_set_obj.calc_reach_set( - obj_pos=ego_state.position, - obj_heading=ego_state.orientation, - obj_vel=ego_state.velocity, - obj_length=self.ego_length, - obj_width=self.ego_width, - dt=self.reach_set_params["parameters"]["dt"], - t_max=self.reach_set_params["parameters"]["t_max"], - a_max=0.01, - ) + # rs = reach_set_obj.calc_reach_set( + # obj_pos=ego_state.position, + # obj_heading=ego_state.orientation, + # obj_vel=ego_state.velocity, + # obj_length=self.ego_length, + # obj_width=self.ego_width, + # dt=self.reach_set_params["parameters"]["dt"], + # t_max=self.reach_set_params["parameters"]["t_max"], + # a_max=0.01, + # ) + rs = reach_set_obj.calc_reach_set(srs, srs_t) extended_rs = self._add_safe_distance( rs, reach_set_obj, - self.reach_set_params["rules"]["safe_distance"]["safe_distance_frac"], - ego_state.velocity, + safe_distance ) self.ego_reach_set[ego_state.time_step].append(extended_rs) diff --git a/planner/utils/reachable_set_simple.py b/planner/utils/reachable_set_simple.py index d81cc37..fe51626 100644 --- a/planner/utils/reachable_set_simple.py +++ b/planner/utils/reachable_set_simple.py @@ -2,8 +2,8 @@ import numpy as np import math -import shapely.geometry from EthicalTrajectoryPlanning.planner.utils import shapely_conversions +import pygeos class ReachSetSimple(object): @@ -42,31 +42,23 @@ def __init__( # intersection with reachable set) if bound_l is not None and bound_r is not None: if closed: - polygon_l = shapely.geometry.Polygon(bound_l) - polygon_r = shapely.geometry.Polygon(bound_r) - - if polygon_l.area > polygon_r.area: - self.__intersection_patch = polygon_l.difference(polygon_r) + polygon_l_r = pygeos.polygons([bound_l, bound_r]) + areas = pygeos.area(polygon_l_r) + if areas[0] > areas[1]: + self.__intersection_patch_pygeos = pygeos.difference(polygon_l_r[0], polygon_l_r[1]) else: - self.__intersection_patch = polygon_r.difference(polygon_l) + self.__intersection_patch_pygeos = pygeos.difference(polygon_l_r[1], polygon_l_r[0]) else: - self.__intersection_patch = shapely.geometry.Polygon( + self.__intersection_patch_pygeos = pygeos.polygons( np.row_stack((bound_l, np.flipud(bound_r))) ) - else: - self.__intersection_patch = None + self.__intersection_patch_pygeos = None def calc_reach_set( self, - obj_pos: np.ndarray, - obj_heading: float, - obj_vel: float, - obj_length: float, - obj_width: float, - dt: float, - t_max: float, - a_max: float, + srs, + srs_t ) -> dict: """ Calculate a simple reachable set approximation, trimmed to bounds. @@ -91,46 +83,32 @@ def calc_reach_set( a np.ndarray with columns [x, y] """ - # calculate reachable set - srs = simple_reachable_set( - obj_pos=obj_pos, - obj_heading=obj_heading, - obj_vel=obj_vel, - obj_length=obj_length, - obj_width=obj_width, - dt=dt, - t_max=t_max, - a_max=a_max, - ) - + # move simple_reachable_set() out to the caller function calc_reach_sets() # if provided, trim reachable set to track - if self.__intersection_patch is not None: - for t_key in srs.keys(): - srs_t = shapely.geometry.Polygon(srs[t_key]) - - srs_t = srs_t.intersection(self.__intersection_patch) + if self.__intersection_patch_pygeos is not None: + # replace shapely by pygeos + srs_t_intersection = pygeos.intersection(srs_t, self.__intersection_patch_pygeos) + key_list = list(srs.keys()) + type_ids = pygeos.get_type_id(srs_t_intersection) + for i in range(len(key_list)): - # if coordinates present (not wiped out completely), - # extract outline coordinates - red_tmp = shapely_conversions.extract_polygon_outline( - shapely_geometry=srs_t - ) + red_tmp_pygeos = shapely_conversions.extract_polygon_outline_pygeos(srs_t_intersection[i], type_ids[i]) # add outline coordinates to reach set - if red_tmp is not None: - srs[t_key] = red_tmp + if red_tmp_pygeos is not None: + srs[key_list[i]] = red_tmp_pygeos return srs @property - def intersection_patch(self): + def intersection_patch_pygeos(self): """ Get intersection patch. Returns: The intersection patch. """ - return self.__intersection_patch + return self.__intersection_patch_pygeos def simple_reachable_set( @@ -306,12 +284,150 @@ def calc_acc_rad(amax: float, dt: float, t_max: float) -> list: :returns: * **racc** - sequence of radii """ - racc = [] - for t in np.arange(0.0, t_max + dt / 2, dt): - racc.append(0.5 * amax * t ** 2) + # racc = [] + # for t in np.arange(0.0, t_max + dt / 2, dt): + # racc.append(0.5 * amax * t ** 2) + racc = 0.5 * amax * np.arange(0.0, t_max + dt / 2, dt) ** 2 return racc +# def calc_vertices( +# pos: np.ndarray, +# cvehicle: np.ndarray, +# bx_bound: np.ndarray, +# globalangle: float, +# racc: list, +# dt: float, +# veh_length: float = 0.0, +# veh_width: float = 0.0, +# ) -> dict: +# """ +# Calculate the vertices of the enveloping polygon. + +# .. code-block:: + +# q2-----q3 +# - | +# q1 | +# | | +# q6 | +# - | +# q5-----q4 + +# Calculation of over-approximation polygon for each step. +# Method based on "SPOT: A Tool for Set-Based Prediction of +# Traffic Participants" by M. Koschi and M. Althoff + + +# :param pos: position of the vehicle [in m] +# :param cvehicle: center positions of vehicle +# at future time-stamps (along longitudinal axis) +# :param bx_bound: front extension of overapprox. +# reach set (based on Althoff) +# :param globalangle: angle [in rad] +# :param racc: radius of reachable area +# at certain points in time [in m] +# :param dt: temporal increment between pose predictions [in s] +# :param veh_length: (optional) vehicle length [in m], +# if not provided, reach-set for +# point-mass is calculated +# :param veh_width: (optional) vehicle width [in m], +# if not provided, reach-set for +# point-mass is calculated +# :returns: +# * **polypred** - dict of reachable areas with: + +# * keys holding the evaluated time-stamps +# * values holding the outlining coordinates +# as a np.ndarray with columns [x, y] + +# :Authors: +# * Yves Huberty +# * Tim Stahl + +# :Created on: +# 27.01.2020 + +# """ +# # initialize empty polygon dict +# polypred = {} + +# # init vehicle dimension array +# veh_dim = ( +# np.array( +# [ +# [-veh_length, veh_width], +# [-veh_length, veh_width], +# [veh_length, veh_width], +# [veh_length, -veh_width], +# [-veh_length, -veh_width], +# [-veh_length, -veh_width], +# ] +# ) +# / 2.0 +# ) + +# prev_front = -99.0 +# prev_r_t = 0.0 +# for j in range(cvehicle.shape[0]): +# # retrieve relevant parameters +# if j > 0: +# r_t = racc[j - 1] +# c_t = cvehicle[j - 1] +# b_t = bx_bound[j - 1] +# else: +# r_t = 0.0 +# c_t = 0.0 +# b_t = 0.0 +# r_t1 = racc[j] +# c_t1 = cvehicle[j] + +# # calculate basic polygon (in plane) +# prev_r_t = poly[0, 1] + +# # add vehicle dimensions +# poly = poly + veh_dim + +# # rotate and translate +# for i in range(np.shape(poly)[0]): +# poly[i, :] = rotate_vector(poly[i, :], globalangle) + np.transpose(pos) + +# # add to polypred +# polypred[round(dt * j, 2)] = poly + +# return polypred + +# @njit(cache=True) [b_t, r_t1], # q2 +# [c_t1 + r_t1, r_t1], # q3 +# [c_t1 + r_t1, -r_t1], # q4 +# [b_t, -r_t1], # q5 +# [c_t - r_t, -r_t], +# ] +# ) # q6 + +# prev_r_t = poly[0, 1] + +# # add vehicle dimensions +# poly = poly + veh_dim + +# # rotate and translate +# for i in range(np.shape(poly)[0]): +# poly[i, :] = rotate_vector(poly[i, :], globalangle) + np.transpose(pos) + +# # add to polypred +# polypred[round(dt * j, 2)] = poly + +# return polypred + +# @njit(cache=True)# prevent from driving backwards when reaching v=0 +# if poly[0, 0] < prev_front: +# poly[0, 0] = prev_front +# poly[0, 1] = prev_r_t +# poly[-1, 0] = prev_front +# poly[-1, 1] = -prev_r_t + +# else: +# def calc_vertices( pos: np.ndarray, cvehicle: np.ndarray, @@ -390,6 +506,11 @@ def calc_vertices( prev_front = -99.0 prev_r_t = 0.0 + + # calculate sin and cos before for loop + s = np.sin(globalangle) + c = np.cos(globalangle) + for j in range(cvehicle.shape[0]): # retrieve relevant parameters if j > 0: @@ -428,12 +549,17 @@ def calc_vertices( # add vehicle dimensions poly = poly + veh_dim - + # use numpy broadcasting and array operation # rotate and translate - for i in range(np.shape(poly)[0]): - poly[i, :] = rotate_vector(poly[i, :], globalangle) + np.transpose(pos) - + poly_new = np.zeros_like(poly) + poly_new[:, 0] = c * poly[:, 0] - s * poly[:, 1] + poly_new[:, 1] = s * poly[:, 0] + c * poly[:, 1] + # xnew = c * poly[:, 0] - s * poly[:, 1] + # ynew = s * poly[:, 0] + c * poly[:, 1] + + res_array = poly_new + np.transpose(pos) + # res_array = np.stack((xnew, ynew), axis=-1) + np.transpose(pos) + # print(np.all(res_array==res_array_test)) # add to polypred - polypred[round(dt * j, 2)] = poly - + polypred[round(dt * j, 2)] = res_array return polypred diff --git a/planner/utils/responsibility.py b/planner/utils/responsibility.py index 7885d34..aad84c8 100644 --- a/planner/utils/responsibility.py +++ b/planner/utils/responsibility.py @@ -1,6 +1,6 @@ """Responsibility handling for ethical risk evaluation.""" import numpy as np -from shapely.geometry import Point, Polygon +import pygeos def calc_responsibility_reach_set(traj, ego_state, reach_set): @@ -14,27 +14,44 @@ def calc_responsibility_reach_set(traj, ego_state, reach_set): Returns: _type_: _description_ """ + # prepare cache memory + bool_contain_cache = [] responsibility_cost = 0.0 - for obj_id, rs in reach_set.reach_sets[ego_state.time_step].items(): - # time_steps = [float(list(entry.keys())[0]) for entry in rs] - responsibility = True - for part_set in rs: - time_t = list(part_set.keys())[0] - if time_t <= 0: - continue - time_step = int(time_t / (traj.t[1] - traj.t[0]) - 1) - - ego_pos = Point(traj.x[time_step], traj.y[time_step]) - obj_rs = Polygon(list(part_set.values())[0]) - - if obj_rs.contains(ego_pos): - responsibility = False - break - - if responsibility: + # time step + dt = traj.t[1] - traj.t[0] + # reach sets key list + dict_items_list = list(reach_set.reach_sets[ego_state.time_step].items()) + + for i in range(len(dict_items_list)): + obj_id = dict_items_list[i][0] + rs = dict_items_list[i][1] + + time_t_list = np.array([list(part_set.keys())[0] for part_set in rs]) + time_step_list = np.array(time_t_list / dt - 1, dtype=int) + + # ego pose + point_array = np.stack((traj.x[time_step_list], traj.y[time_step_list]), axis=-1) + ego_pos_array = pygeos.points(point_array) + # ego_pos_array = pygeos.points(traj.x[time_step_list], traj.y[time_step_list]) + + # prepare polygon array, make sure every polygon array has same length + len_max = max(len(list(part_set.values())[0]) for part_set in rs) + poly_array = [list(part_set.values())[0] for part_set in rs] + poly_array_pad = polygon_padding(len_max, poly_array) + + # create polygon datatype + obj_rs_array = pygeos.polygons(poly_array_pad) + + # whether ego point is contained in polygon + bool_contain = np.array(pygeos.contains(obj_rs_array, ego_pos_array), dtype=int) + # save cache + bool_contain_cache.append(bool_contain) + # ignore when time_t <= 0 + mask_array = np.array(time_t_list > 0, dtype=int) + if 1 not in bool_contain * mask_array: responsibility_cost -= traj.obst_risk_dict[obj_id] - return responsibility_cost + return responsibility_cost, bool_contain_cache def assign_responsibility_by_action_space(scenario, ego_state, predictions): @@ -70,3 +87,27 @@ def check_if_inside180view(ego_state, prediction): else: return False + + +def polygon_padding(max_poly_len, poly_array): + """Polygon padding. + + Args: + max_poly_len (_type_): _description_ + poly_array (_type_): _description_ + + Returns: + _type_: _description_ + """ + res = np.zeros((len(poly_array), max_poly_len, 2)) + + for i in range(len(poly_array)): + if len(poly_array[i]) < max_poly_len: + res[i][:len(poly_array[i])] = poly_array[i] + # for j in range(len(poly_array[i]), max_poly_len): + # res[i][j] = poly_array[i][0] + res[i][len(poly_array[i]):] = poly_array[i][-1] + else: + res[i] = poly_array[i] + + return res diff --git a/planner/utils/shapely_conversions.py b/planner/utils/shapely_conversions.py index 961d188..d56db4c 100644 --- a/planner/utils/shapely_conversions.py +++ b/planner/utils/shapely_conversions.py @@ -2,6 +2,7 @@ import numpy as np import shapely.geometry +import pygeos def extract_polygon_outline(shapely_geometry: shapely.geometry) -> np.ndarray: @@ -73,3 +74,89 @@ def extract_polygon_outline(shapely_geometry: shapely.geometry) -> np.ndarray: polygon_outline[1])) return polygon_outline + + +def extract_polygon_outline_pygeos(pygeos_geometry, type_id) -> np.ndarray: + """ + Extract the key outline coordinates of a shapely shape. + + Extract the key outline coordinates of a shapely shape + (including multi-shapes like MultiPolygon). The following + types ares supported: + * Polygon: The outline of the polygon is returned + * MultiPolygon: The outline of the largest polygon is returned + * GeometryCollection: The largest polygon in the set is returned, + if no polygon is present 'None' is returned + * LineString: 'None' is returned, since the shape is + a line and does not host volume information + + For any other type, an error is raised. + + :param shapely_geometry: shapely-geometry of interest + :returns: + * **polygon_outline** - outline coordinates in form of + a numpy array with columns x, y + + :Authors: + * Tim Stahl + + :Created on: + 09.10.2020 + + """ + polygon_outline = None + # type_id_test = pygeos.get_type_id(pygeos_geometry) + # if not (type_id == type_id_test): + # print('false') + + # if shapely_geometry.geom_type == 'Polygon': + if type_id == 3: + if not pygeos.is_empty(pygeos_geometry): + # polygon_outline = pygeos_geometry.exterior.coords.xy + polygon_outline = pygeos.get_coordinates(pygeos.get_exterior_ring(pygeos_geometry)) + + # elif shapely_geometry.geom_type == 'MultiPolygon': + elif type_id == 6: + # extract largest polygon + + # polygon_outline = max(pygeos_geometry, key=lambda a: pygeos.area(a)).exterior.coords.xy + polygon_outline = pygeos.get_coordinates(pygeos.get_exterior_ring(max(pygeos.get_geometry(pygeos_geometry, range(pygeos.get_num_geometries(pygeos_geometry))), + key=lambda a: pygeos.area(a)))) + + # elif shapely_geometry.geom_type == 'GeometryCollection': + elif type_id == 7: + # extract polygons + polygons = [] + for geometry in pygeos_geometry: + if ( + # geometry.geom_type == 'Polygon' and + # not pygeos_geometry.is_empty + pygeos.get_type_id(geometry) == 3 and not pygeos.is_empty(geometry) + ): + polygons.append(geometry) + + # extract largest polygon + if polygons: + # polygon_outline = max(polygons, key=lambda a: pygeos.area(a)).exterior.coords.xy + polygon_outline = pygeos.get_coordinates(pygeos.get_exterior_ring(max(pygeos.get_geometry(pygeos_geometry, range(pygeos.get_num_geometries(pygeos_geometry))), + key=lambda a: pygeos.area(a)))) + + # elif shapely_geometry.geom_type == 'LineString': + elif type_id == 1: + # if just line left, skip + pass + + # else: + # raise ValueError("Faced unsupported shape '" + + # str(shapely_geometry.geom_type) + "'!") + else: + raise ValueError("Faced unsupported shape '" + + str(type_id) + "'!") + + # convert to numpy array + # if polygon_outline is not None: + # polygon_outline = np.column_stack(( + # polygon_outline[0], + # polygon_outline[1])) + + return polygon_outline diff --git a/requirements.txt b/requirements.txt index e992063..defc546 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ commonroad-io==2020.3 -commonroad-drivability-checker==2021.4.1 +commonroad-drivability-checker commonroad-helper-functions==1.0.0 commonroad-agent==1.1.0 wale-net==2.0.2 -numpy~=1.21.6 +numpy~=1.19.5 matplotlib~=3.2.2 progressbar2 argparse~=1.4.0 @@ -16,3 +16,6 @@ numba~=0.50.1 tqdm~=4.47.0 setuptools~=49.2.0 networkx==2.5 +pandas~=1.4.3 +statsmodels~=0.13.2 +pygeos~=0.13 \ No newline at end of file diff --git a/risk_assessment/collision_probability.py b/risk_assessment/collision_probability.py index 74d9759..926d6f5 100644 --- a/risk_assessment/collision_probability.py +++ b/risk_assessment/collision_probability.py @@ -5,7 +5,8 @@ import os import sys import numpy as np -from scipy.stats import multivariate_normal +from scipy.stats import multivariate_normal, mvn +from scipy.spatial.distance import mahalanobis import commonroad_dc.pycrcc as pycrcc from EthicalTrajectoryPlanning.risk_assessment.helpers.coll_prob_helpers import ( distance, @@ -132,6 +133,165 @@ def get_collision_probability(traj, predictions: dict, vehicle_params, safety_ma return collision_prob_dict +def get_collision_probability_fast(traj, predictions: dict, vehicle_params, safety_margin=1.0): + """ + Calculate the collision probabilities of a trajectory and predictions. + + Args: + traj (FrenetTrajectory): Considered trajectory. + predictions (dict): Predictions of the visible obstacles. + vehicle_params (VehicleParameters): Parameters of the considered + vehicle. + + Returns: + dict: Collision probability of the trajectory per time step with the + prediction for every visible obstacle. + """ + obstacle_ids = list(predictions.keys()) + collision_prob_dict = {} + + # get the current positions array of the ego vehicles + ego_pos = np.stack((traj.x, traj.y), axis=-1) + + # offset between vehicle center point and corner point + offset = np.array([vehicle_params.l / 6, vehicle_params.w / 2]) + + for obstacle_id in obstacle_ids: + mean_list = predictions[obstacle_id]['pos_list'] + cov_list = predictions[obstacle_id]['cov_list'] + yaw_list = predictions[obstacle_id]['orientation_list'] + length = predictions[obstacle_id]['shape']['length'] + probs = [] + + # mean distance calculation + # determine the length of arrays + min_len = min(len(traj.x), len(mean_list)) + + # adjust array of the ego vehicles + # ego_pos_array = np.stack((traj.x[1:min_len], traj.y[1:min_len]), axis=-1) + ego_pos_array = ego_pos[1:min_len] + + # get the positions array of the front and the back of the obsatcle vehicle + mean_deviation_array = np.stack((np.cos(yaw_list[1:min_len]), np.sin(yaw_list[1:min_len])), axis=-1) * length / 2 + mean_array = np.array(mean_list[:min_len - 1]) + mean_front_array = mean_array + mean_deviation_array + mean_back_array = mean_array - mean_deviation_array + + # total_mean_array = np.stack((mean_array, mean_front_array, mean_back_array)) + total_mean_array = np.array([mean_array, mean_front_array, mean_back_array]) + + # distance from ego vehicle + distance_array = total_mean_array - ego_pos_array + distance_array = np.sqrt(distance_array[:, :, 0] ** 2 + distance_array[:, :, 1] ** 2) + # distance_array_test = np.sqrt(np.power(distance_array_test[:,:,0],2) + np.power(distance_array_test[:,:,1],2)) + # distance_array = np.linalg.norm(total_mean_array-ego_pos_array, axis=-1) + + # min distance of each column + min_distance_array = distance_array.min(axis=0) + # bool: whether min distance is larger than 5.0 + min_distance_array = min_distance_array > 5.0 + + for i in range(1, len(traj.x)): + # only calculate probability as the predicted obstacle is visible + if i < len(mean_list): + # if the distance between the vehicles is bigger than 5 meters, + # the collision probability is zero + # avoids huge computation times + + # directly use previous bool result for the if statements + if (min_distance_array[i - 1]): + prob = 0.0 + else: + cov = cov_list[i - 1] + # if the covariance is a zero matrix, the prediction is + # derived from the ground truth + # a zero matrix is not singular and therefore no valid + # covariance matrix + allcovs = [cov[0][0], cov[0][1], cov[1][0], cov[1][1]] + if all(covi == 0 for covi in allcovs): + cov = [[0.1, 0.0], [0.0, 0.1]] + + prob = 0.0 + # means = [mean, mean_front, mean_back] + # means = total_mean_array[:,i-1] + + # the occupancy of the ego vehicle is approximated by three + # axis aligned rectangles + # get the center points of these three rectangles + center_points = get_center_points_for_shape_estimation( + length=vehicle_params.l, + width=vehicle_params.w, + orientation=traj.yaw[i], + pos=ego_pos_array[i - 1], + ) + + # upper_right and lower_left points + center_points = np.array(center_points) + + # in order to get the cdf, the upper right point and the + # lower left point of every rectangle is needed + # upper_right = np.stack((center_points[:,0] + vehicle_params.l / 6, center_points[:,1] + vehicle_params.w / 2), axis=-1) + # lower_left = np.stack((center_points[:,0] - vehicle_params.l / 6, center_points[:,1] - vehicle_params.w / 2), axis=-1) + # offset = np.array([vehicle_params.l / 6, vehicle_params.w / 2]) + upper_right = center_points + offset + lower_left = center_points - offset + + # use mvn.mvnun to calculate multivariant cdf + # the probability distribution consists of the partial + # multivariate normal distributions + # this allows to consider the length of the predicted + # obstacle + # consider every distribution + for mu in total_mean_array[:, i - 1]: + for center_point_index in range(len(center_points)): + prob += mvn.mvnun(lower_left[center_point_index], upper_right[center_point_index], mu, cov)[0] + else: + prob = 0.0 + # divide by 3 since 3 probability distributions are added up and + # normalize the probability + probs.append(prob / 3) + + collision_prob_dict[obstacle_id] = np.array(probs) + + return collision_prob_dict + + +def get_inv_mahalanobis_dist(traj, predictions: dict, vehicle_params, safety_margin=1.0): + """ + Calculate the collision probabilities of a trajectory and predictions. + + Args: + traj (FrenetTrajectory): Considered trajectory. + predictions (dict): Predictions of the visible obstacles. + vehicle_params (VehicleParameters): Parameters of the considered + vehicle. + + Returns: + dict: Collision probability of the trajectory per time step with the + prediction for every visible obstacle. + """ + obstacle_ids = list(predictions.keys()) + collision_prob_dict = {} + + for obstacle_id in obstacle_ids: + mean_list = predictions[obstacle_id]['pos_list'] + cov_list = predictions[obstacle_id]['cov_list'] + inv_cov_list = np.linalg.inv(cov_list) + inv_dist = [] + for i in range(1, len(traj.x)): + if i < len(mean_list): + u = [traj.x[i], traj.y[i]] + v = mean_list[i - 1] + iv = inv_cov_list[i - 1] + # 1e-4 is regression param to be similar to collision probability + inv_dist.append(1e-4 / mahalanobis(u, v, iv)) + else: + inv_dist.append(0.0) + collision_prob_dict[obstacle_id] = inv_dist + + return collision_prob_dict + + def get_prob_via_cdf( multi_norm, upper_right_point: np.array, lower_left_point: np.array ): diff --git a/risk_assessment/harm_estimation.py b/risk_assessment/harm_estimation.py index a7f1319..4203802 100644 --- a/risk_assessment/harm_estimation.py +++ b/risk_assessment/harm_estimation.py @@ -1,9 +1,10 @@ """Harm estimation function calling models based on risk json.""" +import numpy as np from commonroad.scenario.obstacle import ObstacleType from EthicalTrajectoryPlanning.risk_assessment.helpers.harm_parameters import HarmParameters -from EthicalTrajectoryPlanning.risk_assessment.helpers.properties import get_obstacle_mass +from EthicalTrajectoryPlanning.risk_assessment.helpers.properties import calc_crash_angle, get_obstacle_mass from EthicalTrajectoryPlanning.risk_assessment.utils.logistic_regression import ( get_protected_log_reg_harm, get_unprotected_log_reg_harm, @@ -12,10 +13,29 @@ get_protected_ref_speed_harm, get_unprotected_ref_speed_harm, ) +from EthicalTrajectoryPlanning.risk_assessment.utils.reference_speed_symmetrical import ( + get_protected_inj_prob_ref_speed_complete_sym, + get_protected_inj_prob_ref_speed_ignore_angle, + get_protected_inj_prob_ref_speed_reduced_sym, +) +from EthicalTrajectoryPlanning.risk_assessment.utils.reference_speed_asymmetrical import ( + get_protected_inj_prob_ref_speed_complete, + get_protected_inj_prob_ref_speed_reduced, +) from EthicalTrajectoryPlanning.risk_assessment.utils.gidas import ( get_protected_gidas_harm, get_unprotected_gidas_harm, ) +from EthicalTrajectoryPlanning.risk_assessment.utils.logistic_regression_symmetrical import ( + get_protected_inj_prob_log_reg_complete_sym, + get_protected_inj_prob_log_reg_ignore_angle, + get_protected_inj_prob_log_reg_reduced_sym, +) +from EthicalTrajectoryPlanning.risk_assessment.utils.logistic_regression_asymmetrical import ( + get_protected_inj_prob_log_reg_complete, + get_protected_inj_prob_log_reg_reduced, +) + # Dictionary for existence of protective crash structure. obstacle_protection = { @@ -177,3 +197,320 @@ def harm_model( ) return ego_vehicle.harm, obstacle.harm, ego_vehicle, obstacle + + +def get_harm(scenario, traj, predictions, ego_id, vehicle_params, modes, coeffs, timer): + """Get harm. + + Args: + scenario (_type_): _description_ + traj (_type_): _description_ + predictions (_type_): _description_ + ego_id (_type_): _description_ + vehicle_params (_type_): _description_ + modes (_type_): _description_ + coeffs (_type_): _description_ + timer (_type_): _description_ + + Returns: + _type_: _description_ + """ + # get the IDs of the predicted obstacles + obstacle_ids = list(predictions.keys()) + + # max_pred_length = 0 + + ego_harm_traj = {} + obst_harm_traj = {} + + # get the ego vehicle size + # ego_vehicle_size = vehicle_params.w * vehicle_params.l + + for obstacle_id in obstacle_ids: + # choose which model should be used to calculate the harm + ego_harm_fun, obstacle_harm_fun = get_model(modes, obstacle_id, scenario) + # only calculate the risk as long as both obstacles are in the scenario + pred_path = predictions[obstacle_id]['pos_list'] + pred_length = min(len(traj.t) - 1, len(pred_path)) + if pred_length == 0: + continue + + # get max prediction length + # if pred_length > max_pred_length: + # max_pred_length = pred_length + + # get the size, the velocity and the orientation of the predicted + # vehicle + pred_size = ( + predictions[obstacle_id]['shape']['length'] + * predictions[obstacle_id]['shape']['width'] + ) + pred_v = np.array(predictions[obstacle_id]['v_list'], dtype=np.float) + pred_yaw = np.array(predictions[obstacle_id]['orientation_list'], dtype=np.float) + + # lists to save ego and obstacle harm as well as ego and obstacle risk + # one list per obstacle + ego_harm_obst = [] + obst_harm_obst = [] + + # replace get_obstacle_mass() by get_obstacle_mass() + # get the predicted obstacle vehicle mass + obstacle_mass = get_obstacle_mass( + obstacle_type=scenario.obstacle_by_id(obstacle_id).obstacle_type, size=pred_size + ) + + # calc crash angle if comprehensive mode selected + if modes["crash_angle_simplified"] is False: + + with timer.time_with_cm( + "simulation/sort trajectories/calculate costs/calculate risk/" + + "calculate harm/calculate PDOF comp" + ): + + pdof, ego_angle, obs_angle = calc_crash_angle( + traj=traj, + predictions=predictions, + scenario=scenario, + obstacle_id=obstacle_id, + modes=modes, + vehicle_params=vehicle_params, + ) + + for i in range(pred_length): + with timer.time_with_cm( + "simulation/sort trajectories/calculate costs/calculate risk/" + + "calculate harm/harm_model" + ): + + # get the harm ego harm and the harm of the collision opponent + ego_harm, obst_harm, ego_harm_data, obst_harm_data = harm_model( + scenario=scenario, + ego_vehicle_id=ego_id, + vehicle_params=vehicle_params, + ego_velocity=traj.v[i], + ego_yaw=traj.yaw[i], + obstacle_id=obstacle_id, + obstacle_size=pred_size, + obstacle_velocity=pred_v[i], + obstacle_yaw=pred_yaw[i], + pdof=pdof, + ego_angle=ego_angle, + obs_angle=obs_angle, + modes=modes, + coeffs=coeffs, + ) + + # store information to calculate harm and harm value in list + ego_harm_obst.append(ego_harm) + obst_harm_obst.append(obst_harm) + else: + # calc the risk for every time step + with timer.time_with_cm( + "simulation/sort trajectories/calculate costs/calculate risk/" + + "calculate harm/calculate PDOF simple" + ): + # crash angle between ego vehicle and considered obstacle [rad] + pdof_array = predictions[obstacle_id]["orientation_list"][:pred_length] - traj.yaw[:pred_length] + np.pi + rel_angle_array = np.arctan2(predictions[obstacle_id]["pos_list"][:pred_length, 1] - traj.y[:pred_length], + predictions[obstacle_id]["pos_list"][:pred_length, 0] - traj.x[:pred_length]) + # angle of impact area for the ego vehicle + ego_angle_array = rel_angle_array - traj.yaw[:pred_length] + # angle of impact area for the obstacle + obs_angle_array = np.pi + rel_angle_array - predictions[obstacle_id]["orientation_list"][:pred_length] + + # calculate the difference between pre-crash and post-crash speed + delta_v_array = np.sqrt( + np.power(traj.v[:pred_length], 2) + + np.power(pred_v[:pred_length], 2) + + 2 * traj.v[:pred_length] * pred_v[:pred_length] * np.cos(pdof_array) + ) + ego_delta_v = obstacle_mass / (vehicle_params.m + obstacle_mass) * delta_v_array + obstacle_delta_v = vehicle_params.m / (vehicle_params.m + obstacle_mass) * delta_v_array + + # calculate harm besed on selected model + ego_harm_obst = ego_harm_fun(velocity=ego_delta_v, angle=ego_angle_array, coeff=coeffs) + obst_harm_obst = obstacle_harm_fun(velocity=obstacle_delta_v, angle=obs_angle_array, coeff=coeffs) + # store harm list for the obstacles in dictionary for current frenét + # trajectory + ego_harm_traj[obstacle_id] = ego_harm_obst + obst_harm_traj[obstacle_id] = obst_harm_obst + + return ego_harm_traj, obst_harm_traj + + +def get_model(modes, obstacle_id, scenario): + """Get harm model according to settings. + + Args: + modes (_type_): _description_ + obstacle_id (_type_): _description_ + scenario (_type_): _description_ + + Raises: + ValueError: _description_ + + Returns: + _type_: _description_ + """ + # obstacle protection type + obs_protection = obstacle_protection[scenario.obstacle_by_id(obstacle_id).obstacle_type] + + if modes["harm_mode"] == "log_reg": + # select case based on protection structure + if obs_protection is True: + # calculate harm based on angle mode + if modes["ignore_angle"] is False: + if modes["sym_angle"] is False: + if modes["reduced_angle_areas"] is False: + # use log reg complete + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_log_reg_complete + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_log_reg_complete + + else: + # use log reg reduced + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_log_reg_reduced + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_log_reg_reduced + else: + if modes["reduced_angle_areas"] is False: + # use log reg sym complete + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_log_reg_complete_sym + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_log_reg_complete_sym + else: + # use log reg sym reduced + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_log_reg_reduced_sym + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_log_reg_reduced_sym + else: + # use log reg delta v + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_log_reg_ignore_angle + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_log_reg_ignore_angle + + elif obs_protection is False: + # calc ego harm + ego_harm = get_protected_inj_prob_log_reg_ignore_angle + + # calculate obstacle harm + # logistic regression model + obstacle_harm = lambda velocity,angle,coeff : 1 / ( # noqa E731 + 1 + + np.exp( + coeff["pedestrian"]["const"] + - coeff["pedestrian"]["speed"] * velocity + ) + ) + else: + ego_harm = lambda velocity,angle,coeff : 1 # noqa E731 + obstacle_harm = lambda velocity,angle,coeff : 1 # noqa E731 + + elif modes["harm_mode"] == "ref_speed": + # select case based on protection structure + if obs_protection is True: + # calculate harm based on angle mode + if modes["ignore_angle"] is False: + if modes["sym_angle"] is False: + if modes["reduced_angle_areas"] is False: + # use log reg complete + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_ref_speed_complete + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_ref_speed_complete + + else: + # use log reg reduced + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_ref_speed_reduced + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_ref_speed_reduced + else: + if modes["reduced_angle_areas"] is False: + # use log reg sym complete + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_ref_speed_complete_sym + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_ref_speed_complete_sym + else: + # use log reg sym reduced + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_ref_speed_reduced_sym + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_ref_speed_reduced_sym + else: + # use log reg delta v + # calculate harm for the ego vehicle + ego_harm = get_protected_inj_prob_ref_speed_ignore_angle + + # calculate harm for the obstacle vehicle + obstacle_harm = get_protected_inj_prob_ref_speed_ignore_angle + + elif obs_protection is False: + # calc ego harm + ego_harm = get_protected_inj_prob_ref_speed_ignore_angle + + # calculate obstacle harm + # logistic regression model + obstacle_harm = lambda velocity,angle,coeff : 1 / ( # noqa E731 + 1 + + np.exp( + coeff["pedestrian"]["const"] + - coeff["pedestrian"]["speed"] * velocity + ) + ) + else: + ego_harm = lambda velocity,angle,coeff : 1 # noqa E731 + obstacle_harm = lambda velocity,angle,coeff : 1 # noqa E731 + + elif modes["harm_mode"] == "gidas": + # select case based on protection structure + if obs_protection is True: + ego_harm = lambda velocity,angle,coeff : 1 / ( # noqa E731 + 1 + np.exp(-coeff["gidas"]["const"] - coeff["gidas"]["speed"] * velocity) + ) + + obs_harm = lambda velocity,angle,coeff : 1 / ( # noqa E731 + 1 + + np.exp(-coeff["gidas"]["const"] - coeff["gidas"]["speed"] * velocity) + ) + elif obs_protection is False: + # calc ego harm + ego_harm = lambda velocity,angle,coeff : 1 / ( # noqa E731 + 1 + np.exp(-coeff["gidas"]["const"] - coeff["gidas"]["speed"] * velocity) + ) + + # calculate obstacle harm + # logistic regression model + obstacle_harm = lambda velocity, angle, coeff: 1 / ( # noqa E731 + 1 + + np.exp( + coeff["pedestrian_MAIS2+"]["const"] + - coeff["pedestrian_MAIS2+"]["speed"] * velocity + ) + ) + else: + ego_harm = lambda velocity, angle, coeff: 1 # noqa E731 + obstacle_harm = lambda velocity, angle, coeff: 1 # noqa E731 + + else: + raise ValueError( + "Please select a valid mode for harm estimation " + "(log_reg, ref_speed, gidas)" + ) + + return ego_harm, obstacle_harm diff --git a/risk_assessment/helpers/properties.py b/risk_assessment/helpers/properties.py index 88e1f24..af9f510 100644 --- a/risk_assessment/helpers/properties.py +++ b/risk_assessment/helpers/properties.py @@ -22,21 +22,28 @@ def get_obstacle_mass(obstacle_type, size): Returns: Mass (float): Estimated mass of considered obstacle. """ - mass_mapping = { - ObstacleType.CAR: -1333.5 + 526.9 * np.power(size, 0.8), - ObstacleType.TRUCK: 25000, - ObstacleType.BUS: 13000, - ObstacleType.BICYCLE: 90, - ObstacleType.PEDESTRIAN: 75, - ObstacleType.PRIORITY_VEHICLE: -1333.5 + 526.9 * np.power(size, 0.8), - ObstacleType.PARKED_VEHICLE: -1333.5 + 526.9 * np.power(size, 0.8), - ObstacleType.TRAIN: 118800, - ObstacleType.MOTORCYCLE: 250, - ObstacleType.TAXI: -1333.5 + 526.9 * np.power(size, 0.8), - ObstacleType.UNKNOWN: 100, - } - - return mass_mapping[obstacle_type] + if obstacle_type == ObstacleType.CAR: + return -1333.5 + 526.9 * np.power(size, 0.8) + elif obstacle_type == ObstacleType.TRUCK: + return 25000 + elif obstacle_type == ObstacleType.BUS: + return 13000 + elif obstacle_type == ObstacleType.BICYCLE: + return 90 + elif obstacle_type == ObstacleType.PEDESTRIAN: + return 75 + elif obstacle_type == ObstacleType.PRIORITY_VEHICLE: + return -1333.5 + 526.9 * np.power(size, 0.8) + elif obstacle_type == ObstacleType.PARKED_VEHICLE: + return -1333.5 + 526.9 * np.power(size, 0.8) + elif obstacle_type == ObstacleType.TRAIN: + return 118800 + elif obstacle_type == ObstacleType.MOTORCYCLE: + return 250 + elif obstacle_type == ObstacleType.TAXI: + return -1333.5 + 526.9 * np.power(size, 0.8) + else: + return 0 def calc_delta_v(vehicle_1, vehicle_2, pdof): diff --git a/risk_assessment/risk_costs.py b/risk_assessment/risk_costs.py index 87a9f79..ce355b2 100644 --- a/risk_assessment/risk_costs.py +++ b/risk_assessment/risk_costs.py @@ -3,13 +3,10 @@ from EthicalTrajectoryPlanning.planner.Frenet.utils.validity_checks import create_collision_object from commonroad_dc.collision.trajectory_queries import trajectory_queries -from EthicalTrajectoryPlanning.risk_assessment.harm_estimation import harm_model +from EthicalTrajectoryPlanning.risk_assessment.harm_estimation import get_harm from EthicalTrajectoryPlanning.risk_assessment.collision_probability import ( - get_collision_probability, -) -from EthicalTrajectoryPlanning.risk_assessment.helpers.properties import ( - calc_crash_angle, - calc_crash_angle_simple, + get_collision_probability_fast, + get_inv_mahalanobis_dist ) from EthicalTrajectoryPlanning.risk_assessment.helpers.timers import ExecTimer from EthicalTrajectoryPlanning.planner.utils.responsibility import assign_responsibility_by_action_space, calc_responsibility_reach_set @@ -60,11 +57,20 @@ def calc_risk( "simulation/sort trajectories/calculate costs/calculate risk/" + "collision probability" ): - coll_prob_dict = get_collision_probability( - traj=traj, - predictions=predictions, - vehicle_params=vehicle_params, - ) + + if modes["fast_prob_mahalanobis"]: + coll_prob_dict = get_inv_mahalanobis_dist( + traj=traj, + predictions=predictions, + vehicle_params=vehicle_params, + ) + + else: + coll_prob_dict = get_collision_probability_fast( + traj=traj, + predictions=predictions, + vehicle_params=vehicle_params, + ) ego_harm_traj, obst_harm_traj = get_harm( scenario, traj, predictions, ego_id, vehicle_params, modes, coeffs, timer @@ -232,8 +238,9 @@ def get_responsibility_cost(scenario, traj, ego_state, obst_risk_max, prediction _type_: _description_ """ + bool_contain_cache = None if "reach_set" in mode and reach_set is not None: - resp_cost = calc_responsibility_reach_set(traj, ego_state, reach_set) + resp_cost, bool_contain_cache = calc_responsibility_reach_set(traj, ego_state, reach_set) else: # Assign responsibility to predictions @@ -245,123 +252,4 @@ def get_responsibility_cost(scenario, traj, ego_state, obst_risk_max, prediction for key in predictions: resp_cost -= predictions[key]["responsibility"] * obst_risk_max[key] - return resp_cost - - -def get_harm(scenario, traj, predictions, ego_id, vehicle_params, modes, coeffs, timer): - """Get harm. - - Args: - scenario (_type_): _description_ - traj (_type_): _description_ - predictions (_type_): _description_ - ego_id (_type_): _description_ - vehicle_params (_type_): _description_ - modes (_type_): _description_ - coeffs (_type_): _description_ - timer (_type_): _description_ - - Returns: - _type_: _description_ - """ - # get the IDs of the predicted obstacles - obstacle_ids = list(predictions.keys()) - - max_pred_length = 0 - - ego_harm_traj = {} - obst_harm_traj = {} - for obstacle_id in obstacle_ids: - - # only calculate the risk as long as both obstacles are in the scenario - pred_path = predictions[obstacle_id]['pos_list'] - pred_length = min(len(traj.t) - 1, len(pred_path)) - if pred_length == 0: - continue - - # get max prediction length - if pred_length > max_pred_length: - max_pred_length = pred_length - - # get the size, the velocity and the orientation of the predicted - # vehicle - pred_size = ( - predictions[obstacle_id]['shape']['length'] - * predictions[obstacle_id]['shape']['width'] - ) - pred_v = predictions[obstacle_id]['v_list'] - pred_yaw = predictions[obstacle_id]['orientation_list'] - - # lists to save ego and obstacle harm as well as ego and obstacle risk - # one list per obstacle - ego_harm_obst = [] - obst_harm_obst = [] - - # calc crash angle if comprehensive mode selected - if modes["crash_angle_simplified"] is False: - - with timer.time_with_cm( - "simulation/sort trajectories/calculate costs/calculate risk/" - + "calculate harm/calculate PDOF comp" - ): - - pdof, ego_angle, obs_angle = calc_crash_angle( - traj=traj, - predictions=predictions, - scenario=scenario, - obstacle_id=obstacle_id, - modes=modes, - vehicle_params=vehicle_params, - ) - - # calc the risk for every time step - for i in range(pred_length): - - # calc crash angle simplified if selected - if modes["crash_angle_simplified"] is True: - - with timer.time_with_cm( - "simulation/sort trajectories/calculate costs/calculate risk/" - + "calculate harm/calculate PDOF simple" - ): - - pdof, ego_angle, obs_angle = calc_crash_angle_simple( - traj=traj, - predictions=predictions, - obstacle_id=obstacle_id, - time_step=i, - ) - - with timer.time_with_cm( - "simulation/sort trajectories/calculate costs/calculate risk/" - + "calculate harm/harm_model" - ): - - # get the harm ego harm and the harm of the collision opponent - ego_harm, obst_harm, ego_harm_data, obst_harm_data = harm_model( - scenario=scenario, - ego_vehicle_id=ego_id, - vehicle_params=vehicle_params, - ego_velocity=traj.v[i], - ego_yaw=traj.yaw[i], - obstacle_id=obstacle_id, - obstacle_size=pred_size, - obstacle_velocity=pred_v[i], - obstacle_yaw=pred_yaw[i], - pdof=pdof, - ego_angle=ego_angle, - obs_angle=obs_angle, - modes=modes, - coeffs=coeffs, - ) - - # store information to calculate harm and harm value in list - ego_harm_obst.append(ego_harm) - obst_harm_obst.append(obst_harm) - - # store harm list for the obstacles in dictionary for current frenét - # trajectory - ego_harm_traj[obstacle_id] = ego_harm_obst - obst_harm_traj[obstacle_id] = obst_harm_obst - - return ego_harm_traj, obst_harm_traj + return resp_cost, bool_contain_cache diff --git a/risk_assessment/utils/logistic_regression_asymmetrical.py b/risk_assessment/utils/logistic_regression_asymmetrical.py index 3fa2fa3..a38f73d 100644 --- a/risk_assessment/utils/logistic_regression_asymmetrical.py +++ b/risk_assessment/utils/logistic_regression_asymmetrical.py @@ -23,36 +23,37 @@ def get_protected_inj_prob_log_reg_complete(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -15 / 180 * np.pi < angle < 15 / 180 * np.pi: # impact 12 - area = 0 - elif 15 / 180 * np.pi <= angle < 45 / 180 * np.pi: # impact 11 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_11"] - elif -15 / 180 * np.pi >= angle > -45 / 180 * np.pi: # impact 1 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_1"] - elif 45 / 180 * np.pi <= angle < 75 / 180 * np.pi: # impact 10 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_10"] - elif -45 / 180 * np.pi >= angle > -75 / 180 * np.pi: # impact 2 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_2"] - elif 75 / 180 * np.pi <= angle < 105 / 180 * np.pi: # impact 9 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_9"] - elif -75 / 180 * np.pi >= angle > -105 / 180 * np.pi: # impact 3 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_3"] - elif 105 / 180 * np.pi <= angle < 135 / 180 * np.pi: # impact 8 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_8"] - elif -105 / 180 * np.pi >= angle > -135 / 180 * np.pi: # impact 4 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_4"] - elif 135 / 180 * np.pi <= angle < 165 / 180 * np.pi: # impact 7 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_7"] - elif -135 / 180 * np.pi >= angle > -165 / 180 * np.pi: # impact 5 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_5"] - else: # impact 6 - area = coeff["log_reg"]["complete_angle_areas"]["Imp_6"] + for i in range(len(angle)): + if -15 / 180 * np.pi < angle[i] < 15 / 180 * np.pi: # impact 12 + angle[i] = 0 + elif 15 / 180 * np.pi <= angle[i] < 45 / 180 * np.pi: # impact 11 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_11"] + elif -15 / 180 * np.pi >= angle[i] > -45 / 180 * np.pi: # impact 1 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_1"] + elif 45 / 180 * np.pi <= angle[i] < 75 / 180 * np.pi: # impact 10 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_10"] + elif -45 / 180 * np.pi >= angle[i] > -75 / 180 * np.pi: # impact 2 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_2"] + elif 75 / 180 * np.pi <= angle[i] < 105 / 180 * np.pi: # impact 9 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_9"] + elif -75 / 180 * np.pi >= angle[i] > -105 / 180 * np.pi: # impact 3 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_3"] + elif 105 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # impact 8 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_8"] + elif -105 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # impact 4 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_4"] + elif 135 / 180 * np.pi <= angle[i] < 165 / 180 * np.pi: # impact 7 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_7"] + elif -135 / 180 * np.pi >= angle[i] > -165 / 180 * np.pi: # impact 5 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_5"] + else: # impact 6 + angle[i] = coeff["log_reg"]["complete_angle_areas"]["Imp_6"] # logistic regression model p_mais = 1 / (1 + np.exp(- coeff["log_reg"]["complete_angle_areas"] ["const"] - coeff["log_reg"] ["complete_angle_areas"]["speed"] * velocity - - area)) + angle)) return p_mais @@ -76,19 +77,20 @@ def get_protected_inj_prob_log_reg_reduced(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -45 / 180 * np.pi < angle < 45 / 180 * np.pi: # front crash - area = 0 - elif 45 / 180 * np.pi <= angle < 135 / 180 * np.pi: # driver-side crash - area = coeff["log_reg"]["reduced_angle_areas"]["driver_side"] - elif -45 / 180 * np.pi >= angle > -135 / 180 * np.pi: # right-side crash - area = coeff["log_reg"]["reduced_angle_areas"]["right_side"] - else: # rear crash - area = coeff["log_reg"]["reduced_angle_areas"]["rear"] + for i in range(len(angle)): + if -45 / 180 * np.pi < angle[i] < 45 / 180 * np.pi: # front crash + angle[i] = 0 + elif 45 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # driver-side crash + angle[i] = coeff["log_reg"]["reduced_angle_areas"]["driver_side"] + elif -45 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # right-side crash + angle[i] = coeff["log_reg"]["reduced_angle_areas"]["right_side"] + else: # rear crash + angle[i] = coeff["log_reg"]["reduced_angle_areas"]["rear"] # logistic regression model p_mais = 1 / (1 + np.exp(- coeff["log_reg"]["reduced_angle_areas"] ["const"] - coeff["log_reg"] ["reduced_angle_areas"]["speed"] * velocity - - area)) + angle)) return p_mais diff --git a/risk_assessment/utils/logistic_regression_symmetrical.py b/risk_assessment/utils/logistic_regression_symmetrical.py index 598f439..d5bc2ec 100644 --- a/risk_assessment/utils/logistic_regression_symmetrical.py +++ b/risk_assessment/utils/logistic_regression_symmetrical.py @@ -23,36 +23,37 @@ def get_protected_inj_prob_log_reg_complete_sym(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -15 / 180 * np.pi < angle < 15 / 180 * np.pi: # impact 12 - area = 0 - elif 15 / 180 * np.pi <= angle < 45 / 180 * np.pi: # impact 11 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_1_11"] - elif -15 / 180 * np.pi >= angle > -45 / 180 * np.pi: # impact 1 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_1_11"] - elif 45 / 180 * np.pi <= angle < 75 / 180 * np.pi: # impact 10 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_2_10"] - elif -45 / 180 * np.pi >= angle > -75 / 180 * np.pi: # impact 2 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_2_10"] - elif 75 / 180 * np.pi <= angle < 105 / 180 * np.pi: # impact 9 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_3_9"] - elif -75 / 180 * np.pi >= angle > -105 / 180 * np.pi: # impact 3 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_3_9"] - elif 105 / 180 * np.pi <= angle < 135 / 180 * np.pi: # impact 8 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_4_8"] - elif -105 / 180 * np.pi >= angle > -135 / 180 * np.pi: # impact 4 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_4_8"] - elif 135 / 180 * np.pi <= angle < 165 / 180 * np.pi: # impact 7 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_5_7"] - elif -135 / 180 * np.pi >= angle > -165 / 180 * np.pi: # impact 5 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_5_7"] - else: # impact 6 - area = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_6"] + for i in range(len(angle)): + if -15 / 180 * np.pi < angle[i] < 15 / 180 * np.pi: # impact 12 + angle[i] = 0 + elif 15 / 180 * np.pi <= angle[i] < 45 / 180 * np.pi: # impact 11 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_1_11"] + elif -15 / 180 * np.pi >= angle[i] > -45 / 180 * np.pi: # impact 1 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_1_11"] + elif 45 / 180 * np.pi <= angle[i] < 75 / 180 * np.pi: # impact 10 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_2_10"] + elif -45 / 180 * np.pi >= angle[i] > -75 / 180 * np.pi: # impact 2 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_2_10"] + elif 75 / 180 * np.pi <= angle[i] < 105 / 180 * np.pi: # impact 9 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_3_9"] + elif -75 / 180 * np.pi >= angle[i] > -105 / 180 * np.pi: # impact 3 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_3_9"] + elif 105 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # impact 8 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_4_8"] + elif -105 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # impact 4 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_4_8"] + elif 135 / 180 * np.pi <= angle[i] < 165 / 180 * np.pi: # impact 7 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_5_7"] + elif -135 / 180 * np.pi >= angle[i] > -165 / 180 * np.pi: # impact 5 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_5_7"] + else: # impact 6 + angle[i] = coeff["log_reg"]["complete_sym_angle_areas"]["Imp_6"] # logistic regression model p_mais = 1 / (1 + np.exp(- coeff["log_reg"]["complete_sym_angle_areas"] ["const"] - coeff["log_reg"] ["complete_sym_angle_areas"]["speed"] * velocity - - area)) + angle)) return p_mais @@ -76,26 +77,37 @@ def get_protected_inj_prob_log_reg_reduced_sym(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -45 / 180 * np.pi < angle < 45 / 180 * np.pi: # front crash - area = 0 - elif 45 / 180 * np.pi <= angle < 135 / 180 * np.pi: # driver-side crash - area = coeff["log_reg"]["reduced_sym_angle_areas"]["side"] - elif -45 / 180 * np.pi >= angle > -135 / 180 * np.pi: # right-side crash - area = coeff["log_reg"]["reduced_sym_angle_areas"]["side"] - else: # rear crash - area = coeff["log_reg"]["reduced_sym_angle_areas"]["rear"] + t_a = 45 / 180 * np.pi + t_b = 3 * t_a + unpack = False + if isinstance(angle, float): + angle = [angle] + unpack = True + for i in range(len(angle)): + if -t_a < angle[i] < t_a: # front crash + angle[i] = 0 + elif t_a <= angle[i] < t_b: # driver-side crash + angle[i] = coeff["log_reg"]["reduced_sym_angle_areas"]["side"] + elif -t_a >= angle[i] > -t_b: # right-side crash + angle[i] = coeff["log_reg"]["reduced_sym_angle_areas"]["side"] + else: # rear crash + angle[i] = coeff["log_reg"]["reduced_sym_angle_areas"]["rear"] # logistic regression model p_mais = 1 / (1 + np.exp(- coeff["log_reg"]["reduced_sym_angle_areas"] ["const"] - coeff["log_reg"] ["reduced_sym_angle_areas"]["speed"] * velocity - - area)) + angle)) + if unpack: + p_mais = p_mais[0] return p_mais +# change add a parameter def get_protected_inj_prob_log_reg_ignore_angle(velocity, - coeff): + coeff, + angle=0): """ LR1S. diff --git a/risk_assessment/utils/reference_speed_asymmetrical.py b/risk_assessment/utils/reference_speed_asymmetrical.py index 7c74203..a6dc957 100644 --- a/risk_assessment/utils/reference_speed_asymmetrical.py +++ b/risk_assessment/utils/reference_speed_asymmetrical.py @@ -23,37 +23,43 @@ def get_protected_inj_prob_ref_speed_complete(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -15 / 180 * np.pi < angle < 15 / 180 * np.pi: # impact 12 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_12"] - elif 15 / 180 * np.pi <= angle < 45 / 180 * np.pi: # impact 11 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_11"] - elif -15 / 180 * np.pi >= angle > -45 / 180 * np.pi: # impact 1 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_1"] - elif 45 / 180 * np.pi <= angle < 75 / 180 * np.pi: # impact 10 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_10"] - elif -45 / 180 * np.pi >= angle > -75 / 180 * np.pi: # impact 2 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_2"] - elif 75 / 180 * np.pi <= angle < 105 / 180 * np.pi: # impact 9 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_9"] - elif -75 / 180 * np.pi >= angle > -105 / 180 * np.pi: # impact 3 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_3"] - elif 105 / 180 * np.pi <= angle < 135 / 180 * np.pi: # impact 8 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_8"] - elif -105 / 180 * np.pi >= angle > -135 / 180 * np.pi: # impact 4 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_4"] - elif 135 / 180 * np.pi <= angle < 165 / 180 * np.pi: # impact 7 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_7"] - elif -135 / 180 * np.pi >= angle > -165 / 180 * np.pi: # impact 5 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_5"] - else: # impact 6 - reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_6"] + reference = np.zeros_like(angle) + for i in range(len(angle)): + if -15 / 180 * np.pi < angle[i] < 15 / 180 * np.pi: # impact 12 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_12"] + elif 15 / 180 * np.pi <= angle[i] < 45 / 180 * np.pi: # impact 11 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_11"] + elif -15 / 180 * np.pi >= angle[i] > -45 / 180 * np.pi: # impact 1 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_1"] + elif 45 / 180 * np.pi <= angle[i] < 75 / 180 * np.pi: # impact 10 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_10"] + elif -45 / 180 * np.pi >= angle[i] > -75 / 180 * np.pi: # impact 2 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_2"] + elif 75 / 180 * np.pi <= angle[i] < 105 / 180 * np.pi: # impact 9 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_9"] + elif -75 / 180 * np.pi >= angle[i] > -105 / 180 * np.pi: # impact 3 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_3"] + elif 105 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # impact 8 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_8"] + elif -105 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # impact 4 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_4"] + elif 135 / 180 * np.pi <= angle[i] < 165 / 180 * np.pi: # impact 7 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_7"] + elif -135 / 180 * np.pi >= angle[i] > -165 / 180 * np.pi: # impact 5 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_5"] + else: # impact 6 + reference = coeff["ref_speed"]["complete_angle_areas"]["ref_speed_6"] + + temp = np.power(velocity / reference, + coeff["ref_speed"]["complete_angle_areas"]["exp"]) # model - if velocity < reference: - p_mais = np.power(velocity / reference, - coeff["ref_speed"]["complete_angle_areas"]["exp"]) - else: - p_mais = 1 + p_mais = np.zeros_like(angle) + for i in range(len(angle)): + if velocity[i] < reference[i]: + p_mais[i] = temp[i] + else: + p_mais[i] = 1 return p_mais @@ -77,24 +83,29 @@ def get_protected_inj_prob_ref_speed_reduced(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -45 / 180 * np.pi < angle < 45 / 180 * np.pi: # front crash - reference = \ - coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_front"] - elif 45 / 180 * np.pi <= angle < 135 / 180 * np.pi: # driver-side crash - reference = \ - coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_driver_side"] - elif -45 / 180 * np.pi >= angle > -135 / 180 * np.pi: # right-side crash - reference = \ - coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_right_side"] - else: # rear crash - reference = \ - coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_rear"] + reference = np.zeros_like(angle) + for i in range(len(angle)): + if -45 / 180 * np.pi < angle[i] < 45 / 180 * np.pi: # front crash + reference = \ + coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_front"] + elif 45 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # driver-side crash + reference = \ + coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_driver_side"] + elif -45 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # right-side crash + reference = \ + coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_right_side"] + else: # rear crash + reference = \ + coeff["ref_speed"]["reduced_angle_areas"]["ref_speed_rear"] # model - if velocity < reference: - p_mais = np.power(velocity / reference, + temp = np.power(velocity / reference, coeff["ref_speed"]["reduced_angle_areas"]["exp"]) - else: - p_mais = 1 + p_mais = np.zeros_like(angle) + for i in range(len(angle)): + if velocity[i] < reference[i]: + p_mais[i] = temp[i] + else: + p_mais[i] = 1 return p_mais diff --git a/risk_assessment/utils/reference_speed_symmetrical.py b/risk_assessment/utils/reference_speed_symmetrical.py index 9e6b260..1a6f97d 100644 --- a/risk_assessment/utils/reference_speed_symmetrical.py +++ b/risk_assessment/utils/reference_speed_symmetrical.py @@ -23,50 +23,54 @@ def get_protected_inj_prob_ref_speed_complete_sym(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -15 / 180 * np.pi < angle < 15 / 180 * np.pi: # impact 12 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_12"] - elif 15 / 180 * np.pi <= angle < 45 / 180 * np.pi: # impact 11 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_1_11"] - elif -15 / 180 * np.pi >= angle > -45 / 180 * np.pi: # impact 1 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_1_11"] - elif 45 / 180 * np.pi <= angle < 75 / 180 * np.pi: # impact 10 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_2_10"] - elif -45 / 180 * np.pi >= angle > -75 / 180 * np.pi: # impact 2 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_2_10"] - elif 75 / 180 * np.pi <= angle < 105 / 180 * np.pi: # impact 9 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_3_9"] - elif -75 / 180 * np.pi >= angle > -105 / 180 * np.pi: # impact 3 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_3_9"] - elif 105 / 180 * np.pi <= angle < 135 / 180 * np.pi: # impact 8 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_4_8"] - elif -105 / 180 * np.pi >= angle > -135 / 180 * np.pi: # impact 4 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_4_8"] - elif 135 / 180 * np.pi <= angle < 165 / 180 * np.pi: # impact 7 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_5_7"] - elif -135 / 180 * np.pi >= angle > -165 / 180 * np.pi: # impact 5 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_5_7"] - else: # impact 6 - reference = \ - coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_6"] + reference = np.zeros_like(angle) + for i in range(len(angle)): + if -15 / 180 * np.pi < angle[i] < 15 / 180 * np.pi: # impact 12 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_12"] + elif 15 / 180 * np.pi <= angle[i] < 45 / 180 * np.pi: # impact 11 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_1_11"] + elif -15 / 180 * np.pi >= angle[i] > -45 / 180 * np.pi: # impact 1 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_1_11"] + elif 45 / 180 * np.pi <= angle[i] < 75 / 180 * np.pi: # impact 10 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_2_10"] + elif -45 / 180 * np.pi >= angle[i] > -75 / 180 * np.pi: # impact 2 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_2_10"] + elif 75 / 180 * np.pi <= angle[i] < 105 / 180 * np.pi: # impact 9 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_3_9"] + elif -75 / 180 * np.pi >= angle[i] > -105 / 180 * np.pi: # impact 3 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_3_9"] + elif 105 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # impact 8 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_4_8"] + elif -105 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # impact 4 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_4_8"] + elif 135 / 180 * np.pi <= angle[i] < 165 / 180 * np.pi: # impact 7 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_5_7"] + elif -135 / 180 * np.pi >= angle[i] > -165 / 180 * np.pi: # impact 5 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_5_7"] + else: # impact 6 + reference = \ + coeff["ref_speed"]["complete_sym_angle_areas"]["ref_speed_6"] # model - if velocity < reference: - p_mais = \ - np.power(velocity / reference, + temp = np.power(velocity / reference, coeff["ref_speed"]["complete_sym_angle_areas"]["exp"]) - else: - p_mais = 1 + p_mais = np.zeros_like(angle) + for i in range(len(angle)): + if velocity < reference: + p_mais[i] = temp[i] + else: + p_mais[i] = 1 return p_mais @@ -90,31 +94,37 @@ def get_protected_inj_prob_ref_speed_reduced_sym(velocity, float: MAIS 3+ probability """ # get angle coefficient - if -45 / 180 * np.pi < angle < 45 / 180 * np.pi: # front crash - reference = \ - coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_front"] - elif 45 / 180 * np.pi <= angle < 135 / 180 * np.pi: # driver-side crash - reference = \ - coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_side"] - elif -45 / 180 * np.pi >= angle > -135 / 180 * np.pi: # right-side crash - reference = \ - coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_side"] - else: # front crash - reference = \ - coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_rear"] + reference = np.zeros_like(angle) + for i in range(len(angle)): + if -45 / 180 * np.pi < angle[i] < 45 / 180 * np.pi: # front crash + reference = \ + coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_front"] + elif 45 / 180 * np.pi <= angle[i] < 135 / 180 * np.pi: # driver-side crash + reference = \ + coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_side"] + elif -45 / 180 * np.pi >= angle[i] > -135 / 180 * np.pi: # right-side crash + reference = \ + coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_side"] + else: # front crash + reference = \ + coeff["ref_speed"]["reduced_sym_angle_areas"]["ref_speed_rear"] # model - if velocity < reference: - p_mais = \ - np.power(velocity / reference, + temp = np.power(velocity / reference, coeff["ref_speed"]["reduced_sym_angle_areas"]["exp"]) - else: - p_mais = 1 + p_mais = np.zeros_like(angle) + for i in range(len(angle)): + if velocity < reference: + p_mais[i] = temp[i] + else: + p_mais[i] = 1 return p_mais +# change add a parameter def get_protected_inj_prob_ref_speed_ignore_angle(velocity, + angle, coeff): """ RS1S. @@ -131,12 +141,14 @@ def get_protected_inj_prob_ref_speed_ignore_angle(velocity, float: MAIS 3+ probability """ # model - if velocity < coeff["ref_speed"]["ignore_angle"]["ref_speed"]: - p_mais = \ - np.power(velocity / + temp = np.power(velocity / coeff["ref_speed"]["ignore_angle"]["ref_speed"], coeff["ref_speed"]["ignore_angle"]["exp"]) - else: - p_mais = 1 + p_mais = np.zeros_like(angle) + for i in range(len(angle)): + if velocity[i] < coeff["ref_speed"]["ignore_angle"]["ref_speed"]: + p_mais[i] = temp[i] + else: + p_mais[i] = 1 return p_mais