Skip to content

Commit 76bfdf7

Browse files
sandy2008claude
andcommitted
fix(compactor): make DeleteLocalSyncFiles tests deterministic (pin ring tokens, drive compaction manually)
TestCompactor_DeleteLocalSyncFiles and its partition twin relied on timer-driven compaction cycles and on the random per-instance ring tokens drawn at startup. With 2 compactors x 512 random tokens and only 10 fixed users there is a measured ~0.103% (~1-in-960 per run) chance that the second compactor owns zero of the users. In such runs every c2 cycle completes without ever creating a meta-sync directory, so the condition polled since #7567 (CompactionRunsCompleted >= 2 && len(dirs) > 0) is permanently false and the test burns the entire 30s budget: the exact 32.8s arm64 CI failure of #7608, and the true root cause of #7565 (#7567's "transient ring-view skew" was a misdiagnosis - no timeout can fix a permanently false condition). Restore the manual-drive structure the test had when introduced in #3851 (removed by #6510) and pin the ownership split, identically in both twins: - CompactionInterval = 10m: timers are out of the picture; the test drives compaction cycles itself. - Pin per-instance ring tokens via ShardingRing.TokensFilePath (new pinnedTokens helper): a guard token fnv32a(user)+1 for alternating users makes compactor-1 own user-1,3,5,7,9 and compactor-2 own user-2,4,6,8,10 deterministically, since the integer interval (h, h+1) is empty and ring lookup picks the first token strictly greater than the user hash. - Replace the timed CompactionRunsCompleted polls with direct compactUsers() calls on both compactors, gated by one poll waiting until BOTH compactors' ring views see two healthy ACTIVE instances. - Assert exact ownership counts (numUsers/2 each) instead of NotZero/NotEqual, so any silent un-pinning fails loudly. - Tolerate (only) context.Canceled from StopAndAwaitTerminated in the test cleanup: with a 10m interval the compactor is usually still in its initial jittered wait when stopped, and running() returns ctx.Err() there since #6510. Supersedes #7567's poll-based approach. Fixes #7608 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com>
1 parent 74185ef commit 76bfdf7

3 files changed

Lines changed: 127 additions & 34 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
* [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555
5858
* [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602
5959
* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573
60+
* [BUGFIX] Compactor: Fix flake in `TestCompactor_DeleteLocalSyncFiles` and `TestPartitionCompactor_DeleteLocalSyncFiles` by pinning per-instance ring tokens and driving compaction cycles manually; with random tokens the second compactor owned zero of the ten test users in ~1-in-1000 runs, making the previous wait condition permanently unsatisfiable. #7619
6061

6162
## 1.21.0 2026-04-24
6263

pkg/compactor/compactor_paritioning_test.go

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"os"
1111
"path"
12+
"path/filepath"
1213
"strings"
1314
"testing"
1415
"time"
@@ -1588,10 +1589,27 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) {
15881589
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
15891590
cfg.ShardingRing.KVStore.Mock = kvstore
15901591

1592+
cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually.
1593+
1594+
// Pin deterministic ring tokens so that each compactor owns exactly half of
1595+
// the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10).
1596+
// With random tokens there is a ~1-in-1000 chance per run that the second
1597+
// compactor owns zero users, which made the previous wait condition
1598+
// permanently unsatisfiable (#7565, #7608).
1599+
cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens")
1600+
require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath))
1601+
15911602
// Each compactor will get its own temp dir for storing local files.
15921603
c, _, tsdbPlanner, _, _ := prepareForPartitioning(t, cfg, inmem, nil, nil)
15931604
t.Cleanup(func() {
1594-
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
1605+
// With the long compaction interval the compactor is usually still
1606+
// waiting for its initial jittered compaction run when the test ends.
1607+
// Stopping it at that point makes running() return the context
1608+
// cancellation, which is reported as a service failure: tolerate it
1609+
// (and only it).
1610+
if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil {
1611+
require.ErrorIs(t, err, context.Canceled)
1612+
}
15951613
})
15961614

15971615
compactors = append(compactors, c)
@@ -1610,38 +1628,51 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) {
16101628
// Start first compactor
16111629
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1))
16121630

1613-
// Wait until a run has been completed on first compactor. This happens as soon as compactor starts.
1614-
cortex_testutil.Poll(t, 20*time.Second, true, func() any {
1615-
return prom_testutil.ToFloat64(c1.CompactionRunsCompleted) >= 1
1616-
})
1631+
// Run a compaction cycle on the first compactor: it is alone in the ring, so
1632+
// it owns (and syncs) all the users.
1633+
c1.compactUsers(context.Background())
1634+
require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories()))
16171635

16181636
require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600))
16191637

16201638
// Verify that first compactor has synced all the users, plus there is one extra we have just created.
16211639
require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories()))
16221640

1623-
// Now start second compactor, and wait until it runs compaction.
1641+
// Now start second compactor.
16241642
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2))
1625-
// Wait for at least two completed cycles so we sample after a steady-state
1626-
// ownership cycle, not mid-cycle following a zero-owned first cycle. The
1627-
// first cycle's CompactionRunsCompleted can increment with zero owned users
1628-
// due to transient ring-view skew at startup; sampling then would race with
1629-
// the second cycle's fetcher.NewBaseFetcher creating meta-sync directories
1630-
// and return a partial count.
1631-
cortex_testutil.Poll(t, 30*time.Second, true, func() any {
1632-
return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 &&
1633-
len(c2.listTenantsWithMetaSyncDirectories()) > 0
1643+
1644+
// Before driving ownership-dependent compaction cycles, wait until BOTH
1645+
// compactors' ring views see two healthy ACTIVE instances (RingOp is the
1646+
// operation ownUser itself queries). c2's own view is already barriered by
1647+
// starting() — it waits until c2 is ACTIVE in its own view, and every KV
1648+
// snapshot containing c2 also contains the earlier-registered c1 — but c1's
1649+
// ring watcher ingests c2's registration asynchronously, and the final
1650+
// c1.compactUsers() cleanup below depends on c1's view.
1651+
cortex_testutil.Poll(t, 10*time.Second, true, func() any {
1652+
for _, c := range compactors {
1653+
rs, err := c.ring.GetAllHealthy(RingOp)
1654+
if err != nil || len(rs.Instances) != 2 {
1655+
return false
1656+
}
1657+
}
1658+
return true
16341659
})
16351660

1661+
// Run a compaction cycle on the second compactor: with pinned tokens it owns
1662+
// exactly half of the users and creates a meta-sync directory for each of them.
1663+
c2.compactUsers(context.Background())
1664+
16361665
// Let's check how many users second compactor has.
16371666
c2Users := len(c2.listTenantsWithMetaSyncDirectories())
1667+
require.Equal(t, numUsers/2, c2Users)
16381668

16391669
// Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle.
16401670
c1.compactUsers(context.Background())
16411671
c1Users := len(c1.listTenantsWithMetaSyncDirectories())
16421672

1643-
// Now compactor 1 should have cleaned old sync files.
1644-
require.NotEqual(t, numUsers, c1Users)
1673+
// Now compactor 1 should have cleaned the sync files of the users it no longer
1674+
// owns (including "new-user"), keeping exactly its own half.
1675+
require.Equal(t, numUsers-numUsers/2, c1Users)
16451676
require.Equal(t, numUsers, c1Users+c2Users)
16461677
}
16471678

pkg/compactor/compactor_test.go

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"encoding/json"
88
"flag"
99
"fmt"
10+
"hash/fnv"
1011
"io"
12+
"math"
1113
"os"
1214
"path"
1315
"path/filepath"
@@ -1779,6 +1781,35 @@ func mockParquetMarker() string {
17791781
return string(content)
17801782
}
17811783

1784+
// pinnedTokens returns exactly 512 deterministic ring tokens (NumTokens is
1785+
// hardcoded to 512 in RingConfig.ToLifecyclerConfig; shorter token files are
1786+
// silently topped up with random tokens by the lifecycler) pinning ring
1787+
// ownership so that instance i (1-based) owns every 2nd test user: the guard
1788+
// token fnv32a(user)+1 is always the first token strictly greater than that
1789+
// user's hash, while the filler tokens sit far below every user hash. Pinning
1790+
// removes the ~1-in-1000 random-token draw where one compactor owns zero of
1791+
// the test users — the root cause of both #7565 and #7608. Do NOT replace the
1792+
// guards with evenly-spaced tokens: fnv32a("user-N") hashes step by the FNV
1793+
// prime 16777619 and resonate with regular spacings, which can degenerate to
1794+
// a 0/10 ownership split.
1795+
func pinnedTokens(t *testing.T, userIDs []string, instance int) ring.Tokens {
1796+
t.Helper()
1797+
1798+
tokens := make(ring.Tokens, 0, 512)
1799+
for k := instance - 1; k < len(userIDs); k += 2 {
1800+
h := fnv.New32a()
1801+
_, _ = h.Write([]byte(userIDs[k]))
1802+
require.NotEqual(t, uint32(math.MaxUint32), h.Sum32()) // the +1 below must not wrap
1803+
tokens = append(tokens, h.Sum32()+1)
1804+
}
1805+
for j := uint32(0); len(tokens) < 512; j++ {
1806+
tokens = append(tokens, uint32(instance)+2*j) // instance 1: odd 1..1013, instance 2: even 2..1014
1807+
}
1808+
require.Len(t, tokens, 512)
1809+
1810+
return tokens
1811+
}
1812+
17821813
func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
17831814
numUsers := 10
17841815

@@ -1812,10 +1843,27 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
18121843
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
18131844
cfg.ShardingRing.KVStore.Mock = kvstore
18141845

1846+
cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually.
1847+
1848+
// Pin deterministic ring tokens so that each compactor owns exactly half of
1849+
// the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10).
1850+
// With random tokens there is a ~1-in-1000 chance per run that the second
1851+
// compactor owns zero users, which made the previous wait condition
1852+
// permanently unsatisfiable (#7565, #7608).
1853+
cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens")
1854+
require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath))
1855+
18151856
// Each compactor will get its own temp dir for storing local files.
18161857
c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
18171858
t.Cleanup(func() {
1818-
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
1859+
// With the long compaction interval the compactor is usually still
1860+
// waiting for its initial jittered compaction run when the test ends.
1861+
// Stopping it at that point makes running() return the context
1862+
// cancellation, which is reported as a service failure: tolerate it
1863+
// (and only it).
1864+
if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil {
1865+
require.ErrorIs(t, err, context.Canceled)
1866+
}
18191867
})
18201868

18211869
compactors = append(compactors, c)
@@ -1834,38 +1882,51 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
18341882
// Start first compactor
18351883
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1))
18361884

1837-
// Wait until a run has been completed on first compactor. This happens as soon as compactor starts.
1838-
cortex_testutil.Poll(t, 10*time.Second, 1.0, func() any {
1839-
return prom_testutil.ToFloat64(c1.CompactionRunsCompleted)
1840-
})
1885+
// Run a compaction cycle on the first compactor: it is alone in the ring, so
1886+
// it owns (and syncs) all the users.
1887+
c1.compactUsers(context.Background())
1888+
require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories()))
18411889

18421890
require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600))
18431891

18441892
// Verify that first compactor has synced all the users, plus there is one extra we have just created.
18451893
require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories()))
18461894

1847-
// Now start second compactor, and wait until it runs compaction.
1895+
// Now start second compactor.
18481896
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2))
1849-
// Wait for at least two completed cycles so we sample after a steady-state
1850-
// ownership cycle, not mid-cycle following a zero-owned first cycle. The
1851-
// first cycle's CompactionRunsCompleted can increment with zero owned users
1852-
// due to transient ring-view skew at startup; sampling then would race with
1853-
// the second cycle's fetcher.NewBaseFetcher creating meta-sync directories
1854-
// and return a partial count.
1855-
cortex_testutil.Poll(t, 30*time.Second, true, func() any {
1856-
return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 &&
1857-
len(c2.listTenantsWithMetaSyncDirectories()) > 0
1897+
1898+
// Before driving ownership-dependent compaction cycles, wait until BOTH
1899+
// compactors' ring views see two healthy ACTIVE instances (RingOp is the
1900+
// operation ownUser itself queries). c2's own view is already barriered by
1901+
// starting() — it waits until c2 is ACTIVE in its own view, and every KV
1902+
// snapshot containing c2 also contains the earlier-registered c1 — but c1's
1903+
// ring watcher ingests c2's registration asynchronously, and the final
1904+
// c1.compactUsers() cleanup below depends on c1's view.
1905+
cortex_testutil.Poll(t, 10*time.Second, true, func() any {
1906+
for _, c := range compactors {
1907+
rs, err := c.ring.GetAllHealthy(RingOp)
1908+
if err != nil || len(rs.Instances) != 2 {
1909+
return false
1910+
}
1911+
}
1912+
return true
18581913
})
18591914

1915+
// Run a compaction cycle on the second compactor: with pinned tokens it owns
1916+
// exactly half of the users and creates a meta-sync directory for each of them.
1917+
c2.compactUsers(context.Background())
1918+
18601919
// Let's check how many users second compactor has.
18611920
c2Users := len(c2.listTenantsWithMetaSyncDirectories())
1921+
require.Equal(t, numUsers/2, c2Users)
18621922

18631923
// Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle.
18641924
c1.compactUsers(context.Background())
18651925
c1Users := len(c1.listTenantsWithMetaSyncDirectories())
18661926

1867-
// Now compactor 1 should have cleaned old sync files.
1868-
require.NotEqual(t, numUsers, c1Users)
1927+
// Now compactor 1 should have cleaned the sync files of the users it no longer
1928+
// owns (including "new-user"), keeping exactly its own half.
1929+
require.Equal(t, numUsers-numUsers/2, c1Users)
18691930
require.Equal(t, numUsers, c1Users+c2Users)
18701931
}
18711932

0 commit comments

Comments
 (0)