Skip to content

Commit 5949356

Browse files
committed
[server] Add disk-usage write protection to TabletServer
1 parent fcfeabd commit 5949356

15 files changed

Lines changed: 963 additions & 18 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,28 @@ public class ConfigOptions {
335335
+ "The default value is 10.")
336336
.withDeprecatedKeys("coordinator.io-pool.size");
337337

338+
public static final ConfigOption<Double> SERVER_DATA_DISK_WRITE_LIMIT_RATIO =
339+
key("server.data-disk.write-limit-ratio")
340+
.doubleType()
341+
.defaultValue(0.85)
342+
.withDescription(
343+
"Reject writes when the tablet server data disk usage exceeds this ratio. "
344+
+ "Writes resume after the usage drops below (ratio - 0.10). "
345+
+ "Set to 1.0 to disable the disk-usage protection entirely. "
346+
+ "The valid range is (0.0, 1.0].");
347+
348+
public static final ConfigOption<Duration> SERVER_DATA_DISK_CHECK_INTERVAL =
349+
key("server.data-disk.check-interval")
350+
.durationType()
351+
.defaultValue(Duration.ofSeconds(30))
352+
.withDescription(
353+
"The interval at which the tablet server samples the local data disk "
354+
+ "usage for the write-protection state machine. A shorter interval "
355+
+ "narrows the time window during which writes can still flow in "
356+
+ "after the disk crosses the limit ratio, at the cost of slightly "
357+
+ "more frequent statvfs calls (which are in-memory and cheap). "
358+
+ "The default 30s is suitable for typical write workloads.");
359+
338360
// ------------------------------------------------------------------------
339361
// ConfigOptions for Coordinator Server
340362
// ------------------------------------------------------------------------
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Thrown by a tablet server to reject writes when its local data disk usage has reached the
24+
* configured write-limit ratio. The exception is retriable so that clients can retry once the
25+
* server frees up enough disk space and resumes accepting writes.
26+
*/
27+
@PublicEvolving
28+
public class DiskWriteLockedException extends RetriableException {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
public DiskWriteLockedException(String message) {
33+
super(message);
34+
}
35+
36+
public DiskWriteLockedException(int serverId, double usageRatio, double limit) {
37+
super(
38+
String.format(
39+
"TabletServer %d has rejected writes because the data disk usage "
40+
+ "reached %.2f%% (limit: %.2f%%). Free up space or scale the cluster.",
41+
serverId, usageRatio * 100, limit * 100));
42+
}
43+
}

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ public class MetricNames {
9494
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
9595
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
9696

97+
// for tablet server data disk write protection
98+
public static final String DISK_USAGE_RATIO = "diskUsageRatio";
99+
public static final String DISK_WRITE_LOCKED = "diskWriteLocked";
100+
97101
// --------------------------------------------------------------------------------------------
98102
// metrics for user
99103
// --------------------------------------------------------------------------------------------

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.exception.DatabaseNotEmptyException;
2828
import org.apache.fluss.exception.DatabaseNotExistException;
2929
import org.apache.fluss.exception.DeletionDisabledException;
30+
import org.apache.fluss.exception.DiskWriteLockedException;
3031
import org.apache.fluss.exception.DuplicateSequenceException;
3132
import org.apache.fluss.exception.FencedLeaderEpochException;
3233
import org.apache.fluss.exception.FencedTieringEpochException;
@@ -265,7 +266,11 @@ public enum Errors {
265266
TOO_MANY_SCANNERS(
266267
69,
267268
"The per-bucket or per-server scanner session limit has been reached.",
268-
TooManyScannersException::new);
269+
TooManyScannersException::new),
270+
DISK_WRITE_LOCKED_EXCEPTION(
271+
70,
272+
"The tablet server has rejected writes because its data disk usage reached the configured write-limit ratio.",
273+
DiskWriteLockedException::new);
269274

270275
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
271276

fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
4646
import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
4747
import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER;
48+
import static org.apache.fluss.config.ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO;
4849
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
4950
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
5051

@@ -64,7 +65,8 @@ class DynamicServerConfig {
6465
DATALAKE_FORMAT.key(),
6566
LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(),
6667
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
67-
KV_SNAPSHOT_INTERVAL.key()));
68+
KV_SNAPSHOT_INTERVAL.key(),
69+
SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key()));
6870
private static final Set<String> ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake.");
6971

7072
private final ReadWriteLock lock = new ReentrantReadWriteLock();

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
import org.apache.fluss.server.replica.delay.DelayedWrite;
107107
import org.apache.fluss.server.replica.fetcher.InitialFetchStatus;
108108
import org.apache.fluss.server.replica.fetcher.ReplicaFetcherManager;
109+
import org.apache.fluss.server.storage.DiskUsageMonitor;
109110
import org.apache.fluss.server.storage.LocalDiskManager;
110111
import org.apache.fluss.server.utils.FatalErrorHandler;
111112
import org.apache.fluss.server.zk.ZooKeeperClient;
@@ -332,6 +333,7 @@ public ReplicaManager(
332333
this.ioExecutor = ioExecutor;
333334
this.minInSyncReplicas = conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
334335
this.scannerManager = checkNotNull(scannerManager, "scannerManager");
336+
335337
registerMetrics();
336338
}
337339

@@ -344,6 +346,9 @@ public void startup() {
344346
this::maybeShrinkIsr,
345347
0L,
346348
conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis() / 2);
349+
350+
// Start periodic disk usage monitoring (initial + periodic sampling)
351+
localDiskManager.startDiskUsageMonitor(scheduler);
347352
}
348353

349354
public RemoteLogManager getRemoteLogManager() {
@@ -423,6 +428,11 @@ private void registerMetrics() {
423428
physicalStorage.gauge(
424429
MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE,
425430
this::physicalStorageRemoteLogSize);
431+
432+
serverMetricGroup.gauge(
433+
MetricNames.DISK_USAGE_RATIO, localDiskManager::getLastDiskUsageRatio);
434+
serverMetricGroup.gauge(
435+
MetricNames.DISK_WRITE_LOCKED, () -> localDiskManager.isDiskWriteLocked() ? 1 : 0);
426436
}
427437

428438
@VisibleForTesting
@@ -592,6 +602,21 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
592602
}
593603
}
594604

605+
@VisibleForTesting
606+
public boolean isDiskWriteLocked() {
607+
return localDiskManager.isDiskWriteLocked();
608+
}
609+
610+
@VisibleForTesting
611+
public double getLastDiskUsageRatio() {
612+
return localDiskManager.getLastDiskUsageRatio();
613+
}
614+
615+
@VisibleForTesting
616+
public DiskUsageMonitor getDiskUsageMonitor() {
617+
return localDiskManager.getDiskUsageMonitor();
618+
}
619+
595620
/**
596621
* Append log records to leader replicas of the buckets, and wait for them to be replicated to
597622
* other replicas.
@@ -609,6 +634,7 @@ public void appendRecordsToLog(
609634
if (isRequiredAcksInvalid(requiredAcks)) {
610635
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
611636
}
637+
localDiskManager.ensureWritable();
612638

613639
long startTime = System.currentTimeMillis();
614640
Map<TableBucket, ProduceLogResultForBucket> appendResult =
@@ -662,6 +688,7 @@ public void putRecordsToKv(
662688
if (isRequiredAcksInvalid(requiredAcks)) {
663689
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
664690
}
691+
localDiskManager.ensureWritable();
665692

666693
long startTime = System.currentTimeMillis();
667694
Map<TableBucket, PutKvResultForBucket> kvPutResult =
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.storage;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.nio.file.FileStore;
25+
import java.nio.file.Files;
26+
import java.util.Collections;
27+
import java.util.HashSet;
28+
import java.util.List;
29+
import java.util.Set;
30+
31+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
32+
33+
/**
34+
* Collects the local data disk usage ratio for the tablet server. The reported ratio is the
35+
* <b>maximum</b> usage across all distinct {@link FileStore}s backing the configured data
36+
* directories. A per-disk maximum (rather than a weighted average over total/used bytes) is used so
37+
* that a single nearly-full disk cannot be masked by other low-usage disks in a multi-disk
38+
* deployment: any single disk crossing the limit ratio must trip the write protection, because
39+
* partitions pinned to that disk would otherwise fail to write. Multiple data directories sharing
40+
* the same physical {@link FileStore} are still counted only once.
41+
*/
42+
@Internal
43+
public final class DiskUsageCollector {
44+
45+
private final List<File> dataDirs;
46+
47+
public DiskUsageCollector(List<File> dataDirs) {
48+
checkNotNull(dataDirs, "dataDirs");
49+
this.dataDirs = Collections.unmodifiableList(dataDirs);
50+
}
51+
52+
/**
53+
* Collects the current disk usage ratio in the range {@code [0.0, 1.0]}, defined as the maximum
54+
* usage across all distinct {@link FileStore}s. Returns {@code 0.0} when no data directory is
55+
* configured or every reachable file store reports a non-positive total space.
56+
*/
57+
public double collect() throws IOException {
58+
double maxRatio = 0.0;
59+
Set<FileStore> counted = new HashSet<>();
60+
for (File dir : dataDirs) {
61+
FileStore fs = Files.getFileStore(dir.toPath());
62+
if (counted.add(fs)) {
63+
long total = fs.getTotalSpace();
64+
if (total <= 0L) {
65+
continue;
66+
}
67+
double ratio = (double) (total - fs.getUsableSpace()) / total;
68+
if (ratio > maxRatio) {
69+
maxRatio = ratio;
70+
}
71+
}
72+
}
73+
return maxRatio;
74+
}
75+
}

0 commit comments

Comments
 (0)