Skip to content

Commit 9c85143

Browse files
spershinfacebook-github-bot
authored andcommitted
Add pipeline-level driver timing stats. (facebookincubator#17237)
Summary: Operator stats have gaps and to suitable to determine where time went in a task. Adding driver timing stats per pipeline to assist in understanding a query bottlenecks. The current version does not handle the grouped execution well. Working on the extra to support grouped execution as well. Differential Revision: D101395498
1 parent 0187d5b commit 9c85143

5 files changed

Lines changed: 283 additions & 5 deletions

File tree

velox/exec/Driver.cpp

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ Driver::~Driver() = default;
3232

3333
namespace {
3434

35+
/// Returns current time in microseconds using high_resolution_clock.
36+
/// Used for driver-level lifecycle timing to match BlockingState::sinceUs_.
37+
inline uint64_t currentTimeMicrosHires() {
38+
return std::chrono::duration_cast<std::chrono::microseconds>(
39+
std::chrono::high_resolution_clock::now().time_since_epoch())
40+
.count();
41+
}
42+
3543
// Checks if output channel is produced using identity projection and returns
3644
// input channel if so.
3745
std::optional<column_index_t> getIdentityProjection(
@@ -228,6 +236,10 @@ void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
228236
std::lock_guard<std::timed_mutex> l(task->mutex());
229237
if (!driver->state().isTerminated) {
230238
state->operator_->recordBlockingTime(state->sinceUs_, state->reason_);
239+
// Accumulate driver-level blocked time using high_resolution_clock,
240+
// matching sinceUs_ and all other driver lifecycle timing.
241+
driver->addDriverBlockedTime(
242+
(currentTimeMicrosHires() - state->sinceUs_) * 1'000);
231243
}
232244
VELOX_CHECK(!driver->state().suspended());
233245
VELOX_CHECK(driver->state().hasBlockingFuture);
@@ -358,7 +370,7 @@ void Driver::enqueueInternal() {
358370
VELOX_CHECK(!state_.isEnqueued);
359371
state_.isEnqueued = true;
360372
// When enqueuing, starting timing the queue time.
361-
queueTimeStartUs_ = getCurrentTimeMicro();
373+
queueTimeStartUs_ = currentTimeMicrosHires();
362374
}
363375

364376
// Call an Operator method. record silenced throws, but not a query
@@ -504,9 +516,24 @@ StopReason Driver::runInternal(
504516
std::shared_ptr<Driver>& self,
505517
std::shared_ptr<BlockingState>& blockingState,
506518
RowVectorPtr& result) {
507-
const auto now = getCurrentTimeMicro();
519+
// All driver timing uses high_resolution_clock consistently
520+
// (matching BlockingState::sinceUs_ used for blocked time).
521+
const auto now = currentTimeMicrosHires();
508522
const auto queuedTimeUs = now - queueTimeStartUs_;
509523

524+
totalDriverQueuedNanos_ += queuedTimeUs * 1'000;
525+
onThreadStartUs_ = now;
526+
// For the normal close path, closeOperators() finalizes and clears
527+
// onThreadStartUs_ before reporting. This guard handles early returns
528+
// (e.g. Task::enter() failure) and non-close exit paths.
529+
auto onThreadTimeGuard = folly::makeGuard([this]() {
530+
if (onThreadStartUs_ > 0) {
531+
totalDriverOnThreadNanos_ +=
532+
(currentTimeMicrosHires() - onThreadStartUs_) * 1'000;
533+
onThreadStartUs_ = 0;
534+
}
535+
});
536+
510537
// Update the next operator's queueTime.
511538
StopReason stop =
512539
closed_ ? StopReason::kTerminate : task()->enter(state_, now);
@@ -870,6 +897,23 @@ void Driver::closeOperators() {
870897
op->close();
871898
}
872899

900+
// Report driver-level lifecycle timing to the Task accumulator.
901+
// Use partitionId (0..numDrivers-1) so same-index drivers across split
902+
// groups in grouped execution are summed together.
903+
// Finalize on-thread time here (the onThreadTimeGuard in runInternal
904+
// hasn't fired yet since CancelGuard destructs before it).
905+
if (onThreadStartUs_ > 0) {
906+
totalDriverOnThreadNanos_ +=
907+
(currentTimeMicrosHires() - onThreadStartUs_) * 1'000;
908+
onThreadStartUs_ = 0; // Prevent double-counting in the guard.
909+
}
910+
task()->addDriverLifecycleStats(
911+
static_cast<uint32_t>(ctx_->pipelineId),
912+
ctx_->partitionId,
913+
totalDriverQueuedNanos_,
914+
totalDriverOnThreadNanos_,
915+
totalDriverBlockedNanos_);
916+
873917
// Add operator stats to the task.
874918
for (auto& op : operators_) {
875919
auto stats = op->stats(true);
@@ -904,15 +948,22 @@ void Driver::updateStats() {
904948
1'000'000 * state_.totalOffThreadTimeMs,
905949
RuntimeCounter::Unit::kNanos);
906950
}
951+
907952
task()->addDriverStats(ctx_->pipelineId, std::move(stats));
908953
}
909954

910955
void Driver::updateOperatorBlockingStats() {
911956
// Record blocked time if the driver was blocked when terminated.
912957
// This ensures we don't lose blocked time metrics when a query is aborted.
913-
if (state_.hasBlockingFuture && blockedOperatorId_ < operators_.size()) {
914-
operators_[blockedOperatorId_]->recordBlockingTime(
915-
state_.blockingStartUs, blockingReason_);
958+
if (state_.hasBlockingFuture) {
959+
// Accumulate driver-level blocked time unconditionally.
960+
totalDriverBlockedNanos_ +=
961+
(currentTimeMicrosHires() - state_.blockingStartUs) * 1'000;
962+
// Record per-operator blocked time if operator is available.
963+
if (blockedOperatorId_ < operators_.size()) {
964+
operators_[blockedOperatorId_]->recordBlockingTime(
965+
state_.blockingStartUs, blockingReason_);
966+
}
916967
}
917968
}
918969

velox/exec/Driver.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,12 @@ class Driver : public std::enable_shared_from_this<Driver> {
416416
/// memory arbitration finishes.
417417
bool checkUnderArbitration(ContinueFuture* future);
418418

419+
/// Accumulates blocked time for driver-level lifecycle tracking.
420+
/// Called from BlockingState::setResume() when the blocking future resolves.
421+
void addDriverBlockedTime(uint64_t nanos) {
422+
totalDriverBlockedNanos_ += nanos;
423+
}
424+
419425
void initializeOperatorStats(std::vector<OperatorStats>& stats);
420426

421427
/// Close operators and add operator stats to the task.
@@ -700,6 +706,22 @@ class Driver : public std::enable_shared_from_this<Driver> {
700706
// Timer used to track down the time we are sitting in the driver queue.
701707
size_t queueTimeStartUs_{0};
702708

709+
// Driver-level lifecycle timing: independently tracks the three states
710+
// a driver can be in (queued, on-thread, blocked) to enable gap analysis.
711+
// Reported as RuntimeStats on the source operator at close time.
712+
// All three use high_resolution_clock for consistency, matching
713+
// BlockingState::sinceUs_ and enabling accurate gap analysis
714+
// (queued + on-thread + blocked ≈ elapsed time).
715+
// Atomic because closeByTask() may read these from a different thread
716+
// than the one running the onThreadTimeGuard scope guard.
717+
std::atomic<uint64_t> totalDriverQueuedNanos_{0};
718+
std::atomic<uint64_t> totalDriverOnThreadNanos_{0};
719+
std::atomic<uint64_t> totalDriverBlockedNanos_{0};
720+
// Timestamp (micros, high_resolution_clock) when the current on-thread
721+
// period started. Set at the beginning of runInternal, used by
722+
// closeOperators to snapshot on-thread time before the scope guard fires.
723+
uint64_t onThreadStartUs_{0};
724+
703725
// Id (index in the vector) of the current operator to run (or the 1st one if
704726
// we haven't started yet). Used to determine which operator's queueTime we
705727
// should update.

velox/exec/Task.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,8 @@ void Task::init(std::optional<common::SpillDiskOptions>&& spillDiskOpts) {
540540
numDriversPerLeafNode_[factory->leafNodeId()] = factory->numDrivers;
541541
}
542542

543+
initDriverLifecycleStatsLocked();
544+
543545
// Create drivers.
544546
createSplitGroupStateLocked(kUngroupedGroupId);
545547
std::vector<std::shared_ptr<Driver>> drivers =
@@ -1060,6 +1062,8 @@ void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
10601062
factory->inputDriver, factory->outputDriver);
10611063
}
10621064

1065+
initDriverLifecycleStatsLocked();
1066+
10631067
validateGroupedExecutionLeafNodes();
10641068
}
10651069

@@ -2730,6 +2734,37 @@ void Task::addDriverStats(int pipelineId, DriverStats stats) {
27302734
taskStats_.pipelineStats[pipelineId].driverStats.push_back(std::move(stats));
27312735
}
27322736

2737+
void Task::initDriverLifecycleStatsLocked() {
2738+
pipelineLifecycleStats_.resize(driverFactories_.size());
2739+
for (size_t i = 0; i < driverFactories_.size(); ++i) {
2740+
auto& pls = pipelineLifecycleStats_[i];
2741+
const auto& leafNode = driverFactories_[i]->planNodes.front();
2742+
pls.sourceOperatorType = leafNode->name();
2743+
pls.sourcePlanNodeId = leafNode->id();
2744+
pls.driverTimes.resize(driverFactories_[i]->numDrivers);
2745+
}
2746+
}
2747+
2748+
void Task::addDriverLifecycleStats(
2749+
uint32_t pipelineId,
2750+
uint32_t driverIndex,
2751+
uint64_t queuedNanos,
2752+
uint64_t onThreadNanos,
2753+
uint64_t blockedNanos) {
2754+
std::lock_guard<std::timed_mutex> l(mutex_);
2755+
if (pipelineId >= pipelineLifecycleStats_.size()) {
2756+
return;
2757+
}
2758+
auto& pls = pipelineLifecycleStats_[pipelineId];
2759+
if (pls.driverTimes.empty()) {
2760+
return;
2761+
}
2762+
const auto idx = driverIndex % pls.driverTimes.size();
2763+
pls.driverTimes[idx].queuedNanos += queuedNanos;
2764+
pls.driverTimes[idx].onThreadNanos += onThreadNanos;
2765+
pls.driverTimes[idx].blockedNanos += blockedNanos;
2766+
}
2767+
27332768
TaskStats Task::taskStats() const {
27342769
std::lock_guard<std::timed_mutex> l(mutex_);
27352770

@@ -2785,6 +2820,41 @@ TaskStats Task::taskStats() const {
27852820
taskStats.longestRunningOpCallMs = 0;
27862821
}
27872822

2823+
// Emit per-pipeline driver lifecycle timing. Merged into the first
2824+
// existing DriverStats entry for each pipeline to avoid inflating the
2825+
// driverStats vector. Each logical driver index contributes one sample,
2826+
// giving proper sum/count/min/max aggregation.
2827+
for (size_t i = 0; i < pipelineLifecycleStats_.size(); ++i) {
2828+
const auto& pls = pipelineLifecycleStats_[i];
2829+
if (pls.driverTimes.empty()) {
2830+
continue;
2831+
}
2832+
auto& pipeDriverStats = taskStats.pipelineStats[i].driverStats;
2833+
if (pipeDriverStats.empty()) {
2834+
pipeDriverStats.emplace_back();
2835+
}
2836+
auto& targetStats = pipeDriverStats[0].runtimeStats;
2837+
const auto prefix = fmt::format(
2838+
"P{}-{}.{}", i, pls.sourceOperatorType, pls.sourcePlanNodeId);
2839+
const auto queuedKey = fmt::format("{}.driverQueuedWallNanos", prefix);
2840+
const auto onThreadKey = fmt::format("{}.driverOnThreadWallNanos", prefix);
2841+
const auto blockedKey = fmt::format("{}.driverBlockedWallNanos", prefix);
2842+
for (const auto& timing : pls.driverTimes) {
2843+
auto addOrMerge = [&targetStats](const std::string& key, uint64_t nanos) {
2844+
const auto value = saturateCast(nanos);
2845+
auto it = targetStats.find(key);
2846+
if (it != targetStats.end()) {
2847+
it->second.merge(RuntimeMetric(value, RuntimeCounter::Unit::kNanos));
2848+
} else {
2849+
targetStats[key] = RuntimeMetric(value, RuntimeCounter::Unit::kNanos);
2850+
}
2851+
};
2852+
addOrMerge(queuedKey, timing.queuedNanos);
2853+
addOrMerge(onThreadKey, timing.onThreadNanos);
2854+
addOrMerge(blockedKey, timing.blockedNanos);
2855+
}
2856+
}
2857+
27882858
auto bufferManager = bufferManager_.lock();
27892859
taskStats.outputBufferUtilization = bufferManager->getUtilization(taskId_);
27902860
taskStats.outputBufferOverutilized = bufferManager->isOverutilized(taskId_);

velox/exec/Task.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,16 @@ class Task : public std::enable_shared_from_this<Task> {
615615
/// Adds per driver statistics. Called from Drivers upon their closure.
616616
void addDriverStats(int pipelineId, DriverStats stats);
617617

618+
/// Accumulates driver lifecycle timing (queued, on-thread, blocked) for gap
619+
/// analysis. Same-index drivers across split groups are summed together.
620+
/// Called from Driver::closeOperators() upon driver closure.
621+
void addDriverLifecycleStats(
622+
uint32_t pipelineId,
623+
uint32_t driverIndex,
624+
uint64_t queuedNanos,
625+
uint64_t onThreadNanos,
626+
uint64_t blockedNanos);
627+
618628
/// Returns kNone if no pause or terminate is requested. The thread count is
619629
/// incremented if kNone is returned. If something else is returned the
620630
/// calling thread should unwind and return itself to its pool. If 'this' goes
@@ -1362,6 +1372,27 @@ class Task : public std::enable_shared_from_this<Task> {
13621372

13631373
TaskStats taskStats_;
13641374

1375+
// Per-pipeline driver lifecycle timing accumulator. For each pipeline, holds
1376+
// a vector of per-driver timing indexed by driver index within the pipeline.
1377+
// In grouped execution, same-index drivers across groups accumulate into the
1378+
// same entry, giving the total time for a logical driver across all groups.
1379+
// Reported as RuntimeMetrics at task close via taskStats().
1380+
struct DriverLifecycleTiming {
1381+
uint64_t queuedNanos{0};
1382+
uint64_t onThreadNanos{0};
1383+
uint64_t blockedNanos{0};
1384+
};
1385+
1386+
struct PipelineLifecycleStats {
1387+
std::string sourceOperatorType;
1388+
core::PlanNodeId sourcePlanNodeId;
1389+
std::vector<DriverLifecycleTiming> driverTimes;
1390+
};
1391+
std::vector<PipelineLifecycleStats> pipelineLifecycleStats_;
1392+
1393+
/// Initializes pipelineLifecycleStats_ from driverFactories_.
1394+
void initDriverLifecycleStatsLocked();
1395+
13651396
// Stores inter-operator state (exchange, bridges) per split group. During
13661397
// ungrouped execution we use the [0] entry in this vector.
13671398
std::unordered_map<uint32_t, SplitGroupState> splitGroupStates_;

0 commit comments

Comments
 (0)