-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathplot_num_threads_per_disk.py
172 lines (142 loc) · 6.33 KB
/
plot_num_threads_per_disk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#
# Copyright 2016 The Regents of The University California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This script generates graphs of number of threads per disk vs. JCT. The data is assumed to have been
generated by the run_basic_disk_job.py script.
"""
import argparse
import math
from matplotlib import pyplot
from matplotlib.backends import backend_pdf
import os
from os import path
import parse_event_logs
import utils
def main():
args = __parse_args()
num_threads_to_jcts = {}
log_dir = args.log_dir
for trial_log_dir in os.listdir(log_dir):
trial_log_dir_filepath = path.join(log_dir, trial_log_dir)
if path.isdir(trial_log_dir_filepath):
utils.plot_continuous_monitors(trial_log_dir_filepath)
num_threads = __get_num_threads_from_log_dir(trial_log_dir)
jcts = __get_jcts_from_logs(trial_log_dir_filepath, args.warmup_count)
num_threads_to_jcts[num_threads] = jcts
assert len(num_threads_to_jcts) > 0, "No valid logs found in {}".format(log_dir)
__create_num_threads_vs_jct_graph(num_threads_to_jcts, args.output_dir, phase="write")
__create_num_threads_vs_jct_graph(num_threads_to_jcts, args.output_dir, phase="read")
def __parse_args():
parser = argparse.ArgumentParser(description="")
parser.add_argument(
"-l",
"--log-dir",
help=("A directory that contains several directories of logs. Each log " +
"subdirectory should contain the results from one run of " +
"org.apache.spark.examples.monotasks.disk.DiskThroughputExperiment " +
"and have been generated by the utils.copy_and_zip_all_logs() function."),
required=True)
parser.add_argument(
"-o",
"--output-dir",
help="The directory in which to store the graph PDFs. Defaults to the value of --log-dir.",
required=False)
parser.add_argument(
"-w",
"--warmup-count",
default=0,
help="The number of iterations that are warmup and should be discarded.",
required=True,
type=int)
args = parser.parse_args()
log_dir = args.log_dir
assert path.isdir(log_dir), \
"The supplied log directory does not exist or is a file: {}".format(log_dir)
if args.output_dir is None:
args.output_dir = args.log_dir
return args
def __get_jcts_from_logs(log_dir, warmup_count):
"""
Returns a tuple of (list of write job JCTs, list of read job JCTs) parsed from the event log
contained in the provided directory.
"""
event_log_filepath = path.join(log_dir, "event_log")
sorted_job_pairs = sorted(parse_event_logs.Analyzer(event_log_filepath).jobs.iteritems())
return (__get_jcts_for_phase(sorted_job_pairs, warmup_count, phase="write"),
__get_jcts_for_phase(sorted_job_pairs, warmup_count, phase="read"))
def __get_jcts_for_phase(sorted_job_pairs, warmup_count, phase):
"""
Returns a list of JCTs (in seconds) for the specified phase. sorted_job_pairs should be a list of
the form [ ( job ID, job ) ], sorted by job ID. warmup_count indicates the number of jobs that
should be discarded from the beginning of each phase. Assumes that the jobs correspond to one run
of org.apache.spark.examples.monotasks.disk.DiskThroughputExperiment.
"""
assert phase in ["write", "read"]
filterer = __write_job_filterer if phase == "write" else __read_job_filterer
return [job.runtime() / 1000.0 for job in filterer(warmup_count, sorted_job_pairs)]
def __get_num_threads_from_log_dir(log_dir):
"""
Extracts the number of threads per disk from the specified log directory name. log_dir should
be of the form: experiment_log_<experiment parameters>_<num threads per disk>_<timestamp>
"""
return int(log_dir.split("_")[-2])
def __create_num_threads_vs_jct_graph(num_threads_to_jcts, output_dir, phase):
"""
Create a graph of num threads per disk vs. JCT for the specified phase, which must be either
"write" or "read". num_threads_to_jcts should be a dictionary of the form:
{ num threads : ( list of write JCTs, list of read JCTs ) }
"""
assert phase in ["write", "read"]
num_ticks = len(num_threads_to_jcts) + 2
xmax = num_ticks - 1
max_jct = max([jct
for write_jcts, read_jcts in num_threads_to_jcts.itervalues()
for jct in (write_jcts if phase == "write" else read_jcts)])
ymax = max_jct * 1.1
pyplot.title("Num threads per disk vs. JCT ({} phase)".format(phase))
pyplot.xlabel("Num threads per disk")
pyplot.ylabel("JCT (s)")
pyplot.grid(b=True)
pyplot.xlim(xmin=0, xmax=xmax)
pyplot.ylim(ymin=0, ymax=ymax)
# Build a list of lists of JCTs, sorted by num threads per disk.
all_jcts = [write_jcts if phase == "write" else read_jcts
for _, (write_jcts, read_jcts) in sorted(num_threads_to_jcts.iteritems())]
pyplot.boxplot(all_jcts, whis=[0, 100])
# Replace the visually-correct x-axis values with the numerically correct values.
pyplot.xticks(xrange(num_ticks), [""] + sorted(num_threads_to_jcts.keys()) + [""])
# Save the graph as a PDF.
output_filepath = path.join(output_dir, "{}_phase_num_threads_vs_jct.pdf".format(phase))
with backend_pdf.PdfPages(output_filepath) as pdf:
pdf.savefig()
pyplot.close()
def __write_job_filterer(warmup_count, sorted_job_pairs):
"""
The first job generates the data. Of the remaining jobs, the first half are writes. warmup_count
specifies the number of jobs that are warmup and should be discarded.
"""
max_write_job_id = math.ceil(len(sorted_job_pairs) / 2.0) - 1
return [job for (job_id, job) in sorted_job_pairs
if (job_id > warmup_count) and (job_id <= max_write_job_id)]
def __read_job_filterer(warmup_count, sorted_job_pairs):
"""
The first job generates the data. Of the remaining jobs, the second half are reads. warmup_count
specifies the number of jobs that are warmup and should be discarded.
"""
min_read_job_id = math.ceil(len(sorted_job_pairs) / 2.0) + warmup_count
return [job for (job_id, job) in sorted_job_pairs if job_id >= min_read_job_id]
if __name__ == "__main__":
main()