Skip to content

Commit aece3ad

Browse files
authored
Make java spark tests more resilient (#7612)
Depending on your network spark's automatic detection of address to bind might fail therefore we fix it to localhost for tests Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent b6ee27b commit aece3ad

2 files changed

Lines changed: 22 additions & 22 deletions

File tree

java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceS3MockTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public void setUp() {
4545
.master("local[2]")
4646
.config("spark.sql.shuffle.partitions", "2")
4747
.config("spark.sql.adaptive.enabled", "false")
48+
.config("spark.driver.host", "127.0.0.1")
4849
.config("spark.ui.enabled", "false")
4950
// S3A configuration for S3Mock.
5051
// This should be propagated into our reader

java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceWriteTest.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33

44
package dev.vortex.spark;
55

6-
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
7-
import static org.junit.jupiter.api.Assertions.assertEquals;
8-
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.junit.jupiter.api.Assertions.*;
97

108
import java.io.IOException;
119
import java.nio.file.Files;
@@ -45,6 +43,7 @@ public void setUp() {
4543
spark = SparkSession.builder()
4644
.appName("VortexWriteTest")
4745
.master("local[2]") // Use 2 threads
46+
.config("spark.driver.host", "127.0.0.1")
4847
.config("spark.sql.shuffle.partitions", "2")
4948
.config("spark.sql.adaptive.enabled", "false") // Disable AQE for predictable partitioning
5049
.config("spark.ui.enabled", "false") // Disable UI for tests
@@ -213,8 +212,8 @@ public void testPartitionedWrite() throws IOException {
213212
// Verify vortex files inside partition directories
214213
List<Path> filesA = findVortexFiles(outputPath.resolve("group=A"));
215214
List<Path> filesB = findVortexFiles(outputPath.resolve("group=B"));
216-
assertTrue(!filesA.isEmpty(), "Partition A should have vortex files");
217-
assertTrue(!filesB.isEmpty(), "Partition B should have vortex files");
215+
assertFalse(filesA.isEmpty(), "Partition A should have vortex files");
216+
assertFalse(filesB.isEmpty(), "Partition B should have vortex files");
218217

219218
// When: read back
220219
Dataset<Row> readDf = spark.read()
@@ -333,15 +332,15 @@ public void testWriteAndReadTemporalAndStructColumns() throws IOException {
333332
"cast(id as int) as id",
334333
"CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END AS event_date",
335334
"""
336-
CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
337-
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts""",
335+
CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
336+
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts""",
338337
"""
339-
named_struct(
340-
'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END,
341-
'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
342-
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END,
343-
'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END
344-
) AS payload""");
338+
named_struct(
339+
'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END,
340+
'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
341+
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END,
342+
'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END
343+
) AS payload""");
345344

346345
Path outputPath = tempDir.resolve("temporal_struct_output");
347346
originalDf
@@ -366,20 +365,20 @@ ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END,
366365

367366
assertEquals(DataTypes.DateType, readDf.schema().fields()[1].dataType());
368367
assertEquals(DataTypes.TimestampType, readDf.schema().fields()[2].dataType());
369-
assertTrue(readDf.schema().fields()[3].dataType() instanceof StructType);
368+
assertInstanceOf(StructType.class, readDf.schema().fields()[3].dataType());
370369
assertEquals(expectedRows, projectTemporalAndStructRows(readDf));
371370
}
372371

373372
@Test
374373
@DisplayName("Write TimestampNTZ columns and nested structs")
375374
public void testWriteTimestampNtzColumns() throws IOException {
376375
Dataset<Row> timestampNtzDf = spark.range(0, 2).selectExpr("cast(id as int) as id", """
377-
CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
378-
ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz""", """
379-
named_struct(
380-
'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
381-
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END
382-
) AS payload""");
376+
CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
377+
ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz""", """
378+
named_struct(
379+
'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
380+
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END
381+
) AS payload""");
383382

384383
Path outputPath = tempDir.resolve("timestamp_ntz_output");
385384
assertDoesNotThrow(() -> timestampNtzDf
@@ -389,7 +388,7 @@ ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END
389388
.mode(SaveMode.Overwrite)
390389
.save());
391390

392-
assertTrue(!findVortexFiles(outputPath).isEmpty(), "TimestampNTZ write should create Vortex files");
391+
assertFalse(findVortexFiles(outputPath).isEmpty(), "TimestampNTZ write should create Vortex files");
393392
}
394393

395394
/**
@@ -424,7 +423,7 @@ private List<String> projectTemporalAndStructRows(Dataset<Row> df) {
424423
*/
425424
private List<Path> findVortexFiles(Path directory) throws IOException {
426425
if (!Files.exists(directory)) {
427-
return Arrays.asList();
426+
return List.of();
428427
}
429428

430429
try (Stream<Path> paths = Files.walk(directory)) {

0 commit comments

Comments
 (0)