Skip to content

Commit 46a95cc

Browse files
spershinfacebook-github-bot
authored andcommitted
Add pipeline-level driver timing stats. (#17237)
Summary: Operator stats have gaps and to suitable to determine where time went in a task. Adding driver timing stats per pipeline to assist in understanding a query bottlenecks. The current version does not handle the grouped execution well. Working on the extra to support grouped execution as well. Differential Revision: D101395498
1 parent dc9e9f3 commit 46a95cc

5 files changed

Lines changed: 258 additions & 3 deletions

File tree

velox/exec/Driver.cpp

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,12 @@ void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
228228
std::lock_guard<std::timed_mutex> l(task->mutex());
229229
if (!driver->state().isTerminated) {
230230
state->operator_->recordBlockingTime(state->sinceUs_, state->reason_);
231+
// Accumulate driver-level blocked time.
232+
const uint64_t blockedNow =
233+
std::chrono::duration_cast<std::chrono::microseconds>(
234+
std::chrono::high_resolution_clock::now().time_since_epoch())
235+
.count();
236+
driver->addDriverBlockedTime((blockedNow - state->sinceUs_) * 1'000);
231237
}
232238
VELOX_CHECK(!driver->state().suspended());
233239
VELOX_CHECK(driver->state().hasBlockingFuture);
@@ -507,6 +513,12 @@ StopReason Driver::runInternal(
507513
const auto now = getCurrentTimeMicro();
508514
const auto queuedTimeUs = now - queueTimeStartUs_;
509515

516+
// Record driver on-thread time from the very start of runInternal.
517+
// Captures everything including Task::enter() mutex wait and close().
518+
auto onThreadTimeGuard = folly::makeGuard([this, now]() {
519+
totalDriverOnThreadNanos_ += (getCurrentTimeMicro() - now) * 1'000;
520+
});
521+
510522
// Update the next operator's queueTime.
511523
StopReason stop =
512524
closed_ ? StopReason::kTerminate : task()->enter(state_, now);
@@ -531,6 +543,9 @@ StopReason Driver::runInternal(
531543
kMetricDriverQueueTimeMs, queuedTimeUs / 1'000);
532544
}
533545

546+
// Accumulate driver-level queued time.
547+
totalDriverQueuedNanos_ += queuedTimeUs * 1'000;
548+
534549
CancelGuard guard(self, task().get(), &state_, [&](StopReason reason) {
535550
// This is run on error or cancel exit.
536551
if (reason == StopReason::kTerminate) {
@@ -870,6 +885,16 @@ void Driver::closeOperators() {
870885
op->close();
871886
}
872887

888+
// Report driver-level lifecycle timing to the Task accumulator.
889+
// Use partitionId (0..numDrivers-1) so same-index drivers across split
890+
// groups in grouped execution are summed together.
891+
task()->addDriverLifecycleStats(
892+
static_cast<uint32_t>(ctx_->pipelineId),
893+
ctx_->partitionId,
894+
totalDriverQueuedNanos_,
895+
totalDriverOnThreadNanos_,
896+
totalDriverBlockedNanos_);
897+
873898
// Add operator stats to the task.
874899
for (auto& op : operators_) {
875900
auto stats = op->stats(true);
@@ -904,15 +929,25 @@ void Driver::updateStats() {
904929
1'000'000 * state_.totalOffThreadTimeMs,
905930
RuntimeCounter::Unit::kNanos);
906931
}
932+
907933
task()->addDriverStats(ctx_->pipelineId, std::move(stats));
908934
}
909935

910936
void Driver::updateOperatorBlockingStats() {
911937
// Record blocked time if the driver was blocked when terminated.
912938
// This ensures we don't lose blocked time metrics when a query is aborted.
913-
if (state_.hasBlockingFuture && blockedOperatorId_ < operators_.size()) {
914-
operators_[blockedOperatorId_]->recordBlockingTime(
915-
state_.blockingStartUs, blockingReason_);
939+
if (state_.hasBlockingFuture) {
940+
const uint64_t blockedNow =
941+
std::chrono::duration_cast<std::chrono::microseconds>(
942+
std::chrono::high_resolution_clock::now().time_since_epoch())
943+
.count();
944+
// Accumulate driver-level blocked time unconditionally.
945+
totalDriverBlockedNanos_ += (blockedNow - state_.blockingStartUs) * 1'000;
946+
// Record per-operator blocked time if operator is available.
947+
if (blockedOperatorId_ < operators_.size()) {
948+
operators_[blockedOperatorId_]->recordBlockingTime(
949+
state_.blockingStartUs, blockingReason_);
950+
}
916951
}
917952
}
918953

velox/exec/Driver.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,12 @@ class Driver : public std::enable_shared_from_this<Driver> {
416416
/// memory arbitration finishes.
417417
bool checkUnderArbitration(ContinueFuture* future);
418418

419+
/// Accumulates blocked time for driver-level lifecycle tracking.
420+
/// Called from BlockingState::setResume() when the blocking future resolves.
421+
void addDriverBlockedTime(uint64_t nanos) {
422+
totalDriverBlockedNanos_ += nanos;
423+
}
424+
419425
void initializeOperatorStats(std::vector<OperatorStats>& stats);
420426

421427
/// Close operators and add operator stats to the task.
@@ -700,6 +706,15 @@ class Driver : public std::enable_shared_from_this<Driver> {
700706
// Timer used to track down the time we are sitting in the driver queue.
701707
size_t queueTimeStartUs_{0};
702708

709+
// Driver-level lifecycle timing: independently tracks the three states
710+
// a driver can be in (queued, on-thread, blocked) to enable gap analysis.
711+
// Reported as RuntimeStats on the source operator at close time.
712+
// Atomic because closeByTask() may read these from a different thread
713+
// than the one running the onThreadTimeGuard scope guard.
714+
std::atomic<uint64_t> totalDriverQueuedNanos_{0};
715+
std::atomic<uint64_t> totalDriverOnThreadNanos_{0};
716+
std::atomic<uint64_t> totalDriverBlockedNanos_{0};
717+
703718
// Id (index in the vector) of the current operator to run (or the 1st one if
704719
// we haven't started yet). Used to determine which operator's queueTime we
705720
// should update.

velox/exec/Task.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,8 @@ void Task::init(std::optional<common::SpillDiskOptions>&& spillDiskOpts) {
540540
numDriversPerLeafNode_[factory->leafNodeId()] = factory->numDrivers;
541541
}
542542

543+
initDriverLifecycleStatsLocked();
544+
543545
// Create drivers.
544546
createSplitGroupStateLocked(kUngroupedGroupId);
545547
std::vector<std::shared_ptr<Driver>> drivers =
@@ -1060,6 +1062,8 @@ void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
10601062
factory->inputDriver, factory->outputDriver);
10611063
}
10621064

1065+
initDriverLifecycleStatsLocked();
1066+
10631067
validateGroupedExecutionLeafNodes();
10641068
}
10651069

@@ -2730,6 +2734,37 @@ void Task::addDriverStats(int pipelineId, DriverStats stats) {
27302734
taskStats_.pipelineStats[pipelineId].driverStats.push_back(std::move(stats));
27312735
}
27322736

2737+
void Task::initDriverLifecycleStatsLocked() {
2738+
pipelineLifecycleStats_.resize(driverFactories_.size());
2739+
for (size_t i = 0; i < driverFactories_.size(); ++i) {
2740+
auto& pls = pipelineLifecycleStats_[i];
2741+
const auto& leafNode = driverFactories_[i]->planNodes.front();
2742+
pls.sourceOperatorType = leafNode->name();
2743+
pls.sourcePlanNodeId = leafNode->id();
2744+
pls.driverTimes.resize(driverFactories_[i]->numDrivers);
2745+
}
2746+
}
2747+
2748+
void Task::addDriverLifecycleStats(
2749+
uint32_t pipelineId,
2750+
uint32_t driverIndex,
2751+
uint64_t queuedNanos,
2752+
uint64_t onThreadNanos,
2753+
uint64_t blockedNanos) {
2754+
std::lock_guard<std::timed_mutex> l(mutex_);
2755+
if (pipelineId >= pipelineLifecycleStats_.size()) {
2756+
return;
2757+
}
2758+
auto& pls = pipelineLifecycleStats_[pipelineId];
2759+
if (pls.driverTimes.empty()) {
2760+
return;
2761+
}
2762+
const auto idx = driverIndex % pls.driverTimes.size();
2763+
pls.driverTimes[idx].queuedNanos += queuedNanos;
2764+
pls.driverTimes[idx].onThreadNanos += onThreadNanos;
2765+
pls.driverTimes[idx].blockedNanos += blockedNanos;
2766+
}
2767+
27332768
TaskStats Task::taskStats() const {
27342769
std::lock_guard<std::timed_mutex> l(mutex_);
27352770

@@ -2785,6 +2820,41 @@ TaskStats Task::taskStats() const {
27852820
taskStats.longestRunningOpCallMs = 0;
27862821
}
27872822

2823+
// Emit per-pipeline driver lifecycle timing. Merged into the first
2824+
// existing DriverStats entry for each pipeline to avoid inflating the
2825+
// driverStats vector. Each logical driver index contributes one sample,
2826+
// giving proper sum/count/min/max aggregation.
2827+
for (size_t i = 0; i < pipelineLifecycleStats_.size(); ++i) {
2828+
const auto& pls = pipelineLifecycleStats_[i];
2829+
if (pls.driverTimes.empty()) {
2830+
continue;
2831+
}
2832+
auto& pipeDriverStats = taskStats.pipelineStats[i].driverStats;
2833+
if (pipeDriverStats.empty()) {
2834+
pipeDriverStats.emplace_back();
2835+
}
2836+
auto& targetStats = pipeDriverStats[0].runtimeStats;
2837+
const auto prefix = fmt::format(
2838+
"P{}-{}.{}", i, pls.sourceOperatorType, pls.sourcePlanNodeId);
2839+
const auto queuedKey = fmt::format("{}.driverQueuedWallNanos", prefix);
2840+
const auto onThreadKey = fmt::format("{}.driverOnThreadWallNanos", prefix);
2841+
const auto blockedKey = fmt::format("{}.driverBlockedWallNanos", prefix);
2842+
for (const auto& timing : pls.driverTimes) {
2843+
auto addOrMerge = [&targetStats](const std::string& key, uint64_t nanos) {
2844+
const auto value = saturateCast(nanos);
2845+
auto it = targetStats.find(key);
2846+
if (it != targetStats.end()) {
2847+
it->second.merge(RuntimeMetric(value, RuntimeCounter::Unit::kNanos));
2848+
} else {
2849+
targetStats[key] = RuntimeMetric(value, RuntimeCounter::Unit::kNanos);
2850+
}
2851+
};
2852+
addOrMerge(queuedKey, timing.queuedNanos);
2853+
addOrMerge(onThreadKey, timing.onThreadNanos);
2854+
addOrMerge(blockedKey, timing.blockedNanos);
2855+
}
2856+
}
2857+
27882858
auto bufferManager = bufferManager_.lock();
27892859
taskStats.outputBufferUtilization = bufferManager->getUtilization(taskId_);
27902860
taskStats.outputBufferOverutilized = bufferManager->isOverutilized(taskId_);

velox/exec/Task.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,16 @@ class Task : public std::enable_shared_from_this<Task> {
615615
/// Adds per driver statistics. Called from Drivers upon their closure.
616616
void addDriverStats(int pipelineId, DriverStats stats);
617617

618+
/// Accumulates driver lifecycle timing (queued, on-thread, blocked) for gap
619+
/// analysis. Same-index drivers across split groups are summed together.
620+
/// Called from Driver::closeOperators() upon driver closure.
621+
void addDriverLifecycleStats(
622+
uint32_t pipelineId,
623+
uint32_t driverIndex,
624+
uint64_t queuedNanos,
625+
uint64_t onThreadNanos,
626+
uint64_t blockedNanos);
627+
618628
/// Returns kNone if no pause or terminate is requested. The thread count is
619629
/// incremented if kNone is returned. If something else is returned the
620630
/// calling thread should unwind and return itself to its pool. If 'this' goes
@@ -1362,6 +1372,27 @@ class Task : public std::enable_shared_from_this<Task> {
13621372

13631373
TaskStats taskStats_;
13641374

1375+
// Per-pipeline driver lifecycle timing accumulator. For each pipeline, holds
1376+
// a vector of per-driver timing indexed by driver index within the pipeline.
1377+
// In grouped execution, same-index drivers across groups accumulate into the
1378+
// same entry, giving the total time for a logical driver across all groups.
1379+
// Reported as RuntimeMetrics at task close via taskStats().
1380+
struct DriverLifecycleTiming {
1381+
uint64_t queuedNanos{0};
1382+
uint64_t onThreadNanos{0};
1383+
uint64_t blockedNanos{0};
1384+
};
1385+
1386+
struct PipelineLifecycleStats {
1387+
std::string sourceOperatorType;
1388+
core::PlanNodeId sourcePlanNodeId;
1389+
std::vector<DriverLifecycleTiming> driverTimes;
1390+
};
1391+
std::vector<PipelineLifecycleStats> pipelineLifecycleStats_;
1392+
1393+
/// Initializes pipelineLifecycleStats_ from driverFactories_.
1394+
void initDriverLifecycleStatsLocked();
1395+
13651396
// Stores inter-operator state (exchange, bridges) per split group. During
13661397
// ungrouped execution we use the [0] entry in this vector.
13671398
std::unordered_map<uint32_t, SplitGroupState> splitGroupStates_;

velox/exec/tests/TaskTest.cpp

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2555,6 +2555,110 @@ TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) {
25552555
ASSERT_GE(totalOffThreadTime.sum, 0);
25562556
}
25572557

2558+
// Verifies that driver-level lifecycle timing metrics
2559+
// (driverQueuedWallNanos, driverOnThreadWallNanos, driverBlockedWallNanos)
2560+
// are reported correctly for each pipeline.
2561+
TEST_F(TaskTest, driverLifecycleTimingStats) {
2562+
auto data = makeRowVector({
2563+
makeFlatVector<int64_t>(1'000, [](auto row) { return row % 10; }),
2564+
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
2565+
});
2566+
2567+
auto buildData = makeRowVector(
2568+
{"u0", "u1"},
2569+
{
2570+
makeFlatVector<int64_t>(100, [](auto row) { return row % 10; }),
2571+
makeFlatVector<int64_t>(100, [](auto row) { return row; }),
2572+
});
2573+
2574+
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
2575+
2576+
// Build a hash join plan to get multiple pipelines:
2577+
// Pipeline 0 (probe): Values -> HashProbe -> sink
2578+
// Pipeline 1 (build): Values -> HashBuild
2579+
core::PlanNodeId probeScanId;
2580+
auto plan =
2581+
PlanBuilder(planNodeIdGenerator)
2582+
.values({data})
2583+
.capturePlanNodeId(probeScanId)
2584+
.hashJoin(
2585+
{"c0"},
2586+
{"u0"},
2587+
PlanBuilder(planNodeIdGenerator).values({buildData}).planNode(),
2588+
"",
2589+
{"c0", "c1"})
2590+
.planNode();
2591+
2592+
auto result = AssertQueryBuilder(plan).copyResults(pool_.get());
2593+
ASSERT_GT(result->size(), 0);
2594+
2595+
// Get task stats from the most recently completed task.
2596+
auto tasks = Task::getRunningTasks();
2597+
// Tasks may already be cleaned up. Use a cursor-based approach instead.
2598+
2599+
// Re-run using CursorParameters to access the task directly.
2600+
CursorParameters params;
2601+
params.planNode = plan;
2602+
params.queryCtx = core::QueryCtx::create(driverExecutor_.get());
2603+
params.maxDrivers = 2;
2604+
2605+
auto cursor = TaskCursor::create(params);
2606+
while (cursor->moveNext()) {
2607+
}
2608+
auto task = cursor->task();
2609+
ASSERT_TRUE(waitForTaskCompletion(task.get()));
2610+
2611+
auto taskStats = task->taskStats();
2612+
// We should have at least 2 pipelines (probe and build).
2613+
ASSERT_GE(taskStats.pipelineStats.size(), 2);
2614+
2615+
// Helper to find a driver stat by substring match across all pipelines.
2616+
auto findDriverStat =
2617+
[&taskStats](
2618+
const std::string& statSubstr) -> std::optional<RuntimeMetric> {
2619+
for (const auto& pipeline : taskStats.pipelineStats) {
2620+
for (const auto& ds : pipeline.driverStats) {
2621+
for (const auto& [name, metric] : ds.runtimeStats) {
2622+
if (name.find(statSubstr) != std::string::npos) {
2623+
return metric;
2624+
}
2625+
}
2626+
}
2627+
}
2628+
return std::nullopt;
2629+
};
2630+
2631+
// Verify driver lifecycle metrics exist and have sensible values.
2632+
auto queued = findDriverStat("driverQueuedWallNanos");
2633+
auto onThread = findDriverStat("driverOnThreadWallNanos");
2634+
auto blocked = findDriverStat("driverBlockedWallNanos");
2635+
2636+
ASSERT_TRUE(queued.has_value()) << "driverQueuedWallNanos not found";
2637+
ASSERT_TRUE(onThread.has_value()) << "driverOnThreadWallNanos not found";
2638+
ASSERT_TRUE(blocked.has_value()) << "driverBlockedWallNanos not found";
2639+
2640+
// Each metric should have count >= 1 (at least one driver reported).
2641+
ASSERT_GE(queued->count, 1);
2642+
ASSERT_GE(onThread->count, 1);
2643+
ASSERT_GE(blocked->count, 1);
2644+
2645+
// On-thread time must be positive (drivers did real work).
2646+
ASSERT_GT(onThread->sum, 0);
2647+
ASSERT_GT(onThread->max, 0);
2648+
2649+
// Queued and blocked times must be non-negative.
2650+
ASSERT_GE(queued->sum, 0);
2651+
ASSERT_GE(blocked->sum, 0);
2652+
2653+
// Verify the stat names contain the pipeline prefix and source operator.
2654+
// Look for probe pipeline stats (source is Values with probeScanId).
2655+
auto probePrefix = fmt::format("P0-Values.{}", probeScanId);
2656+
auto probeOnThread = findDriverStat(probePrefix + ".driverOnThreadWallNanos");
2657+
ASSERT_TRUE(probeOnThread.has_value())
2658+
<< "Probe pipeline stat not found with prefix: " << probePrefix;
2659+
ASSERT_GT(probeOnThread->max, 0);
2660+
}
2661+
25582662
DEBUG_ONLY_TEST_F(TaskTest, driverEnqueAfterFailedAndPausedTask) {
25592663
const auto data = makeRowVector({
25602664
makeFlatVector<int64_t>(50, [](auto row) { return row; }),

0 commit comments

Comments
 (0)