diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 1818e16ef3..c11c15e591 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -335,6 +335,28 @@ public class ConfigOptions { + "The default value is 10.") .withDeprecatedKeys("coordinator.io-pool.size"); + public static final ConfigOption SERVER_DATA_DISK_WRITE_LIMIT_RATIO = + key("server.data-disk.write-limit-ratio") + .doubleType() + .defaultValue(0.85) + .withDescription( + "Reject writes when the tablet server data disk usage exceeds this ratio. " + + "Writes resume after the usage drops below (ratio - 0.10). " + + "Set to 1.0 to disable the disk-usage protection entirely. " + + "The valid range is (0.0, 1.0]."); + + public static final ConfigOption SERVER_DATA_DISK_CHECK_INTERVAL = + key("server.data-disk.check-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The interval at which the tablet server samples the local data disk " + + "usage for the write-protection state machine. A shorter interval " + + "narrows the time window during which writes can still flow in " + + "after the disk crosses the limit ratio, at the cost of slightly " + + "more frequent statvfs calls (which are in-memory and cheap). " + + "The default 30s is suitable for typical write workloads."); + // ------------------------------------------------------------------------ // ConfigOptions for Coordinator Server // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/DiskWriteLockedException.java b/fluss-common/src/main/java/org/apache/fluss/exception/DiskWriteLockedException.java new file mode 100644 index 0000000000..cce9c3a74a --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/DiskWriteLockedException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown by a tablet server to reject writes when its local data disk usage has reached the + * configured write-limit ratio. The exception is retriable so that clients can retry once the + * server frees up enough disk space and resumes accepting writes. + */ +@PublicEvolving +public class DiskWriteLockedException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public DiskWriteLockedException(String message) { + super(message); + } + + public DiskWriteLockedException(int serverId, double usageRatio, double limit) { + super( + String.format( + "TabletServer %d has rejected writes because the data disk usage " + + "reached %.2f%% (limit: %.2f%%). Free up space or scale the cluster.", + serverId, usageRatio * 100, limit * 100)); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 8f48ccfd33..07ebf57b6d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -94,6 +94,10 @@ public class MetricNames { public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize"; public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize"; + // for tablet server data disk write protection + public static final String DISK_USAGE_RATIO = "diskUsageRatio"; + public static final String DISK_WRITE_LOCKED = "diskWriteLocked"; + // -------------------------------------------------------------------------------------------- // metrics for user // -------------------------------------------------------------------------------------------- diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index c72012a0ae..0307932ecd 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; import org.apache.fluss.exception.DeletionDisabledException; +import org.apache.fluss.exception.DiskWriteLockedException; import org.apache.fluss.exception.DuplicateSequenceException; import org.apache.fluss.exception.FencedLeaderEpochException; import org.apache.fluss.exception.FencedTieringEpochException; @@ -265,7 +266,11 @@ public enum Errors { TOO_MANY_SCANNERS( 69, "The per-bucket or per-server scanner session limit has been reached.", - TooManyScannersException::new); + TooManyScannersException::new), + DISK_WRITE_LOCKED( + 70, + "The tablet server has rejected writes because its data disk usage reached the configured write-limit ratio.", + DiskWriteLockedException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index dcec31b463..e130c392d5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -45,6 +45,7 @@ import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC; import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL; import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER; +import static org.apache.fluss.config.ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -64,7 +65,8 @@ class DynamicServerConfig { DATALAKE_FORMAT.key(), LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), - KV_SNAPSHOT_INTERVAL.key())); + KV_SNAPSHOT_INTERVAL.key(), + SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key())); private static final Set ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake."); private final ReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 71fd233b68..0aaea033a4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -106,6 +106,7 @@ import org.apache.fluss.server.replica.delay.DelayedWrite; import org.apache.fluss.server.replica.fetcher.InitialFetchStatus; import org.apache.fluss.server.replica.fetcher.ReplicaFetcherManager; +import org.apache.fluss.server.storage.DiskUsageMonitor; import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.utils.FatalErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; @@ -332,6 +333,7 @@ public ReplicaManager( this.ioExecutor = ioExecutor; this.minInSyncReplicas = conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER); this.scannerManager = checkNotNull(scannerManager, "scannerManager"); + registerMetrics(); } @@ -344,6 +346,9 @@ public void startup() { this::maybeShrinkIsr, 0L, conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis() / 2); + + // Start periodic disk usage monitoring (initial + periodic sampling) + localDiskManager.startDiskUsageMonitor(scheduler); } public RemoteLogManager getRemoteLogManager() { @@ -423,6 +428,11 @@ private void registerMetrics() { physicalStorage.gauge( MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE, this::physicalStorageRemoteLogSize); + + serverMetricGroup.gauge( + MetricNames.DISK_USAGE_RATIO, localDiskManager::getLastDiskUsageRatio); + serverMetricGroup.gauge( + MetricNames.DISK_WRITE_LOCKED, () -> localDiskManager.isDiskWriteLocked() ? 1 : 0); } @VisibleForTesting @@ -592,6 +602,16 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { } } + @VisibleForTesting + public boolean isDiskWriteLocked() { + return localDiskManager.isDiskWriteLocked(); + } + + @VisibleForTesting + public DiskUsageMonitor getDiskUsageMonitor() { + return localDiskManager.getDiskUsageMonitor(); + } + /** * Append log records to leader replicas of the buckets, and wait for them to be replicated to * other replicas. @@ -609,6 +629,7 @@ public void appendRecordsToLog( if (isRequiredAcksInvalid(requiredAcks)) { throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks); } + localDiskManager.ensureWritable(); long startTime = System.currentTimeMillis(); Map appendResult = @@ -662,6 +683,7 @@ public void putRecordsToKv( if (isRequiredAcksInvalid(requiredAcks)) { throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks); } + localDiskManager.ensureWritable(); long startTime = System.currentTimeMillis(); Map kvPutResult = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/storage/DiskUsageCollector.java b/fluss-server/src/main/java/org/apache/fluss/server/storage/DiskUsageCollector.java new file mode 100644 index 0000000000..9bc7523acd --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/storage/DiskUsageCollector.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.storage; + +import org.apache.fluss.annotation.Internal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Collects the local data disk usage ratio for the tablet server. The reported ratio is the + * maximum usage across all distinct {@link FileStore}s backing the configured data + * directories. A per-disk maximum (rather than a weighted average over total/used bytes) is used so + * that a single nearly-full disk cannot be masked by other low-usage disks in a multi-disk + * deployment: any single disk crossing the limit ratio must trip the write protection, because + * partitions pinned to that disk would otherwise fail to write. Multiple data directories sharing + * the same physical {@link FileStore} are still counted only once. + */ +@Internal +public final class DiskUsageCollector { + + private static final Logger LOG = LoggerFactory.getLogger(DiskUsageCollector.class); + + private final List dataDirs; + + public DiskUsageCollector(List dataDirs) { + checkNotNull(dataDirs, "dataDirs"); + this.dataDirs = Collections.unmodifiableList(dataDirs); + } + + /** + * Collects the current disk usage ratio in the range {@code [0.0, 1.0]}, defined as the maximum + * usage across all distinct {@link FileStore}s. Returns {@code 0.0} when no data directory is + * configured or every reachable file store reports a non-positive total space. + * + *

Individual directories that fail (e.g. deleted at runtime) are skipped with a warning so + * that one unhealthy directory does not prevent monitoring of the remaining disks. An {@link + * IOException} is thrown only when all directories fail. + */ + public double collect() throws IOException { + double maxRatio = 0.0; + Set counted = new HashSet<>(); + int failures = 0; + for (File dir : dataDirs) { + FileStore fs; + try { + fs = Files.getFileStore(dir.toPath()); + } catch (IOException e) { + LOG.warn("Failed to get FileStore for data directory {}; skipping.", dir, e); + failures++; + continue; + } + if (counted.add(fs)) { + long total = fs.getTotalSpace(); + if (total <= 0L) { + continue; + } + double ratio = (double) (total - fs.getUsableSpace()) / total; + if (ratio > maxRatio) { + maxRatio = ratio; + } + } + } + if (failures > 0 && failures == dataDirs.size()) { + throw new IOException("All " + failures + " data directories failed FileStore lookup."); + } + return maxRatio; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/storage/DiskUsageMonitor.java b/fluss-server/src/main/java/org/apache/fluss/server/storage/DiskUsageMonitor.java new file mode 100644 index 0000000000..8a27c1ef32 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/storage/DiskUsageMonitor.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.storage; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Periodically samples the local data disk usage ratio and toggles the tablet server write-lock + * state with a fixed 10% hysteresis: writes are locked when the usage reaches the configured + * write-limit ratio and resume only after the usage drops below {@code (limit - 0.10)}. The monitor + * is single-state and intended to be driven by a scheduler thread; it never blocks. + */ +@Internal +public final class DiskUsageMonitor { + + /** Fixed hysteresis between the lock and unlock thresholds. */ + public static final double RECOVER_GAP = 0.10; + + private static final Logger LOG = LoggerFactory.getLogger(DiskUsageMonitor.class); + + private final int serverId; + private final DiskUsageCollector collector; + private volatile double writeLimitRatio; + private volatile double recoverThreshold; + private final Listener listener; + + private volatile boolean locked; + private volatile double lastUsageRatio; + + public DiskUsageMonitor( + int serverId, DiskUsageCollector collector, double writeLimitRatio, Listener listener) { + checkArgument( + writeLimitRatio > 0.0 && writeLimitRatio <= 1.0, + "%s must be within (0.0, 1.0], but was %s", + "server.data-disk.write-limit-ratio", + writeLimitRatio); + this.serverId = serverId; + this.collector = checkNotNull(collector, "collector"); + this.writeLimitRatio = writeLimitRatio; + this.recoverThreshold = Math.max(0.0, writeLimitRatio - RECOVER_GAP); + this.listener = checkNotNull(listener, "listener"); + } + + /** Samples the disk usage once and updates the lock state. Never throws. */ + public void runOnce() { + double usage; + try { + usage = collector.collect(); + } catch (IOException e) { + LOG.warn( + "Failed to collect disk usage for tablet server {}; " + + "keep the previous write-lock state {} (last usage {}%).", + serverId, locked, String.format("%.2f", lastUsageRatio * 100), e); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "[DISK-MONITOR-DEBUG] TabletServer {} disk usage: {}% | limit: {}% | " + + "recover: {}% | locked: {}", + serverId, + String.format("%.4f", usage * 100), + String.format("%.2f", writeLimitRatio * 100), + String.format("%.2f", recoverThreshold * 100), + locked); + } + update(usage); + } + + @VisibleForTesting + public void update(double usage) { + lastUsageRatio = usage; + // writeLimitRatio == 1.0 means the protection is disabled; skip state transitions + // but still keep sampling for metrics. + if (Double.compare(writeLimitRatio, 1.0) >= 0) { + if (locked) { + locked = false; + } + listener.onSample(lastUsageRatio, false); + return; + } + boolean wasLocked = locked; + if (!wasLocked && usage >= writeLimitRatio) { + locked = true; + LOG.warn( + "TabletServer {} disk usage reached {}% (limit {}%); rejecting writes " + + "until usage drops below {}%.", + serverId, + String.format("%.2f", usage * 100), + String.format("%.2f", writeLimitRatio * 100), + String.format("%.2f", recoverThreshold * 100)); + } else if (wasLocked && usage <= recoverThreshold) { + locked = false; + LOG.info( + "TabletServer {} disk usage dropped to {}% (recover threshold {}%); " + + "resuming writes.", + serverId, + String.format("%.2f", usage * 100), + String.format("%.2f", recoverThreshold * 100)); + } + listener.onSample(lastUsageRatio, locked); + } + + public boolean isLocked() { + return locked; + } + + public double getLastUsageRatio() { + return lastUsageRatio; + } + + public double getWriteLimitRatio() { + return writeLimitRatio; + } + + public double getRecoverThreshold() { + return recoverThreshold; + } + + /** + * Dynamically updates the write-limit ratio and the derived recover threshold. The new ratio + * takes effect on the next {@link #runOnce()} invocation or {@link #update(double)} call. + * + * @param newRatio the new write-limit ratio, must be within (0.0, 1.0] + */ + public void updateWriteLimitRatio(double newRatio) { + checkArgument( + newRatio > 0.0 && newRatio <= 1.0, + "server.data-disk.write-limit-ratio must be within (0.0, 1.0], but was %s", + newRatio); + this.writeLimitRatio = newRatio; + this.recoverThreshold = Math.max(0.0, newRatio - RECOVER_GAP); + } + + /** Receives every sample for downstream state synchronization (e.g. metrics gauges). */ + @FunctionalInterface + public interface Listener { + void onSample(double usageRatio, boolean locked); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java b/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java index f7a8d6c444..466af0a6cf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java @@ -20,10 +20,14 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.ServerReconfigurable; +import org.apache.fluss.exception.ConfigException; +import org.apache.fluss.exception.DiskWriteLockedException; import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.IOUtils; +import org.apache.fluss.utils.concurrent.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,8 +71,12 @@ * After that, the mutable state is limited to in-memory bucket counters and shutdown cleanup. The * public runtime APIs synchronize on the manager instance so directory lookups, counter updates, * and {@link #close()} observe a consistent view. + * + *

The disk usage write-protection state ({@link #isDiskWriteLocked()} / {@link + * #getLastDiskUsageRatio()}) is updated by the scheduler thread started via {@link + * #startDiskUsageMonitor(Scheduler)} and read by request handlers via {@code volatile} fields. */ -public final class LocalDiskManager implements Closeable { +public final class LocalDiskManager implements Closeable, ServerReconfigurable { private static final Logger LOG = LoggerFactory.getLogger(LocalDiskManager.class); @@ -84,6 +92,32 @@ public final class LocalDiskManager implements Closeable { private final Map diskInfosByDir = new LinkedHashMap<>(); private final Map directoryLocks = new LinkedHashMap<>(); + // ------------------------------------------------------------------------ + // Disk usage write protection + // ------------------------------------------------------------------------ + + private final long diskCheckIntervalMs; + private final DiskUsageMonitor diskUsageMonitor; + + /** + * Configured high-water-mark ratio: writes are rejected once any single backing {@link + * java.nio.file.FileStore}'s usage ratio reaches this value. Writes resume after the usage + * drops below {@code diskWriteLimitRatio - 0.10} (the recover gap is owned by {@link + * DiskUsageMonitor}). This field is volatile because it can be changed at runtime via dynamic + * reconfiguration. + */ + private volatile double diskWriteLimitRatio; + + /** + * Whether the local tablet server is currently rejecting writes because the data disk usage has + * reached {@link #diskWriteLimitRatio}. Updated by the {@link DiskUsageMonitor} scheduler + * thread, read by append/put handlers. + */ + private volatile boolean diskWriteLocked; + + /** Last observed disk usage ratio in range {@code [0.0, 1.0]}. */ + private volatile double lastDiskUsageRatio; + // ------------------------------------------------------------------------ // Initialization // ------------------------------------------------------------------------ @@ -99,6 +133,33 @@ private LocalDiskManager(Configuration conf) throws IOException { throw throwable; } this.dataDirs = Collections.unmodifiableList(initializedDataDirs); + + this.diskWriteLimitRatio = conf.get(ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO); + if (diskWriteLimitRatio <= 0.0 || diskWriteLimitRatio > 1.0) { + throw new IllegalConfigurationException( + String.format( + "%s must be within (0.0, 1.0], but was %s", + ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key(), + diskWriteLimitRatio)); + } + this.diskCheckIntervalMs = + conf.get(ConfigOptions.SERVER_DATA_DISK_CHECK_INTERVAL).toMillis(); + if (diskCheckIntervalMs <= 0L) { + throw new IllegalConfigurationException( + String.format( + "%s must be a positive duration, but was %s ms", + ConfigOptions.SERVER_DATA_DISK_CHECK_INTERVAL.key(), + diskCheckIntervalMs)); + } + this.diskUsageMonitor = + new DiskUsageMonitor( + serverId, + new DiskUsageCollector(this.dataDirs), + diskWriteLimitRatio, + (usage, locked) -> { + this.lastDiskUsageRatio = usage; + this.diskWriteLocked = locked; + }); } public static LocalDiskManager create(Configuration conf) throws IOException { @@ -402,6 +463,101 @@ public List dataDirs() { return dataDirs; } + // ------------------------------------------------------------------------ + // Disk usage write protection + // ------------------------------------------------------------------------ + + /** + * Schedules the periodic disk usage sampling on the given {@link Scheduler}. An immediate + * synchronous sample is performed first so that the write-lock state is meaningful before the + * first scheduled tick. + */ + public void startDiskUsageMonitor(Scheduler scheduler) { + // initial sample to avoid an open window before the first scheduled run + diskUsageMonitor.runOnce(); + scheduler.schedule( + "disk-usage-monitor", + diskUsageMonitor::runOnce, + diskCheckIntervalMs, + diskCheckIntervalMs); + } + + /** + * @return whether write requests are currently being rejected due to disk pressure. + */ + public boolean isDiskWriteLocked() { + return diskWriteLocked; + } + + /** + * @return the most recent observed disk usage ratio in {@code [0.0, 1.0]}. + */ + public double getLastDiskUsageRatio() { + return lastDiskUsageRatio; + } + + /** + * @return the configured high-water-mark ratio that triggers the write lock. + */ + public double getDiskWriteLimitRatio() { + return diskWriteLimitRatio; + } + + /** + * Throws {@link DiskWriteLockedException} when the local data disk usage has crossed the + * configured write-limit ratio. Only client-driven writes ({@code appendLog} / {@code putKv}) + * should call this; follower replication paths must bypass this check to preserve replica + * consistency. + */ + public void ensureWritable() { + if (diskWriteLocked) { + throw new DiskWriteLockedException(serverId, lastDiskUsageRatio, diskWriteLimitRatio); + } + } + + @VisibleForTesting + public DiskUsageMonitor getDiskUsageMonitor() { + return diskUsageMonitor; + } + + // ------------------------------------------------------------------------ + // ServerReconfigurable: dynamic write-limit-ratio update + // ------------------------------------------------------------------------ + + @Override + public void validate(Configuration newConfig) throws ConfigException { + double newRatio = newConfig.get(ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO); + if (newRatio <= 0.0 || newRatio > 1.0) { + throw new ConfigException( + String.format( + "Invalid %s: must be within (0.0, 1.0], but was %s", + ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key(), newRatio)); + } + } + + @Override + public void reconfigure(Configuration newConfig) { + double newRatio = newConfig.get(ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO); + if (Double.compare(newRatio, diskWriteLimitRatio) == 0) { + LOG.debug( + "{} unchanged: {}", + ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key(), + newRatio); + return; + } + double oldRatio = diskWriteLimitRatio; + diskWriteLimitRatio = newRatio; + diskUsageMonitor.updateWriteLimitRatio(newRatio); + // Trigger an immediate check so the new threshold takes effect without waiting + // for the next scheduled tick. + diskUsageMonitor.runOnce(); + LOG.info( + "{} reconfigured: {} -> {} (immediate check triggered)", + ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key(), + oldRatio, + newRatio); + } + /** * Resolves a local path back to the configured data directory that owns it. * diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 4ec7df46ee..74afa83378 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -299,6 +299,8 @@ protected void startServices() throws Exception { dynamicConfigManager.register(replicaManager.getKvSnapshotContext()); // Register replicaManager to dynamicConfigManager for dynamic config dynamicConfigManager.register(replicaManager); + // Register localDiskManager for dynamic server.data-disk.write-limit-ratio + dynamicConfigManager.register(localDiskManager); // Start dynamicConfigManager after all reconfigurable components are registered dynamicConfigManager.startup(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index 3ef221ecab..c9378bad28 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -24,6 +24,7 @@ import org.apache.fluss.config.cluster.ServerReconfigurable; import org.apache.fluss.exception.ConfigException; import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -36,7 +37,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -53,6 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.within; /** Test for {@link DynamicConfigManager}. */ public class DynamicConfigChangeTest { @@ -552,4 +556,90 @@ void testExplicitDataLakeEnabledRequiresDataLakeFormat() throws Exception { "'datalake.format' must be configured when 'datalake.enabled' is explicitly set to true."); } } + + @Test + void testDynamicDiskWriteLimitRatioChange(@TempDir File tempDir) throws Exception { + File dataDir = new File(tempDir, "data-0"); + assertThat(dataDir.mkdirs()).isTrue(); + + Configuration configuration = new Configuration(); + configuration.setInt(ConfigOptions.TABLET_SERVER_ID, 0); + configuration.setString(ConfigOptions.DATA_DIR, dataDir.getAbsolutePath()); + configuration.set(ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO, 0.85); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + + // Create LocalDiskManager and register it + try (LocalDiskManager localDiskManager = LocalDiskManager.create(configuration)) { + dynamicConfigManager.register(localDiskManager); + dynamicConfigManager.startup(); + + // Verify initial state + assertThat(localDiskManager.getDiskWriteLimitRatio()).isEqualTo(0.85); + assertThat(localDiskManager.getDiskUsageMonitor().getWriteLimitRatio()).isEqualTo(0.85); + assertThat(localDiskManager.getDiskUsageMonitor().getRecoverThreshold()) + .isEqualTo(0.75); + + // Lower the limit to 0.70 via dynamic config + assertThatCode( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions + .SERVER_DATA_DISK_WRITE_LIMIT_RATIO + .key(), + "0.70", + AlterConfigOpType.SET)))) + .doesNotThrowAnyException(); + + // Verify the new ratio took effect immediately (reconfigure triggers runOnce) + assertThat(localDiskManager.getDiskWriteLimitRatio()).isEqualTo(0.70); + assertThat(localDiskManager.getDiskUsageMonitor().getWriteLimitRatio()).isEqualTo(0.70); + assertThat(localDiskManager.getDiskUsageMonitor().getRecoverThreshold()) + .isCloseTo(0.60, within(1e-9)); + + // Verify config was persisted to ZK + Map zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key())) + .isEqualTo("0.70"); + } + } + + @Test + void testPreventInvalidDiskWriteLimitRatio(@TempDir File tempDir) throws Exception { + File dataDir = new File(tempDir, "data-0"); + assertThat(dataDir.mkdirs()).isTrue(); + + Configuration configuration = new Configuration(); + configuration.setInt(ConfigOptions.TABLET_SERVER_ID, 0); + configuration.setString(ConfigOptions.DATA_DIR, dataDir.getAbsolutePath()); + configuration.set(ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO, 0.85); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(configuration)) { + dynamicConfigManager.register(localDiskManager); + dynamicConfigManager.startup(); + + // Try to set to invalid value 0.0 - should be rejected + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions + .SERVER_DATA_DISK_WRITE_LIMIT_RATIO + .key(), + "0.0", + AlterConfigOpType.SET)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining("must be within (0.0, 1.0]"); + + // Verify the ratio was NOT changed + assertThat(localDiskManager.getDiskWriteLimitRatio()).isEqualTo(0.85); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 14e24c18cf..eea0d5eb12 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.DiskWriteLockedException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidRequiredAcksException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -491,6 +492,66 @@ void testFetchLogWithMaxBytesLimitForMultiTableBucket() throws Exception { } } + @Test + void testAppendRejectedWhenDiskLocked() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1); + makeLogTableAsLeader(tb.getBucket()); + + // simulate disk usage breaching the write-limit ratio (default 0.85) + replicaManager.getDiskUsageMonitor().update(0.95); + assertThat(replicaManager.isDiskWriteLocked()).isTrue(); + + assertThatThrownBy( + () -> + replicaManager.appendRecordsToLog( + 20000, + 1, + Collections.singletonMap( + tb, genMemoryLogRecordsByObject(DATA1)), + null, + (result) -> {})) + .isInstanceOf(DiskWriteLockedException.class) + .hasMessageContaining("data disk usage"); + + // recover when usage drops below (limit - 0.10) -> 0.75 + replicaManager.getDiskUsageMonitor().update(0.50); + assertThat(replicaManager.isDiskWriteLocked()).isFalse(); + + CompletableFuture> future = new CompletableFuture<>(); + replicaManager.appendRecordsToLog( + 20000, + 1, + Collections.singletonMap(tb, genMemoryLogRecordsByObject(DATA1)), + null, + future::complete); + assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L)); + } + + @Test + void testPutKvRejectedWhenDiskLocked() { + TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); + + replicaManager.getDiskUsageMonitor().update(0.99); + assertThat(replicaManager.isDiskWriteLocked()).isTrue(); + + assertThatThrownBy( + () -> + replicaManager.putRecordsToKv( + 20000, + 1, + Collections.singletonMap( + tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), + null, + MergeMode.DEFAULT, + PUT_KV_VERSION, + (result) -> {})) + .isInstanceOf(DiskWriteLockedException.class); + + // unlock for any subsequent tests on the shared replicaManager instance + replicaManager.getDiskUsageMonitor().update(0.10); + } + @Test void testPutKv() throws Exception { TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/storage/DiskUsageCollectorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/storage/DiskUsageCollectorTest.java new file mode 100644 index 0000000000..86b09c6fa1 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/storage/DiskUsageCollectorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.storage; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link DiskUsageCollector}. */ +class DiskUsageCollectorTest { + + @TempDir private File tempDir; + + @Test + void testCollectReturnsRatioInRange() throws Exception { + File dataDir = new File(tempDir, "data-0"); + assertThat(dataDir.mkdirs()).isTrue(); + + DiskUsageCollector collector = new DiskUsageCollector(Collections.singletonList(dataDir)); + + double ratio = collector.collect(); + assertThat(ratio).isBetween(0.0, 1.0); + } + + @Test + void testMultipleDirsOnSameFileStoreDeduplicated() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + assertThat(dataDir1.mkdirs()).isTrue(); + assertThat(dataDir2.mkdirs()).isTrue(); + + DiskUsageCollector twoDirs = new DiskUsageCollector(Arrays.asList(dataDir1, dataDir2)); + DiskUsageCollector oneDir = new DiskUsageCollector(Collections.singletonList(dataDir1)); + + // both directories share the same FileStore -> result should match a single-dir collector + assertThat(twoDirs.collect()).isEqualTo(oneDir.collect()); + } + + @Test + void testEmptyDataDirsReturnsZero() throws Exception { + DiskUsageCollector collector = new DiskUsageCollector(Collections.emptyList()); + assertThat(collector.collect()).isEqualTo(0.0); + } + + @Test + void testPartialFailureSkipsBadDirAndReportsRemaining() throws Exception { + File goodDir = new File(tempDir, "good"); + assertThat(goodDir.mkdirs()).isTrue(); + File badDir = new File(tempDir, "does-not-exist/nested"); + + DiskUsageCollector collector = new DiskUsageCollector(Arrays.asList(badDir, goodDir)); + double ratio = collector.collect(); + assertThat(ratio).isBetween(0.0, 1.0); + } + + @Test + void testAllDirsFailThrowsIOException() { + File bad1 = new File("/__fluss_non_existent_1__/x"); + File bad2 = new File("/__fluss_non_existent_2__/y"); + + DiskUsageCollector collector = new DiskUsageCollector(Arrays.asList(bad1, bad2)); + assertThatThrownBy(collector::collect) + .isInstanceOf(IOException.class) + .hasMessageContaining("All 2 data directories failed"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/storage/DiskUsageMonitorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/storage/DiskUsageMonitorTest.java new file mode 100644 index 0000000000..dc1291d08c --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/storage/DiskUsageMonitorTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.storage; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.within; + +/** Test for {@link DiskUsageMonitor}. */ +class DiskUsageMonitorTest { + + private static final int SERVER_ID = 7; + + @Test + void testInvalidLimitRatioRejected() { + DiskUsageCollector collector = new DiskUsageCollector(Collections.emptyList()); + assertThatThrownBy( + () -> + new DiskUsageMonitor( + SERVER_ID, collector, 0.0, (usage, locked) -> {})) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + new DiskUsageMonitor( + SERVER_ID, collector, 1.5, (usage, locked) -> {})) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testStaysUnlockedBelowLimit() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(0.85, recorder); + + monitor.update(0.50); + assertThat(monitor.isLocked()).isFalse(); + assertThat(monitor.getLastUsageRatio()).isEqualTo(0.50); + assertThat(recorder.lastLocked.get()).isFalse(); + assertThat(recorder.lastUsage.get()).isEqualTo(0.50); + } + + @Test + void testLockedWhenReachingLimit() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(0.85, recorder); + + monitor.update(0.85); + assertThat(monitor.isLocked()).isTrue(); + assertThat(recorder.lastLocked.get()).isTrue(); + + monitor.update(0.90); + assertThat(monitor.isLocked()).isTrue(); + } + + @Test + void testStaysLockedAboveRecoverThreshold() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(0.85, recorder); + + monitor.update(0.86); + assertThat(monitor.isLocked()).isTrue(); + + // recover threshold is 0.75, 0.80 still above it -> stay locked + monitor.update(0.80); + assertThat(monitor.isLocked()).isTrue(); + } + + @Test + void testUnlockedAtRecoverThreshold() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(0.85, recorder); + + monitor.update(0.90); + assertThat(monitor.isLocked()).isTrue(); + + monitor.update(0.75); + assertThat(monitor.isLocked()).isFalse(); + assertThat(recorder.lastLocked.get()).isFalse(); + } + + @Test + void testRecoverThresholdNeverNegative() { + DiskUsageMonitor monitor = newMonitor(0.05, new Recorder()); + assertThat(monitor.getRecoverThreshold()).isEqualTo(0.0); + assertThat(monitor.getWriteLimitRatio()).isEqualTo(0.05); + } + + @Test + void testRunOncePreservesStateWhenCollectorThrows() { + // Pointing the collector at a non-existent path makes Files.getFileStore raise + // NoSuchFileException -> IOException, which runOnce must swallow without flipping state. + DiskUsageCollector failing = + new DiskUsageCollector( + Collections.singletonList( + new File("/__fluss_disk_monitor_does_not_exist__/x"))); + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = new DiskUsageMonitor(SERVER_ID, failing, 0.85, recorder); + + // First put the monitor into the locked state via update(). + monitor.update(0.95); + assertThat(monitor.isLocked()).isTrue(); + double usageBefore = monitor.getLastUsageRatio(); + recorder.lastUsage.set(Double.NaN); + recorder.lastLocked.set(false); + + // runOnce should swallow the IOException, keep locked=true and not invoke the listener. + monitor.runOnce(); + assertThat(monitor.isLocked()).isTrue(); + assertThat(monitor.getLastUsageRatio()).isEqualTo(usageBefore); + assertThat(Double.isNaN(recorder.lastUsage.get())).isTrue(); + assertThat(recorder.lastLocked.get()).isFalse(); + } + + @Test + void testRunOnceUpdatesUsageWhenCollectorReturnsValue() { + // Reuse a real collector backed by an empty data dirs list -> always returns 0.0. + DiskUsageCollector collector = new DiskUsageCollector(Collections.emptyList()); + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = new DiskUsageMonitor(SERVER_ID, collector, 0.85, recorder); + + monitor.runOnce(); + assertThat(monitor.isLocked()).isFalse(); + assertThat(monitor.getLastUsageRatio()).isEqualTo(0.0); + assertThat(recorder.lastUsage.get()).isEqualTo(0.0); + assertThat(recorder.lastLocked.get()).isFalse(); + } + + private DiskUsageMonitor newMonitor(double limit, DiskUsageMonitor.Listener listener) { + return new DiskUsageMonitor( + SERVER_ID, new DiskUsageCollector(Collections.emptyList()), limit, listener); + } + + @Test + void testUpdateWriteLimitRatioChangesThresholds() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(0.85, recorder); + + // Initially ratio=0.85, recover=0.75 + assertThat(monitor.getWriteLimitRatio()).isEqualTo(0.85); + assertThat(monitor.getRecoverThreshold()).isEqualTo(0.75); + + // Simulate usage at 0.82 — should NOT lock (below 0.85) + monitor.update(0.82); + assertThat(monitor.isLocked()).isFalse(); + + // Lower the limit to 0.80 — now 0.82 exceeds the new limit + monitor.updateWriteLimitRatio(0.80); + assertThat(monitor.getWriteLimitRatio()).isEqualTo(0.80); + assertThat(monitor.getRecoverThreshold()).isCloseTo(0.70, within(1e-9)); + + // Re-evaluate with same usage — should lock + monitor.update(0.82); + assertThat(monitor.isLocked()).isTrue(); + assertThat(recorder.lastLocked.get()).isTrue(); + + // Raise the limit to 0.90 and recover threshold becomes 0.80 + // 0.82 > 0.80 so should remain locked + monitor.updateWriteLimitRatio(0.90); + monitor.update(0.82); + assertThat(monitor.isLocked()).isTrue(); + + // Drop usage to 0.79 — now below new recover threshold 0.80, should unlock + monitor.update(0.79); + assertThat(monitor.isLocked()).isFalse(); + } + + @Test + void testDisabledWhenRatioIsOne() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(1.0, recorder); + + // Even at 100% disk usage, should NOT lock when ratio = 1.0 + monitor.update(1.0); + assertThat(monitor.isLocked()).isFalse(); + assertThat(recorder.lastLocked.get()).isFalse(); + assertThat(recorder.lastUsage.get()).isEqualTo(1.0); + + // Any usage below 1.0 should also remain unlocked + monitor.update(0.99); + assertThat(monitor.isLocked()).isFalse(); + + // If previously locked (via dynamic update), switching to 1.0 should unlock immediately + DiskUsageMonitor monitor2 = newMonitor(0.80, recorder); + monitor2.update(0.85); // lock it + assertThat(monitor2.isLocked()).isTrue(); + + monitor2.updateWriteLimitRatio(1.0); + monitor2.update(0.85); // same usage, but ratio=1.0 -> should unlock + assertThat(monitor2.isLocked()).isFalse(); + assertThat(recorder.lastLocked.get()).isFalse(); + } + + @Test + void testUpdateWriteLimitRatioRejectsInvalidValues() { + Recorder recorder = new Recorder(); + DiskUsageMonitor monitor = newMonitor(0.85, recorder); + + assertThatThrownBy(() -> monitor.updateWriteLimitRatio(0.0)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> monitor.updateWriteLimitRatio(1.1)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> monitor.updateWriteLimitRatio(-0.5)) + .isInstanceOf(IllegalArgumentException.class); + } + + /** Captures the latest sample observed by the listener. */ + private static final class Recorder implements DiskUsageMonitor.Listener { + final AtomicReference lastUsage = new AtomicReference<>(Double.NaN); + final AtomicBoolean lastLocked = new AtomicBoolean(false); + + @Override + public void onSample(double usageRatio, boolean locked) { + lastUsage.set(usageRatio); + lastLocked.set(locked); + } + } +} diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 2f77fb5676..1691ba7de4 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -64,20 +64,22 @@ during the Fluss cluster working. ## TabletServer -| Option | Type | Default | Description | -|--------------------------------------------------|------------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| tablet-server.id | Integer | (None) | The id for the tablet server. | -| tablet-server.rack | String | (None) | The rack for the TabletServer. This will be used in rack aware bucket assignment for fault tolerance. Examples: `RACK1`, `cn-hangzhou-server10` | -| data.dir | String | /tmp/fluss-data | This configuration controls the directory where Fluss will store its data. The default value is /tmp/fluss-data | -| server.writer-id.expiration-time | Duration | 7d | The time that the tablet server will wait without receiving any write request from a client before expiring the related status. The default value is 7 days. | -| server.writer-id.expiration-check-interval | Duration | 10min | The interval at which to remove writer ids that have expired due to `server.writer-id.expiration-time passing. The default value is 10 minutes. | -| server.background.threads | Integer | 10 | The number of threads to use for various background processing tasks. The default value is 10. | -| server.buffer.memory-size | MemorySize | 256mb | The total bytes of memory the server can use, e.g, buffer write-ahead-log rows. | -| server.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers (`server.buffer.memory-size`). | -| server.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of server.buffer.page-size. It must be greater than or equal to server.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | | -| server.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the buffer pool will block when waiting for segments. | -| tablet-server.controlled-shutdown.max-retries | Integer | 3 | Maximum number of attempts to transfer leadership before proceeding with an unclean shutdown during a controlled shutdown procedure. | -| tablet-server.controlled-shutdown.retry-interval | Duration | 1000ms | Time interval between retry attempts when trying to transfer leadership during controlled shutdown. | +| Option | Type | Default | Description | +|--------------------------------------------------|------------|-----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| tablet-server.id | Integer | (None) | The id for the tablet server. | +| tablet-server.rack | String | (None) | The rack for the TabletServer. This will be used in rack aware bucket assignment for fault tolerance. Examples: `RACK1`, `cn-hangzhou-server10` | +| data.dir | String | /tmp/fluss-data | This configuration controls the directory where Fluss will store its data. The default value is /tmp/fluss-data | +| server.writer-id.expiration-time | Duration | 7d | The time that the tablet server will wait without receiving any write request from a client before expiring the related status. The default value is 7 days. | +| server.writer-id.expiration-check-interval | Duration | 10min | The interval at which to remove writer ids that have expired due to `server.writer-id.expiration-time passing. The default value is 10 minutes. | +| server.background.threads | Integer | 10 | The number of threads to use for various background processing tasks. The default value is 10. | +| server.buffer.memory-size | MemorySize | 256mb | The total bytes of memory the server can use, e.g, buffer write-ahead-log rows. | +| server.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers (`server.buffer.memory-size`). | +| server.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of server.buffer.page-size. It must be greater than or equal to server.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | +| server.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the buffer pool will block when waiting for segments. | +| tablet-server.controlled-shutdown.max-retries | Integer | 3 | Maximum number of attempts to transfer leadership before proceeding with an unclean shutdown during a controlled shutdown procedure. | +| tablet-server.controlled-shutdown.retry-interval | Duration | 1000ms | Time interval between retry attempts when trying to transfer leadership during controlled shutdown. | +| server.data-disk.write-limit-ratio | Double | 0.85 | Reject writes when the tablet server data disk usage exceeds this ratio. Writes resume after the usage drops below `(ratio - 0.10)`. The monitor reports the maximum usage across all distinct file stores so that a single nearly-full disk is never masked by other low-usage disks. Set to `1.0` to disable the disk-usage protection entirely. The valid range is `(0.0, 1.0]`. This configuration can be updated dynamically without server restart. | +| server.data-disk.check-interval | Duration | 30s | The interval at which the tablet server samples the local data disk usage for the write-protection state machine. A shorter interval narrows the time window during which writes can still flow in after the disk crosses the limit ratio, at the cost of slightly more frequent `statvfs` calls (which are in-memory and cheap). The default 30s is suitable for typical write workloads. | ## Zookeeper