Skip to content

Commit 33cb8e0

Browse files
authored
shard orchestrator mappings improvement (#1783)
1 parent 72e8a25 commit 33cb8e0

1 file changed

Lines changed: 26 additions & 1 deletion

File tree

pkg/workflows/shardorchestrator/store.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,19 +168,44 @@ func (s *Store) GetAllWorkflowMappings(ctx context.Context) ([]*WorkflowMappingS
168168

169169
// ReportShardRegistration is called when a shard reports its registered workflows
170170
// This helps track which workflows each shard has successfully loaded
171+
// It also updates workflowMappings so GetWorkflowShardMapping returns correct data
171172
func (s *Store) ReportShardRegistration(ctx context.Context, shardID uint32, workflowIDs []string) error {
172173
s.mu.Lock()
173174
defer s.mu.Unlock()
174175

175-
// Clear and update
176+
now := time.Now()
177+
178+
// Clear and update shard registrations
176179
s.shardRegistrations[shardID] = make(map[string]bool)
177180
for _, wfID := range workflowIDs {
178181
s.shardRegistrations[shardID][wfID] = true
179182
}
180183

184+
// Also update workflowMappings - when a shard reports it has a workflow,
185+
// that's authoritative information about where the workflow is running
186+
for _, wfID := range workflowIDs {
187+
existing, ok := s.workflowMappings[wfID]
188+
if !ok || existing.NewShardID != shardID {
189+
s.workflowMappings[wfID] = &WorkflowMappingState{
190+
WorkflowID: wfID,
191+
OldShardID: 0,
192+
NewShardID: shardID,
193+
TransitionState: StateSteady,
194+
UpdatedAt: now,
195+
}
196+
if ok {
197+
s.workflowMappings[wfID].OldShardID = existing.NewShardID
198+
}
199+
}
200+
}
201+
202+
s.mappingVersion++
203+
s.lastUpdateTime = now
204+
181205
s.logger.Debugw("Updated shard registrations",
182206
"shardID", shardID,
183207
"workflowCount", len(workflowIDs),
208+
"version", s.mappingVersion,
184209
)
185210

186211
return nil

0 commit comments

Comments
 (0)