Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,28 @@ public class ConfigOptions {
+ "The default value is 10.")
.withDeprecatedKeys("coordinator.io-pool.size");

public static final ConfigOption<Double> SERVER_DATA_DISK_WRITE_LIMIT_RATIO =
key("server.data-disk.write-limit-ratio")
.doubleType()
.defaultValue(0.85)
.withDescription(
"Reject writes when the tablet server data disk usage exceeds this ratio. "
+ "Writes resume after the usage drops below (ratio - 0.10). "
+ "Set to 1.0 to disable the disk-usage protection entirely. "
+ "The valid range is (0.0, 1.0].");

public static final ConfigOption<Duration> SERVER_DATA_DISK_CHECK_INTERVAL =
key("server.data-disk.check-interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The interval at which the tablet server samples the local data disk "
+ "usage for the write-protection state machine. A shorter interval "
+ "narrows the time window during which writes can still flow in "
+ "after the disk crosses the limit ratio, at the cost of slightly "
+ "more frequent statvfs calls (which are in-memory and cheap). "
+ "The default 30s is suitable for typical write workloads.");

// ------------------------------------------------------------------------
// ConfigOptions for Coordinator Server
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.exception;

import org.apache.fluss.annotation.PublicEvolving;

/**
* Thrown by a tablet server to reject writes when its local data disk usage has reached the
* configured write-limit ratio. The exception is retriable so that clients can retry once the
* server frees up enough disk space and resumes accepting writes.
*/
@PublicEvolving
public class DiskWriteLockedException extends RetriableException {

private static final long serialVersionUID = 1L;

public DiskWriteLockedException(String message) {
super(message);
}

public DiskWriteLockedException(int serverId, double usageRatio, double limit) {
super(
String.format(
"TabletServer %d has rejected writes because the data disk usage "
+ "reached %.2f%% (limit: %.2f%%). Free up space or scale the cluster.",
serverId, usageRatio * 100, limit * 100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public class MetricNames {
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";

// for tablet server data disk write protection
public static final String DISK_USAGE_RATIO = "diskUsageRatio";
public static final String DISK_WRITE_LOCKED = "diskWriteLocked";

// --------------------------------------------------------------------------------------------
// metrics for user
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
import org.apache.fluss.exception.DeletionDisabledException;
import org.apache.fluss.exception.DiskWriteLockedException;
import org.apache.fluss.exception.DuplicateSequenceException;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.FencedTieringEpochException;
Expand Down Expand Up @@ -265,7 +266,11 @@ public enum Errors {
TOO_MANY_SCANNERS(
69,
"The per-bucket or per-server scanner session limit has been reached.",
TooManyScannersException::new);
TooManyScannersException::new),
DISK_WRITE_LOCKED(
70,
"The tablet server has rejected writes because its data disk usage reached the configured write-limit ratio.",
DiskWriteLockedException::new);

private static final Logger LOG = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER;
import static org.apache.fluss.config.ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO;
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;

Expand All @@ -64,7 +65,8 @@ class DynamicServerConfig {
DATALAKE_FORMAT.key(),
LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(),
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
KV_SNAPSHOT_INTERVAL.key()));
KV_SNAPSHOT_INTERVAL.key(),
SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key()));
private static final Set<String> ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake.");

private final ReadWriteLock lock = new ReentrantReadWriteLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.fluss.server.replica.delay.DelayedWrite;
import org.apache.fluss.server.replica.fetcher.InitialFetchStatus;
import org.apache.fluss.server.replica.fetcher.ReplicaFetcherManager;
import org.apache.fluss.server.storage.DiskUsageMonitor;
import org.apache.fluss.server.storage.LocalDiskManager;
import org.apache.fluss.server.utils.FatalErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
Expand Down Expand Up @@ -332,6 +333,7 @@ public ReplicaManager(
this.ioExecutor = ioExecutor;
this.minInSyncReplicas = conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
this.scannerManager = checkNotNull(scannerManager, "scannerManager");

registerMetrics();
}

Expand All @@ -344,6 +346,9 @@ public void startup() {
this::maybeShrinkIsr,
0L,
conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis() / 2);

// Start periodic disk usage monitoring (initial + periodic sampling)
localDiskManager.startDiskUsageMonitor(scheduler);
}

public RemoteLogManager getRemoteLogManager() {
Expand Down Expand Up @@ -423,6 +428,11 @@ private void registerMetrics() {
physicalStorage.gauge(
MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE,
this::physicalStorageRemoteLogSize);

serverMetricGroup.gauge(
MetricNames.DISK_USAGE_RATIO, localDiskManager::getLastDiskUsageRatio);
serverMetricGroup.gauge(
MetricNames.DISK_WRITE_LOCKED, () -> localDiskManager.isDiskWriteLocked() ? 1 : 0);
}

@VisibleForTesting
Expand Down Expand Up @@ -592,6 +602,16 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
}
}

@VisibleForTesting
public boolean isDiskWriteLocked() {
return localDiskManager.isDiskWriteLocked();
}

@VisibleForTesting
public DiskUsageMonitor getDiskUsageMonitor() {
return localDiskManager.getDiskUsageMonitor();
}

/**
* Append log records to leader replicas of the buckets, and wait for them to be replicated to
* other replicas.
Expand All @@ -609,6 +629,7 @@ public void appendRecordsToLog(
if (isRequiredAcksInvalid(requiredAcks)) {
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
}
localDiskManager.ensureWritable();

long startTime = System.currentTimeMillis();
Map<TableBucket, ProduceLogResultForBucket> appendResult =
Expand Down Expand Up @@ -662,6 +683,7 @@ public void putRecordsToKv(
if (isRequiredAcksInvalid(requiredAcks)) {
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
}
localDiskManager.ensureWritable();

long startTime = System.currentTimeMillis();
Map<TableBucket, PutKvResultForBucket> kvPutResult =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.storage;

import org.apache.fluss.annotation.Internal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* Collects the local data disk usage ratio for the tablet server. The reported ratio is the
* <b>maximum</b> usage across all distinct {@link FileStore}s backing the configured data
* directories. A per-disk maximum (rather than a weighted average over total/used bytes) is used so
* that a single nearly-full disk cannot be masked by other low-usage disks in a multi-disk
* deployment: any single disk crossing the limit ratio must trip the write protection, because
* partitions pinned to that disk would otherwise fail to write. Multiple data directories sharing
* the same physical {@link FileStore} are still counted only once.
*/
@Internal
public final class DiskUsageCollector {

private static final Logger LOG = LoggerFactory.getLogger(DiskUsageCollector.class);

private final List<File> dataDirs;

public DiskUsageCollector(List<File> dataDirs) {
checkNotNull(dataDirs, "dataDirs");
this.dataDirs = Collections.unmodifiableList(dataDirs);
}

/**
* Collects the current disk usage ratio in the range {@code [0.0, 1.0]}, defined as the maximum
* usage across all distinct {@link FileStore}s. Returns {@code 0.0} when no data directory is
* configured or every reachable file store reports a non-positive total space.
*
* <p>Individual directories that fail (e.g. deleted at runtime) are skipped with a warning so
* that one unhealthy directory does not prevent monitoring of the remaining disks. An {@link
* IOException} is thrown only when <b>all</b> directories fail.
*/
public double collect() throws IOException {
double maxRatio = 0.0;
Set<FileStore> counted = new HashSet<>();
int failures = 0;
for (File dir : dataDirs) {
FileStore fs;
try {
fs = Files.getFileStore(dir.toPath());
} catch (IOException e) {
LOG.warn("Failed to get FileStore for data directory {}; skipping.", dir, e);
failures++;
continue;
}
if (counted.add(fs)) {
long total = fs.getTotalSpace();
if (total <= 0L) {
continue;
}
double ratio = (double) (total - fs.getUsableSpace()) / total;
if (ratio > maxRatio) {
maxRatio = ratio;
}
}
}
if (failures > 0 && failures == dataDirs.size()) {
throw new IOException("All " + failures + " data directories failed FileStore lookup.");
}
return maxRatio;
}
}
Loading