@@ -23,6 +23,7 @@ import (
2323 "github.com/prometheus/common/model"
2424 "github.com/prometheus/prometheus/model/labels"
2525 "github.com/prometheus/prometheus/prompb"
26+ writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2627 "github.com/prometheus/prometheus/promql/parser"
2728 "github.com/stretchr/testify/require"
2829 "github.com/thanos-io/thanos/pkg/block"
@@ -1701,6 +1702,161 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
17011702 runQueryFuzzTestCases (t , ps , c1 , c2 , end , start , end , scrapeInterval , 1000 , false )
17021703}
17031704
1705+ // TestRW1vsRW2QueryFuzz pushes the same time series data to two isolated Cortex instances,
1706+ // one via PRW1 and one via PRW2, then uses promqlsmith to generate random PromQL queries,
1707+ // and verifies that both instances return identical results.
1708+ func TestRW1vsRW2QueryFuzz (t * testing.T ) {
1709+ s , err := e2e .NewScenario (networkName )
1710+ require .NoError (t , err )
1711+ defer s .Close ()
1712+
1713+ consul1 := e2edb .NewConsulWithName ("consul-rw1" )
1714+ consul2 := e2edb .NewConsulWithName ("consul-rw2" )
1715+ require .NoError (t , s .StartAndWaitReady (consul1 , consul2 ))
1716+
1717+ flags := mergeFlags (
1718+ AlertmanagerLocalFlags (),
1719+ map [string ]string {
1720+ "-store.engine" : blocksStorageEngine ,
1721+ "-blocks-storage.backend" : "filesystem" ,
1722+ "-blocks-storage.tsdb.head-compaction-interval" : "4m" ,
1723+ "-blocks-storage.tsdb.block-ranges-period" : "2h" ,
1724+ "-blocks-storage.tsdb.ship-interval" : "1h" ,
1725+ "-blocks-storage.bucket-store.sync-interval" : "15m" ,
1726+ "-blocks-storage.tsdb.retention-period" : "2h" ,
1727+ "-blocks-storage.bucket-store.index-cache.backend" : tsdb .IndexCacheBackendInMemory ,
1728+ // Ingester.
1729+ "-ring.store" : "consul" ,
1730+ // Distributor.
1731+ "-distributor.replication-factor" : "1" ,
1732+ "-distributor.remote-writev2-enabled" : "true" ,
1733+ // Alert manager.
1734+ "-alertmanager.web.external-url" : "http://localhost/alertmanager" ,
1735+ },
1736+ )
1737+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
1738+
1739+ path1 := path .Join (s .SharedDir (), "cortex-rw1" )
1740+ path2 := path .Join (s .SharedDir (), "cortex-rw2" )
1741+
1742+ flags1 := mergeFlags (flags , map [string ]string {
1743+ "-blocks-storage.filesystem.dir" : path1 ,
1744+ "-consul.hostname" : consul1 .NetworkHTTPEndpoint (),
1745+ })
1746+ flags2 := mergeFlags (flags , map [string ]string {
1747+ "-blocks-storage.filesystem.dir" : path2 ,
1748+ "-consul.hostname" : consul2 .NetworkHTTPEndpoint (),
1749+ })
1750+
1751+ cortexRW1 := e2ecortex .NewSingleBinary ("cortex-rw1" , flags1 , "" )
1752+ cortexRW2 := e2ecortex .NewSingleBinary ("cortex-rw2" , flags2 , "" )
1753+ require .NoError (t , s .StartAndWaitReady (cortexRW1 , cortexRW2 ))
1754+
1755+ require .NoError (t , cortexRW1 .WaitSumMetrics (e2e .Equals (float64 (512 )), "cortex_ring_tokens_total" ))
1756+ require .NoError (t , cortexRW2 .WaitSumMetrics (e2e .Equals (float64 (512 )), "cortex_ring_tokens_total" ))
1757+
1758+ c1 , err := e2ecortex .NewClient (cortexRW1 .HTTPEndpoint (), cortexRW1 .HTTPEndpoint (), "" , "" , "user-1" )
1759+ require .NoError (t , err )
1760+ c2 , err := e2ecortex .NewClient (cortexRW2 .HTTPEndpoint (), cortexRW2 .HTTPEndpoint (), "" , "" , "user-1" )
1761+ require .NoError (t , err )
1762+
1763+ now := time .Now ()
1764+ start := now .Add (- time .Hour * 2 )
1765+ end := now .Add (- time .Hour )
1766+ numSeries := 3
1767+ numSamples := 60
1768+ scrapeInterval := time .Minute
1769+
1770+ // Generate the same series data once as prompb (PRW1 format).
1771+ // The exact same timestamps/values will be used for PRW2 via convertSeriesToPRW2.
1772+ lbls := make ([]labels.Labels , numSeries * 2 )
1773+ serieses := make ([]prompb.TimeSeries , numSeries * 2 )
1774+
1775+ for i := 0 ; i < numSeries ; i ++ {
1776+ series := e2e .GenerateSeriesWithSamples ("test_series_a" , start , scrapeInterval , i * numSamples , numSamples ,
1777+ prompb.Label {Name : "job" , Value : "test" },
1778+ prompb.Label {Name : "series" , Value : strconv .Itoa (i )},
1779+ )
1780+ serieses [i ] = series
1781+ builder := labels .NewBuilder (labels .EmptyLabels ())
1782+ for _ , lbl := range series .Labels {
1783+ builder .Set (lbl .Name , lbl .Value )
1784+ }
1785+ lbls [i ] = builder .Labels ()
1786+ }
1787+ for i := numSeries ; i < 2 * numSeries ; i ++ {
1788+ prompbLabels := []prompb.Label {
1789+ {Name : "job" , Value : "test" },
1790+ {Name : "series" , Value : strconv .Itoa (i )},
1791+ }
1792+ switch i % 3 {
1793+ case 0 :
1794+ prompbLabels = append (prompbLabels , prompb.Label {Name : "status_code" , Value : "200" })
1795+ case 1 :
1796+ prompbLabels = append (prompbLabels , prompb.Label {Name : "status_code" , Value : "400" })
1797+ default :
1798+ prompbLabels = append (prompbLabels , prompb.Label {Name : "status_code" , Value : "500" })
1799+ }
1800+ series := e2e .GenerateSeriesWithSamples ("test_series_b" , start , scrapeInterval , i * numSamples , numSamples , prompbLabels ... )
1801+ serieses [i ] = series
1802+ builder := labels .NewBuilder (labels .EmptyLabels ())
1803+ for _ , lbl := range series .Labels {
1804+ builder .Set (lbl .Name , lbl .Value )
1805+ }
1806+ lbls [i ] = builder .Labels ()
1807+ }
1808+
1809+ // Push via PRW1 to cortex-rw1.
1810+ res , err := c1 .Push (serieses )
1811+ require .NoError (t , err )
1812+ require .Equal (t , 200 , res .StatusCode )
1813+
1814+ // Convert the same series to PRW2 format and push to cortex-rw2.
1815+ symbols , v2Series := convertSeriesToPRW2 (serieses )
1816+ _ , err = c2 .PushV2 (symbols , v2Series )
1817+ require .NoError (t , err )
1818+
1819+ seed := now .Unix ()
1820+ rnd := rand .New (rand .NewSource (seed ))
1821+
1822+ ctx := context .Background ()
1823+ waitUntilReady (t , ctx , c1 , c2 , `{job="test"}` , start , end )
1824+
1825+ opts := []promqlsmith.Option {
1826+ promqlsmith .WithEnabledFunctions (enabledFunctions ),
1827+ promqlsmith .WithEnabledAggrs (enabledAggrs ),
1828+ promqlsmith .WithEnableExperimentalPromQLFunctions (true ),
1829+ }
1830+ ps := promqlsmith .New (rnd , lbls , opts ... )
1831+
1832+ runQueryFuzzTestCases (t , ps , c1 , c2 , end , start , end , scrapeInterval , 1000 , false )
1833+ }
1834+
1835+ func convertSeriesToPRW2 (timeSeries []prompb.TimeSeries ) ([]string , []writev2.TimeSeries ) {
1836+ st := writev2 .NewSymbolTable ()
1837+ v2Series := make ([]writev2.TimeSeries , 0 , len (timeSeries ))
1838+
1839+ for _ , ts := range timeSeries {
1840+ lb := labels .NewScratchBuilder (len (ts .Labels ))
1841+ for _ , l := range ts .Labels {
1842+ lb .Add (l .Name , l .Value )
1843+ }
1844+ lb .Sort ()
1845+
1846+ samples := make ([]writev2.Sample , len (ts .Samples ))
1847+ for i , s := range ts .Samples {
1848+ samples [i ] = writev2.Sample {Timestamp : s .Timestamp , Value : s .Value }
1849+ }
1850+
1851+ v2Series = append (v2Series , writev2.TimeSeries {
1852+ LabelsRefs : st .SymbolizeLabels (lb .Labels (), nil ),
1853+ Samples : samples ,
1854+ })
1855+ }
1856+
1857+ return st .Symbols (), v2Series
1858+ }
1859+
17041860// waitUntilReady is a helper function to wait and check if both servers to test load the expected data.
17051861func waitUntilReady (t * testing.T , ctx context.Context , c1 , c2 * e2ecortex.Client , query string , start , end time.Time ) {
17061862 retries := backoff .New (ctx , backoff.Config {
0 commit comments