-
Notifications
You must be signed in to change notification settings - Fork 330
chore: Add envvars to override writer configs and cometConf minor clean up #3540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7b341d9
4e8a773
7e04d58
e954bc9
0202a09
cdcf5f7
08e5914
d4595b6
ffe088f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,11 +78,11 @@ object CometConf extends ShimCometConf { | |
|
|
||
| val COMET_PREFIX = "spark.comet"; | ||
|
|
||
| val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"; | ||
| val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec" | ||
|
|
||
| val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"; | ||
| val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression" | ||
|
|
||
| val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"; | ||
| val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator" | ||
|
|
||
| val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") | ||
| .category(CATEGORY_EXEC) | ||
|
|
@@ -112,7 +112,7 @@ object CometConf extends ShimCometConf { | |
| "feature is highly experimental and only partially implemented. It should not " + | ||
| "be used in production.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
| .createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false) | ||
|
|
||
| // Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices | ||
| // and does not support complex types. Use native_iceberg_compat or auto instead. | ||
|
|
@@ -490,13 +490,23 @@ object CometConf extends ShimCometConf { | |
| "Ensure that Comet shuffle memory overhead factor is a double greater than 0") | ||
| .createWithDefault(1.0) | ||
|
|
||
| val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") | ||
| .category(CATEGORY_TUNING) | ||
| .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") | ||
| .intConf | ||
| .checkValue(v => v > 0, "Batch size must be positive") | ||
| .createWithDefault(8192) | ||
|
|
||
| val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] = | ||
| conf("spark.comet.columnar.shuffle.batch.size") | ||
| .category(CATEGORY_SHUFFLE) | ||
| .doc("Batch size when writing out sorted spill files on the native side. Note that " + | ||
| "this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise " + | ||
| "it will produce larger batches than expected in the native operator after shuffle.") | ||
| .intConf | ||
| .checkValue( | ||
| v => v <= COMET_BATCH_SIZE.get(), | ||
| "Should not be larger than batch size `spark.comet.batchSize`") | ||
| .createWithDefault(8192) | ||
|
|
||
| val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] = | ||
|
|
@@ -552,6 +562,7 @@ object CometConf extends ShimCometConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| // Used on native side. Check spark_config.rs how the config is used | ||
| val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] = | ||
| conf(s"$COMET_PREFIX.debug.memory") | ||
| .category(CATEGORY_TESTING) | ||
|
|
@@ -610,12 +621,6 @@ object CometConf extends ShimCometConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") | ||
| .category(CATEGORY_TUNING) | ||
| .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") | ||
| .intConf | ||
| .createWithDefault(8192) | ||
|
|
||
| val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = | ||
| conf("spark.comet.parquet.enable.directBuffer") | ||
| .category(CATEGORY_PARQUET) | ||
|
|
@@ -795,14 +800,6 @@ object CometConf extends ShimCometConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = | ||
| conf("spark.comet.regexp.allowIncompatible") | ||
| .category(CATEGORY_EXEC) | ||
| .doc("Comet is not currently fully compatible with Spark for all regular expressions. " + | ||
| s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] = | ||
| conf("spark.comet.metrics.updateInterval") | ||
| .category(CATEGORY_EXEC) | ||
|
|
@@ -821,6 +818,7 @@ object CometConf extends ShimCometConf { | |
| .stringConf | ||
| .createOptional | ||
|
|
||
| // Used on native side. Check spark_config.rs how the config is used | ||
| val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some of these configs are used directly from native code. We add them here so that they get documented.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ic, I'll add comments! |
||
| conf("spark.comet.maxTempDirectorySize") | ||
| .category(CATEGORY_EXEC) | ||
|
|
@@ -845,6 +843,9 @@ object CometConf extends ShimCometConf { | |
| .booleanConf | ||
| .createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false) | ||
|
|
||
| val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean] = | ||
| createOperatorIncompatConfig("DataWritingCommandExec") | ||
|
|
||
| /** Create a config to enable a specific operator */ | ||
| private def createExecEnabledConfig( | ||
| exec: String, | ||
|
|
@@ -860,6 +861,25 @@ object CometConf extends ShimCometConf { | |
| .createWithDefault(defaultValue) | ||
| } | ||
|
|
||
| /** | ||
| * Converts a config key to a valid environment variable name. Example: | ||
| * "spark.comet.operator.DataWritingCommandExec.allowIncompatible" -> | ||
| * "SPARK_COMET_OPERATOR_DATAWRITINGCOMMANDEXEC_ALLOWINCOMPATIBLE" | ||
| */ | ||
| private def configKeyToEnvVar(configKey: String): String = | ||
| configKey.toUpperCase(Locale.ROOT).replace('.', '_') | ||
|
|
||
| private def createOperatorIncompatConfig(name: String): ConfigEntry[Boolean] = { | ||
| val configKey = getOperatorAllowIncompatConfigKey(name) | ||
| val envVar = configKeyToEnvVar(configKey) | ||
| conf(configKey) | ||
| .category(CATEGORY_EXEC) | ||
| .doc(s"Whether to allow incompatibility for operator: $name. " + | ||
| s"False by default. Can be overridden with $envVar env variable") | ||
| .booleanConf | ||
| .createWithEnvVarOrDefault(envVar, false) | ||
| } | ||
|
|
||
| def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = { | ||
| getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing the config here will remove it from the configuration guide, but maybe that is fine, so long as we are explaining it in the compatibility guide.