Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,16 @@ public void testWriteAndReadTemporalAndStructColumns() throws IOException {
.selectExpr(
"cast(id as int) as id",
"CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END AS event_date",
"CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP) "
+ "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts",
"named_struct("
+ "'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END, "
+ "'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP) "
+ "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END, "
+ "'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END"
+ ") AS payload");
"""
CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts""",
"""
named_struct(
'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END,
'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END,
'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END
) AS payload""");

Path outputPath = tempDir.resolve("temporal_struct_output");
originalDf
Expand Down Expand Up @@ -371,15 +373,13 @@ public void testWriteAndReadTemporalAndStructColumns() throws IOException {
@Test
@DisplayName("Write TimestampNTZ columns and nested structs")
public void testWriteTimestampNtzColumns() throws IOException {
Dataset<Row> timestampNtzDf = spark.range(0, 2)
.selectExpr(
"cast(id as int) as id",
"CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ) "
+ "ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz",
"named_struct("
+ "'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ) "
+ "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END"
+ ") AS payload");
Dataset<Row> timestampNtzDf = spark.range(0, 2).selectExpr("cast(id as int) as id", """
CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz""", """
named_struct(
'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END
) AS payload""");

Path outputPath = tempDir.resolve("timestamp_ntz_output");
assertDoesNotThrow(() -> timestampNtzDf
Expand All @@ -406,18 +406,15 @@ private Dataset<Row> createTestDataFrame(int numRows) {
}

private List<String> projectTemporalAndStructRows(Dataset<Row> df) {
return df
.orderBy("id")
.selectExpr("to_json(named_struct("
+ "'id', id, "
+ "'event_date', cast(event_date as string), "
+ "'event_ts', date_format(event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), "
+ "'payload_event_date', cast(payload.event_date as string), "
+ "'payload_event_ts', date_format(payload.event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), "
+ "'payload_label', payload.label"
+ ")) as json")
.collectAsList()
.stream()
return df.orderBy("id").selectExpr("""
to_json(named_struct(
'id', id,
'event_date', cast(event_date as string),
'event_ts', date_format(event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'),
'payload_event_date', cast(payload.event_date as string),
'payload_event_ts', date_format(payload.event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'),
'payload_label', payload.label
)) as json""").collectAsList().stream()
.map(row -> row.getString(0))
.collect(Collectors.toList());
}
Expand Down
Loading