Skip to content

Commit a494b31

Browse files
committed
Reflect self-review comments
1 parent 158dff8 commit a494b31

2 files changed

Lines changed: 124 additions & 379 deletions

File tree

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 61 additions & 232 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,7 +1633,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16331633
-230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L,
16341634
-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
16351635

1636-
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - scan bounded range",
1636+
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan",
16371637
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
16381638

16391639
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
@@ -1648,151 +1648,50 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16481648
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
16491649
}
16501650

1651-
val timestamps = Seq(100L, 200L, 300L, 400L, 500L)
1652-
timestamps.foreach { ts =>
1651+
diverseTimestamps.foreach { ts =>
16531652
store.put(dataToKeyRowWithRangeScan(ts, "a"), dataToValueRow(ts.toInt), cfName)
16541653
}
16551654

1656-
val startKey = dataToKeyRowWithRangeScan(200L, "a")
1657-
val endKey = dataToKeyRowWithRangeScan(401L, "a")
1658-
val iter = store.scan(Some(startKey), Some(endKey), cfName)
1659-
val results = iter.map { pair =>
1655+
// Bounded positive range [0, 100)
1656+
val boundedIter = store.scan(
1657+
Some(dataToKeyRowWithRangeScan(0L, "a")),
1658+
Some(dataToKeyRowWithRangeScan(100L, "a")), cfName)
1659+
val boundedResults = boundedIter.map { pair =>
16601660
(pair.key.getLong(0), pair.value.getInt(0))
16611661
}.toList
1662-
iter.close()
1663-
1664-
assert(results.map(_._1) === Seq(200L, 300L, 400L))
1665-
assert(results.map(_._2) === Seq(200, 300, 400))
1666-
} finally {
1667-
if (!store.hasCommitted) store.abort()
1668-
}
1669-
}
1670-
}
1671-
1672-
testWithColumnFamiliesAndEncodingTypes(
1673-
"rocksdb range scan - scan with None startKey scans from beginning",
1674-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1675-
1676-
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
1677-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1678-
colFamiliesEnabled)) { provider =>
1679-
val store = provider.getStore(0)
1680-
try {
1681-
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
1682-
if (colFamiliesEnabled) {
1683-
store.createColFamilyIfAbsent(cfName,
1684-
keySchemaWithRangeScan, valueSchema,
1685-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
1686-
}
1687-
1688-
val timestamps = Seq(100L, 200L, 300L, 400L, 500L)
1689-
timestamps.foreach { ts =>
1690-
store.put(dataToKeyRowWithRangeScan(ts, "a"), dataToValueRow(ts.toInt), cfName)
1691-
}
1692-
1693-
val endKey = dataToKeyRowWithRangeScan(301L, "a")
1694-
val iter = store.scan(None, Some(endKey), cfName)
1695-
val results = iter.map(_.key.getLong(0)).toList
1696-
iter.close()
1697-
1698-
assert(results === Seq(100L, 200L, 300L))
1699-
} finally {
1700-
if (!store.hasCommitted) store.abort()
1701-
}
1702-
}
1703-
}
1704-
1705-
testWithColumnFamiliesAndEncodingTypes(
1706-
"rocksdb range scan - scan with None endKey scans to end",
1707-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1708-
1709-
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
1710-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1711-
colFamiliesEnabled)) { provider =>
1712-
val store = provider.getStore(0)
1713-
try {
1714-
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
1715-
if (colFamiliesEnabled) {
1716-
store.createColFamilyIfAbsent(cfName,
1717-
keySchemaWithRangeScan, valueSchema,
1718-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
1719-
}
1720-
1721-
val timestamps = Seq(100L, 200L, 300L, 400L, 500L)
1722-
timestamps.foreach { ts =>
1723-
store.put(dataToKeyRowWithRangeScan(ts, "a"), dataToValueRow(ts.toInt), cfName)
1724-
}
1725-
1726-
val startKey = dataToKeyRowWithRangeScan(300L, "a")
1727-
val iter = store.scan(Some(startKey), None, cfName)
1728-
val results = iter.map(_.key.getLong(0)).toList
1729-
iter.close()
1730-
1731-
assert(results === Seq(300L, 400L, 500L))
1732-
} finally {
1733-
if (!store.hasCommitted) store.abort()
1734-
}
1735-
}
1736-
}
1737-
1738-
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - scan empty range",
1739-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1740-
1741-
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
1742-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1743-
colFamiliesEnabled)) { provider =>
1744-
val store = provider.getStore(0)
1745-
try {
1746-
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
1747-
if (colFamiliesEnabled) {
1748-
store.createColFamilyIfAbsent(cfName,
1749-
keySchemaWithRangeScan, valueSchema,
1750-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
1751-
}
1752-
1753-
store.put(dataToKeyRowWithRangeScan(100L, "a"), dataToValueRow(100), cfName)
1754-
store.put(dataToKeyRowWithRangeScan(500L, "a"), dataToValueRow(500), cfName)
1755-
1756-
val startKey = dataToKeyRowWithRangeScan(200L, "a")
1757-
val endKey = dataToKeyRowWithRangeScan(300L, "a")
1758-
val iter = store.scan(Some(startKey), Some(endKey), cfName)
1759-
assert(!iter.hasNext)
1760-
iter.close()
1761-
} finally {
1762-
if (!store.hasCommitted) store.abort()
1763-
}
1764-
}
1765-
}
1766-
1767-
testWithColumnFamiliesAndEncodingTypes(
1768-
"rocksdb range scan - scan with diverse timestamps bounded range",
1769-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1770-
1771-
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
1772-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1773-
colFamiliesEnabled)) { provider =>
1774-
val store = provider.getStore(0)
1775-
try {
1776-
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
1777-
if (colFamiliesEnabled) {
1778-
store.createColFamilyIfAbsent(cfName,
1779-
keySchemaWithRangeScan, valueSchema,
1780-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
1781-
}
1782-
1783-
diverseTimestamps.zipWithIndex.foreach { case (ts, idx) =>
1784-
store.put(dataToKeyRowWithRangeScan(ts, "a"), dataToValueRow(idx), cfName)
1785-
}
1786-
1787-
// Scan negative range: [-300, 0)
1788-
val startKey = dataToKeyRowWithRangeScan(-300L, "a")
1789-
val endKey = dataToKeyRowWithRangeScan(0L, "a")
1790-
val iter = store.scan(Some(startKey), Some(endKey), cfName)
1791-
val results = iter.map(_.key.getLong(0)).toList
1792-
iter.close()
1793-
1794-
val expected = diverseTimestamps.filter(ts => ts >= -300 && ts < 0).sorted
1795-
assert(results === expected)
1662+
boundedIter.close()
1663+
val expectedBoundedTs = diverseTimestamps.filter(ts => ts >= 0 && ts < 100).sorted
1664+
assert(boundedResults.map(_._1) === expectedBoundedTs)
1665+
assert(boundedResults.map(_._2) === expectedBoundedTs.map(_.toInt))
1666+
1667+
// None startKey scans from beginning to 0
1668+
val noneStartIter = store.scan(
1669+
None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
1670+
val noneStartResults = noneStartIter.map(_.key.getLong(0)).toList
1671+
noneStartIter.close()
1672+
assert(noneStartResults === diverseTimestamps.filter(_ < 0).sorted)
1673+
1674+
// None endKey scans from 1000 to end
1675+
val noneEndIter = store.scan(
1676+
Some(dataToKeyRowWithRangeScan(1000L, "a")), None, cfName)
1677+
val noneEndResults = noneEndIter.map(_.key.getLong(0)).toList
1678+
noneEndIter.close()
1679+
assert(noneEndResults === diverseTimestamps.filter(_ >= 1000).sorted)
1680+
1681+
// Empty range [10, 31) - no entries between 9 and 32
1682+
val emptyIter = store.scan(
1683+
Some(dataToKeyRowWithRangeScan(10L, "a")),
1684+
Some(dataToKeyRowWithRangeScan(31L, "a")), cfName)
1685+
assert(!emptyIter.hasNext)
1686+
emptyIter.close()
1687+
1688+
// Bounded negative range [-300, 0)
1689+
val negIter = store.scan(
1690+
Some(dataToKeyRowWithRangeScan(-300L, "a")),
1691+
Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
1692+
val negResults = negIter.map(_.key.getLong(0)).toList
1693+
negIter.close()
1694+
assert(negResults === diverseTimestamps.filter(ts => ts >= -300 && ts < 0).sorted)
17961695
} finally {
17971696
if (!store.hasCommitted) store.abort()
17981697
}
@@ -1838,88 +1737,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
18381737
}
18391738

18401739
testWithColumnFamiliesAndEncodingTypes(
1841-
"rocksdb range scan - scanWithMultiValues bounded range",
1842-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1843-
1844-
// Multiple values per key requires column families
1845-
if (colFamiliesEnabled) {
1846-
tryWithProviderResource(newStoreProvider(
1847-
StateStoreId(newDir(), Random.nextInt(), 0),
1848-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1849-
keySchema = keySchemaWithRangeScan,
1850-
useColumnFamilies = colFamiliesEnabled,
1851-
useMultipleValuesPerKey = true)) { provider =>
1852-
val store = provider.getStore(0)
1853-
try {
1854-
val cfName = "testColFamily"
1855-
store.createColFamilyIfAbsent(cfName,
1856-
keySchemaWithRangeScan, valueSchema,
1857-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1858-
useMultipleValuesPerKey = true)
1859-
1860-
val timestamps = Seq(100L, 200L, 300L, 400L, 500L)
1861-
timestamps.foreach { ts =>
1862-
store.putList(dataToKeyRowWithRangeScan(ts, "a"),
1863-
Array(dataToValueRow(ts.toInt), dataToValueRow(ts.toInt + 1)), cfName)
1864-
}
1865-
1866-
val startKey = dataToKeyRowWithRangeScan(200L, "a")
1867-
val endKey = dataToKeyRowWithRangeScan(401L, "a")
1868-
val iter = store.scanWithMultiValues(Some(startKey), Some(endKey), cfName)
1869-
val results = iter.map { pair =>
1870-
(pair.key.getLong(0), pair.value.getInt(0))
1871-
}.toList
1872-
iter.close()
1873-
1874-
val resultTimestamps = results.map(_._1).distinct
1875-
assert(resultTimestamps === Seq(200L, 300L, 400L))
1876-
assert(results.length === 6) // 3 timestamps x 2 values each
1877-
} finally {
1878-
if (!store.hasCommitted) store.abort()
1879-
}
1880-
}
1881-
}
1882-
}
1883-
1884-
testWithColumnFamiliesAndEncodingTypes(
1885-
"rocksdb range scan - scanWithMultiValues with None startKey",
1886-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1887-
1888-
if (colFamiliesEnabled) {
1889-
tryWithProviderResource(newStoreProvider(
1890-
StateStoreId(newDir(), Random.nextInt(), 0),
1891-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1892-
keySchema = keySchemaWithRangeScan,
1893-
useColumnFamilies = colFamiliesEnabled,
1894-
useMultipleValuesPerKey = true)) { provider =>
1895-
val store = provider.getStore(0)
1896-
try {
1897-
val cfName = "testColFamily"
1898-
store.createColFamilyIfAbsent(cfName,
1899-
keySchemaWithRangeScan, valueSchema,
1900-
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
1901-
useMultipleValuesPerKey = true)
1902-
1903-
val timestamps = Seq(100L, 200L, 300L, 400L, 500L)
1904-
timestamps.foreach { ts =>
1905-
store.merge(dataToKeyRowWithRangeScan(ts, "a"), dataToValueRow(ts.toInt), cfName)
1906-
}
1907-
1908-
val endKey = dataToKeyRowWithRangeScan(301L, "a")
1909-
val iter = store.scanWithMultiValues(None, Some(endKey), cfName)
1910-
val results = iter.map(_.key.getLong(0)).toList
1911-
iter.close()
1912-
1913-
assert(results === Seq(100L, 200L, 300L))
1914-
} finally {
1915-
if (!store.hasCommitted) store.abort()
1916-
}
1917-
}
1918-
}
1919-
}
1920-
1921-
testWithColumnFamiliesAndEncodingTypes(
1922-
"rocksdb range scan - scanWithMultiValues with diverse timestamps",
1740+
"rocksdb range scan - scanWithMultiValues",
19231741
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
19241742

19251743
if (colFamiliesEnabled) {
@@ -1942,19 +1760,30 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
19421760
Array(dataToValueRow(idx * 10), dataToValueRow(idx * 10 + 1)), cfName)
19431761
}
19441762

1945-
// Scan [0, 1000] (inclusive via endKey = 1001)
1946-
val startKey = dataToKeyRowWithRangeScan(0L, "a")
1947-
val endKey = dataToKeyRowWithRangeScan(1001L, "a")
1948-
val iter = store.scanWithMultiValues(Some(startKey), Some(endKey), cfName)
1949-
val results = iter.map { pair =>
1763+
// Bounded range [0, 1001)
1764+
val boundedIter = store.scanWithMultiValues(
1765+
Some(dataToKeyRowWithRangeScan(0L, "a")),
1766+
Some(dataToKeyRowWithRangeScan(1001L, "a")), cfName)
1767+
val boundedResults = boundedIter.map { pair =>
19501768
(pair.key.getLong(0), pair.value.getInt(0))
19511769
}.toList
1952-
iter.close()
1770+
boundedIter.close()
19531771

19541772
val expectedTimestamps = diverseTimestamps.filter(ts => ts >= 0 && ts <= 1000).sorted
1955-
val resultTimestamps = results.map(_._1).distinct
1956-
assert(resultTimestamps === expectedTimestamps)
1957-
assert(results.length === expectedTimestamps.length * 2)
1773+
assert(boundedResults.map(_._1).distinct === expectedTimestamps)
1774+
val expectedValues = diverseTimestamps.zipWithIndex
1775+
.filter { case (ts, _) => ts >= 0 && ts <= 1000 }
1776+
.sortBy(_._1)
1777+
.flatMap { case (_, idx) => Seq(idx * 10, idx * 10 + 1) }
1778+
assert(boundedResults.map(_._2) === expectedValues)
1779+
1780+
// None startKey scans from beginning to 0
1781+
val noneStartIter = store.scanWithMultiValues(
1782+
None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
1783+
val noneStartResults = noneStartIter.map(_.key.getLong(0)).toList
1784+
noneStartIter.close()
1785+
1786+
assert(noneStartResults.distinct === diverseTimestamps.filter(_ < 0).sorted)
19581787
} finally {
19591788
if (!store.hasCommitted) store.abort()
19601789
}

0 commit comments

Comments
 (0)