|
2 | 2 | title: Use Change Feed for cross-partition query optimization with materialized views |
3 | 3 | impact: HIGH |
4 | 4 | impactDescription: eliminates cross-partition query overhead for admin/analytics scenarios |
5 | | -tags: pattern, change-feed, materialized-views, cross-partition, query-optimization |
| 5 | +tags: pattern, change-feed, materialized-views, cross-partition, query-optimization, idempotency, at-least-once |
6 | 6 | --- |
7 | 7 |
|
8 | 8 | ## Use Change Feed for Materialized Views or Global Secondary Index |
@@ -186,14 +186,110 @@ var query = ordersByStatusContainer.GetItemQueryIterator<OrderStatusView>( |
186 | 186 | | Better scalability | Eventual consistency (slight delay) | |
187 | 187 | | Lower RU cost per query | RU cost for writes to both containers | |
188 | 188 |
|
| 189 | +**⚠️ Change Feed delivers events at-least-once.** Your handler MUST be idempotent — processing the same event twice must produce the same result. Never use `counter += 1` or `get() + 1` patterns in Change Feed handlers, as event replay will silently double-count. |
| 190 | + |
| 191 | +**Incorrect — non-idempotent handler (counter drift on replay):** |
| 192 | + |
| 193 | +```java |
| 194 | +// ❌ WRONG — at-least-once replay doubles counts |
| 195 | +private void handleChanges(List<JsonNode> changes, ChangeFeedProcessorContext context) { |
| 196 | + for (JsonNode node : changes) { |
| 197 | + GameScore score = objectMapper.treeToValue(node, GameScore.class); |
| 198 | + PlayerProfile profile = playerRepository.findById(score.getPlayerId()).orElseGet(PlayerProfile::new); |
| 199 | + profile.setTotalGamesPlayed(profile.getTotalGamesPlayed() + 1); // NON-IDEMPOTENT |
| 200 | + profile.setTotalScore(profile.getTotalScore() + score.getScore()); // NON-IDEMPOTENT |
| 201 | + playerRepository.save(profile); |
| 202 | + } |
| 203 | +} |
| 204 | +``` |
| 205 | + |
| 206 | +```csharp |
| 207 | +// ❌ WRONG — same problem in .NET |
| 208 | +async Task HandleChangesAsync(IReadOnlyCollection<GameScore> changes, CancellationToken ct) |
| 209 | +{ |
| 210 | + foreach (var score in changes) |
| 211 | + { |
| 212 | + var profile = await GetProfileAsync(score.PlayerId); |
| 213 | + profile.TotalGamesPlayed += 1; // NON-IDEMPOTENT |
| 214 | + profile.TotalScore += score.Score; // NON-IDEMPOTENT |
| 215 | + await SaveProfileAsync(profile); |
| 216 | + } |
| 217 | +} |
| 218 | +``` |
| 219 | + |
| 220 | +**Correct — idempotent alternatives:** |
| 221 | + |
| 222 | +Use one of these patterns to ensure safe replay: |
| 223 | + |
| 224 | +**1. Replace pattern — write absolute values, not deltas:** |
| 225 | + |
| 226 | +```java |
| 227 | +// ✅ CORRECT — replace with absolute value from the event |
| 228 | +private void handleChanges(List<JsonNode> changes, ChangeFeedProcessorContext context) { |
| 229 | + for (JsonNode node : changes) { |
| 230 | + GameScore score = objectMapper.treeToValue(node, GameScore.class); |
| 231 | + PlayerProfile profile = playerRepository.findById(score.getPlayerId()).orElseGet(PlayerProfile::new); |
| 232 | + // Idempotent: same event replayed produces same result |
| 233 | + profile.setHighScore(Math.max(profile.getHighScore(), score.getScore())); |
| 234 | + playerRepository.save(profile); |
| 235 | + } |
| 236 | +} |
| 237 | +``` |
| 238 | + |
| 239 | +**2. Conditional write — use ETags to detect duplicate processing:** |
| 240 | + |
| 241 | +```csharp |
| 242 | +// ✅ CORRECT — ETag prevents duplicate processing |
| 243 | +async Task HandleChangesAsync(IReadOnlyCollection<GameScore> changes, CancellationToken ct) |
| 244 | +{ |
| 245 | + foreach (var score in changes) |
| 246 | + { |
| 247 | + var response = await container.ReadItemAsync<PlayerProfile>( |
| 248 | + score.PlayerId, new PartitionKey(score.PlayerId)); |
| 249 | + var profile = response.Resource; |
| 250 | + profile.HighScore = Math.Max(profile.HighScore, score.Score); |
| 251 | + await container.ReplaceItemAsync(profile, profile.Id, |
| 252 | + new PartitionKey(profile.Id), |
| 253 | + new ItemRequestOptions { IfMatchEtag = response.ETag }); |
| 254 | + } |
| 255 | +} |
| 256 | +``` |
| 257 | + |
| 258 | +**3. Mark-and-rebuild — flag affected records and recalculate from source of truth:** |
| 259 | + |
| 260 | +```python |
| 261 | +# ✅ CORRECT — mark dirty and rebuild from source data |
| 262 | +async def handle_changes(changes): |
| 263 | + for change in changes: |
| 264 | + player_id = change["playerId"] |
| 265 | + # Mark the profile as needing recalculation |
| 266 | + await profiles_container.patch_item( |
| 267 | + item=player_id, |
| 268 | + partition_key=player_id, |
| 269 | + patch_operations=[ |
| 270 | + {"op": "set", "path": "/needsRecalc", "value": True} |
| 271 | + ] |
| 272 | + ) |
| 273 | + # Separate process recalculates from source of truth |
| 274 | +``` |
| 275 | + |
| 276 | +| Idempotent Pattern | When to Use | Trade-off | |
| 277 | +|--------------------|-------------|-----------| |
| 278 | +| Replace (absolute value) | High scores, latest status, max/min values | Only works for non-cumulative data | |
| 279 | +| Conditional write (ETag) | Any update where you can detect duplicates | Extra read + possible retry on conflict | |
| 280 | +| Mark-and-rebuild | Counters, aggregations, cumulative totals | Higher latency, requires rebuild process | |
| 281 | + |
189 | 282 | **Key Points:** |
| 283 | +- **Change Feed delivers at-least-once** — handlers MUST be idempotent |
190 | 284 | - Change Feed provides reliable, ordered event stream of all document changes |
191 | 285 | - Materialized views trade storage cost for query efficiency |
192 | 286 | - Updates are eventually consistent (typically <1 second delay) |
193 | 287 | - Use lease container to track processor progress (enables resume after failures) |
| 288 | +- Never use `counter += 1`, `total += value`, or `get() + 1` patterns in Change Feed handlers |
194 | 289 | - Consider Azure Functions with Cosmos DB trigger for serverless implementation |
195 | | -- Consider Global Secondary Index (GSI) implementation as alternative for automatic sync between containers with different partition keys. |
| 290 | +- Consider Global Secondary Index (GSI) implementation as alternative for automatic sync between containers with different partition keys |
196 | 291 |
|
197 | 292 | Reference(s): |
198 | 293 | [Change feed in Azure Cosmos DB](https://learn.microsoft.com/azure/cosmos-db/change-feed) |
| 294 | +[Change feed design patterns in Azure Cosmos DB](https://learn.microsoft.com/azure/cosmos-db/nosql/change-feed-design-patterns) |
199 | 295 | [Global Secondary Indexes (GSI) in Azure Cosmos DB](https://learn.microsoft.com/en-us/azure/cosmos-db/global-secondary-indexes) |
0 commit comments