Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX}
import org.apache.spark.internal.config.{DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED,
DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX}
import org.apache.spark.internal.config.UI.UI_REVERSE_PROXY
import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
import org.apache.spark.resource.ResourceInformation
Expand Down Expand Up @@ -175,6 +176,13 @@ private[deploy] class DriverRunner(
localJarFile.getAbsolutePath
}

private[worker] def activeProcessorCountOpts(): Seq[String] =
if (conf.get(DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED)) {
Seq(s"-XX:ActiveProcessorCount=${driverDesc.cores}")
} else {
Seq.empty
}

private[worker] def prepareAndRunDriver(): Int = {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
Expand All @@ -187,8 +195,9 @@ private[deploy] class DriverRunner(
}

// config resource file for driver, which would be used to load resources when driver starts up
val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
val javaOpts = activeProcessorCountOpts() ++
driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED
import org.apache.spark.internal.config.SPARK_EXECUTOR_PREFIX
import org.apache.spark.internal.config.UI._
import org.apache.spark.resource.ResourceInformation
Expand Down Expand Up @@ -132,6 +133,13 @@ private[deploy] class ExecutorRunner(
}
}

private[worker] def activeProcessorCountOpts(): Seq[String] =
if (conf.get(EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED)) {
Seq(s"-XX:ActiveProcessorCount=$cores")
} else {
Seq.empty
}

/** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
private[worker] def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
Expand All @@ -152,7 +160,7 @@ private[deploy] class ExecutorRunner(
// Launch the process
val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
val subsOpts = appDesc.command.javaOpts.map {
val subsOpts = activeProcessorCountOpts() ++ appDesc.command.javaOpts.map {
Utils.substituteAppNExecIds(_, appId, execId.toString)
}
val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2946,15 +2946,15 @@ package object config {
private[spark] val DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED =
ConfigBuilder("spark.driver.limitActiveProcessorCount.enabled")
.doc("Whether to add -XX:ActiveProcessorCount=<spark.driver.cores> to the driver JVM " +
"options. Currently, this only takes effect in YARN cluster mode.")
"options. Currently, this takes effect in YARN cluster mode and standalone cluster mode.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED =
ConfigBuilder("spark.executor.limitActiveProcessorCount.enabled")
.doc("Whether to add -XX:ActiveProcessorCount=<spark.executor.cores> to executor JVM " +
"options. Currently, this only takes effect in YARN mode.")
"options. Currently, this takes effect in YARN mode and standalone mode.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import org.mockito.invocation.InvocationOnMock
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.{Command, DeployTestUtils, DriverDescription}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config.DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Clock

Expand All @@ -44,6 +45,14 @@ class DriverRunnerTest extends SparkFunSuite {
new SecurityManager(conf)))
}

private def createDriverRunnerWithCores(conf: SparkConf, cores: Int): DriverRunner = {
val driverDesc = new DriverDescription(
"hdfs://some-dir/some.jar", 100, cores, false, DeployTestUtils.createDriverCommand())
new DriverRunner(conf, "driver-1", new File("workDir"), new File("sparkHome"),
driverDesc, null, "spark://worker", "http://publicAddress:80",
new SecurityManager(conf))
}

private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
val processBuilder = mock(classOf[ProcessBuilderLike])
when(processBuilder.command).thenReturn(Seq("mocked", "command"))
Expand Down Expand Up @@ -208,4 +217,21 @@ class DriverRunnerTest extends SparkFunSuite {
assert(runner.finalException.get.isInstanceOf[RuntimeException])
}
}

test("SPARK-56157: APC flag not set by default") {
val runner = createDriverRunnerWithCores(new SparkConf(), cores = 2)
assert(runner.activeProcessorCountOpts() === Seq.empty)
}

test("SPARK-56157: APC flag set when enabled") {
val conf = new SparkConf().set(DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
val runner = createDriverRunnerWithCores(conf, cores = 2)
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=2"))
}

test("SPARK-56157: APC flag reflects core count") {
val conf = new SparkConf().set(DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
val runner = createDriverRunnerWithCores(conf, cores = 5)
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=5"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{ApplicationDescription, Command, DeployTestUtils, ExecutorState}
import org.apache.spark.internal.config.EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED
import org.apache.spark.resource.ResourceProfile

class ExecutorRunnerTest extends SparkFunSuite {
Expand All @@ -39,4 +40,29 @@ class ExecutorRunnerTest extends SparkFunSuite {
val builderCommand = builder.command()
assert(builderCommand.get(builderCommand.size() - 1) === appId)
}

test("SPARK-56157: APC flag not set by default") {
val runner = createExecutorRunner(new SparkConf(), cores = 2)
assert(runner.activeProcessorCountOpts() === Seq.empty)
}

test("SPARK-56157: APC flag set when enabled") {
val conf = new SparkConf().set(EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
val runner = createExecutorRunner(conf, cores = 2)
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=2"))
}

test("SPARK-56157: APC flag reflects core count") {
val conf = new SparkConf().set(EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
val runner = createExecutorRunner(conf, cores = 7)
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=7"))
}

private def createExecutorRunner(conf: SparkConf, cores: Int): ExecutorRunner = {
new ExecutorRunner(
"appId", 1, DeployTestUtils.createAppDesc(), cores, 1234, null,
"workerId", "http://", "host", 123, "publicAddress",
new File(sys.props.getOrElse("spark.test.home", ".")), new File("workDir"), "spark://worker",
conf, Seq("localDir"), ExecutorState.RUNNING, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
}
}