From 09a6cde878219055d762d72b1aa66002579ba588 Mon Sep 17 00:00:00 2001 From: Aiden Grossman Date: Mon, 30 Dec 2024 16:45:21 -0800 Subject: [PATCH] Add basic benchmark scheduler This patch introduces the BenchmarkScheduler interface that sets the CPU affinity mask for the current process and gets a benchmark core for use by llvm-exegesis. This patch additional introduces a BenchmarkScheduler implementation that simply does nothing, which will preserve the behavior that we have currently where we do not explicitly set CPU masks. Reviewers: orodley, ondrasej Reviewed By: ondrasej Pull Request: https://github.com/google/gematria/pull/270 --- gematria/datasets/pipelines/BUILD.bazel | 14 ++ .../pipelines/benchmark_cpu_scheduler.py | 143 ++++++++++++++++++ .../pipelines/benchmark_cpu_scheduler_test.py | 128 ++++++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 gematria/datasets/pipelines/benchmark_cpu_scheduler.py create mode 100644 gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py diff --git a/gematria/datasets/pipelines/BUILD.bazel b/gematria/datasets/pipelines/BUILD.bazel index df51ec31..954bb2f1 100644 --- a/gematria/datasets/pipelines/BUILD.bazel +++ b/gematria/datasets/pipelines/BUILD.bazel @@ -81,3 +81,17 @@ gematria_py_test( "//gematria/proto:execution_annotation_py_pb2", ], ) + +gematria_py_library( + name = "benchmark_cpu_scheduler", + srcs = ["benchmark_cpu_scheduler.py"], +) + +gematria_py_test( + name = "benchmark_cpu_scheduler_test", + size = "small", + srcs = ["benchmark_cpu_scheduler_test.py"], + deps = [ + ":benchmark_cpu_scheduler", + ], +) diff --git a/gematria/datasets/pipelines/benchmark_cpu_scheduler.py b/gematria/datasets/pipelines/benchmark_cpu_scheduler.py new file mode 100644 index 00000000..39b52815 --- /dev/null +++ b/gematria/datasets/pipelines/benchmark_cpu_scheduler.py @@ -0,0 +1,143 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing_extensions import override +from collections.abc import Iterable +import os +import re + + +class BenchmarkScheduler(metaclass=abc.ABCMeta): + """Schedules a benchmark and the parent process to reduce noise. + + BenchmarkScheduler is an abstraction that provides two main pieces of + functionality. Firstly, it provides a function + (setup_and_get_benchmark_core) that allows an implementation to perform any + necessary setup in the parent process and provide a core ID that should be + used to perform any benchmarking. Additionally, implementations are + intended to hold state to verify that the expected state is maintained and + not changed by external software. + """ + + @abc.abstractmethod + def setup_and_get_benchmark_core(self) -> int | None: + """Sets up the parent process and chooses a benchmark core. + + This function will perform any relevant setup in the parent process, + and return a core ID that can be used to run benchmarks on. + + Returns: + Returns an integer core ID to specify a core that should be used for + running benchmarks, or None to indicate that any core can be used. + """ + + @abc.abstractmethod + def verify(self): + """Verifies that conditions match what is expected. + + This function allows for implementations to verify that the original + setup created in setup_and_get_benchmark_core is maintained for every + benchmark that is run. + """ + + +class NoSchedulingBenchmarkScheduler(BenchmarkScheduler): + """A basic BenchmarkScheduler implementation that does nothing. + + This BenchmarkScheduler implementation does nothing. It leaves scheduling + of the benchmarking process to the operating system by specifying any core + can be used for benchmarking, performs no setup in the parent process, and + performs no verification. + """ + + @override + def setup_and_get_benchmark_core(self) -> int | None: + return None + + @override + def verify(self): + pass + + +class DefaultBenchmarkScheduler(BenchmarkScheduler): + """A BenchmarkScheduler that schedules processes separately. + + DefaultBenchmarkScheduler schedules the main process and the benchmark + subprocess on separate cores, making sure to reserve the second hyperthread + on the benchmarking core to prevent interference. It expects that the main + process is initially given a CPU Mask with three active threads, additionally + assuming that two of the threads are neighboring (part of the same core). + Errors are raised if these conditions are not met. The benchmarking core + returned is one of the two neighboring threads. The main process has its + COU mask limited to the thread that neighbors neither of the other threads. + """ + + def __init__(self): + self._cpu_mask = [] + + @staticmethod + def _get_neighboring_threads(cpu_index: int) -> list[int]: + with open( + f'/sys/devices/system/cpu/cpu{cpu_index}/topology/thread_siblings_list' + ) as thread_sibling_list_handle: + neighboring_threads_strings = re.split( + r'[-,]+', thread_sibling_list_handle.read().strip() + ) + neighboring_threads = [ + int(cpu_index_str) for cpu_index_str in neighboring_threads_strings + ] + return neighboring_threads + + def _get_aux_core_and_hyperthread_pair( + self, + cpu_mask: Iterable[int], + ) -> tuple[int, list[int]]: + for cpu_index in cpu_mask: + neighboring_threads = self._get_neighboring_threads(cpu_index) + if len(neighboring_threads) != 2: + raise ValueError('Expected two hyperthreads per CPU.') + + if ( + neighboring_threads[0] in cpu_mask + and neighboring_threads[1] in cpu_mask + ): + cpus = list(cpu_mask) + cpus.remove(neighboring_threads[0]) + cpus.remove(neighboring_threads[1]) + return (cpus[0], [neighboring_threads[0], neighboring_threads[1]]) + raise ValueError( + 'Expected a pair of neighboring hyperthreads in the CPU mask.' + ) + + @override + def setup_and_get_benchmark_core(self) -> int | None: + cpu_mask = os.sched_getaffinity(0) + + if len(cpu_mask) != 3: + raise ValueError('Expected to have three CPUs.') + + aux_core, hyperthread_pair = self._get_aux_core_and_hyperthread_pair( + cpu_mask + ) + os.sched_setaffinity(0, [aux_core]) + self._cpu_mask = [aux_core] + + return hyperthread_pair[0] + + @override + def verify(self): + cpu_mask = list(os.sched_getaffinity(0)) + if self._cpu_mask != cpu_mask: + raise ValueError('Expected the CPU mask to not change.') diff --git a/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py b/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py new file mode 100644 index 00000000..be092cd0 --- /dev/null +++ b/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py @@ -0,0 +1,128 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from collections.abc import Iterable + +from absl.testing import absltest + +from gematria.datasets.pipelines import benchmark_cpu_scheduler + + +class BenchmarkSchedulerTests(absltest.TestCase): + + def test_no_scheduling(self): + scheduler = benchmark_cpu_scheduler.NoSchedulingBenchmarkScheduler() + self.assertIsNone(scheduler.setup_and_get_benchmark_core()) + scheduler.verify() + + def test_default_scheduler_get_neighboring_threads(self): + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + neighboring_threads = scheduler._get_neighboring_threads(0) + + # Just check that we get two CPU ids back that are not the same. We cannot + # do much more without knowing more about the system topology, and this + # should be a reasonable enough test. + self.assertLen(neighboring_threads, 2) + self.assertNotEqual(neighboring_threads[0], neighboring_threads[1]) + + @staticmethod + def _set_normal_affinity(): + cpu_mask = os.sched_getaffinity(0) + cpu_mask_list = list(cpu_mask) + aux_cpu = cpu_mask.pop() + hyperthread_pair_part = cpu_mask.pop() + hyperthread_pair = benchmark_cpu_scheduler.DefaultBenchmarkScheduler._get_neighboring_threads( + hyperthread_pair_part + ) + new_cpu_mask = [aux_cpu, *hyperthread_pair] + + os.sched_setaffinity(0, new_cpu_mask) + return (aux_cpu, hyperthread_pair, cpu_mask_list) + + @staticmethod + def _reset_cpu_affinity(cpu_mask: Iterable[int]): + os.sched_setaffinity(0, cpu_mask) + + def test_default_scheduler_get_cores(self): + expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = ( + self._set_normal_affinity() + ) + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + cpu_mask = os.sched_getaffinity(0) + aux_cpu, hyperthread_pair = scheduler._get_aux_core_and_hyperthread_pair( + cpu_mask + ) + self.assertEqual(aux_cpu, expected_aux_cpu) + self.assertContainsSubsequence(hyperthread_pair, expected_hyperthread_pair) + self._reset_cpu_affinity(old_cpu_mask) + + def test_default_scheduler_get_cores_no_neighboring_threads(self): + cpu_mask = os.sched_getaffinity(0) + three_cores = [cpu_mask.pop(), cpu_mask.pop(), cpu_mask.pop()] + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + with self.assertRaises(ValueError): + scheduler._get_aux_core_and_hyperthread_pair(three_cores) + + def test_default_scheduler_setup(self): + expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = ( + self._set_normal_affinity() + ) + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + benchmark_core = scheduler.setup_and_get_benchmark_core() + self.assertIn(benchmark_core, expected_hyperthread_pair) + set_cpu_mask = os.sched_getaffinity(0) + self.assertLen(set_cpu_mask, 1) + self.assertEqual(set_cpu_mask.pop(), expected_aux_cpu) + + self._reset_cpu_affinity(old_cpu_mask) + + def test_default_scheduler_not_three_cpus(self): + old_cpu_mask = os.sched_getaffinity(0) + cpu_mask_list = list(old_cpu_mask) + os.sched_setaffinity(0, cpu_mask_list[0:2]) + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + with self.assertRaises(ValueError): + scheduler.setup_and_get_benchmark_core() + + os.sched_setaffinity(0, old_cpu_mask) + + def test_default_scheduler_verify(self): + _, _, old_cpu_mask = self._set_normal_affinity() + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + scheduler.setup_and_get_benchmark_core() + scheduler.verify() + + self._reset_cpu_affinity(old_cpu_mask) + + def test_default_scheduler_verify_mask_changed(self): + _, _, old_cpu_mask = self._set_normal_affinity() + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + scheduler.setup_and_get_benchmark_core() + + cpu_mask_list = list(old_cpu_mask) + os.sched_setaffinity(0, cpu_mask_list[1:3]) + with self.assertRaises(ValueError): + scheduler.verify() + + self._reset_cpu_affinity(old_cpu_mask) + + +if __name__ == '__main__': + absltest.main()