@@ -72,35 +72,98 @@ public CommunityExecutionPlanner(RunnerId instanceId,
7272 this .configuration = coreConfiguration ;
7373 }
7474
75+ /**
76+ * Gets the next execution plan using a two-phase audit read to prevent race conditions in concurrent executions.
77+ *
78+ * <p><b>How it works:</b></p>
79+ *
80+ * <ol>
81+ * <li><b>Initial Audit Read (Optimistic)</b> - Reads the audit log without holding the lock to determine
82+ * if any changes need execution. This avoids acquiring the lock when all changes are already executed.</li>
83+ *
84+ * <li><b>Early Exit Check</b> - If no changes require execution based on the initial read, returns immediately
85+ * without acquiring the lock.</li>
86+ *
87+ * <li><b>Lock Acquisition</b> - Acquires the lock to ensure only one instance executes changes.
88+ * If another instance holds the lock, this call blocks until the lock becomes available.</li>
89+ *
90+ * <li><b>Validated Audit Read (With Lock)</b> - Re-reads the audit log while holding the lock to get
91+ * the authoritative state. This detects if another instance executed changes while we waited for the lock.</li>
92+ *
93+ * <li><b>Plan Validation</b> - Rebuilds the execution plan with the validated audit data and checks if
94+ * execution is still needed. If another instance already executed the changes, releases the lock
95+ * and returns without executing.</li>
96+ *
97+ * <li><b>Concurrent Execution Detection</b> - Logs any differences between the initial and validated plans
98+ * to track when concurrent instances attempted to execute the same changes.</li>
99+ *
100+ * <li><b>Lock Refresh Daemon</b> - If enabled, starts a background daemon to periodically refresh the lock
101+ * during long-running executions to prevent lock expiration.</li>
102+ *
103+ * <li><b>Execution</b> - Returns an execution plan containing the next stage to execute while holding the lock.
104+ * The lock will be released after execution completes.</li>
105+ * </ol>
106+ *
107+ * <p><b>Concurrent Execution Handling:</b></p>
108+ * <pre>
109+ * Instance A: Read audit → Get lock → Re-read → Execute → Release
110+ * Instance B: Read audit → Wait for lock → Get lock → Re-read → Detect already executed → Release
111+ * </pre>
112+ *
113+ * <p><b>Error Handling:</b> If any exception occurs after acquiring the lock, the lock is released
114+ * in the catch block to prevent lock leaks.</p>
115+ *
116+ * @param loadedStages the list of loaded stages containing all defined changes
117+ * @return ExecutionPlan containing either stages to execute (with lock held) or CONTINUE (no lock)
118+ * @throws LockException if unable to acquire the distributed lock within the configured timeout
119+ */
75120 @ Override
76121 public ExecutionPlan getNextExecution (List <AbstractLoadedStage > loadedStages ) throws LockException {
77- Map <String , AuditEntry > auditSnapshot = auditReader .getAuditSnapshotByChangeId ();
78- logger .debug ("Pulled remote state:\n {}" , auditSnapshot );
79-
80- List <ExecutableStage > executableStages = loadedStages
81- .stream ()
82- .map (loadedStage -> {
83- // Convert audit status to action plan using the new action-based architecture
84- ChangeActionMap changeActionMap = CommunityChangeActionBuilder .build (loadedStage .getTasks (), auditSnapshot );
85- return loadedStage .applyActions (changeActionMap );
86- })
87- .collect (Collectors .toList ());
122+ Map <String , AuditEntry > initialSnapshot = auditReader .getAuditSnapshotByChangeId ();
123+ logger .debug ("Pulled initial remote state:\n {}" , initialSnapshot );
88124
89- Optional <ExecutableStage > nextStageOpt = executableStages .stream ()
90- .filter (ExecutableStage ::isExecutionRequired )
91- .findFirst ();
125+ List <ExecutableStage > initialStages = buildExecutableStages (loadedStages , initialSnapshot );
126+
127+ if (!hasExecutableStages (initialStages )) {
128+ return ExecutionPlan .CONTINUE (initialStages );
129+ }
130+
131+ Lock lock = acquireLock ();
132+
133+ try {
134+ Map <String , AuditEntry > validatedSnapshot = auditReader .getAuditSnapshotByChangeId ();
135+
136+ List <ExecutableStage > validatedStages = buildExecutableStages (loadedStages , validatedSnapshot );
92137
138+ Optional <ExecutableStage > nextStageOpt = getFirstExecutableStage (validatedStages );
139+
140+ if (!nextStageOpt .isPresent ()) {
141+ logger .info (
142+ "Execution plan invalidated after lock acquisition. " +
143+ "All changes were executed by another instance during lock wait. " +
144+ "Releasing lock and continuing."
145+ );
146+ lock .release ();
147+ return ExecutionPlan .CONTINUE (validatedStages );
148+ }
149+
150+ logPlanChanges (initialStages , validatedStages );
93151
94- if (nextStageOpt .isPresent ()) {
95- Lock lock = acquireLock ();
96152 if (configuration .isEnableRefreshDaemon ()) {
97153 new LockRefreshDaemon (lock , TimeService .getDefault ()).start ();
98154 }
155+
99156 String executionId = ExecutionId .getNewExecutionId ();
100- return ExecutionPlan .newExecution (executionId , lock , Collections .singletonList (nextStageOpt .get ()));
157+ return ExecutionPlan .newExecution (
158+ executionId ,
159+ lock ,
160+ Collections .singletonList (nextStageOpt .get ())
161+ );
101162
102- } else {
103- return ExecutionPlan .CONTINUE (executableStages );
163+ } catch (Exception e ) {
164+ logger .error ("Error during execution planning - releasing lock" , e );
165+ lock .release ();
166+ throw e ;
104167 }
105168 }
106169
@@ -115,6 +178,86 @@ private Lock acquireLock() {
115178 );
116179 }
117180
181+ /**
182+ * Builds executable stages from audit snapshot.
183+ *
184+ * @param loadedStages the loaded stages to process
185+ * @param auditSnapshot the audit snapshot containing change states
186+ * @return list of executable stages
187+ */
188+ private List <ExecutableStage > buildExecutableStages (
189+ List <AbstractLoadedStage > loadedStages ,
190+ Map <String , AuditEntry > auditSnapshot ) {
191+
192+ return loadedStages .stream ()
193+ .map (loadedStage -> {
194+ ChangeActionMap changeActionMap = CommunityChangeActionBuilder .build (
195+ loadedStage .getTasks (),
196+ auditSnapshot
197+ );
198+ return loadedStage .applyActions (changeActionMap );
199+ })
200+ .collect (Collectors .toList ());
201+ }
202+
203+ /**
204+ * Checks if any stage requires execution.
205+ *
206+ * @param stages the list of executable stages
207+ * @return true if at least one stage requires execution
208+ */
209+ private boolean hasExecutableStages (List <ExecutableStage > stages ) {
210+ return stages .stream ().anyMatch (ExecutableStage ::isExecutionRequired );
211+ }
212+
213+ /**
214+ * Gets the first executable stage.
215+ *
216+ * @param stages the list of executable stages
217+ * @return optional containing the first executable stage, or empty if none
218+ */
219+ private Optional <ExecutableStage > getFirstExecutableStage (List <ExecutableStage > stages ) {
220+ return stages .stream ()
221+ .filter (ExecutableStage ::isExecutionRequired )
222+ .findFirst ();
223+ }
224+
225+ /**
226+ * Logs differences between initial and validated plans to detect concurrent executions.
227+ *
228+ * @param initialStages the initially planned stages
229+ * @param validatedStages the validated stages after lock acquisition
230+ */
231+ private void logPlanChanges (List <ExecutableStage > initialStages , List <ExecutableStage > validatedStages ) {
232+ long initialCount = countExecutableTasks (initialStages );
233+ long validatedCount = countExecutableTasks (validatedStages );
234+
235+ if (initialCount != validatedCount ) {
236+ logger .warn (
237+ "Execution plan changed during lock acquisition: {} -> {} executable tasks. " +
238+ "This indicates concurrent execution - {} tasks were executed by another instance." ,
239+ initialCount ,
240+ validatedCount ,
241+ initialCount - validatedCount
242+ );
243+ } else {
244+ logger .debug ("Execution plan validated after lock acquisition: {} executable tasks" , validatedCount );
245+ }
246+ }
247+
248+ /**
249+ * Counts the number of executable tasks across all stages.
250+ *
251+ * @param stages the list of stages
252+ * @return total count of executable tasks
253+ */
254+ private long countExecutableTasks (List <ExecutableStage > stages ) {
255+ return stages .stream ()
256+ .filter (ExecutableStage ::isExecutionRequired )
257+ .mapToLong (stage -> stage .getTasks ().size ())
258+ .sum ();
259+ }
260+
118261
119262 public static class Builder {
120263 private RunnerId runnerId ;
0 commit comments