Skip to content

Commit b5bd66c

Browse files
authored
Merge pull request #983 from ottointhesky/task_label_feature
Task label feature refined
2 parents 170f64d + d90ca24 commit b5bd66c

File tree

11 files changed

+338
-52
lines changed

11 files changed

+338
-52
lines changed

.github/workflows/test.yml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ jobs:
8484
8585
- name: Set up slurm
8686
if: ${{ matrix.cluster_type == 'slurm' }}
87+
# docker build can lead to race condition -> image "docker.io/library/ipp-cluster:slurm": already exists
88+
# see https://github.com/mlflow/mlflow/pull/20779
89+
# work-a-round fix: docker compose again if first call failed
8790
run: |
8891
export DOCKER_BUILDKIT=1
8992
export COMPOSE_DOCKER_CLI_BUILD=1
9093
cd ci/slurm
91-
docker compose up -d --build
94+
docker compose up -d --build || docker compose up -d --build
9295
9396
- name: Install Python (conda) ${{ matrix.python }}
9497
if: ${{ matrix.cluster_type == 'mpi' }}
@@ -128,6 +131,23 @@ jobs:
128131
pip install distributed joblib
129132
pip install --only-binary :all: matplotlib
130133
134+
- name: Start MongoDB
135+
if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster
136+
uses: supercharge/mongodb-github-action@1.12.1 # uses latest mongodb per default
137+
138+
- name: Install pymongo package
139+
if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster
140+
run: pip install pymongo
141+
142+
- name: Try to connect to mongodb
143+
if: ${{ (! matrix.runs_on) && (! matrix.cluster_type) }} # only under linux with no cluster
144+
run: |
145+
python3 <<EOF
146+
from pymongo import MongoClient
147+
client = MongoClient('mongodb://localhost:27017/',serverSelectionTimeoutMS=1)
148+
print(client.server_info())
149+
EOF
150+
131151
- name: Show environment
132152
run: pip freeze
133153

docs/make.bat

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ pushd %~dp0
44

55
REM Command file for Sphinx documentation
66

7-
if "%SPHINXBUILD%" == "" (
8-
set SPHINXBUILD=--color -W --keep-going
7+
if "%SPHINXOPTS%" == "" (
8+
set SPHINXOPTS=--color -W --keep-going
99
)
1010
if "%SPHINXBUILD%" == "" (
1111
set SPHINXBUILD=sphinx-build

docs/source/examples/basic_task_label.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
"""Basic task label example"""
1+
"""Basic task label example
2+
3+
Labels can be used for identifying or semantically grouping tasks. Using the $glob operator
4+
relevant task records can be queried from the Task Database.
5+
6+
Authors
7+
-------
8+
* ottointhesky
9+
"""
210

311
import ipyparallel as ipp
412

@@ -18,13 +26,17 @@ def wait(t):
1826
return time.time() - tic
1927

2028

29+
count = 5 # number of tasks per method
30+
2131
# use load balanced view
2232
bview = rc.load_balanced_view()
2333
ar_list_b1 = [
24-
bview.set_flags(label=f"mylabel_map_{i:02}").map_async(wait, [2]) for i in range(10)
34+
bview.set_flags(label=f"bview_map_{i:02}").map_async(wait, [2])
35+
for i in range(count)
2536
]
2637
ar_list_b2 = [
27-
bview.set_flags(label=f"mylabel_map_{i:02}").apply_async(wait, 2) for i in range(10)
38+
bview.set_flags(label=f"bview_apply_{i:02}").apply_async(wait, 2)
39+
for i in range(count)
2840
]
2941
bview.wait(ar_list_b1)
3042
bview.wait(ar_list_b2)
@@ -33,19 +45,36 @@ def wait(t):
3345
# use direct view
3446
dview = rc[:]
3547
ar_list_d1 = [
36-
dview.set_flags(label=f"mylabel_map_{i + 10:02}").apply_async(wait, 2)
37-
for i in range(10)
48+
dview.set_flags(label=f"dview_map_{i + 10:02}").map_async(wait, [2])
49+
for i in range(count)
3850
]
3951
ar_list_d2 = [
40-
dview.set_flags(label=f"mylabel_map_{i + 10:02}").map_async(wait, [2])
41-
for i in range(10)
52+
dview.set_flags(label=f"dview_apply_{i + 10:02}").apply_async(wait, 2)
53+
for i in range(count)
4254
]
4355
dview.wait(ar_list_d1)
4456
dview.wait(ar_list_d2)
4557

58+
59+
def print_records(titel, data):
60+
print(f"{titel} ({len(data)} records)")
61+
for d in data:
62+
print(
63+
f"\tmsg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}"
64+
)
65+
66+
67+
query_keys = ['msg_id', 'label', 'engine_uuid']
68+
4669
# query database
47-
data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid'])
48-
for d in data:
49-
print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}")
70+
data = rc.db_query({'label': {"$nin": ""}}, keys=query_keys)
71+
print_records("all entries with labels", data)
72+
73+
data = rc.db_query({'label': {"$glob": "dview_*"}}, keys=query_keys)
74+
print_records("all dview label entries", data)
75+
76+
data = rc.db_query({'label': {"$glob": "*_map_*"}}, keys=query_keys)
77+
print_records("all map label entries", data)
5078

79+
# stop cluster
5180
cluster.stop_cluster_sync()

docs/source/reference/db.md

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,26 @@ TaskRecord keys:
7575
| error | dict | Python traceback (error message content) |
7676
| stdout | str | Stream of stdout data |
7777
| stderr | str | Stream of stderr data |
78+
| label | str | optional user-defined task identifier |
7879

7980
MongoDB operators we emulate on all backends:
8081

81-
| Operator | Python equivalent |
82-
| -------- | ----------------- |
83-
| '\$in' | in |
84-
| '\$nin' | not in |
85-
| '\$eq' | == |
86-
| '\$ne' | != |
87-
| '\$gt' | > |
88-
| '\$gte' | >= |
89-
| '\$le' | \< |
90-
| '\$lte' | \<= |
82+
| Operator | Python equivalent |
83+
| -------- | ----------------------------------------------------------------------------- |
84+
| '\$in' | in |
85+
| '\$nin' | not in |
86+
| '\$eq' | == |
87+
| '\$ne' | != |
88+
| '\$gt' | > |
89+
| '\$gte' | >= |
90+
| '\$le' | \< |
91+
| '\$lte' | \<= |
92+
| '\$glob' | [fnmatch](https://docs.python.org/3/library/fnmatch.html) (wildcard matching) |
93+
94+
Remarks on _$glob_: The operator can be used to find substrings in DB columns based on
95+
[unix style filename pattern matching](https://docs.python.org/3/library/fnmatch.html)
96+
_$glob_ is **not** a regular MongoDB opertor, but is internally translated to a regular
97+
expression (_$regex_) which is natively supported by MongoDB.
9198

9299
The DB Query is useful for two primary cases:
93100

ipyparallel/client/client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1226,7 +1226,9 @@ def relay_comm(msg):
12261226
for callback in msg_future.iopub_callbacks:
12271227
callback(msg)
12281228

1229-
def create_message_futures(self, msg_id, header, async_result=False, track=False):
1229+
def create_message_futures(
1230+
self, msg_id, header, async_result=False, track=False, label=None
1231+
):
12301232
msg_future = MessageFuture(msg_id, header=header, track=track)
12311233
futures = [msg_future]
12321234
self._futures[msg_id] = msg_future
@@ -1237,6 +1239,7 @@ def create_message_futures(self, msg_id, header, async_result=False, track=False
12371239
# hook up metadata
12381240
output.metadata = self.metadata[msg_id]
12391241
output.metadata['submitted'] = util.utcnow()
1242+
output.metadata['label'] = label
12401243
msg_future.output = output
12411244
futures.append(output)
12421245
return futures
@@ -1266,6 +1269,7 @@ def _send(
12661269
msg_id = msg['header']['msg_id']
12671270

12681271
expect_reply = msg_type not in {"comm_msg", "comm_close", "comm_open"}
1272+
label = metadata["label"] if metadata and "label" in metadata else None
12691273

12701274
if expect_reply and track_outstanding:
12711275
# add to outstanding, history
@@ -1289,6 +1293,7 @@ def _send(
12891293
msg['header'],
12901294
async_result=msg_type in {'execute_request', 'apply_request'},
12911295
track=track,
1296+
label=label,
12921297
)
12931298
if message_future_hook is not None:
12941299
message_future_hook(futures[0])

ipyparallel/client/view.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def __len__(self):
138138
def set_flags(self, **kwargs):
139139
"""set my attribute flags by keyword.
140140
141-
Views determine behavior with a few attributes (`block`, `track`, etc.).
141+
Views determine behavior with a few attributes (`block`, `track`, `label`, etc.).
142142
These attributes can be set all at once by name with this method.
143143
144144
Parameters
@@ -149,6 +149,8 @@ def set_flags(self, **kwargs):
149149
whether to create a MessageTracker to allow the user to
150150
safely edit after arrays and buffers during non-copying
151151
sends.
152+
label : str
153+
set an optional user-defined task identifier
152154
"""
153155
for name, value in kwargs.items():
154156
if name not in self._flag_names:
@@ -557,6 +559,8 @@ def _really_apply(
557559
whether to block
558560
track : bool [default: self.track]
559561
whether to ask zmq to track the message, for safe non-copying sends
562+
label : str [default self.label]
563+
set an optional user-defined task identifier
560564
561565
Returns
562566
-------
@@ -642,6 +646,8 @@ def map(
642646
Only for zero-copy sends such as numpy arrays that are going to be modified in-place.
643647
return_exceptions : bool [default False]
644648
Return remote Exceptions in the result sequence instead of raising them.
649+
label : str [default self.label]
650+
set an optional user-defined task identifier
645651
646652
Returns
647653
-------
@@ -672,7 +678,7 @@ def map(
672678

673679
@sync_results
674680
@save_ids
675-
def execute(self, code, silent=True, targets=None, block=None):
681+
def execute(self, code, silent=True, targets=None, block=None, label=None):
676682
"""Executes `code` on `targets` in blocking or nonblocking manner.
677683
678684
``execute`` is always `bound` (affects engine namespace)
@@ -681,18 +687,21 @@ def execute(self, code, silent=True, targets=None, block=None):
681687
----------
682688
code : str
683689
the code string to be executed
684-
block : bool
690+
block : bool [default self.block]
685691
whether or not to wait until done to return
686-
default: self.block
692+
label : str [default self.label]
693+
set an optional user-defined task identifier
687694
"""
688695
block = self.block if block is None else block
689696
targets = self.targets if targets is None else targets
697+
label = self.label if label is None else label
698+
metadata = dict(label=label)
690699

691700
_idents, _targets = self.client._build_targets(targets)
692701
futures = []
693702
for ident in _idents:
694703
future = self.client.send_execute_request(
695-
self._socket, code, silent=silent, ident=ident
704+
self._socket, code, silent=silent, ident=ident, metadata=metadata
696705
)
697706
futures.append(future)
698707
if isinstance(targets, int):
@@ -708,7 +717,7 @@ def execute(self, code, silent=True, targets=None, block=None):
708717
pass
709718
return ar
710719

711-
def run(self, filename, targets=None, block=None):
720+
def run(self, filename, targets=None, block=None, label=None):
712721
"""Execute contents of `filename` on my engine(s).
713722
714723
This simply reads the contents of the file and calls `execute`.
@@ -723,13 +732,15 @@ def run(self, filename, targets=None, block=None):
723732
block : bool
724733
whether or not to wait until done
725734
default: self.block
735+
label : str
736+
set an optional user-defined task identifier
726737
727738
"""
728739
with open(filename) as f:
729740
# add newline in case of trailing indented whitespace
730741
# which will cause SyntaxError
731742
code = f.read() + '\n'
732-
return self.execute(code, block=block, targets=targets)
743+
return self.execute(code, block=block, targets=targets, label=label)
733744

734745
def update(self, ns):
735746
"""update remote namespace with dict `ns`
@@ -1076,7 +1087,6 @@ def map(
10761087
block=None,
10771088
track=False,
10781089
return_exceptions=False,
1079-
label=None,
10801090
):
10811091
"""Parallel version of builtin `map`, using this View's `targets`.
10821092
@@ -1297,6 +1307,8 @@ def set_flags(self, **kwargs):
12971307
DependencyTimeout.
12981308
retries : int
12991309
Number of times a task will be retried on failure.
1310+
label : str
1311+
set an optional user-defined task identifier
13001312
"""
13011313

13021314
super().set_flags(**kwargs)
@@ -1348,6 +1360,8 @@ def _really_apply(
13481360
whether to block
13491361
track : bool [default: self.track]
13501362
whether to ask zmq to track the message, for safe non-copying sends
1363+
label : str [default self.label]
1364+
set an optional user-defined task identifier
13511365
!!!!!! TODO : THE REST HERE !!!!
13521366
13531367
Returns
@@ -1470,6 +1484,8 @@ def map(
14701484
14711485
return_exceptions: bool [default False]
14721486
Return Exceptions instead of raising on the first exception.
1487+
label : str [default self.label]
1488+
set an optional user-defined task identifier
14731489
14741490
Returns
14751491
-------

ipyparallel/controller/dictdb.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
# Copyright (c) IPython Development Team.
3737
# Distributed under the terms of the Modified BSD License.
3838
import copy
39+
import fnmatch
3940
from copy import deepcopy
4041
from datetime import datetime
4142

@@ -59,6 +60,7 @@
5960
'$all': lambda a, b: all([a in bb for bb in b]),
6061
'$mod': lambda a, b: a % b[0] == b[1],
6162
'$exists': lambda a, b: (b and a is not None) or (a is None and not b),
63+
'$glob': lambda a, b: fnmatch.fnmatch(a, b) if a is not None else False,
6264
}
6365

6466

0 commit comments

Comments
 (0)