Skip to content

Commit 322a44e

Browse files
authored
Merge pull request #372 from capocchi/version-5.1
update
2 parents 1c3f2a0 + f5c7b70 commit 322a44e

File tree

5 files changed

+1080
-644
lines changed

5 files changed

+1080
-644
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__all__ = ["DEVS","simulator","kafka_devs"]
1+
__all__ = ["DEVS","simulator","atomic_worker"]
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# atomic_worker.py
2+
import os, json, argparse, sys
3+
from confluent_kafka import Consumer, Producer
4+
from simulator import AtomicSolver
5+
6+
def build_atomic_instance(atomic_id: str):
7+
# TODO: adapter à votre modèle: soit via une fabrique, soit via un registre picklable.
8+
# Exemples:
9+
# if atomic_id == "1": return MyGenerator("G1")
10+
# if atomic_id == "2": return MyProcessor("P1")
11+
# Ici, on suppose un point d’extension externalisé.
12+
raise NotImplementedError("Provide a factory to create the atomic model for id=" + atomic_id)
13+
14+
def main():
15+
parser = argparse.ArgumentParser()
16+
parser.add_argument("--single", action="store_true")
17+
parser.add_argument("--atomic-id", type=str, default=os.getenv("ATOMIC_ID"))
18+
args = parser.parse_args()
19+
20+
BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "localhost:9092")
21+
CMD_TOPIC = os.getenv("CMD_TOPIC", "strategy.commands")
22+
EVT_TOPIC = os.getenv("EVT_TOPIC", "strategy.events")
23+
GROUP_ID = os.getenv("WORKER_GROUP_ID", "atomic-workers")
24+
25+
prod = Producer({"bootstrap.servers": BOOTSTRAP, "enable.idempotence": True, "acks": "all"})
26+
cons = Consumer({
27+
"bootstrap.servers": BOOTSTRAP,
28+
"group.id": GROUP_ID,
29+
"auto.offset.reset": "earliest"
30+
})
31+
cons.subscribe([CMD_TOPIC])
32+
33+
# Mono-atomic
34+
if args.single:
35+
if not args.atomic_id:
36+
print("Missing --atomic-id", file=sys.stderr); sys.exit(2)
37+
aid = str(args.atomic_id)
38+
a = build_atomic_instance(aid)
39+
40+
def send_evt(payload):
41+
prod.produce(EVT_TOPIC, json.dumps(payload), key=aid)
42+
prod.poll(0)
43+
44+
def loop_single():
45+
while True:
46+
msg = cons.poll(0.2)
47+
if msg is None or msg.error():
48+
continue
49+
data = json.loads(msg.value().decode())
50+
if data.get("atomic_id") != aid:
51+
continue
52+
op = data.get("op"); t = data.get("t"); corr_id = data.get("corr_id")
53+
if op == "init":
54+
AtomicSolver.receive(a, [0, None, t])
55+
send_evt({"op": "ack", "t": t, "corr_id": corr_id, "myTimeAdvance": a.myTimeAdvance})
56+
elif op == "step-int":
57+
y = AtomicSolver.receive(a, [1, None, t])
58+
y_named = {}
59+
if isinstance(y, dict):
60+
for p, v in y.items():
61+
y_named[getattr(p, "name", "OUT")] = v
62+
send_evt({"op": "y", "t": t, "corr_id": corr_id, "y": y_named, "myTimeAdvance": a.myTimeAdvance})
63+
elif op == "step-ext":
64+
xin = {}
65+
n2p = {p.name: p for p in a.IPorts}
66+
for name, val in (data.get("inputs") or {}).items():
67+
if name in n2p:
68+
xin[n2p[name]] = val
69+
AtomicSolver.receive(a, [xin, None, t])
70+
send_evt({"op": "ack", "t": t, "corr_id": corr_id, "myTimeAdvance": a.myTimeAdvance})
71+
elif op == "state-ack":
72+
send_evt({"op": "ack-state", "t": t, "corr_id": corr_id, "myTimeAdvance": a.myTimeAdvance})
73+
elif op == "shutdown":
74+
send_evt({"op": "ack", "t": t, "corr_id": corr_id})
75+
break
76+
77+
try:
78+
loop_single()
79+
finally:
80+
cons.close()
81+
return
82+
83+
# Mode multi-atomics (REGISTRY) si vous le conservez
84+
# ...
85+
86+
if __name__ == "__main__":
87+
main()

0 commit comments

Comments
 (0)