From 1bbab103bc609bf5b709ce2c63b2a6508e2da556 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Wed, 1 Feb 2023 03:31:28 +0800 Subject: [PATCH 1/4] Graceful handling of null field --- config/spark2kafka.properties | 8 +++---- docker/start_etl.sh | 4 ++-- .../org/astraea/etl/DataFrameProcessor.scala | 22 ++++++++++++++----- .../main/scala/org/astraea/etl/Metadata.scala | 2 +- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/config/spark2kafka.properties b/config/spark2kafka.properties index 7e6d195533..330c9d0d59 100644 --- a/config/spark2kafka.properties +++ b/config/spark2kafka.properties @@ -1,9 +1,9 @@ #Parameters you must configure #============================================================== -#The data source path should be a directory. +#The data source should be a directory. source.path = -#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean... +#The csv col names.For example:sA=string,sB=integer,sC=boolean... column.names = #Primary keys.For example:sA=string,sB=integer,sC=boolean... @@ -15,8 +15,8 @@ kafka.bootstrap.servers = #Set your topic name. topic.name = -#Spark checkpoint path -checkpoint = +#Spark checkpoint +checkpoint.path = #Parameters that can be selected for configuration #============================================================== diff --git a/docker/start_etl.sh b/docker/start_etl.sh index 3c829ff388..406a21a543 100755 --- a/docker/start_etl.sh +++ b/docker/start_etl.sh @@ -24,7 +24,7 @@ declare -r SPARK_VERSION=${SPARK_VERSION:-3.3.1} declare -r LOCAL_PATH=$(cd -- "$(dirname -- "${DOCKER_FOLDER}")" &>/dev/null && pwd) # ===============================[properties keys]================================= declare -r SOURCE_KEY="source.path" -declare -r CHECKPOINT_KEY="checkpoint" +declare -r CHECKPOINT_KEY="checkpoint.path" # ===============================[spark driver/executor resource]================== declare -r RESOURCES_CONFIGS="${RESOURCES_CONFIGS:-"--conf spark.driver.memory=4g --conf spark.executor.memory=4g"}" # ===================================[functions]=================================== @@ -89,7 +89,7 @@ function runContainer() { if [[ "$master" == "spark:"* ]] || [[ "$master" == "local"* ]]; then docker run -d --init \ - --name "csv-kafka-${source_name}" \ + --name "csv-kafka${source_name}" \ $network_config \ -v "$propertiesPath":"$propertiesPath":ro \ -v "$jar_path":/tmp/astraea-etl.jar:ro \ diff --git a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala index 18d85ccf2d..3582108b87 100644 --- a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala +++ b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala @@ -64,13 +64,29 @@ class DataFrameProcessor(dataFrame: DataFrame) { * @return * json df */ +// def csvToJSON(cols: Seq[DataColumn]): DataFrameProcessor = { +// new DataFrameProcessor( +// dataFrame +// .withColumn( +// "value", +// defaultConverter( +// map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*) +// ) +// ) def csvToJSON(cols: Seq[DataColumn]): DataFrameProcessor = { new DataFrameProcessor( dataFrame .withColumn( "value", defaultConverter( - map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*) + map( + cols.flatMap(c => + List( + lit(c.name), + when(col(c.name).isNotNull, col(c.name)).otherwise(lit(null)) + ) + ): _* + ) ) ) .withColumn( @@ -171,10 +187,6 @@ object DataFrameProcessor { private def schema(columns: Seq[DataColumn]): StructType = StructType(columns.map { col => - if (col.dataType != DataType.StringType) - throw new IllegalArgumentException( - "Sorry, only string type is currently supported.Because a problem(astraea #1286) has led to the need to wrap the non-nullable type." - ) StructField(col.name, col.dataType.sparkType) }) } diff --git a/etl/src/main/scala/org/astraea/etl/Metadata.scala b/etl/src/main/scala/org/astraea/etl/Metadata.scala index 9823824ab0..6ed5999cee 100644 --- a/etl/src/main/scala/org/astraea/etl/Metadata.scala +++ b/etl/src/main/scala/org/astraea/etl/Metadata.scala @@ -71,7 +71,7 @@ object Metadata { private[etl] val DEFAULT_PARTITIONS = 15 private[etl] val DEFAULT_REPLICAS = 1.toShort - private[etl] val DEFAULT_RECURSIVE = "ture" + private[etl] val DEFAULT_RECURSIVE = "true" private[etl] val DEFAULT_CLEAN_SOURCE = "delete" // Parameters needed to configure ETL. From 3aec0ab2469fced4058ae9db93957b0efdbf03c3 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Wed, 1 Feb 2023 03:41:30 +0800 Subject: [PATCH 2/4] spotless --- .../main/scala/org/astraea/etl/DataFrameProcessor.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala index 3582108b87..18a0d03080 100644 --- a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala +++ b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala @@ -64,15 +64,6 @@ class DataFrameProcessor(dataFrame: DataFrame) { * @return * json df */ -// def csvToJSON(cols: Seq[DataColumn]): DataFrameProcessor = { -// new DataFrameProcessor( -// dataFrame -// .withColumn( -// "value", -// defaultConverter( -// map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*) -// ) -// ) def csvToJSON(cols: Seq[DataColumn]): DataFrameProcessor = { new DataFrameProcessor( dataFrame From 74516b1e3e9b91826e0d4149f3cafa9cb9094ee3 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Thu, 23 Feb 2023 02:01:53 +0800 Subject: [PATCH 3/4] filter null value --- docker/start_etl.sh | 2 +- .../org/astraea/etl/DataFrameProcessor.scala | 15 ++++++++++----- etl/src/main/scala/org/astraea/etl/Metadata.scala | 2 +- .../org/astraea/etl/DataFrameProcessorTest.scala | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/docker/start_etl.sh b/docker/start_etl.sh index 406a21a543..2b57cd5b81 100755 --- a/docker/start_etl.sh +++ b/docker/start_etl.sh @@ -89,7 +89,7 @@ function runContainer() { if [[ "$master" == "spark:"* ]] || [[ "$master" == "local"* ]]; then docker run -d --init \ - --name "csv-kafka${source_name}" \ + --name "csv-kafka-${source_name}" \ $network_config \ -v "$propertiesPath":"$propertiesPath":ro \ -v "$jar_path":/tmp/astraea-etl.jar:ro \ diff --git a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala index 18a0d03080..fdff4fc7aa 100644 --- a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala +++ b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala @@ -18,6 +18,7 @@ package org.astraea.etl import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ +import org.apache.spark.sql.sources.{IsNotNull, IsNull} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.astraea.common.json.JsonConverter @@ -71,12 +72,16 @@ class DataFrameProcessor(dataFrame: DataFrame) { "value", defaultConverter( map( - cols.flatMap(c => - List( - lit(c.name), - when(col(c.name).isNotNull, col(c.name)).otherwise(lit(null)) + cols + .map(c => + ( + lit(c.name), + when(col(c.name).isNotNull, col(c.name)) + .otherwise(lit(null)) + ) ) - ): _* + .filter(_._2 != null) + .flatMap(c => List(c._1, c._2)): _* ) ) ) diff --git a/etl/src/main/scala/org/astraea/etl/Metadata.scala b/etl/src/main/scala/org/astraea/etl/Metadata.scala index 6ed5999cee..c5fc9cf8cb 100644 --- a/etl/src/main/scala/org/astraea/etl/Metadata.scala +++ b/etl/src/main/scala/org/astraea/etl/Metadata.scala @@ -57,7 +57,7 @@ case class Metadata private ( object Metadata { private[etl] val ARCHIVE_PATH = "archive.path" private[etl] val SOURCE_PATH_KEY = "source.path" - private[etl] val CHECKPOINT_KEY = "checkpoint" + private[etl] val CHECKPOINT_KEY = "checkpoint.path" private[etl] val COLUMN_NAME_KEY = "column.names" private[etl] val COLUMN_TYPES_KEY = "column.types" private[etl] val CLEAN_SOURCE = "clean.source" diff --git a/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala b/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala index 974f48a185..6e516a213e 100644 --- a/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala +++ b/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala @@ -28,7 +28,7 @@ import java.io._ import java.nio.file.Files import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration -import scala.jdk.CollectionConverters._ +import scala.jdk.CollectionConverters.CollectionHasAsScala class DataFrameProcessorTest { @Test From 7dca5ba7b49a497fb64a17a4bde158b1e2e7e117 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Thu, 23 Feb 2023 02:23:50 +0800 Subject: [PATCH 4/4] remove otherwies --- etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala index fdff4fc7aa..9ae17e1315 100644 --- a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala +++ b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala @@ -77,7 +77,6 @@ class DataFrameProcessor(dataFrame: DataFrame) { ( lit(c.name), when(col(c.name).isNotNull, col(c.name)) - .otherwise(lit(null)) ) ) .filter(_._2 != null)