Skip to content

Commit d09b9cb

Browse files
committed
update
1 parent 1a16f9b commit d09b9cb

File tree

5 files changed

+651
-5
lines changed

5 files changed

+651
-5
lines changed

devsimpy/DEVSKernel/KAFKADEVS/InMemoryKafkaWorker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import time
99

10-
from .logconfig import LOGGING_LEVEL, worker_kafka_logger
10+
from DEVSKernel.KafkaDEVS.logconfig import LOGGING_LEVEL, worker_kafka_logger
1111

1212
logger = logging.getLogger("DEVSKernel.KafkaDEVS.InMemoryKafkaWorker")
1313
logger.setLevel(LOGGING_LEVEL)

devsimpy/DEVSKernel/KAFKADEVS/MS4Me/MS4MeKafkaWorker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from DEVSKernel.KafkaDEVS.InMemoryKafkaWorker import InMemoryKafkaWorker
1010

11-
from .ms4me_kafka_messages import (
11+
from DEVSKernel.KafkaDEVS.MS4Me.ms4me_kafka_messages import (
1212
BaseMessage,
1313
SimTime,
1414
InitSim,
@@ -22,7 +22,7 @@
2222
ModelDone,
2323
)
2424

25-
from .ms4me_kafka_wire_adapters import StandardWireAdapter
25+
from DEVSKernel.KafkaDEVS.MS4Me.ms4me_kafka_wire_adapters import StandardWireAdapter
2626
from DomainInterface.Object import Message
2727

2828
class MS4MeKafkaWorker(InMemoryKafkaWorker):
@@ -191,7 +191,7 @@ def _process_standard(self, data):
191191

192192
# Log et envoi de la réponse
193193
worker_kafka_logger.debug(
194-
f"[Thread-{self.index}] OUT: topic={self.out_topic} value={reply_json.decode("utf-8")}"
194+
f"[Thread-{self.index}] OUT: topic={self.out_topic} value={reply_json.decode('utf-8')}"
195195
)
196196

197197
# Envoi de la réponse

devsimpy/DEVSKernel/KAFKADEVS/MS4Me/ms4me_kafka_wire_adapters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from __future__ import annotations
33
from typing import Any, Dict
44

5-
from .ms4me_kafka_messages import BaseMessage
5+
from DEVSKernel.KafkaDEVS.MS4Me.ms4me_kafka_messages import BaseMessage
66

77
class StandardWireAdapter:
88
"""
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
"""
4+
Script pour lancer un worker Kafka DEVS à partir du chemin d'un modèle atomique.
5+
Usage: python run_worker.py --model-path <path> --index <int> [options]
6+
Exemple à éxécuter dans une console avant test_worker_messages:
7+
Attention, index ne sert pas à créer le topic mais il est obligatoire dans la classe MS4MeKafkaWorker
8+
python run_worker.py --model-path ..\..\..\Domain\Collector\MessagesCollector.py --class-name MessagesCollector --index 0 --bootstrap localhost:9092
9+
"""
10+
11+
import sys
12+
import argparse
13+
import time
14+
import signal
15+
import logging
16+
17+
# Configuration du logging
18+
logging.basicConfig(
19+
level=logging.INFO,
20+
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s"
21+
)
22+
logger = logging.getLogger("WorkerLauncher")
23+
24+
# Fonction pour charger dynamiquement un modèle depuis un .amd
25+
def load_model_from_amd(amd_path: str, class_name: str = None):
26+
"""Charge un modèle atomique depuis un fichier .amd (zip)"""
27+
import zipfile
28+
import tempfile
29+
import shutil
30+
import importlib.util
31+
32+
amd_path = Path(amd_path).resolve()
33+
tmp_dir = Path(tempfile.mkdtemp(prefix="devsimpy_worker_"))
34+
35+
try:
36+
with zipfile.ZipFile(amd_path, "r") as zf:
37+
zf.extractall(tmp_dir)
38+
39+
py_files = list(tmp_dir.rglob("*.py"))
40+
if not py_files:
41+
raise RuntimeError(f"No .py file found in {amd_path}")
42+
43+
main_py = py_files[0]
44+
spec = importlib.util.spec_from_file_location(main_py.stem, main_py)
45+
module = importlib.util.module_from_spec(spec)
46+
spec.loader.exec_module(module)
47+
48+
if class_name:
49+
model_cls = getattr(module, class_name)
50+
else:
51+
# Prendre la première classe trouvée
52+
candidates = [obj for obj in module.__dict__.values() if isinstance(obj, type)]
53+
if not candidates:
54+
raise RuntimeError(f"No class found in {main_py}")
55+
model_cls = candidates[0]
56+
57+
return model_cls()
58+
59+
finally:
60+
shutil.rmtree(tmp_dir, ignore_errors=True)
61+
62+
63+
def load_model_from_py(py_path: str, class_name: str):
64+
"""Charge un modèle atomique depuis un fichier .py"""
65+
import importlib.util
66+
67+
py_path = Path(py_path).resolve()
68+
69+
spec = importlib.util.spec_from_file_location(py_path.stem, py_path)
70+
module = importlib.util.module_from_spec(spec)
71+
spec.loader.exec_module(module)
72+
73+
model_cls = getattr(module, class_name)
74+
return model_cls()
75+
76+
77+
def load_model(model_path: str, class_name: str = None):
78+
"""Charge un modèle depuis .py ou .amd"""
79+
path = Path(model_path)
80+
81+
if not path.exists():
82+
raise FileNotFoundError(f"Model file not found: {model_path}")
83+
84+
if path.suffix == ".amd":
85+
logger.info(f"Loading model from .amd: {model_path}")
86+
return load_model_from_amd(model_path, class_name)
87+
elif path.suffix == ".py":
88+
if not class_name:
89+
raise ValueError("--class-name is required for .py files")
90+
logger.info(f"Loading model from .py: {model_path} (class: {class_name})")
91+
return load_model_from_py(model_path, class_name)
92+
else:
93+
raise ValueError(f"Unsupported file type: {path.suffix}")
94+
95+
96+
def create_mock_block_model(label: str):
97+
"""Crée un objet mock pour getBlockModel()"""
98+
class MockBlockModel:
99+
def __init__(self, label):
100+
self.label = label
101+
return MockBlockModel(label)
102+
103+
104+
def main():
105+
parser = argparse.ArgumentParser(
106+
description="Lance un worker Kafka DEVS pour un modèle atomique"
107+
)
108+
parser.add_argument(
109+
"--model-path",
110+
required=True,
111+
help="Chemin vers le fichier du modèle (.py ou .amd)"
112+
)
113+
parser.add_argument(
114+
"--class-name",
115+
help="Nom de la classe du modèle (requis pour .py, optionnel pour .amd)"
116+
)
117+
parser.add_argument(
118+
"--index",
119+
type=int,
120+
required=True,
121+
help="Index du modèle atomique dans la simulation"
122+
)
123+
parser.add_argument(
124+
"--bootstrap",
125+
default="localhost:9092",
126+
help="Adresse du serveur Kafka (défaut: localhost:9092)"
127+
)
128+
parser.add_argument(
129+
"--in-topic",
130+
help="Topic d'entrée (par défaut: généré depuis le label)"
131+
)
132+
parser.add_argument(
133+
"--out-topic",
134+
default="ms4meOut",
135+
help="Topic de sortie (défaut: ms4meOut)"
136+
)
137+
parser.add_argument(
138+
"--label",
139+
help="Label du modèle (par défaut: nom de la classe)"
140+
)
141+
142+
args = parser.parse_args()
143+
144+
# Import des classes nécessaires
145+
try:
146+
from MS4MeKafkaWorker import MS4MeKafkaWorker
147+
except ImportError:
148+
logger.error("Impossible d'importer MS4MeKafkaWorker. Vérifiez votre installation.")
149+
sys.exit(1)
150+
151+
# Charger le modèle
152+
try:
153+
model = load_model(args.model_path, args.class_name)
154+
except Exception as e:
155+
logger.error(f"Erreur lors du chargement du modèle: {e}")
156+
sys.exit(1)
157+
158+
# Déterminer le label
159+
label = args.label or model.__class__.__name__
160+
161+
# Ajouter le mock getBlockModel
162+
model.getBlockModel = lambda: create_mock_block_model(label)
163+
164+
# Déterminer le topic d'entrée
165+
in_topic = args.in_topic or f"ms4me{label}_{args.index}In"
166+
167+
logger.info("=" * 60)
168+
logger.info(f"Lancement du worker pour {label}")
169+
logger.info(f" Index: {args.index}")
170+
logger.info(f" Bootstrap: {args.bootstrap}")
171+
logger.info(f" In topic: {in_topic}")
172+
logger.info(f" Out topic: {args.out_topic}")
173+
logger.info("=" * 60)
174+
175+
# Créer et lancer le worker
176+
worker = MS4MeKafkaWorker(
177+
aDEVS=model,
178+
index=args.index,
179+
bootstrap_servers=args.bootstrap
180+
)
181+
182+
# Gestion du signal pour arrêt propre
183+
def signal_handler(sig, frame):
184+
logger.info("\nArrêt du worker en cours...")
185+
worker.stop()
186+
worker.join(timeout=5.0)
187+
logger.info("Worker arrêté")
188+
sys.exit(0)
189+
190+
signal.signal(signal.SIGINT, signal_handler)
191+
signal.signal(signal.SIGTERM, signal_handler)
192+
193+
# Démarrer le worker
194+
worker.start()
195+
logger.info(f"Worker démarré (PID: {worker.ident})")
196+
197+
# Attendre que le worker se termine
198+
try:
199+
while worker.is_alive():
200+
time.sleep(1)
201+
except KeyboardInterrupt:
202+
logger.info("Interruption clavier détectée")
203+
worker.stop()
204+
worker.join(timeout=5.0)
205+
206+
logger.info("Worker terminé")
207+
208+
if __name__ == "__main__":
209+
import builtins
210+
import os
211+
from pathlib import Path
212+
213+
# Ajouter le répertoire racine du projet au PYTHONPATH
214+
# Le script est dans DEVSKernel/KafkaDEVS/MS4Me/
215+
# On remonte de 3 niveaux pour atteindre la racine devsimpy/
216+
script_dir = Path(__file__).parent.resolve()
217+
devskernel_path = script_dir.parents[1]
218+
project_root = script_dir.parents[2] # Remonte de MS4Me -> KafkaDEVS -> DEVSKernel -> devsimpy
219+
220+
if str(project_root) not in sys.path:
221+
sys.path.insert(0, str(project_root))
222+
print(f"Added to PYTHONPATH: {project_root}")
223+
224+
setattr(builtins, 'DEFAULT_SIM_STRATEGY', 'original')
225+
setattr(builtins, 'DEFAULT_DEVS_DIRNAME', 'KafkaDEVS')
226+
setattr(builtins, 'DEVS_SIM_KERNEL_PATH', devskernel_path)
227+
setattr(builtins, 'DEVS_DIR_PATH_DICT', {'KafkaDEVS': os.path.join(devskernel_path, 'KafkaDEVS')})
228+
main()

0 commit comments

Comments
 (0)