-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathevent_publisher.py
More file actions
63 lines (55 loc) · 2.42 KB
/
event_publisher.py
File metadata and controls
63 lines (55 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the specific language governing permissions and
# limitations under the License.
import uuid
from inference_endpoint.async_utils.loop_manager import LoopManager
from inference_endpoint.async_utils.transport.zmq.context import ManagedZMQContext
from inference_endpoint.async_utils.transport.zmq.pubsub import ZmqMessagePublisher
from inference_endpoint.core.record import EventRecord, EventRecordCodec
class EventPublisherService(ZmqMessagePublisher[EventRecord]):
"""Publisher for publishing event records over ZMQ PUB socket.
Wraps ZmqMessagePublisher[EventRecord] with LoopManager integration and
auto-generated socket names.
"""
def __init__(
self,
managed_zmq_context: ManagedZMQContext,
extra_eager: bool = False,
isolated_event_loop: bool = False,
send_threshold: int = 1000,
):
"""Creates a new EventPublisherService.
Args:
managed_zmq_context: The managed ZMQ context to use.
extra_eager: If True, publish() blocks until the message is sent.
Useful for testing or when EventRecords are used as a
synchronization mechanism (e.g., ENDED as a stop signal).
isolated_event_loop: If True, runs on a separate event loop thread.
send_threshold: Minimum number of buffered records before an
automatic flush is triggered. See ZmqMessagePublisher.
"""
if extra_eager:
loop = None
elif isolated_event_loop:
loop = LoopManager().create_loop("ev_pub")
else:
loop = LoopManager().default_loop
self.socket_name = f"ev_pub_{uuid.uuid4().hex[:8]}"
super().__init__(
EventRecordCodec(),
self.socket_name,
managed_zmq_context,
loop=loop,
send_threshold=send_threshold,
)