Skip to content

Commit dc9e9f3

Browse files
zhichenxu-metameta-codesync[bot]
authored andcommitted
feat(rpc): Add AIMD congestion control and batch dispatch chunking for async RPC batch mode (#17227)
Summary: Pull Request resolved: #17227 Batch dispatch chunking and AIMD congestion control for async RPC operators in batch mode. Problem: When using batch mode, dispatch_batch_size was not actually splitting rows — all rows were sent as a single RPC call, potentially exceeding the server's concurrent request limit. Additionally, there was no backpressure mechanism to throttle dispatch when the server was overloaded. Changes: 1. Batch dispatch chunking: flushBatch(maxRows) drains only maxRows from pending rows instead of all. RPCOperator loops flushBatchRequests(dispatchBatchSize_) to flush in chunks. AsyncRPCFunction.h updated with maxRows parameter. 2. Backpressure check in addInput: while loop checks isUnderBackpressure() between flushes to prevent overshooting maxPendingBatches. 3. AIMD congestion control: RPCState tracks effectiveMaxPendingBatches_ (starts at maxPendingBatches_=2). On success: +1 (additive increase). On error (>50% real errors or _rpc_retried signal): /2 (multiplicative decrease, floor 1). Null input responses are excluded from error counting. Suppresses redundant "decreased from 1 to 1" log messages. Reviewed By: Yuhta Differential Revision: D101062260 fbshipit-source-id: cc0f809cabdbf8c9b0abe53305cabaa10ba3b645
1 parent 3eb73e9 commit dc9e9f3

5 files changed

Lines changed: 123 additions & 17 deletions

File tree

velox/exec/rpc/RPCOperator.cpp

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,17 @@ void RPCOperator::addInput(RowVectorPtr input) {
191191

192192
if (dispatchBatchSize_ > 0 &&
193193
function_->pendingBatchSize() >= dispatchBatchSize_) {
194-
flushBatchRequests();
194+
// Flush in chunks of dispatchBatchSize_ to avoid sending one
195+
// giant batch_predict call that overwhelms the server.
196+
while (function_->pendingBatchSize() >= dispatchBatchSize_ &&
197+
!state_->isUnderBackpressure()) {
198+
flushBatchRequests(dispatchBatchSize_);
199+
}
195200
}
196201
}
197202
}
198203

199-
void RPCOperator::flushBatchRequests() {
204+
void RPCOperator::flushBatchRequests(int32_t maxRows) {
200205
if (function_->pendingBatchSize() == 0) {
201206
VELOX_CHECK(
202207
batchRowLocations_.empty(),
@@ -207,13 +212,24 @@ void RPCOperator::flushBatchRequests() {
207212
return;
208213
}
209214

210-
RPC_OP_LOG(INFO) << "Flushing batch with " << function_->pendingBatchSize()
211-
<< " accumulated rows";
215+
// Determine how many rows to flush.
216+
auto flushCount = maxRows > 0
217+
? std::min(static_cast<int32_t>(batchRowLocations_.size()), maxRows)
218+
: static_cast<int32_t>(batchRowLocations_.size());
219+
220+
RPC_OP_LOG(INFO) << "Flushing batch with " << flushCount << " of "
221+
<< function_->pendingBatchSize() << " accumulated rows";
212222

213-
auto rowLocations = std::move(batchRowLocations_);
214-
auto rowIds = std::move(batchRowIds_);
223+
// Split off the rows to flush.
224+
std::vector<RPCState::RowLocation> rowLocations(
225+
batchRowLocations_.begin(), batchRowLocations_.begin() + flushCount);
226+
std::vector<int64_t> rowIds(
227+
batchRowIds_.begin(), batchRowIds_.begin() + flushCount);
228+
batchRowLocations_.erase(
229+
batchRowLocations_.begin(), batchRowLocations_.begin() + flushCount);
230+
batchRowIds_.erase(batchRowIds_.begin(), batchRowIds_.begin() + flushCount);
215231

216-
auto future = function_->flushBatch();
232+
auto future = function_->flushBatch(maxRows);
217233

218234
// Count each flushBatch() as 1 pending unit in the rate limiter.
219235
auto token = std::make_shared<RPCRateLimiter::Token>(
@@ -252,9 +268,9 @@ void RPCOperator::noMoreInput() {
252268
<< numRequestsDispatched_;
253269

254270
if (state_->streamingMode() == RPCStreamingMode::kBatch) {
255-
// Flush any remaining accumulated rows.
256-
if (function_->pendingBatchSize() > 0) {
257-
flushBatchRequests();
271+
// Flush any remaining accumulated rows in chunks.
272+
while (function_->pendingBatchSize() > 0) {
273+
flushBatchRequests(dispatchBatchSize_ > 0 ? dispatchBatchSize_ : 0);
258274
}
259275
}
260276

@@ -311,6 +327,16 @@ RowVectorPtr RPCOperator::getOutput() {
311327
numErrors_++;
312328
}
313329
}
330+
331+
// Delegate congestion evaluation to the function.
332+
// The function knows its domain-specific error semantics.
333+
auto signal = function_->evaluateCongestion(claimedBatch_->responses);
334+
if (signal == AsyncRPCFunction::CongestionSignal::kError) {
335+
state_->onBatchError();
336+
} else if (signal == AsyncRPCFunction::CongestionSignal::kSuccess) {
337+
state_->onBatchSuccess(function_->congestionRecoveryIncrement());
338+
}
339+
314340
auto output = buildOutputFromReadyBatch(*claimedBatch_);
315341
numResponsesCollected_ += numRows;
316342
claimedBatch_.reset();

velox/exec/rpc/RPCOperator.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ class RPCOperator : public exec::Operator {
108108
private:
109109
/// Flush accumulated batch rows via function_->flushBatch().
110110
/// Called when threshold is reached or at noMoreInput/drain time.
111-
void flushBatchRequests();
111+
/// @param maxRows Maximum rows to flush. 0 means flush all.
112+
void flushBatchRequests(int32_t maxRows = 0);
112113

113114
/// Build output RowVector from ready rows (PER_ROW mode).
114115
/// Supports multiple rows via batched drain for pipeline efficiency.
@@ -174,7 +175,7 @@ class RPCOperator : public exec::Operator {
174175
// This is a ceiling — the operator returns as soon as results are ready.
175176
// Batch LLM inference can take many minutes due to MetaGen queuing
176177
// and GPU scheduling, so the timeout needs generous headroom.
177-
static constexpr auto kBatchRpcTimeout = std::chrono::milliseconds(1'800'000);
178+
static constexpr auto kBatchRpcTimeout = std::chrono::milliseconds(3'600'000);
178179

179180
// Block wait time tracking for runtime stats.
180181
std::optional<uint64_t> blockWaitStartNs_;

velox/exec/rpc/RPCState.cpp

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ void RPCState::setMaxPendingRows(int64_t maxPendingRows) {
4444

4545
void RPCState::setMaxPendingBatches(int64_t maxPendingBatches) {
4646
maxPendingBatches_ = maxPendingBatches;
47+
effectiveMaxPendingBatches_ = maxPendingBatches;
4748
}
4849

4950
// ===== Input batch storage =====
@@ -340,11 +341,35 @@ bool RPCState::isFinished() {
340341
bool RPCState::isUnderBackpressure() {
341342
std::lock_guard<std::mutex> l(mutex_);
342343
if (streamingMode_ == RPCStreamingMode::kBatch) {
343-
return static_cast<int64_t>(pendingBatches_.size()) >= maxPendingBatches_;
344+
return static_cast<int64_t>(pendingBatches_.size()) >=
345+
effectiveMaxPendingBatches_;
344346
}
345347
return numPendingRows_ >= maxPendingRows_;
346348
}
347349

350+
void RPCState::onBatchSuccess(int64_t increment) {
351+
std::lock_guard<std::mutex> l(mutex_);
352+
if (effectiveMaxPendingBatches_ < maxPendingBatches_) {
353+
effectiveMaxPendingBatches_ =
354+
std::min(effectiveMaxPendingBatches_ + increment, maxPendingBatches_);
355+
RPC_STATE_LOG(INFO) << "RPC congestion: batch success, window increased to "
356+
<< effectiveMaxPendingBatches_ << "/"
357+
<< maxPendingBatches_;
358+
}
359+
}
360+
361+
void RPCState::onBatchError() {
362+
std::lock_guard<std::mutex> l(mutex_);
363+
auto prev = effectiveMaxPendingBatches_;
364+
effectiveMaxPendingBatches_ =
365+
std::max<int64_t>(effectiveMaxPendingBatches_ / 2, 1);
366+
if (effectiveMaxPendingBatches_ < prev) {
367+
RPC_STATE_LOG(WARNING)
368+
<< "RPC congestion: batch error, window decreased from " << prev
369+
<< " to " << effectiveMaxPendingBatches_;
370+
}
371+
}
372+
348373
void RPCState::notifyWaitersLocked() {
349374
// Fulfill all promises to wake up blocked drivers.
350375
// Called while mutex_ is held.

velox/exec/rpc/RPCState.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,20 @@ class RPCState {
224224

225225
/// Returns true if backpressure should be applied. Thread-safe.
226226
/// PER_ROW mode: pending rows >= maxPendingRows.
227-
/// BATCH mode: pending batches >= maxPendingBatches.
227+
/// BATCH mode: pending batches >= effectiveMaxPendingBatches
228+
/// (congestion-adjusted).
228229
bool isUnderBackpressure();
229230

231+
/// Signal that a batch completed successfully (all responses non-empty).
232+
/// Increases the effective concurrency window by increment (additive
233+
/// increase). Thread-safe.
234+
void onBatchSuccess(int64_t increment = 2);
235+
236+
/// Signal that a batch had errors (e.g., empty responses from overload).
237+
/// Halves the effective concurrency window (multiplicative decrease).
238+
/// Thread-safe.
239+
void onBatchError();
240+
230241
private:
231242
/// Move a completed row into readyRows_ and notify waiters.
232243
/// Called from the RPC completion callback (runs on executor thread).
@@ -253,7 +264,14 @@ class RPCState {
253264
bool noMoreInput_{false};
254265
RPCStreamingMode streamingMode_{RPCStreamingMode::kPerRow};
255266
int64_t maxPendingRows_{100};
256-
int64_t maxPendingBatches_{10};
267+
int64_t maxPendingBatches_{2};
268+
269+
// Congestion control for BATCH mode.
270+
// effectiveMaxPendingBatches_ starts at maxPendingBatches_ and adjusts:
271+
// - On success: min(effective + 1, maxPendingBatches_) (additive increase)
272+
// - On error: max(effective / 2, 1) (multiplicative
273+
// decrease)
274+
int64_t effectiveMaxPendingBatches_{2};
257275
};
258276

259277
} // namespace facebook::velox::exec::rpc

velox/expression/rpc/AsyncRPCFunction.h

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,20 @@ class AsyncRPCFunction {
141141
/// The function builds the typed batch request from its internal
142142
/// accumulated state and dispatches it.
143143
///
144-
/// Returns responses for ALL accumulated rows. Null rows get
144+
/// @param maxRows Maximum number of rows to flush. 0 means flush all.
145+
/// Returns responses for the flushed rows. Null rows get
145146
/// RPCResponse{.error = "null_input"}. This keeps the operator
146147
/// completely agnostic to null handling in batch mode.
147-
virtual folly::SemiFuture<std::vector<RPCResponse>> flushBatch() {
148+
virtual folly::SemiFuture<std::vector<RPCResponse>> flushBatch(
149+
int32_t /*maxRows*/) {
148150
VELOX_UNSUPPORTED("flushBatch() not implemented for function '{}'", name());
149151
}
150152

153+
/// Convenience overload: flush all accumulated rows.
154+
virtual folly::SemiFuture<std::vector<RPCResponse>> flushBatch() {
155+
return flushBatch(0);
156+
}
157+
151158
/// Number of rows accumulated so far (for threshold checks).
152159
/// Batch-capable functions MUST override this; the operator uses
153160
/// function_->pendingBatchSize() >= dispatchBatchSize_ to decide
@@ -177,6 +184,35 @@ class AsyncRPCFunction {
177184
}
178185
return result;
179186
}
187+
188+
// ── Congestion Control ───────────────────────────────────────
189+
190+
/// Signal returned by evaluateCongestion() to indicate batch health.
191+
enum class CongestionSignal {
192+
/// Batch completed successfully — increase concurrency window.
193+
kSuccess,
194+
/// Batch had errors — decrease concurrency window.
195+
kError,
196+
/// No congestion evaluation — skip window adjustment.
197+
kNone,
198+
};
199+
200+
/// Evaluate batch congestion from completed responses.
201+
/// Called by RPCOperator after a BATCH-mode batch completes.
202+
/// The function inspects responses and returns a signal that the
203+
/// operator maps to window adjustments (additive increase on
204+
/// kSuccess, multiplicative decrease on kError).
205+
/// Default: kNone (no congestion control).
206+
virtual CongestionSignal evaluateCongestion(
207+
const std::vector<RPCResponse>& /*responses*/) const {
208+
return CongestionSignal::kNone;
209+
}
210+
211+
/// How much to increase the concurrency window on kSuccess.
212+
/// Override to tune recovery speed per client. Default: 2.
213+
virtual int64_t congestionRecoveryIncrement() const {
214+
return 2;
215+
}
180216
};
181217

182218
} // namespace facebook::velox::exec::rpc

0 commit comments

Comments
 (0)