From 025dff6cef87d07c4024582a5326575616abf2a0 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 2 May 2024 19:17:20 -0400 Subject: [PATCH] Various changes to support the table movement experiments (#505) This is to make running them slightly less painful: - We avoid deleting tables from Athena - Add experiment configs for table movement (specialized scenario first) - Add tool to make physical alterations to the blueprint and placement Part of #487. --- .../15-e2e-scenarios-v2/specialized/COND | 10 ++ .../specialized/run_vector_workload_tm.sh | 64 +++++++ .../specialized/specialized_config_tm.yml | 166 ++++++++++++++++++ src/brad/admin/table_adjustments.py | 112 ++++++++++++ src/brad/blueprint/sql_gen/table.py | 17 ++ src/brad/config/file.py | 16 ++ src/brad/daemon/transition_orchestrator.py | 25 ++- src/brad/exec/admin.py | 2 + 8 files changed, 403 insertions(+), 9 deletions(-) create mode 100755 experiments/15-e2e-scenarios-v2/specialized/run_vector_workload_tm.sh create mode 100644 experiments/15-e2e-scenarios-v2/specialized/specialized_config_tm.yml create mode 100644 src/brad/admin/table_adjustments.py diff --git a/experiments/15-e2e-scenarios-v2/specialized/COND b/experiments/15-e2e-scenarios-v2/specialized/COND index 439d850b..c7d38ced 100644 --- a/experiments/15-e2e-scenarios-v2/specialized/COND +++ b/experiments/15-e2e-scenarios-v2/specialized/COND @@ -19,6 +19,16 @@ run_experiment( }, ) +run_experiment( + name="brad_100g_vector_tm", + run="./run_vector_workload_tm.sh", + options={ + # NOTE: This has table movement enabled. + "system-config-file": "specialized_config_tm.yml", + **COMMON_CONFIGS, + }, +) + run_experiment( name="hand_designed_100g_vector", run="./run_vector_workload.sh", diff --git a/experiments/15-e2e-scenarios-v2/specialized/run_vector_workload_tm.sh b/experiments/15-e2e-scenarios-v2/specialized/run_vector_workload_tm.sh new file mode 100755 index 00000000..a6209ec4 --- /dev/null +++ b/experiments/15-e2e-scenarios-v2/specialized/run_vector_workload_tm.sh @@ -0,0 +1,64 @@ +#! /bin/bash + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +source ../common.sh + +# Arguments: +# --config-file +# --planner-config-file +# --query-indexes +extract_named_arguments $@ + +# Repeating query indexes: +# 51, 53, 58, 61, 62, 64, 65, 66, 69, 72, 73, 74, 77, 86, 91 +# +# Touch `title`: +# 65, 69, 73 +# +# Heavy repeating query indexes: +# 14, 54, 59, 60, 71, 75 +# +# Touch `title`: +# 14, 54, 59, 75 + +# General scenario: +# Aurora is being used for queries involving `title` because of the vector +# similarity queries that also touch `title`. After deploying BRAD, it realizes +# that it's better to replicate `title` and route the rest of the queries onto +# Redshift. + +query_indices="62,64,65,66,69,72,73,74,91,59" +heavier_queries="14,54,60,71,75" +all_queries="${query_indices},${heavier_queries}" + +start_brad $system_config_file $physical_config_file +log_workload_point "brad_start_initiated" +sleep 30 + +log_workload_point "clients_starting" +start_repeating_olap_runner 8 5 5 $all_queries "ra_8" +rana_pid=$runner_pid + +start_other_repeating_runner 2 8 5 "ra_vector" 8 +other_pid=$runner_pid + +start_txn_runner_serial 4 # Implicit: --dataset-type +txn_pid=$runner_pid +log_workload_point "clients_started" + +function inner_cancel_experiment() { + cancel_experiment $rana_pid $txn_pid $other_pid +} + +trap "inner_cancel_experiment" INT +trap "inner_cancel_experiment" TERM + +# Note that this line is different from the TM-disabled version (3 hours instead of 2). +sleep $((3 * 60 * 60)) # Wait for 3 hours. +log_workload_point "experiment_done" + +# Shut down everything now. +>&2 echo "Experiment done. Shutting down runners..." +graceful_shutdown $rana_pid $txn_pid $other_pid +log_workload_point "shutdown_complete" diff --git a/experiments/15-e2e-scenarios-v2/specialized/specialized_config_tm.yml b/experiments/15-e2e-scenarios-v2/specialized/specialized_config_tm.yml new file mode 100644 index 00000000..41d3ca39 --- /dev/null +++ b/experiments/15-e2e-scenarios-v2/specialized/specialized_config_tm.yml @@ -0,0 +1,166 @@ +# This file contains configurations that are used by BRAD. These are default +# values and should be customized for specific situations. + +# BRAD's front end servers will listen for client connections on this interface +# and port. If `num_front_ends` is greater than one, subsequent front ends will +# listen on successive ports (e.g., 6584, 6585, etc.). +front_end_interface: "0.0.0.0" +front_end_port: 6583 +num_front_ends: 16 + +# Logging paths. If the value is in ALL_CAPS (with underscores), it is +# interpreted as an environment variable (BRAD will log to the path stored in +# the environment variable). + +# Where BRAD's daemon process will write its logs. +daemon_log_file: COND_OUT + +# Where BRAD's front end processes will write their logs. +front_end_log_path: COND_OUT + +# Where BRAD's blueprint planner will write debug logs. +planner_log_path: COND_OUT + +# Where BRAD's metrics loggers will write their logs. +metrics_log_path: COND_OUT + +# Probability that each transactional query will be logged. +txn_log_prob: 0.01 + +# Set to a non-zero value enable automatic data syncing. When this is set to 0, +# automatic syncing is disabled. +data_sync_period_seconds: 0 + +# BRAD's front end servers will report their metrics at regular intervals. +front_end_metrics_reporting_period_seconds: 30 +front_end_query_latency_buffer_size: 100 + +# `default` means to use the policy encoded in the blueprint. Other values will +# override the blueprint. +routing_policy: default + +# Whether to disable table movement for benchmark purposes (i.e., keep all +# tables on all engines.) +disable_table_movement: false +skip_sync_before_table_movement: true + +# Epoch length for metrics and forecasting. This is the granularity at which +# metrics/forecasting will be performed. +epoch_length: + weeks: 0 + days: 0 + hours: 0 + minutes: 1 + +# Blueprint planning strategy. +strategy: fp_query_based_beam + +# Used to specify the period of time over which to use data for planning. +# Currrently, this is a "look behind" window for the workload. +planning_window: + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + +# Used to aggregate metrics collected in the planning window. +metrics_agg: + method: ewm # 'mean' is another option + alpha: 0.86466472 # 1 - 1 / e^2 + +# Used during planning. +reinterpret_second_as: 1 + +# The query distribution must change by at least this much for a new blueprint +# to be accepted. +query_dist_change_frac: 0.1 + +# The search bound for the provisioning. +max_provisioning_multiplier: 2.5 + +# Flag options for blueprint planning. +use_io_optimized_aurora: true +use_recorded_routing_if_available: true +ensure_tables_together_on_one_engine: true + +# Loads used to prime the system when no information is available. +aurora_initialize_load_fraction: 0.25 +redshift_initialize_load_fraction: 0.25 + +# BRAD will not reduce predicted load lower than these values. Raise these +# values to be more conservative against mispredictions. +aurora_min_load_removal_fraction: 0.8 +redshift_min_load_removal_fraction: 0.9 + +aurora_max_query_factor: 4.0 +aurora_max_query_factor_replace: 10000.0 +redshift_peak_load_threshold: 99.0 +redshift_peak_load_multiplier: 1.5 + +# Blueprint planning performance ceilings. +query_latency_p90_ceiling_s: 30.0 +txn_latency_p90_ceiling_s: 0.030 + +# Used for ordering blueprints during planning. +comparator: + type: benefit_perf_ceiling # or `perf_ceiling` + + benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator + weeks: 0 + days: 0 + hours: 24 + minutes: 0 + + penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator + penalty_power: 2 # Only used by the `benefit_perf_ceiling` comparator + +# Used for precomputed predictions. +std_datasets: + - name: regular + path: workloads/IMDB_100GB/regular_test/ + - name: adhoc + path: workloads/IMDB_100GB/adhoc_test/ + +use_preset_redshift_clusters: false + +aurora_provisioning_search_distance: 1500.0 +redshift_provisioning_search_distance: 400.0 + +planner_max_workers: 16 + +# Blueprint planning trigger configs. + +triggers: + enabled: true + check_period_s: 90 # Triggers are checked every X seconds. + check_period_offset_s: 360 # Wait 6 mins before starting. + observe_new_blueprint_mins: 3 + + elapsed_time: + disabled: true + multiplier: 60 # Multiplier over `planning_window`. + + redshift_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + aurora_cpu: + lo: 10 + hi: 85 + sustained_epochs: 3 + + variable_costs: + disabled: true + threshold: 1.0 + + query_latency_ceiling: + ceiling_s: 30.0 + sustained_epochs: 3 + + txn_latency_ceiling: + ceiling_s: 0.030 + sustained_epochs: 3 + + recent_change: + delay_epochs: 5 diff --git a/src/brad/admin/table_adjustments.py b/src/brad/admin/table_adjustments.py new file mode 100644 index 00000000..75c35b4f --- /dev/null +++ b/src/brad/admin/table_adjustments.py @@ -0,0 +1,112 @@ +import asyncio +import logging + +from brad.asset_manager import AssetManager +from brad.blueprint.manager import BlueprintManager +from brad.config.engine import Engine +from brad.config.file import ConfigFile +from brad.blueprint.blueprint import Blueprint +from brad.blueprint.sql_gen.table import TableSqlGenerator +from brad.front_end.engine_connections import EngineConnections + +logger = logging.getLogger(__name__) + + +def register_admin_action(subparser) -> None: + parser = subparser.add_parser( + "table_adjustments", + help="Used to manually modify the physical tables in BRAD's underlying infrastructure.", + ) + parser.add_argument( + "--physical-config-file", + type=str, + required=True, + help="Path to BRAD's physical configuration file.", + ) + parser.add_argument( + "--schema-name", + type=str, + required=True, + help="The schema name to use.", + ) + parser.add_argument( + "action", + type=str, + help="The action to run {remove_blueprint_table, rename_table}.", + ) + parser.add_argument( + "--table-name", type=str, help="The name of the table.", required=True + ) + parser.add_argument("--engines", type=str, nargs="+", help="The engines involved.") + parser.add_argument( + "--new-table-name", type=str, help="The new table name, when applicable." + ) + parser.set_defaults(admin_action=table_adjustments) + + +async def table_adjustments_impl(args) -> None: + # 1. Load the config, blueprint, and provisioning. + config = ConfigFile.load_from_physical_config(phys_config=args.physical_config_file) + assets = AssetManager(config) + + blueprint_mgr = BlueprintManager(config, assets, args.schema_name) + await blueprint_mgr.load() + blueprint = blueprint_mgr.get_blueprint() + directory = blueprint_mgr.get_directory() + + if args.action == "remove_blueprint_table": + # NOTE: This only removes the table from the blueprint. You need to + # manually remove it from the physical engines (if appropriate). + table_to_remove = args.table_name + new_blueprint = Blueprint( + schema_name=blueprint.schema_name(), + table_schemas=[ + table for table in blueprint.tables() if table.name != table_to_remove + ], + table_locations={ + table_name: locations + for table_name, locations in blueprint.table_locations().items() + if table_name != table_to_remove + }, + aurora_provisioning=blueprint.aurora_provisioning(), + redshift_provisioning=blueprint.redshift_provisioning(), + full_routing_policy=blueprint.get_routing_policy(), + ) + blueprint_mgr.force_new_blueprint_sync(new_blueprint, score=None) + + elif args.action == "rename_table": + engines = {Engine.from_str(engine_str) for engine_str in args.engines} + connections = EngineConnections.connect_sync( + config, + directory, + schema_name=args.schema_name, + autocommit=False, + specific_engines=engines, + ) + sqlgen = TableSqlGenerator(config, blueprint) + for engine in engines: + table = blueprint.get_table(args.table_name) + logger.info( + "On %s: Renaming table %s to %s", + str(engine), + table.name, + args.new_table_name, + ) + statements, run_on = sqlgen.generate_rename_table_sql( + table, engine, args.new_table_name + ) + conn = connections.get_connection(run_on) + cursor = conn.cursor_sync() + for stmt in statements: + cursor.execute_sync(stmt) + cursor.commit_sync() + + else: + logger.error("Unknown action %s", args.action) + + logger.info("Done.") + + +# This method is called by `brad.exec.admin.main`. +def table_adjustments(args): + asyncio.run(table_adjustments_impl(args)) diff --git a/src/brad/blueprint/sql_gen/table.py b/src/brad/blueprint/sql_gen/table.py index efc8e2c4..5ae0ab6d 100644 --- a/src/brad/blueprint/sql_gen/table.py +++ b/src/brad/blueprint/sql_gen/table.py @@ -232,6 +232,23 @@ def generate_extraction_progress_init( queries.append(initialize_template.format(table_name=table_name)) return (queries, Engine.Aurora) + def generate_rename_table_sql( + self, table: Table, location: Engine, new_name: str + ) -> Tuple[List[str], Engine]: + """ + Generates the SQL statements needed to rename a table on the given engine. + """ + if location == Engine.Aurora: + # Aurora is more complicated because we use a view with other + # metadata too. This is not currently needed. + raise RuntimeError("Aurora renames are currently unimplemented.") + + elif location == Engine.Redshift or location == Engine.Athena: + return ([f"ALTER TABLE {table.name} RENAME TO {new_name}"], location) + + else: + raise RuntimeError(f"Unsupported location {str(location)}") + def generate_create_index_sql( table: Table, indexes: List[Tuple[Column, ...]] diff --git a/src/brad/config/file.py b/src/brad/config/file.py index e7eda3d3..b8ef4054 100644 --- a/src/brad/config/file.py +++ b/src/brad/config/file.py @@ -190,6 +190,22 @@ def disable_table_movement(self) -> bool: # Table movement disabled by default. return True + @property + def skip_sync_before_movement(self) -> bool: + try: + return self._raw["skip_sync_before_table_movement"] + except KeyError: + # Skip by default. + return True + + @property + def skip_athena_table_deletion(self) -> bool: + try: + return self._raw["skip_athena_table_deletion"] + except KeyError: + # Skip by default. + return True + @property def use_preset_redshift_clusters(self) -> bool: try: diff --git a/src/brad/daemon/transition_orchestrator.py b/src/brad/daemon/transition_orchestrator.py index 06bda3d9..4b2d02cc 100644 --- a/src/brad/daemon/transition_orchestrator.py +++ b/src/brad/daemon/transition_orchestrator.py @@ -131,14 +131,17 @@ async def run_prepare_then_transition( # 2. Sync tables (TODO: discuss more efficient alternatives - # possibly add a filter of tables to run_sync) - await self._data_sync_executor.establish_connections() - ran_sync = await self._data_sync_executor.run_sync( - self._blueprint_mgr.get_blueprint() - ) - logger.debug( - """Completed data sync step during transition. """ - f"""There were {'some' if ran_sync else 'no'} new writes to sync""" - ) + if not self._config.skip_sync_before_movement: + await self._data_sync_executor.establish_connections() + ran_sync = await self._data_sync_executor.run_sync( + self._blueprint_mgr.get_blueprint() + ) + logger.debug( + """Completed data sync step during transition. """ + f"""There were {'some' if ran_sync else 'no'} new writes to sync""" + ) + else: + logger.info("Not running table sync before movement.") # 3. Create tables in new locations as needed directory = self._blueprint_mgr.get_directory() @@ -628,7 +631,11 @@ async def _run_athena_post_transition( ) -> None: # Drop removed tables to_drop = [] - if table_diffs is not None and self._config.disable_table_movement is False: + if ( + table_diffs is not None + and self._config.disable_table_movement is False + and self._config.skip_athena_table_deletion is False + ): for table_diff in table_diffs: if Engine.Athena in table_diff.removed_locations(): to_drop.append(table_diff.table_name()) diff --git a/src/brad/exec/admin.py b/src/brad/exec/admin.py index 13a970ed..d70e7d7d 100644 --- a/src/brad/exec/admin.py +++ b/src/brad/exec/admin.py @@ -15,6 +15,7 @@ import brad.admin.replay_planner as replay_planner import brad.admin.clean_dataset as clean_dataset import brad.admin.alter_schema as alter_schema +import brad.admin.table_adjustments as table_adjustments logger = logging.getLogger(__name__) @@ -43,6 +44,7 @@ def register_command(subparsers) -> None: replay_planner.register_admin_action(admin_subparsers) clean_dataset.register_admin_action(admin_subparsers) alter_schema.register_admin_action(admin_subparsers) + table_adjustments.register_admin_action(admin_subparsers) parser.set_defaults(func=main)