Skip to content

Commit c920842

Browse files
create executor for dag groups
1 parent 796b431 commit c920842

9 files changed

Lines changed: 483 additions & 133 deletions

File tree

src/main/python/systemds/scuro/dataloader/image_loader.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,22 @@ def get_stats(self, source_path: str):
8282

8383
for file in self.indices:
8484
path = os.path.join(source_path, f"{file}{self._ext}")
85-
if self.chunk_size is None:
86-
self.extract(path)
87-
md = self.metadata[path]
88-
max_width = max(max_width, md["width"])
89-
max_height = max(max_height, md["height"])
90-
max_channels = max(max_channels, md["num_channels"])
91-
num_instances += 1
92-
else:
93-
self.file_sanity_check(path)
94-
image = cv2.imread(path, cv2.IMREAD_COLOR)
95-
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
96-
height, width, channels = image.shape
97-
max_width = max(max_width, width)
98-
max_height = max(max_height, height)
99-
max_channels = max(max_channels, channels)
100-
num_instances += 1
85+
# if self.chunk_size is None:
86+
# self.extract(path)
87+
# md = self.metadata[path]
88+
# max_width = max(max_width, md["width"])
89+
# max_height = max(max_height, md["height"])
90+
# max_channels = max(max_channels, md["num_channels"])
91+
# num_instances += 1
92+
# else:
93+
self.file_sanity_check(path)
94+
image = cv2.imread(path, cv2.IMREAD_COLOR)
95+
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
96+
height, width, channels = image.shape
97+
max_width = max(max_width, width)
98+
max_height = max(max_height, height)
99+
max_channels = max(max_channels, channels)
100+
num_instances += 1
101101
return ImageStats(
102102
max_width,
103103
max_height,

src/main/python/systemds/scuro/dataloader/json_loader.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,33 @@ def get_stats(self, source_path: str):
9595

9696
text = " ".join(text) if isinstance(text, list) else text
9797
num_instances += 1
98-
max_length = max(max_length, len(text))
98+
max_length = max(max_length, len(text)) # number of characters
9999
avg_length += len(text)
100100

101101
avg_length /= num_instances
102102
return JSONStats(num_instances, max_length, avg_length, (max_length,))
103+
104+
def estimate_peak_memory_bytes(self) -> dict:
105+
s = self.stats
106+
n = max(1, s.num_instances)
107+
108+
avg_len = s.avg_length / n if s.avg_length > s.max_length else s.avg_length
109+
avg_len = max(1.0, avg_len)
110+
111+
bytes_per_char = 2
112+
str_overhead = 49
113+
ptr_size = 8
114+
list_header = 56
115+
list_overalloc = 1.125
116+
metadata_per_instance = 192
117+
118+
resident_strings = n * (str_overhead + bytes_per_char * avg_len)
119+
resident_list = list_header + int(ptr_size * n * list_overalloc)
120+
resident_metadata = n * metadata_per_instance
121+
resident_total = resident_strings + resident_list + resident_metadata
122+
123+
parse_factor = 2.0
124+
transient_parse = parse_factor * (n * bytes_per_char * avg_len)
125+
126+
cpu_peak = int(resident_total + transient_parse)
127+
return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}

src/main/python/systemds/scuro/dataloader/video_loader.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,19 @@ def get_stats(self, source_path: str):
133133
return VideoStats(
134134
fps, max_length, max_width, max_height, max_num_channels, num_instances
135135
)
136+
137+
def estimate_peak_memory_bytes(self) -> dict:
138+
s = self.stats
139+
if self.chunk_size is not None:
140+
n = self.chunk_size
141+
else:
142+
n = s.num_instances
143+
return {
144+
"cpu_peak_bytes": n
145+
* s.output_shape[0]
146+
* s.output_shape[1]
147+
* s.output_shape[2]
148+
* s.output_shape[3]
149+
* 4,
150+
"gpu_peak_bytes": 0,
151+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# -------------------------------------------------------------
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
# -------------------------------------------------------------
21+
from concurrent.futures import ProcessPoolExecutor, as_completed
22+
import multiprocessing as mp
23+
import os
24+
import pickle
25+
26+
import time
27+
from typing import Any, Dict, List, Optional
28+
from systemds.scuro import Modality
29+
from systemds.scuro.drsearch.representation_dag import (
30+
LRUCache,
31+
RepresentationDag,
32+
group_dags_by_dependencies,
33+
)
34+
from systemds.scuro.utils.checkpointing import CheckpointManager
35+
from systemds.scuro.drsearch.dag_group_scheduler import DAGGroupScheduler
36+
37+
38+
def _process_dag_group(
39+
dag_group_pickle: bytes,
40+
modality_pickle: bytes,
41+
tasks_pickle: bytes,
42+
modality_id: int,
43+
dag_group_idx: int,
44+
) -> Dict[str, Any]:
45+
checkpoint_manager = CheckpointManager(
46+
checkpoint_dir=os.getcwd(),
47+
prefix=f"unimodal_checkpoint_group_{modality_id}_{dag_group_idx}_",
48+
checkpoint_every=1,
49+
resume=False,
50+
)
51+
results = []
52+
53+
dag_group = pickle.loads(dag_group_pickle)
54+
modality = pickle.loads(modality_pickle)
55+
tasks = pickle.loads(tasks_pickle)
56+
57+
group_cache = LRUCache(max_size=6)
58+
59+
for i, dag in enumerate(dag_group):
60+
representation = dag.execute([modality], external_cache=group_cache)
61+
62+
for task in tasks:
63+
start = time.perf_counter()
64+
scores = task.run(representation.data)
65+
end = time.perf_counter()
66+
67+
results.append(
68+
{
69+
"scores": scores,
70+
"transform_time": representation.transform_time,
71+
"task_name": task.model.name,
72+
"task_time": end - start,
73+
"dag": dag,
74+
"modality_id": modality_id,
75+
}
76+
)
77+
78+
checkpoint_manager.increment(modality_id, 1, dag_group_idx=dag_group_idx)
79+
checkpoint_manager.checkpoint_if_due(results)
80+
81+
return {"results": results}
82+
83+
84+
class DAGGroupExecutor:
85+
def __init__(
86+
self,
87+
dags: List[RepresentationDag],
88+
modalities: List[Modality],
89+
tasks: List[Any],
90+
checkpoint_manager: Optional[CheckpointManager] = None,
91+
max_workers: Optional[int] = None,
92+
):
93+
self.dags = dags
94+
self.dag_groups = group_dags_by_dependencies(dags)
95+
self.modalities = modalities
96+
self.tasks = tasks
97+
self.max_workers = max_workers or mp.cpu_count()
98+
self.checkpoint_manager = checkpoint_manager
99+
self.scheduler = DAGGroupScheduler(
100+
dag_groups=self.dag_groups, modality=modalities[0]
101+
)
102+
103+
def run(self):
104+
results = []
105+
ctx = mp.get_context("spawn")
106+
max_workers = min(len(self.dag_groups), self.max_workers)
107+
108+
modality_pickle = pickle.dumps(
109+
self.modalities[0]
110+
) # TODO: handle multiple modalities
111+
tasks_pickle = pickle.dumps(self.tasks)
112+
113+
pending_dag_groups = set(range(len(self.dag_groups)))
114+
running_dag_groups = {}
115+
all_groups_succeeded = True
116+
with ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) as executor:
117+
while pending_dag_groups or running_dag_groups:
118+
pending_resources = [
119+
(
120+
i,
121+
self.scheduler.group_resources[i][0],
122+
self.scheduler.group_resources[i][1],
123+
)
124+
for i in pending_dag_groups
125+
]
126+
ready_to_execute = self.scheduler.get_runnable(
127+
pending_resources, max_concurrent=max_workers
128+
)
129+
for group_id, gpu_id in ready_to_execute:
130+
pending_dag_groups.remove(group_id)
131+
dag_group = self.dag_groups[group_id]
132+
cpu_mem, gpu_mem = self.scheduler.group_resources[group_id]
133+
134+
future = executor.submit(
135+
_process_dag_group,
136+
pickle.dumps(dag_group),
137+
modality_pickle,
138+
tasks_pickle,
139+
self.modalities[0].modality_id,
140+
group_id,
141+
)
142+
running_dag_groups[future] = (group_id, cpu_mem, gpu_mem, gpu_id)
143+
if not running_dag_groups:
144+
break
145+
done = next(as_completed(running_dag_groups), None)
146+
if done is None:
147+
break
148+
group_id, cpu_mem, gpu_mem, gpu_id = running_dag_groups.pop(done)
149+
self.scheduler.release(cpu_mem, gpu_mem, gpu_id)
150+
151+
try:
152+
result_dict = future.result()
153+
154+
for result_entry in result_dict["results"]:
155+
results.append(
156+
{
157+
"scores": result_entry["scores"],
158+
"transform_time": result_entry["transform_time"],
159+
"task_name": result_entry["task_name"],
160+
"task_time": result_entry["task_time"],
161+
"dag": result_entry["dag"],
162+
"modality_id": self.modalities[0].modality_id,
163+
}
164+
)
165+
except Exception as e:
166+
all_groups_succeeded = False
167+
print(
168+
f"Error processing DAG group {group_id} for modality {self.modalities[0].modality_id}: {e}"
169+
)
170+
return results, all_groups_succeeded

src/main/python/systemds/scuro/drsearch/dag_group_scheduler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ def __init__(
5050
gpu_margin: float = 0.8,
5151
shared_state: Optional[Dict[str, Any]] = None,
5252
lock=None,
53+
dag_groups: List[List[RepresentationDag]] = None,
54+
modality: Modality = None,
5355
):
5456
self._margin = (cpu_margin, gpu_margin)
5557
self._n_gpu = (
@@ -65,6 +67,10 @@ def __init__(
6567
else:
6668
self._shared.setdefault("cpu_in_use", 0.0)
6769
self._shared.setdefault("gpu_in_use", {})
70+
self.group_resources = []
71+
for dag_group in dag_groups:
72+
cpu_mem, gpu_mem = get_peak_memory_from_dag_group(dag_group, modality)
73+
self.group_resources.append((cpu_mem, gpu_mem))
6874

6975
def _avail_cpu(self) -> float:
7076
available_memory = (psutil.virtual_memory().available) if psutil else 4096.0

0 commit comments

Comments
 (0)