Skip to content

Commit 69d62c1

Browse files
authored
Merge pull request #375 from capocchi/version-5.1
Version 5.1
2 parents 94a2aea + e5e28be commit 69d62c1

File tree

13 files changed

+242
-213
lines changed

13 files changed

+242
-213
lines changed

devsimpy/DEVSKernel/KAFKADEVS/InMemoryKafkaWorker.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import logging
88
import time
99

10+
from abc import ABC, abstractmethod
11+
1012
from DEVSKernel.KafkaDEVS.logconfig import LOGGING_LEVEL, worker_kafka_logger
1113

1214
logger = logging.getLogger("DEVSKernel.KafkaDEVS.InMemoryKafkaWorker")
@@ -21,24 +23,22 @@
2123
class InMemoryKafkaWorker(threading.Thread):
2224
"""Worker thread that manages one atomic model in memory."""
2325

24-
def __init__(self, aDEVS, index, bootstrap_servers, in_topic=None, out_topic=None):
26+
def __init__(self, model_name, aDEVS, bootstrap_server, in_topic=None, out_topic=None):
2527
super().__init__(daemon=True)
2628
self.aDEVS = aDEVS
27-
self.index = index
28-
self.bootstrap_servers = bootstrap_servers
29+
self.model_name = model_name
30+
self.bootstrap_server = bootstrap_server
2931
self.running = True
3032

3133
# Topics explicitement fournis par la stratégie
3234
self.in_topic = in_topic ### from coodinator
3335
self.out_topic = out_topic ### to coordinator
34-
35-
self.aBlock = aDEVS.getBlockModel()
3636

37-
group_id = f"worker-thread-{index}-{int(time.time() * 1000)}"
37+
group_id = f"worker-thread-{self.aDEVS.myID}-{int(time.time() * 1000)}"
3838

3939
# Kafka consumer pour le topic de travail dédié
4040
self.consumer = Consumer({
41-
"bootstrap.servers": bootstrap_servers,
41+
"bootstrap.servers": bootstrap_server,
4242
"group.id": group_id,
4343
"auto.offset.reset": "latest",
4444
"enable.auto.commit": True,
@@ -47,24 +47,20 @@ def __init__(self, aDEVS, index, bootstrap_servers, in_topic=None, out_topic=Non
4747

4848
# Kafka producer pour renvoyer les réponses
4949
self.producer = Producer({
50-
"bootstrap.servers": bootstrap_servers
50+
"bootstrap.servers": bootstrap_server
5151
})
5252

5353
logger.info(
5454
" [Thread-%s] Created for model %s (in topic=%s, out topic=%s)",
55-
index, self.aBlock.label, self.in_topic, self.out_topic
55+
self.aDEVS.myID, self.model_name, self.in_topic, self.out_topic
5656
)
57-
58-
def get_index(self):
59-
"""Returns the index of the model"""
60-
return self.index
6157

6258
def get_model(self):
6359
"""Returns the atomic DEVS model managed by this worker."""
6460
return self.aDEVS
6561

6662
def get_model_label(self):
67-
return self.aDEVS.getBlockModel().label
63+
return self.model_name
6864

6965
def get_model_time_next(self):
7066
return self.aDEVS.timeNext
@@ -132,12 +128,23 @@ def do_output_function(self):
132128

133129
self.aDEVS.outputFnc()
134130

131+
@abstractmethod
132+
def _process_standard(self, data):
133+
"""
134+
Process standard DEVS message format.
135+
Must be implemented by subclasses.
136+
137+
Args:
138+
data (dict): Parsed JSON message data
139+
"""
140+
pass
141+
135142
# ------------------------------------------------------------------
136143
# Boucle principale
137144
# ------------------------------------------------------------------
138145

139146
def run(self):
140-
logger.info(" [Thread-%s] Started", self.index)
147+
logger.info(f" [Thread-{self.aDEVS.myID}] Started")
141148

142149
while self.running:
143150
msg = self.consumer.poll(timeout=0.5)
@@ -148,20 +155,15 @@ def run(self):
148155
raw = msg.value().decode("utf-8")
149156
data = json.loads(raw)
150157

151-
worker_kafka_logger.debug(
152-
"[Thread-%s] IN: topic=%s value=%s",
153-
self.index,
154-
msg.topic(),
155-
raw,
156-
)
158+
worker_kafka_logger.debug(f"[Thread-{self.aDEVS.myID}] IN: topic={msg.topic()} value={raw}")
157159

158160
self._process_standard(data)
159161

160162
except Exception as e:
161-
logger.exception("[Thread-%s] Error in run loop: %s", self.index, e)
163+
logger.exception("[Thread-%s] Error in run loop: %s", self.aDEVS.myID, e)
162164

163165
self.consumer.close()
164-
logger.info(" [Thread-%s] Stopped", self.index)
166+
logger.info(f" [Thread-{self.aDEVS.myID}] Stopped")
165167

166168

167169
def stop(self):
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
FROM python:3.11-slim
2+
3+
WORKDIR /app
4+
5+
RUN apt-get update && \
6+
apt-get install -y --no-install-recommends build-essential && \
7+
rm -rf /var/lib/apt/lists/*
8+
9+
COPY requirements.txt .
10+
RUN pip install --no-cache-dir -r requirements.txt
11+
12+
COPY . .
13+
14+
CMD ["python", "--version"] # Commande par défaut

devsimpy/DEVSKernel/KAFKADEVS/MS4Me/MS4MeKafkaWorker.py

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,33 +30,25 @@ class MS4MeKafkaWorker(InMemoryKafkaWorker):
3030

3131
OUT_TOPIC = "ms4meOut"
3232

33-
def __init__(self, aDEVS, index, bootstrap_servers):
34-
super().__init__(aDEVS, index, bootstrap_servers, in_topic=f"ms4me{aDEVS.getBlockModel().label}In", out_topic=MS4MeKafkaWorker.OUT_TOPIC)
33+
def __init__(self, model_name, aDEVS, bootstrap_server):
34+
""" Constructor
35+
"""
36+
super().__init__(model_name, aDEVS, bootstrap_server, in_topic=f"ms4me{model_name}In", out_topic=MS4MeKafkaWorker.OUT_TOPIC)
3537

3638
self.wire = StandardWireAdapter
3739

3840
def get_topic_to_write(self) -> str:
41+
""" Return the topic used to contact the model
3942
"""
40-
Retourne le nom de topic à utiliser pour contacter ce modèle (obj)
41-
"""
42-
4343
return self.in_topic
44-
45-
@staticmethod
46-
def get_topic_to_read() -> str:
47-
"""
48-
Retourne le nom de topic à utiliser pour lire les résultats de tous les modèles
44+
45+
def get_topic_to_read(self) -> str:
46+
""" Return the name of the topic to used to read the results od the model
4947
"""
50-
return MS4MeKafkaWorker.OUT_TOPIC
48+
return self.out_topic
5149

5250
def output_msg_mapping(self) -> ModelOutputMessage:
53-
"""
54-
55-
Args:
56-
aDEVS (AtomicDEVS): DEVS atomic model from DomainBehaviorInterface
57-
58-
Returns:
59-
ModelOutputMessage: ModelOutputMessage containing the output port values from Ms4me
51+
""" Returns aModelOutputMessage message that contain the output port values from Ms4me
6052
"""
6153

6254
result_portvalue_list = []
@@ -85,16 +77,15 @@ def output_msg_mapping(self) -> ModelOutputMessage:
8577
return result_portvalue_list
8678

8779
def _handle_devs_message(self, msg: BaseMessage) -> BaseMessage:
88-
"""
89-
Reçoit un BaseMessage (InitSim, ExecuteTransition, SendOutput, ...)
90-
et renvoie un BaseMessage de réponse (NextTime, ModelOutputMessage, ...).
80+
""" Receive a BaseMessage (InitSim, ExecuteTransition, SendOutput, ...)
81+
and send a BasMessage in repsonce (NextTime, ModelOutputMessage, ...).
9182
"""
9283
t = msg.time.t
9384

9485
# --- InitSim : initialisation + timeAdvance initial ---
9586
if isinstance(msg, InitSim):
9687
self.do_initialize(t)
97-
return NextTime(SimTime(t=float(self.aDEVS.timeNext)), sender=self.aBlock.label)
88+
return NextTime(SimTime(t=float(self.aDEVS.timeNext)), sender=self.model_name)
9889

9990
# --- ExecuteTransition
10091
if isinstance(msg, ExecuteTransition):
@@ -146,7 +137,7 @@ def temp_peek(port, *args):
146137

147138
ta = float(ta) if ta is not None else float("inf")
148139

149-
return TransitionDone(time=SimTime(t=t), nextTime=SimTime(t=ta), sender=self.aBlock.label)
140+
return TransitionDone(time=SimTime(t=t), nextTime=SimTime(t=ta), sender=self.model_name)
150141

151142
# --- SendOutput : outputFnc + calcul des sorties ---
152143
if isinstance(msg, SendOutput):
@@ -159,42 +150,39 @@ def temp_peek(port, *args):
159150
return ModelOutputMessage(
160151
modelOutput = port_values,
161152
nextTime = next_time,
162-
sender = self.aBlock.label,
153+
sender = self.model_name,
163154
)
164155

165156
if isinstance(msg, SimulationDone):
166157
self.running = False
167158
return ModelDone(
168159
time = msg.time,
169-
sender = self.aDEVS.getBlockModel().label,
160+
sender = self.model_name,
170161
)
171-
172-
# --- NextTime, ModelOutputMessage, etc. ---
173-
# Ces types sont normalement utilisés comme réponses, pas comme requêtes
174-
# côté worker, donc on ne les traite pas ici.
162+
175163
raise ValueError(f"Unsupported message type in worker: {type(msg).__name__}")
176164

177165
def _process_standard(self, data):
178166
"""Process standard DEVS message format."""
179167

180168
devs_msg = self.wire.from_wire(data)
181169

182-
# Traiter le message DEVS
170+
# handel the DEVS msessage
183171
try:
184172
reply_msg = self._handle_devs_message(devs_msg)
185173
except Exception as e:
186-
logger.exception(" [Thread-%s] Error handling message: %s", self.index, e)
174+
logger.exception(" [Thread-%s] Error handling message: %s", self.aDEVS.myID, e)
187175

188-
# Préparer le message de réponse
176+
# define a msg to send
189177
reply_wire = self.wire.to_wire(reply_msg)
190178
reply_json = json.dumps(reply_wire).encode("utf-8")
191179

192-
# Log et envoi de la réponse
180+
# Log
193181
worker_kafka_logger.debug(
194-
f"[Thread-{self.index}] OUT: topic={self.out_topic} value={reply_json.decode('utf-8')}"
182+
f"[Thread-{self.aDEVS.myID}] OUT: topic={self.out_topic} value={reply_json.decode('utf-8')}"
195183
)
196184

197-
# Envoi de la réponse
185+
# send the msg to the out_topic
198186
self.producer.produce(
199187
self.out_topic,
200188
value=reply_json,

devsimpy/DEVSKernel/KAFKADEVS/MS4Me/ms4me_kafka_messages.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,30 @@
33
from __future__ import annotations
44
from dataclasses import dataclass
55
from typing import Any, Dict, List, Optional
6-
6+
import numpy as np
77
# ---------- Types de base ----------
88

99
@dataclass
1010
class SimTime:
11-
timeType: str = "devs.msg.time.LongSimTime"
11+
timeType: str = "devs.msg.time.DoubleSimTime"
1212
t: float = 0.0
1313

1414
def to_dict(self) -> Dict[str, Any]:
15+
if self.t == float("inf"):
16+
self.t = np.finfo(np.float64).max
1517
return {
1618
"timeType": self.timeType,
1719
"t": self.t,
1820
}
1921

2022
@staticmethod
2123
def from_dict(d: Dict[str, Any]) -> "SimTime":
24+
time =float(d.get("t", 0.0))
25+
if time == np.finfo(np.float64).max:
26+
time = float("inf")
2227
return SimTime(
23-
timeType=d.get("timeType", "devs.msg.time.LongSimTime"),
24-
t=float(d.get("t", 0.0)),
28+
timeType=d.get("timeType", "devs.msg.time.DoubleSimTime"),
29+
t=time,
2530
)
2631

2732

0 commit comments

Comments
 (0)