[orchagent]: Add ZmqRouteServer for concurrent route updates#4564
[orchagent]: Add ZmqRouteServer for concurrent route updates#4564venkit-nexthop wants to merge 1 commit into
Conversation
Introduce a dedicated ZmqRouteServer (and ZmqRouteOrch/ZmqRouteConsumer) used by RouteOrch for receiving APPL_DB route updates from fpmsyncd. Non-fabric/non-dpu switches now create a ZmqRouteServer instead of the generic ZmqServer; fabric and DPU continue to use ZmqServer. ZmqRouteConsumer merges incoming tuples into m_toSync from the mqPollThread ingress callback under m_toSyncMutex, and only notifies the orch main loop once a batch (gMaxBulkSize) has accumulated. To support this concurrent access, ConsumerBase::addToSync and dumpPendingTasks are made virtual so the route consumer can wrap them in a lock, while the default single-threaded base remains lock-free. Also: - create_zmq_route_server() factory added in lib/orch_zmq_config. - getCfgSwitchType() moved earlier in orchagent main so the server type can be chosen based on switch type. - Update fake_zmqserver to return a ZmqMessageHandler* from handleReceivedData to match the new upstream signature. - Add zmq_route_orch_ut.cpp unit tests. Signed-off-by: Venkit Kasiviswanathan <venkit@nexthop.ai>
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
tagging @deepak-singhal0408 @prabhataravind @prsunny @qiluo-msft @vivekrnv for viz Also refer to the following PRs: Per Task instrumentation: Update m_toSync from mqPollThread: |
There was a problem hiding this comment.
Deadlock possibility in case of resync where we call consumer.addToSync(x) from inside doTask(ConsumerBase&)
|
|
||
| void ZmqRouteConsumer::drain() | ||
| { | ||
| std::lock_guard<std::mutex> lk(m_toSyncMutex); |
There was a problem hiding this comment.
With drain() holding the lock during the entire processing of doTask(), how the new behavior of directly pushing to m_toSync would work as ZmqRouteConsumerStateTable::handleReceivedData cannot acquire the lock. Can we add a comment as To-DO that we would yield in the future based on some time quanta
| #include "zmqrouteserver.h" | ||
| #include "zmqrouteconsumerstatetable.h" | ||
|
|
||
| extern int gZmqExecuteTimeQuantaMsecs; |
There was a problem hiding this comment.
Would it be good to add a comment of TODO on this will be used as we are not referencing this now
There was a problem hiding this comment.
Can we add a test where producers call addToSync while drain() is in progress ?
| void ZmqRouteOrch::doTask(Consumer &consumer) | ||
| { | ||
| // When ZMQ disabled, forward data from Consumer | ||
| doTask((ConsumerBase &)consumer); |
There was a problem hiding this comment.
| doTask((ConsumerBase &)consumer); | |
| doTask(static_cast<ConsumerBase&>(consumer)); |
What I did
Introduce a dedicated
ZmqRouteServer(withZmqRouteOrchandZmqRouteConsumer) forRouteOrchto receive APPL_DB route updates from fpmsyncd. Non-fabric / non-dpu switches now create aZmqRouteServerinstead of the genericZmqServer; fabric and DPU continue to use the existingZmqServer.ZmqRouteConsumermerges incoming tuples intom_toSyncfrom themqPollThreadingress callback underm_toSyncMutex, and only notifies the orch main loop once a batch (gMaxBulkSize) has accumulated. To support this concurrent access,ConsumerBase::addToSyncanddumpPendingTasksare madevirtualso the route consumer can wrap them in a lock, while the default single-threaded base remains lock-free.Other changes:
create_zmq_route_server()factory added inlib/orch_zmq_config.getCfgSwitchType()moved earlier in orchagentmainso the server type can be chosen by switch type.fake_zmqserverupdated to return aZmqMessageHandler*fromhandleReceivedDatato match the new signature.tests/mock_tests/zmq_route_orch_ut.cpp.Why I did it
The generic
ZmqServerserializes all message handling on a single thread. For route-heavy workloads, dispatching route updates on a dedicated server with its own poll thread improves throughput and decouples the route ingestion path from the rest of the orchagent ZMQ channel.How I verified it
make_exit=0).zmq_route_orch_ut.cppcovering the new consumer.Details if related
N/A