Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@

public class RssSparkConfig {

public static final ConfigOption<Boolean> RSS_EAGER_SHUFFLE_DELETION_ENABLED =
ConfigOptions.key("rss.client.eagerShuffleDeletion.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable eager shuffle data deletion after the Spark stage ends.");

public static final ConfigOption<Integer> RSS_EAGER_SHUFFLE_DELETION_DELAYED_MINUTES =
ConfigOptions.key("rss.client.eagerShuffleDeletion.delayedMinutes")
.intType()
.defaultValue(20)
.withDescription(
"The delayed minutes to perform eager shuffle data deletion after the shuffle reference count = 0.");

public static final ConfigOption<Boolean> RSS_CLIENT_INTEGRITY_VALIDATION_ENABLED =
ConfigOptions.key("rss.client.integrityValidation.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@

import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_EAGER_SHUFFLE_DELETION_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_READ_SHUFFLE_HANDLE_CACHE_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
Expand Down Expand Up @@ -192,6 +193,8 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac

private boolean isDriver = false;

private Optional<StageDependencyTracker> stageDependencyTracker = Optional.empty();

public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
LOG.info(
"Uniffle {} version: {}", this.getClass().getName(), Constants.VERSION_AND_REVISION_SHORT);
Expand Down Expand Up @@ -327,6 +330,13 @@ public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
LOG.error("Failed to start shuffle manager server", e);
throw new RssException(e);
}

if (rssConf.get(RSS_EAGER_SHUFFLE_DELETION_ENABLED)) {
LOG.info("Eager shuffle deletion is enabled, initializing stage dependency tracker...");
this.stageDependencyTracker =
Optional.of(
new StageDependencyTracker(rssConf, shuffleId -> unregisterShuffle(shuffleId)));
}
}
}
if (shuffleManagerRpcServiceEnabled) {
Expand Down Expand Up @@ -467,6 +477,7 @@ public boolean unregisterShuffle(int shuffleId) {
shuffleWriteClient.unregisterShuffle(getAppId(), shuffleId);
shuffleIdToPartitionNum.remove(shuffleId);
shuffleIdToNumMapTasks.remove(shuffleId);
shuffleHandleInfoManager.remove(shuffleId);
if (service != null) {
service.unregisterShuffle(shuffleId);
}
Expand Down Expand Up @@ -1636,4 +1647,8 @@ public ShuffleHandleInfo getOrFetchShuffleHandle(
}
return handle;
}

public Optional<StageDependencyTracker> getStageDependencyTracker() {
return stageDependencyTracker;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle.manager;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class ShuffleDeletionItem implements Delayed {
private final int shuffleId;
private final long expireAtMs;

public ShuffleDeletionItem(int shuffleId, long delayMs) {
this.shuffleId = shuffleId;
this.expireAtMs = System.currentTimeMillis() + delayMs;
}

@Override
public long getDelay(TimeUnit unit) {
long remainingMs = expireAtMs - System.currentTimeMillis();
return unit.convert(remainingMs, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
if (o == this) {
return 0;
}
long otherDelayMs = o.getDelay(TimeUnit.MILLISECONDS);
long thisDelayMs = this.getDelay(TimeUnit.MILLISECONDS);
return Long.compare(thisDelayMs, otherDelayMs);
}

public int getShuffleId() {
return shuffleId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle.manager;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.JavaUtils;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_EAGER_SHUFFLE_DELETION_DELAYED_MINUTES;

/**
* This class tracks the dependencies between stages. It maintains a mapping of stage IDs to their
* parent stage IDs and reference counts to manage the lifecycle of stages and their associated
* shuffle data. But for the shuffle-reuse scenario, we have to introduce the delayed deletion
* mechanism to avoid the too-early deletion issue.
*
* <p>ATTENTION: This is still an experimental feature and may not cover all edge cases.
*/
public class StageDependencyTracker {
private static final Logger LOG = LoggerFactory.getLogger(StageDependencyTracker.class);

// key: stageId, value: shuffleId of writer
private Map<Integer, Integer> stageIdToShuffleIdOfWriters = JavaUtils.newConcurrentMap();

// key: shuffleId, value: stageIds of readers
private Map<Integer, Set<Integer>> shuffleIdToStageIdsOfReaders = JavaUtils.newConcurrentMap();
// reverse link by the stageId
private Map<Integer, Set<Integer>> stageIdToShuffleIdOfReaders = JavaUtils.newConcurrentMap();

private final ExecutorService deletionExecutor;
private final DelayQueue<ShuffleDeletionItem> deletionDelayQueue = new DelayQueue<>();

private long deletionDelayMs = -1L;

// for the test cases
private Set<Integer> cleanedShuffles = ConcurrentHashMap.newKeySet();

public StageDependencyTracker(RssConf rssConf, Consumer<Integer> deletionFunc) {
this.deletionExecutor = Executors.newFixedThreadPool(1);
deletionExecutor.execute(
() -> {
while (true) {
try {
ShuffleDeletionItem item = deletionDelayQueue.take();
int shuffleId = item.getShuffleId();
// to check references again
Set<Integer> readers = shuffleIdToStageIdsOfReaders.get(shuffleId);
if (readers == null || readers.isEmpty()) {
LOG.info("Deleting shuffle data for shuffleId: {}", shuffleId);
deletionFunc.accept(shuffleId);
cleanedShuffles.add(shuffleId);
shuffleIdToStageIdsOfReaders.remove(shuffleId);
} else {
LOG.info("Skipping deletion for shuffleId: {} as it has new readers", shuffleId);
}
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for deletion delay queue", e);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOG.error("Errors on deleting", e);
}
}
});
if (rssConf != null) {
int delayMinutes = rssConf.get(RSS_EAGER_SHUFFLE_DELETION_DELAYED_MINUTES);
if (delayMinutes > 0) {
this.deletionDelayMs = delayMinutes * 60L * 1000L;
LOG.info("Set deletion delay to {} ms", this.deletionDelayMs);
}
}
}

public StageDependencyTracker(Consumer<Integer> deletionFunc) {
this(null, deletionFunc);
}

public int getShuffleIdByStageIdOfWriter(int stageId) {
Integer shuffleId = stageIdToShuffleIdOfWriters.get(stageId);
if (shuffleId == null) {
// ignore this.
return -1;
}
return shuffleId;
}

public void linkWriter(int shuffleId, int writerStageId) {
stageIdToShuffleIdOfWriters.put(writerStageId, shuffleId);
}

public void linkReader(int shuffleId, int readerStageId) {
shuffleIdToStageIdsOfReaders
.computeIfAbsent(shuffleId, k -> ConcurrentHashMap.newKeySet())
.add(readerStageId);
stageIdToShuffleIdOfReaders
.computeIfAbsent(readerStageId, k -> ConcurrentHashMap.newKeySet())
.add(shuffleId);
}

public void removeStage(int stageId) {
Set<Integer> allUpstreamShuffleIdsOfRead = stageIdToShuffleIdOfReaders.get(stageId);
if (allUpstreamShuffleIdsOfRead != null) {
for (int shuffleId : allUpstreamShuffleIdsOfRead) {
Set<Integer> readers = shuffleIdToStageIdsOfReaders.get(shuffleId);
if (readers != null) {
readers.remove(stageId);
if (readers.isEmpty()) {
// add into the delayed deletion queue
deletionDelayQueue.offer(new ShuffleDeletionItem(shuffleId, deletionDelayMs));
}
}
}
}
}

public int getActiveShuffleCount() {
return shuffleIdToStageIdsOfReaders.size();
}

public int getCleanedShuffleCount() {
return cleanedShuffles.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle.manager;

import org.junit.jupiter.api.Test;

public class ShuffleDeletionItemTest {

@Test
public void testNegativeDelay() {
ShuffleDeletionItem item = new ShuffleDeletionItem(1, -1000L);
long delay = item.getDelay(java.util.concurrent.TimeUnit.MILLISECONDS);
assert delay <= 0 : "Delay should be non-positive for negative delay input";
}
}
Loading
Loading