Skip to content

Commit 1a16f9b

Browse files
committed
refactor kafka and add update og settings in builtin early in devsimpy start
1 parent c01a7d0 commit 1a16f9b

15 files changed

+713
-1185
lines changed

devsimpy/DEVSKernel/KAFKADEVS/InMemoryKafkaWorker.py

Lines changed: 79 additions & 303 deletions
Large diffs are not rendered by default.
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import logging
2+
import json
3+
4+
from DEVSKernel.KafkaDEVS.logconfig import LOGGING_LEVEL, worker_kafka_logger
5+
6+
logger = logging.getLogger("DEVSKernel.KafkaDEVS.InMemoryKafkaWorker")
7+
logger.setLevel(LOGGING_LEVEL)
8+
9+
from DEVSKernel.KafkaDEVS.InMemoryKafkaWorker import InMemoryKafkaWorker
10+
11+
from .ms4me_kafka_messages import (
12+
BaseMessage,
13+
SimTime,
14+
InitSim,
15+
ExecuteTransition,
16+
SendOutput,
17+
NextTime,
18+
TransitionDone,
19+
ModelOutputMessage,
20+
PortValue,
21+
SimulationDone,
22+
ModelDone,
23+
)
24+
25+
from .ms4me_kafka_wire_adapters import StandardWireAdapter
26+
from DomainInterface.Object import Message
27+
28+
class MS4MeKafkaWorker(InMemoryKafkaWorker):
29+
"""Worker thread that manages one atomic model in memory."""
30+
31+
OUT_TOPIC = "ms4meOut"
32+
33+
def __init__(self, aDEVS, index, bootstrap_servers):
34+
super().__init__(aDEVS, index, bootstrap_servers, in_topic=f"ms4me{aDEVS.getBlockModel().label}In", out_topic=MS4MeKafkaWorker.OUT_TOPIC)
35+
36+
self.wire = StandardWireAdapter
37+
38+
def get_topic_to_write(self) -> str:
39+
"""
40+
Retourne le nom de topic à utiliser pour contacter ce modèle (obj)
41+
"""
42+
43+
return self.in_topic
44+
45+
@staticmethod
46+
def get_topic_to_read() -> str:
47+
"""
48+
Retourne le nom de topic à utiliser pour lire les résultats de tous les modèles
49+
"""
50+
return MS4MeKafkaWorker.OUT_TOPIC
51+
52+
def output_msg_mapping(self) -> ModelOutputMessage:
53+
"""
54+
55+
Args:
56+
aDEVS (AtomicDEVS): DEVS atomic model from DomainBehaviorInterface
57+
58+
Returns:
59+
ModelOutputMessage: ModelOutputMessage containing the output port values from Ms4me
60+
"""
61+
62+
result_portvalue_list = []
63+
for port, m in self.aDEVS.myOutput.items():
64+
if isinstance(m, Message):
65+
value = getattr(m, "value", m)
66+
port_name = getattr(port, "name", str(port))
67+
result_portvalue_list.append(
68+
PortValue(
69+
value=value,
70+
portIdentifier=port_name,
71+
portType=type(value).__name__,
72+
)
73+
)
74+
else:
75+
value = m
76+
port_name = getattr(port, "name", str(port))
77+
result_portvalue_list.append(
78+
PortValue(
79+
value=value,
80+
portIdentifier=port_name,
81+
portType=type(value).__name__,
82+
)
83+
)
84+
85+
return result_portvalue_list
86+
87+
def _handle_devs_message(self, msg: BaseMessage) -> BaseMessage:
88+
"""
89+
Reçoit un BaseMessage (InitSim, ExecuteTransition, SendOutput, ...)
90+
et renvoie un BaseMessage de réponse (NextTime, ModelOutputMessage, ...).
91+
"""
92+
t = msg.time.t
93+
94+
# --- InitSim : initialisation + timeAdvance initial ---
95+
if isinstance(msg, InitSim):
96+
self.do_initialize(t)
97+
return NextTime(SimTime(t=float(self.aDEVS.timeNext)), sender=self.aBlock.label)
98+
99+
# --- ExecuteTransition
100+
if isinstance(msg, ExecuteTransition):
101+
t = msg.time.t
102+
103+
# Si on a des inputs, c'est une extTransition
104+
if msg.portValueList:
105+
port_inputs = {}
106+
107+
# Construire dict {port_obj -> Message(value, time)}
108+
from DomainInterface.Object import Message
109+
for pv in msg.portValueList:
110+
# pv.portIdentifier doit matcher le nom du port d'entrée
111+
for iport in self.aDEVS.IPorts:
112+
if iport.name == pv.portIdentifier:
113+
m = Message(pv.value, t)
114+
port_inputs[iport] = m
115+
break
116+
117+
# Sauvegarder l'ancien peek()
118+
old_peek = getattr(self.aDEVS, "peek", None)
119+
120+
def temp_peek(port, *args):
121+
if args and isinstance(args[0], dict):
122+
return args[0].get(port)
123+
return port_inputs.get(port)
124+
125+
# Override temporairement peek()
126+
self.aDEVS.peek = temp_peek
127+
128+
try:
129+
# with self._temporary_peek(port_inputs):
130+
self.do_external_transition(t, msg)
131+
# self.aDEVS.extTransition(port_inputs)
132+
finally:
133+
if old_peek is not None:
134+
self.aDEVS.peek = old_peek
135+
else:
136+
# On enlève l'attribut si inexistant avant
137+
if hasattr(self.aDEVS, "peek"):
138+
delattr(self.aDEVS, "peek")
139+
else:
140+
# Pas d'inputs : transition interne
141+
# self.aDEVS.intTransition()
142+
self.do_internal_transition(t)
143+
144+
# Après extTransition, on recalcule le ta
145+
ta = self.aDEVS.timeNext
146+
147+
ta = float(ta) if ta is not None else float("inf")
148+
149+
return TransitionDone(time=SimTime(t=t), nextTime=SimTime(t=ta), sender=self.aBlock.label)
150+
151+
# --- SendOutput : outputFnc + calcul des sorties ---
152+
if isinstance(msg, SendOutput):
153+
t = msg.time.t
154+
155+
self.do_output_function()
156+
port_values = self.output_msg_mapping()
157+
158+
next_time = SimTime(t=t)
159+
return ModelOutputMessage(
160+
modelOutput = port_values,
161+
nextTime = next_time,
162+
sender = self.aBlock.label,
163+
)
164+
165+
if isinstance(msg, SimulationDone):
166+
self.running = False
167+
return ModelDone(
168+
time = msg.time,
169+
sender = self.aDEVS.getBlockModel().label,
170+
)
171+
172+
# --- NextTime, ModelOutputMessage, etc. ---
173+
# Ces types sont normalement utilisés comme réponses, pas comme requêtes
174+
# côté worker, donc on ne les traite pas ici.
175+
raise ValueError(f"Unsupported message type in worker: {type(msg).__name__}")
176+
177+
def _process_standard(self, data):
178+
"""Process standard DEVS message format."""
179+
180+
devs_msg = self.wire.from_wire(data)
181+
182+
# Traiter le message DEVS
183+
try:
184+
reply_msg = self._handle_devs_message(devs_msg)
185+
except Exception as e:
186+
logger.exception(" [Thread-%s] Error handling message: %s", self.index, e)
187+
188+
# Préparer le message de réponse
189+
reply_wire = self.wire.to_wire(reply_msg)
190+
reply_json = json.dumps(reply_wire).encode("utf-8")
191+
192+
# Log et envoi de la réponse
193+
worker_kafka_logger.debug(
194+
f"[Thread-{self.index}] OUT: topic={self.out_topic} value={reply_json.decode("utf-8")}"
195+
)
196+
197+
# Envoi de la réponse
198+
self.producer.produce(
199+
self.out_topic,
200+
value=reply_json,
201+
)
202+
self.producer.flush()

devsimpy/DEVSKernel/KAFKADEVS/devs_kafka_messages.py renamed to devsimpy/DEVSKernel/KAFKADEVS/MS4Me/ms4me_kafka_messages.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
# devs_kafka_messages.py
2-
from __future__ import annotations
1+
# ms4me_kafka_messages.py
32

3+
from __future__ import annotations
44
from dataclasses import dataclass
55
from typing import Any, Dict, List, Optional
66

7-
87
# ---------- Types de base ----------
98

109
@dataclass
@@ -298,4 +297,4 @@ def from_dict(d: Dict[str, Any]) -> "TransitionDone":
298297
time=SimTime.from_dict(d.get("time", {})),
299298
nextTime=SimTime.from_dict(d.get("nextTime", {})),
300299
sender=d.get("sender", ""),
301-
)
300+
)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# devs_kafka_wire_adapters.py
2+
from __future__ import annotations
3+
from typing import Any, Dict
4+
5+
from .ms4me_kafka_messages import BaseMessage
6+
7+
class StandardWireAdapter:
8+
"""
9+
MS4me msg adaptator.
10+
"""
11+
12+
@staticmethod
13+
def to_wire(msg: BaseMessage) -> Dict[str, Any]:
14+
"""
15+
Transform BaseMessage (InitSim, NextTime, ...) to dict in order to be sended on Kafka.
16+
"""
17+
return msg.to_dict()
18+
19+
@staticmethod
20+
def from_wire(d: Dict[str, Any]) -> BaseMessage:
21+
"""
22+
Transform a dict (Kafka value) in BaseMessage.
23+
"""
24+
return BaseMessage.from_dict(d)

devsimpy/DEVSKernel/KAFKADEVS/auto_kafka.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def wait_for_kafka(bootstrap: str, timeout: float = 30.0, interval: float = 1.0)
2121
if time.time() - start > timeout:
2222
raise RuntimeError(
2323
f"Kafka ne répond pas sur {bootstrap} après {timeout} secondes.\n"
24-
f"Vérifie que le conteneur '{KAFKA_CONATINER_NAME}' est démarré et en bonne santé."
24+
f"Vérifier que le conteneur '{KAFKA_CONATINER_NAME}' est démarré et en bonne santé."
2525
) from e
2626
time.sleep(interval)
2727
else:
@@ -57,7 +57,8 @@ def ensure_kafka_broker(
5757
existing = result.stdout.strip().splitlines()
5858
except subprocess.CalledProcessError as e:
5959
raise RuntimeError(f"Impossible d'appeler docker: {e}") from e
60-
60+
61+
6162
# 2) S'il existe déjà
6263
if container_name in existing:
6364
# Vérifier s'il est en cours d'exécution

devsimpy/DEVSKernel/KAFKADEVS/delete_all_topics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from confluent_kafka.admin import AdminClient
22
import logging
33

4-
from .logconfig import configure_logging, LOGGING_LEVEL, kafka_logger
4+
from logconfig import configure_logging, LOGGING_LEVEL, kafka_logger
55
configure_logging()
66
logger = logging.getLogger("DEVSKernel.Strategies")
77
logger.setLevel(LOGGING_LEVEL)
88

9-
from .kafkaconfig import KAFKA_BOOTSTRAP
9+
from kafkaconfig import KAFKA_BOOTSTRAP
1010

1111

1212
def delete_all_topics(bootstrap_servers=KAFKA_BOOTSTRAP):

devsimpy/DEVSKernel/KAFKADEVS/devs_kafka_wire_adapters.py

Lines changed: 0 additions & 44 deletions
This file was deleted.

devsimpy/DEVSKernel/KAFKADEVS/kafkaconfig.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
### KAFKA_MODE OPTIONS: "local" or "standard"
2-
### - local: topics work_i and atomic_results
3-
### - standard: topics ms4meXXXIn and ms4meOut
1+
### KAFKA_MODE OPTIONS:
2+
### - fromMe4Me: topics ms4meXXXIn and ms4meOut
43
### Default:
5-
KAFKA_MODE = "standard"
4+
KAFKA_MODE = "fromMe4Me"
65

76
### Whether to auto-start a Kafka broker if not already running
87
AUTO_START_KAFKA_BROKER=True
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
### Fichier de configuration du logging pour KafkaDEVS
1+
### conf file for KafkaDEVS
22
import logging
33
from pathlib import Path
44

5-
### Niveau de logging:
6-
### - logging.DEBUG pour le dev
7-
### - logging.INFO pour des traces plus légères
8-
### - logging.WARNING pour la prod
5+
### logging level:
6+
### - logging.DEBUG for dev
7+
### - logging.INFO for light traces
8+
### - logging.WARNING for prod
99

1010
LOGGING_LEVEL = logging.DEBUG
1111
LOGFILE = "kafkadevs_local.log"
1212

1313
def configure_logging():
14-
# Racine DEVSimPy = dossier contenant devsimpy-nogui.py / devsimpy.py
14+
# DEVSimPy root dir
1515
filename = Path(__file__).resolve()
1616
# filename_without_ext = filename.stem
1717
base_dir = filename.parents[2] # DEVSimPy/
@@ -25,10 +25,13 @@ def configure_logging():
2525
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
2626
handlers=[
2727
logging.FileHandler(log_file, encoding="utf-8"),
28-
logging.StreamHandler(), # console (optionnel)
28+
logging.StreamHandler(), # console (opt.)
2929
],
3030
)
3131

32-
# logger dédié pour les traces Kafka
33-
kafka_logger = logging.getLogger("kafka.trace")
34-
kafka_logger.setLevel(LOGGING_LEVEL)
32+
# logger dedicated to Kafka
33+
worker_kafka_logger = logging.getLogger("worker.kafka.trace")
34+
worker_kafka_logger.setLevel(LOGGING_LEVEL)
35+
36+
coord_kafka_logger = logging.getLogger("coord.kafka.trace")
37+
coord_kafka_logger.setLevel(LOGGING_LEVEL)

0 commit comments

Comments
 (0)