diff --git a/gematria/datasets/pipelines/BUILD.bazel b/gematria/datasets/pipelines/BUILD.bazel index 954bb2f1..c7fbcdd6 100644 --- a/gematria/datasets/pipelines/BUILD.bazel +++ b/gematria/datasets/pipelines/BUILD.bazel @@ -53,6 +53,7 @@ gematria_py_binary( name = "benchmark_bbs_lib", srcs = ["benchmark_bbs_lib.py"], deps = [ + ":benchmark_cpu_scheduler", "//gematria/datasets/python:exegesis_benchmark", "//gematria/proto:execution_annotation_py_pb2", ], @@ -63,6 +64,7 @@ gematria_py_binary( srcs = ["benchmark_bbs.py"], deps = [ ":benchmark_bbs_lib", + ":benchmark_cpu_scheduler", ], ) @@ -77,6 +79,7 @@ gematria_py_test( ], deps = [ ":benchmark_bbs_lib", + ":benchmark_cpu_scheduler", "//gematria/io/python:tfrecord", "//gematria/proto:execution_annotation_py_pb2", ], diff --git a/gematria/datasets/pipelines/benchmark_bbs.py b/gematria/datasets/pipelines/benchmark_bbs.py index 9e28ee55..d79f61fa 100644 --- a/gematria/datasets/pipelines/benchmark_bbs.py +++ b/gematria/datasets/pipelines/benchmark_bbs.py @@ -20,6 +20,7 @@ from apache_beam.options import pipeline_options from gematria.datasets.pipelines import benchmark_bbs_lib +from gematria.datasets.pipelines import benchmark_cpu_scheduler _INPUT_FILE_PATTERN = flags.DEFINE_string( 'input_file_pattern', @@ -30,6 +31,15 @@ _OUTPUT_FILE_PATTERN = flags.DEFINE_string( 'output_file_pattern', None, 'The output file path/pattern.', required=True ) +_BENCHMARK_SCHEDULER = flags.DEFINE_enum( + 'benchmark_scheduler', + 'NoScheduling', + [ + scheduler_type.name + for scheduler_type in benchmark_cpu_scheduler.BenchmarkSchedulerImplementations + ], + 'The scheduler to use for choosing a core for running benchmarks.', +) def main(argv) -> None: @@ -39,7 +49,11 @@ def main(argv) -> None: beam_options = pipeline_options.PipelineOptions() pipeline_constructor = benchmark_bbs_lib.benchmark_bbs( - _INPUT_FILE_PATTERN.value, _OUTPUT_FILE_PATTERN.value + _INPUT_FILE_PATTERN.value, + _OUTPUT_FILE_PATTERN.value, + benchmark_cpu_scheduler.BenchmarkSchedulerImplementations[ + _BENCHMARK_SCHEDULER.value + ], ) with beam.Pipeline(options=beam_options) as pipeline: diff --git a/gematria/datasets/pipelines/benchmark_bbs_lib.py b/gematria/datasets/pipelines/benchmark_bbs_lib.py index 035a2935..eb3300cb 100644 --- a/gematria/datasets/pipelines/benchmark_bbs_lib.py +++ b/gematria/datasets/pipelines/benchmark_bbs_lib.py @@ -20,6 +20,7 @@ from gematria.proto import execution_annotation_pb2 from gematria.datasets.python import exegesis_benchmark +from gematria.datasets.pipelines import benchmark_cpu_scheduler _BEAM_METRIC_NAMESPACE_NAME = 'benchmark_bbs' @@ -27,8 +28,11 @@ class BenchmarkBasicBlock(beam.DoFn): """A Beam function that benchmarks basic blocks.""" - def setup(self): - self._exegesis_benchmark = exegesis_benchmark.ExegesisBenchmark.create() + def __init__( + self, + benchmark_scheduler_type: benchmark_cpu_scheduler.BenchmarkSchedulerImplementations, + ): + self._benchmark_scheduler_type = benchmark_scheduler_type self._benchmark_success_blocks = metrics.Metrics.counter( _BEAM_METRIC_NAMESPACE_NAME, 'benchmark_bbs_success' ) @@ -36,6 +40,17 @@ def setup(self): _BEAM_METRIC_NAMESPACE_NAME, 'benchmark_blocks_failed' ) + def setup(self): + self._exegesis_benchmark = exegesis_benchmark.ExegesisBenchmark.create() + self._benchmark_scheduler = ( + benchmark_cpu_scheduler.construct_benchmark_scheduler( + self._benchmark_scheduler_type + ) + ) + self._benchmarking_core = ( + self._benchmark_scheduler.setup_and_get_benchmark_core() + ) + def process( self, block_with_annotations: execution_annotation_pb2.BlockWithExecutionAnnotations, @@ -44,8 +59,10 @@ def process( benchmark_code = self._exegesis_benchmark.process_annotated_block( block_with_annotations ) + + self._benchmark_scheduler.verify() benchmark_value = self._exegesis_benchmark.benchmark_basic_block( - benchmark_code + benchmark_code, self._benchmarking_core ) self._benchmark_success_blocks.inc() yield (block_with_annotations.block_hex, benchmark_value) @@ -65,7 +82,9 @@ def process( def benchmark_bbs( - input_file_pattern: str, output_file_pattern: str + input_file_pattern: str, + output_file_pattern: str, + benchmark_scheduler_type: benchmark_cpu_scheduler.BenchmarkSchedulerImplementations, ) -> Callable[[beam.Pipeline], None]: """Creates a pipeline to benchmark BBs.""" @@ -78,7 +97,7 @@ def pipeline(root: beam.Pipeline) -> None: ) annotated_bbs_shuffled = annotated_bbs | 'Shuffle' >> beam.Reshuffle() benchmarked_blocks = annotated_bbs_shuffled | 'Benchmarking' >> beam.ParDo( - BenchmarkBasicBlock() + BenchmarkBasicBlock(benchmark_scheduler_type) ) formatted_output = benchmarked_blocks | 'Formatting' >> beam.ParDo( FormatBBsForOutput() diff --git a/gematria/datasets/pipelines/benchmark_bbs_lib_test.py b/gematria/datasets/pipelines/benchmark_bbs_lib_test.py index 6ab41f5e..2b869c1d 100644 --- a/gematria/datasets/pipelines/benchmark_bbs_lib_test.py +++ b/gematria/datasets/pipelines/benchmark_bbs_lib_test.py @@ -21,6 +21,7 @@ from gematria.datasets.pipelines import benchmark_bbs_lib from gematria.proto import execution_annotation_pb2 from gematria.io.python import tfrecord +from gematria.datasets.pipelines import benchmark_cpu_scheduler BLOCK_FOR_TESTING = execution_annotation_pb2.BlockWithExecutionAnnotations( execution_annotations=execution_annotation_pb2.ExecutionAnnotations( @@ -45,7 +46,9 @@ class BenchmarkBBsTests(absltest.TestCase): def test_benchmark_basic_block(self): - benchmark_transform = benchmark_bbs_lib.BenchmarkBasicBlock() + benchmark_transform = benchmark_bbs_lib.BenchmarkBasicBlock( + benchmark_cpu_scheduler.BenchmarkSchedulerImplementations.NoScheduling + ) benchmark_transform.setup() block_outputs = list(benchmark_transform.process(BLOCK_FOR_TESTING)) @@ -74,7 +77,9 @@ def test_benchmark_bbs(self): output_file_pattern = os.path.join(output_folder, 'bhive-output') pipeline_constructor = benchmark_bbs_lib.benchmark_bbs( - test_tfrecord.full_path, output_file_pattern + test_tfrecord.full_path, + output_file_pattern, + benchmark_cpu_scheduler.BenchmarkSchedulerImplementations.NoScheduling, ) with test_pipeline.TestPipeline() as pipeline_under_test: diff --git a/gematria/datasets/pipelines/benchmark_cpu_scheduler.py b/gematria/datasets/pipelines/benchmark_cpu_scheduler.py index 43b41e6e..bbc36047 100644 --- a/gematria/datasets/pipelines/benchmark_cpu_scheduler.py +++ b/gematria/datasets/pipelines/benchmark_cpu_scheduler.py @@ -17,6 +17,7 @@ from collections.abc import Collection import os import re +from enum import Enum class BenchmarkScheduler(metaclass=abc.ABCMeta): @@ -148,3 +149,19 @@ def verify(self): cpu_mask = list(os.sched_getaffinity(0)) if self._cpu_mask != cpu_mask: raise ValueError('Expected the CPU mask to not change.') + + +class BenchmarkSchedulerImplementations(Enum): + NoScheduling = 1 + Default = 2 + + +def construct_benchmark_scheduler( + scheduler_type: BenchmarkSchedulerImplementations, +) -> BenchmarkScheduler: + if scheduler_type == BenchmarkSchedulerImplementations.NoScheduling: + return NoSchedulingBenchmarkScheduler() + elif scheduler_type == BenchmarkSchedulerImplementations.Default: + return DefaultBenchmarkScheduler() + else: + raise ValueError('Unexpected Benchmark Scheduler Type.')