diff --git a/config/spark2kafka.properties b/config/spark2kafka.properties index 7e6d195533..330c9d0d59 100644 --- a/config/spark2kafka.properties +++ b/config/spark2kafka.properties @@ -1,9 +1,9 @@ #Parameters you must configure #============================================================== -#The data source path should be a directory. +#The data source should be a directory. source.path = -#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean... +#The csv col names.For example:sA=string,sB=integer,sC=boolean... column.names = #Primary keys.For example:sA=string,sB=integer,sC=boolean... @@ -15,8 +15,8 @@ kafka.bootstrap.servers = #Set your topic name. topic.name = -#Spark checkpoint path -checkpoint = +#Spark checkpoint +checkpoint.path = #Parameters that can be selected for configuration #============================================================== diff --git a/docker/start_etl.sh b/docker/start_etl.sh index 3c829ff388..2b57cd5b81 100755 --- a/docker/start_etl.sh +++ b/docker/start_etl.sh @@ -24,7 +24,7 @@ declare -r SPARK_VERSION=${SPARK_VERSION:-3.3.1} declare -r LOCAL_PATH=$(cd -- "$(dirname -- "${DOCKER_FOLDER}")" &>/dev/null && pwd) # ===============================[properties keys]================================= declare -r SOURCE_KEY="source.path" -declare -r CHECKPOINT_KEY="checkpoint" +declare -r CHECKPOINT_KEY="checkpoint.path" # ===============================[spark driver/executor resource]================== declare -r RESOURCES_CONFIGS="${RESOURCES_CONFIGS:-"--conf spark.driver.memory=4g --conf spark.executor.memory=4g"}" # ===================================[functions]=================================== diff --git a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala index 18d85ccf2d..9ae17e1315 100644 --- a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala +++ b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala @@ -18,6 +18,7 @@ package org.astraea.etl import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ +import org.apache.spark.sql.sources.{IsNotNull, IsNull} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.astraea.common.json.JsonConverter @@ -70,7 +71,17 @@ class DataFrameProcessor(dataFrame: DataFrame) { .withColumn( "value", defaultConverter( - map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*) + map( + cols + .map(c => + ( + lit(c.name), + when(col(c.name).isNotNull, col(c.name)) + ) + ) + .filter(_._2 != null) + .flatMap(c => List(c._1, c._2)): _* + ) ) ) .withColumn( @@ -171,10 +182,6 @@ object DataFrameProcessor { private def schema(columns: Seq[DataColumn]): StructType = StructType(columns.map { col => - if (col.dataType != DataType.StringType) - throw new IllegalArgumentException( - "Sorry, only string type is currently supported.Because a problem(astraea #1286) has led to the need to wrap the non-nullable type." - ) StructField(col.name, col.dataType.sparkType) }) } diff --git a/etl/src/main/scala/org/astraea/etl/Metadata.scala b/etl/src/main/scala/org/astraea/etl/Metadata.scala index 9823824ab0..c5fc9cf8cb 100644 --- a/etl/src/main/scala/org/astraea/etl/Metadata.scala +++ b/etl/src/main/scala/org/astraea/etl/Metadata.scala @@ -57,7 +57,7 @@ case class Metadata private ( object Metadata { private[etl] val ARCHIVE_PATH = "archive.path" private[etl] val SOURCE_PATH_KEY = "source.path" - private[etl] val CHECKPOINT_KEY = "checkpoint" + private[etl] val CHECKPOINT_KEY = "checkpoint.path" private[etl] val COLUMN_NAME_KEY = "column.names" private[etl] val COLUMN_TYPES_KEY = "column.types" private[etl] val CLEAN_SOURCE = "clean.source" @@ -71,7 +71,7 @@ object Metadata { private[etl] val DEFAULT_PARTITIONS = 15 private[etl] val DEFAULT_REPLICAS = 1.toShort - private[etl] val DEFAULT_RECURSIVE = "ture" + private[etl] val DEFAULT_RECURSIVE = "true" private[etl] val DEFAULT_CLEAN_SOURCE = "delete" // Parameters needed to configure ETL. diff --git a/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala b/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala index 974f48a185..6e516a213e 100644 --- a/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala +++ b/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala @@ -28,7 +28,7 @@ import java.io._ import java.nio.file.Files import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration -import scala.jdk.CollectionConverters._ +import scala.jdk.CollectionConverters.CollectionHasAsScala class DataFrameProcessorTest { @Test