Skip to content

Simplify OMSSparkConf implementation #19

@himanishk

Description

@himanishk

You could simplify this a bit, with an internal mapping case class between spark conf and oms conf and also establish required fields that way. something like this. You can also just avoid this internal mapping to maintain by refactoring your original case class OMSConfig to use this.

/*
 * Copyright (2021) Databricks, Inc.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE
 * AND NONINFRINGEMENT.
 *
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
 * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 * See the Full License for the specific language governing permissions and
 * limitations under the License.
 */

package com.databricks.labs.deltaoms.common

import com.databricks.labs.deltaoms.common.OMSSparkConfUtils.{buildConfKey, getSparkConf}
import com.databricks.labs.deltaoms.configuration.{OMSConfig, SparkSettings}

private object OMSSparkConfUtils extends SparkSettings {
  def buildConfKey(key: String): String = s"databricks.labs.deltaoms.${key}"

  def getSparkConf[T](confKey: String): Option[String] = {
    spark.conf.getOption(confKey)
  }
}

case class WithSparkConf[T](value: T,
                            sparkConfigName: String,
                            is_required: Boolean = false)


case class OMSSparkConfig(baseLocation: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("base.location"),
                              is_required = true),
                          dbName: WithSparkConf[Option[String]] =
                          WithSparkConf(None,
                            buildConfKey("db.name"),
                            is_required = true),
                          checkpointBase: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("checkpoint.base"),
                              is_required = true),
                          checkpointSuffix: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("checkpoint.suffix"),
                              is_required = true),
                          rawActionTable: WithSparkConf[String] =
                            WithSparkConf("rawactions",
                              buildConfKey("raw.action.table")),
                          sourceConfigTable: WithSparkConf[String] =
                            WithSparkConf("sourceconfig",
                              buildConfKey("source.config.table")),
                          pathConfigTable: WithSparkConf[String] =
                            WithSparkConf("pathconfig",
                              buildConfKey("path.config.table")),
                          processedHistoryTable: WithSparkConf[String] =
                            WithSparkConf("processedhistory",
                              buildConfKey("processed.history.table")),
                          commitInfoSnapshotTable: WithSparkConf[String] =
                            WithSparkConf("commitinfosnapshots",
                              buildConfKey("commitinfo.snapshot.table")),
                          actionSnapshotTable: WithSparkConf[String] =
                            WithSparkConf("actionsnapshots",
                              buildConfKey("action.snapshot.table")),
                          consolidateWildcardPaths: WithSparkConf[Boolean] =
                            WithSparkConf(true,
                              buildConfKey("consolidate.wildcard.paths")),
                          truncatePathConfig: WithSparkConf[Boolean] =
                            WithSparkConf(false,
                              buildConfKey("truncate.path.config")),
                          skipPathConfig: WithSparkConf[Boolean] =
                            WithSparkConf(false,
                              buildConfKey("skip.path.config")),
                          skipInitializeOMS: WithSparkConf[Boolean] =
                            WithSparkConf(false,
                              buildConfKey("skip.initialize")),
                          srcDatabases: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("src.databases")),
                          tablePattern: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("table.pattern")),
                          triggerInterval: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("trigger.interval")),
                          startingStream: WithSparkConf[Int] =
                            WithSparkConf(1,
                              buildConfKey("starting.stream")),
                          endingStream: WithSparkConf[Int] =
                            WithSparkConf(50,
                              buildConfKey("ending.stream")))

trait OMSSparkConf extends Serializable with SparkSettings {

  def consolidateOMSConfigFromSparkConf(config: OMSConfig): OMSConfig = {
    val sparkOmsConfMap = OMSSparkConfig()
    OMSConfig(
      baseLocation = getSparkConf(sparkOmsConfMap.baseLocation.sparkConfigName)
        .fold(config.baseLocation){Some(_)},
      dbName = getSparkConf(sparkOmsConfMap.dbName.sparkConfigName)
        .fold(config.dbName){Some(_)},
      checkpointBase = getSparkConf(sparkOmsConfMap.checkpointBase.sparkConfigName)
        .fold(config.checkpointBase){Some(_)},
      checkpointSuffix = getSparkConf(sparkOmsConfMap.checkpointSuffix.sparkConfigName)
        .fold(config.checkpointSuffix){Some(_)},
      rawActionTable = getSparkConf(sparkOmsConfMap.rawActionTable.sparkConfigName)
        .fold(config.rawActionTable){_},
      sourceConfigTable = getSparkConf(sparkOmsConfMap.sourceConfigTable.sparkConfigName)
        .fold(config.sourceConfigTable){_},
      pathConfigTable = getSparkConf(sparkOmsConfMap.pathConfigTable.sparkConfigName)
        .fold(config.pathConfigTable){_},
      processedHistoryTable = getSparkConf(sparkOmsConfMap.processedHistoryTable.sparkConfigName)
        .fold(config.processedHistoryTable){_},
      commitInfoSnapshotTable = getSparkConf(sparkOmsConfMap.commitInfoSnapshotTable.sparkConfigName)
        .fold(config.commitInfoSnapshotTable){_},
      actionSnapshotTable = getSparkConf(sparkOmsConfMap.actionSnapshotTable.sparkConfigName)
        .fold(config.actionSnapshotTable){_},
      consolidateWildcardPaths = getSparkConf(sparkOmsConfMap.consolidateWildcardPaths.sparkConfigName)
        .fold(config.consolidateWildcardPaths){_.toBoolean},
      truncatePathConfig = getSparkConf(sparkOmsConfMap.truncatePathConfig.sparkConfigName)
        .fold(config.truncatePathConfig){_.toBoolean},
      skipPathConfig = getSparkConf(sparkOmsConfMap.skipPathConfig.sparkConfigName)
        .fold(config.skipPathConfig){_.toBoolean},
      skipInitializeOMS = getSparkConf(sparkOmsConfMap.skipInitializeOMS.sparkConfigName)
        .fold(config.skipInitializeOMS){_.toBoolean},
      srcDatabases = getSparkConf(sparkOmsConfMap.srcDatabases.sparkConfigName)
        .fold(config.srcDatabases){Some(_)},
      tablePattern = getSparkConf(sparkOmsConfMap.tablePattern.sparkConfigName)
        .fold(config.tablePattern){Some(_)},
      triggerInterval = getSparkConf(sparkOmsConfMap.triggerInterval.sparkConfigName)
        .fold(config.triggerInterval){Some(_)},
      startingStream = getSparkConf(sparkOmsConfMap.startingStream.sparkConfigName)
        .fold(config.startingStream){_.toInt},
      endingStream = getSparkConf(sparkOmsConfMap.endingStream.sparkConfigName)
        .fold(config.endingStream){_.toInt}
  }
}

object OMSSparkConf extends OMSSparkConf

Originally posted by @bali0019 in #17 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions