Skip to content

Commit

Permalink
Add WorkerNetworkBandwidth chart to dashboard (#5104)
Browse files Browse the repository at this point in the history
  • Loading branch information
ncclementi authored Jul 27, 2021
1 parent b37ac9d commit cf1e412
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 5 deletions.
94 changes: 93 additions & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __init__(self, scheduler, **kwargs):
@without_property_validation
def update(self):
with log_errors():
workers = list(self.scheduler.workers.values())
workers = self.scheduler.workers.values()

y = list(range(len(workers)))
occupancy = [ws.occupancy for ws in workers]
Expand Down Expand Up @@ -722,6 +722,98 @@ def name(address):
update(self.source, result)


class WorkerNetworkBandwidth(DashboardComponent):
"""Worker network bandwidth chart
Plots horizontal bars with the read_bytes and write_bytes worker state
"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"y_read": [],
"y_write": [],
"x_read": [],
"x_write": [],
}
)
self.root = figure(
title="Worker Network Bandwidth",
tools="",
id="bk-worker-net-bandwidth",
name="worker_network_bandwidth",
**kwargs,
)

# read_bytes
self.root.hbar(
y="y_read",
right="x_read",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)

# write_bytes
self.root.hbar(
y="y_write",
right="x_write",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)

self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.yaxis.visible = False

@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()

h = 0.1
y_read = [i + 0.75 + i * h for i in range(len(workers))]
y_write = [i + 0.25 + i * h for i in range(len(workers))]

x_read = []
x_write = []

for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])

self.root.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.root.x_range.end,
)

result = {
"y_read": y_read,
"y_write": y_write,
"x_read": x_read,
"x_write": x_write,
}

update(self.source, result)


class ComputePerKey(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""

Expand Down
16 changes: 14 additions & 2 deletions distributed/dashboard/components/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,20 @@ def __init__(self, worker, height=150, last_count=None, **kwargs):
tools=tools,
**kwargs,
)
self.bandwidth.line(source=self.source, x="time", y="read_bytes", color="red")
self.bandwidth.line(source=self.source, x="time", y="write_bytes", color="blue")
self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
color="red",
legend_label="read",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
color="blue",
legend_label="write",
)
self.bandwidth.yaxis.axis_label = "Bytes / second"

# self.cpu.yaxis[0].formatter = NumeralTickFormatter(format='0%')
Expand Down
4 changes: 4 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TaskGroupGraph,
TaskProgress,
TaskStream,
WorkerNetworkBandwidth,
WorkersMemory,
WorkerTable,
events_doc,
Expand Down Expand Up @@ -67,6 +68,9 @@
"/individual-workers": individual_doc(WorkerTable, 500),
"/individual-bandwidth-types": individual_doc(BandwidthTypes, 500),
"/individual-bandwidth-workers": individual_doc(BandwidthWorkers, 500),
"/individual-workers-network-bandwidth": individual_doc(
WorkerNetworkBandwidth, 500
),
"/individual-memory-by-key": individual_doc(MemoryByKey, 500),
"/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
"/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),
Expand Down
32 changes: 32 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
TaskGroupGraph,
TaskProgress,
TaskStream,
WorkerNetworkBandwidth,
WorkersMemory,
WorkersMemoryHistogram,
WorkerTable,
Expand Down Expand Up @@ -476,6 +477,37 @@ async def test_WorkerTable_with_memory_limit_as_0(c, s, a, b):
assert wt.source.data["memory_percent"][0] == ""


@gen_cluster(client=True)
async def test_WorkerNetworkBandwidth(c, s, a, b):
nb = WorkerNetworkBandwidth(s)
nb.update()

assert all(len(v) == 2 for v in nb.source.data.values())

assert nb.source.data["y_read"] == [0.75, 1.85]
assert nb.source.data["y_write"] == [0.25, 1.35]


@gen_cluster(client=True)
async def test_WorkerNetworkBandwidth_metrics(c, s, a, b):
# Disable system monitor periodic callback to allow us to manually control
# when it is called below
a.periodic_callbacks["monitor"].stop()
b.periodic_callbacks["monitor"].stop()

# Update worker system monitors and send updated metrics to the scheduler
a.monitor.update()
b.monitor.update()
await asyncio.gather(a.heartbeat(), b.heartbeat())

nb = WorkerNetworkBandwidth(s)
nb.update()

for idx, ws in enumerate(s.workers.values()):
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]


@gen_cluster(client=True)
async def test_TaskGraph(c, s, a, b):
gp = TaskGraph(s)
Expand Down
5 changes: 3 additions & 2 deletions docs/source/http_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ Individual bokeh plots
- ``/individual-groups``
- ``/individual-profile``
- ``/individual-profile-server``
- ``/individual-nbytes``
- ``/individual-nbytes-cluster``
- ``/individual-workers-memory``
- ``/individual-cluster-memory``
- ``/individual-cpu``
- ``/individual-nprocessing``
- ``/individual-occupancy``
- ``/individual-workers``
- ``/individual-bandwidth-types``
- ``/individual-bandwidth-workers``
- ``/individual-workers-network-bandwidth``
- ``/individual-memory-by-key``
- ``/individual-compute-time-per-key``
- ``/individual-aggregate-time-per-action``
Expand Down

0 comments on commit cf1e412

Please sign in to comment.