Skip to content

Commit 3a94710

Browse files
Merge pull request #459 from volcengine/feat/rmq-in-hub
feat: support rocketmq middleware in a2a
1 parent fa08e59 commit 3a94710

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
from abc import ABC, abstractmethod
17+
from typing import Callable
18+
19+
from rocketmq.client import (
20+
ConsumeStatus,
21+
Message,
22+
Producer,
23+
PushConsumer,
24+
ReceivedMessage,
25+
)
26+
27+
from veadk import Agent
28+
from veadk.utils.logger import get_logger
29+
30+
logger = get_logger(__name__)
31+
32+
33+
class RocketMQClient:
34+
def __init__(
35+
self,
36+
name: str,
37+
producer_group: str,
38+
name_server_addr: str,
39+
access_key: str,
40+
access_secret: str,
41+
):
42+
self.name = name
43+
44+
self.producer_group = producer_group
45+
self.name_server_addr = name_server_addr
46+
self.access_key = access_key
47+
self.access_secret = access_secret
48+
49+
self.producer = Producer(producer_group)
50+
self.producer.set_name_server_address(name_server_addr)
51+
self.producer.set_session_credentials(access_key, access_secret, "")
52+
self.producer.start()
53+
54+
def send_msg(self, topic: str, msg_body: str, key: str = "", tag: str = ""):
55+
msg = Message(topic)
56+
msg.set_keys(key)
57+
msg.set_tags(tag)
58+
msg.set_body(msg_body)
59+
60+
logger.info(
61+
f"Middleware client {self.name} send one-way message to topic {topic}: {msg_body}"
62+
)
63+
self.producer.send_oneway(msg)
64+
65+
# self.producer.shutdown()
66+
67+
def start_consumer(self, topic: str, group: str, callback: Callable):
68+
consumer = PushConsumer(group)
69+
consumer.set_name_server_address(self.name_server_addr)
70+
consumer.set_session_credentials(self.access_key, self.access_secret, "")
71+
72+
# for trial, subscribe all tags
73+
consumer.subscribe(topic, callback, "")
74+
75+
consumer.start()
76+
77+
while True:
78+
time.sleep(3600)
79+
80+
81+
class RocketMQAgentClient(ABC):
82+
def __init__(
83+
self,
84+
agent: Agent,
85+
rocketmq_client: RocketMQClient,
86+
subscribe_topic: str,
87+
group: str,
88+
):
89+
self.agent = agent
90+
self.rocketmq_client = rocketmq_client
91+
92+
self.subscribe_topic = subscribe_topic
93+
self.group = group
94+
95+
def listen(self):
96+
logger.info(
97+
f"RocketMQ agent client {self.agent.name} start listening on topic {self.subscribe_topic}"
98+
)
99+
self.rocketmq_client.start_consumer(
100+
topic=self.subscribe_topic,
101+
group=self.group,
102+
callback=self.recv_msg_callback,
103+
)
104+
105+
@abstractmethod
106+
def recv_msg_callback(self, msg: ReceivedMessage) -> ConsumeStatus: ...

0 commit comments

Comments
 (0)