Skip to content

Commit cf1e412

Browse files
authored
Add WorkerNetworkBandwidth chart to dashboard (#5104)
1 parent b37ac9d commit cf1e412

File tree

5 files changed

+146
-5
lines changed

5 files changed

+146
-5
lines changed

distributed/dashboard/components/scheduler.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def __init__(self, scheduler, **kwargs):
142142
@without_property_validation
143143
def update(self):
144144
with log_errors():
145-
workers = list(self.scheduler.workers.values())
145+
workers = self.scheduler.workers.values()
146146

147147
y = list(range(len(workers)))
148148
occupancy = [ws.occupancy for ws in workers]
@@ -722,6 +722,98 @@ def name(address):
722722
update(self.source, result)
723723

724724

725+
class WorkerNetworkBandwidth(DashboardComponent):
726+
"""Worker network bandwidth chart
727+
728+
Plots horizontal bars with the read_bytes and write_bytes worker state
729+
"""
730+
731+
def __init__(self, scheduler, **kwargs):
732+
with log_errors():
733+
self.scheduler = scheduler
734+
self.source = ColumnDataSource(
735+
{
736+
"y_read": [],
737+
"y_write": [],
738+
"x_read": [],
739+
"x_write": [],
740+
}
741+
)
742+
self.root = figure(
743+
title="Worker Network Bandwidth",
744+
tools="",
745+
id="bk-worker-net-bandwidth",
746+
name="worker_network_bandwidth",
747+
**kwargs,
748+
)
749+
750+
# read_bytes
751+
self.root.hbar(
752+
y="y_read",
753+
right="x_read",
754+
line_color=None,
755+
left=0,
756+
height=0.5,
757+
fill_color="red",
758+
legend_label="read",
759+
source=self.source,
760+
)
761+
762+
# write_bytes
763+
self.root.hbar(
764+
y="y_write",
765+
right="x_write",
766+
line_color=None,
767+
left=0,
768+
height=0.5,
769+
fill_color="blue",
770+
legend_label="write",
771+
source=self.source,
772+
)
773+
774+
self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
775+
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
776+
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
777+
self.root.xaxis.minor_tick_line_alpha = 0
778+
self.root.x_range = Range1d(start=0)
779+
self.root.yaxis.visible = False
780+
self.root.ygrid.visible = False
781+
self.root.toolbar_location = None
782+
self.root.yaxis.visible = False
783+
784+
@without_property_validation
785+
def update(self):
786+
with log_errors():
787+
workers = self.scheduler.workers.values()
788+
789+
h = 0.1
790+
y_read = [i + 0.75 + i * h for i in range(len(workers))]
791+
y_write = [i + 0.25 + i * h for i in range(len(workers))]
792+
793+
x_read = []
794+
x_write = []
795+
796+
for ws in workers:
797+
x_read.append(ws.metrics["read_bytes"])
798+
x_write.append(ws.metrics["write_bytes"])
799+
800+
self.root.x_range.end = max(
801+
max(x_read),
802+
max(x_write),
803+
100_000_000,
804+
0.95 * self.root.x_range.end,
805+
)
806+
807+
result = {
808+
"y_read": y_read,
809+
"y_write": y_write,
810+
"x_read": x_read,
811+
"x_write": x_write,
812+
}
813+
814+
update(self.source, result)
815+
816+
725817
class ComputePerKey(DashboardComponent):
726818
"""Bar chart showing time spend in action by key prefix"""
727819

distributed/dashboard/components/shared.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,8 +482,20 @@ def __init__(self, worker, height=150, last_count=None, **kwargs):
482482
tools=tools,
483483
**kwargs,
484484
)
485-
self.bandwidth.line(source=self.source, x="time", y="read_bytes", color="red")
486-
self.bandwidth.line(source=self.source, x="time", y="write_bytes", color="blue")
485+
self.bandwidth.line(
486+
source=self.source,
487+
x="time",
488+
y="read_bytes",
489+
color="red",
490+
legend_label="read",
491+
)
492+
self.bandwidth.line(
493+
source=self.source,
494+
x="time",
495+
y="write_bytes",
496+
color="blue",
497+
legend_label="write",
498+
)
487499
self.bandwidth.yaxis.axis_label = "Bytes / second"
488500

489501
# self.cpu.yaxis[0].formatter = NumeralTickFormatter(format='0%')

distributed/dashboard/scheduler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
TaskGroupGraph,
2020
TaskProgress,
2121
TaskStream,
22+
WorkerNetworkBandwidth,
2223
WorkersMemory,
2324
WorkerTable,
2425
events_doc,
@@ -67,6 +68,9 @@
6768
"/individual-workers": individual_doc(WorkerTable, 500),
6869
"/individual-bandwidth-types": individual_doc(BandwidthTypes, 500),
6970
"/individual-bandwidth-workers": individual_doc(BandwidthWorkers, 500),
71+
"/individual-workers-network-bandwidth": individual_doc(
72+
WorkerNetworkBandwidth, 500
73+
),
7074
"/individual-memory-by-key": individual_doc(MemoryByKey, 500),
7175
"/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
7276
"/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),

distributed/dashboard/tests/test_scheduler_bokeh.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
TaskGroupGraph,
3636
TaskProgress,
3737
TaskStream,
38+
WorkerNetworkBandwidth,
3839
WorkersMemory,
3940
WorkersMemoryHistogram,
4041
WorkerTable,
@@ -476,6 +477,37 @@ async def test_WorkerTable_with_memory_limit_as_0(c, s, a, b):
476477
assert wt.source.data["memory_percent"][0] == ""
477478

478479

480+
@gen_cluster(client=True)
481+
async def test_WorkerNetworkBandwidth(c, s, a, b):
482+
nb = WorkerNetworkBandwidth(s)
483+
nb.update()
484+
485+
assert all(len(v) == 2 for v in nb.source.data.values())
486+
487+
assert nb.source.data["y_read"] == [0.75, 1.85]
488+
assert nb.source.data["y_write"] == [0.25, 1.35]
489+
490+
491+
@gen_cluster(client=True)
492+
async def test_WorkerNetworkBandwidth_metrics(c, s, a, b):
493+
# Disable system monitor periodic callback to allow us to manually control
494+
# when it is called below
495+
a.periodic_callbacks["monitor"].stop()
496+
b.periodic_callbacks["monitor"].stop()
497+
498+
# Update worker system monitors and send updated metrics to the scheduler
499+
a.monitor.update()
500+
b.monitor.update()
501+
await asyncio.gather(a.heartbeat(), b.heartbeat())
502+
503+
nb = WorkerNetworkBandwidth(s)
504+
nb.update()
505+
506+
for idx, ws in enumerate(s.workers.values()):
507+
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
508+
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]
509+
510+
479511
@gen_cluster(client=True)
480512
async def test_TaskGraph(c, s, a, b):
481513
gp = TaskGraph(s)

docs/source/http_services.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ Individual bokeh plots
5757
- ``/individual-groups``
5858
- ``/individual-profile``
5959
- ``/individual-profile-server``
60-
- ``/individual-nbytes``
61-
- ``/individual-nbytes-cluster``
60+
- ``/individual-workers-memory``
61+
- ``/individual-cluster-memory``
6262
- ``/individual-cpu``
6363
- ``/individual-nprocessing``
6464
- ``/individual-occupancy``
6565
- ``/individual-workers``
6666
- ``/individual-bandwidth-types``
6767
- ``/individual-bandwidth-workers``
68+
- ``/individual-workers-network-bandwidth``
6869
- ``/individual-memory-by-key``
6970
- ``/individual-compute-time-per-key``
7071
- ``/individual-aggregate-time-per-action``

0 commit comments

Comments
 (0)