Skip to content

Commit 1a63c3c

Browse files
committed
Support cancel on ActiveSegmentReadTaskQueue
Signed-off-by: JaySon-Huang <tshent@qq.com>
1 parent 946d0bd commit 1a63c3c

4 files changed

Lines changed: 44 additions & 4 deletions

File tree

dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,13 +1380,15 @@ void DAGStorageInterpreter::buildLocalExec(
13801380
assert(!table_query_infos.empty()); // check at start of this function
13811381
auto read_queue = table_query_infos.begin()->second.read_queue;
13821382
read_queue->finishQueueIfEmpty();
1383+
exec_context.addStorageTaskQueue(read_queue);
13831384
}
13841385
else
13851386
{
13861387
for (const auto & table_query_info : table_query_infos)
13871388
{
13881389
const SelectQueryInfo & query_info = table_query_info.second;
13891390
query_info.read_queue->finishQueueIfEmpty();
1391+
exec_context.addStorageTaskQueue(query_info.read_queue);
13901392
}
13911393
}
13921394

dbms/src/Flash/Executor/PipelineExecutorContext.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <Flash/Pipeline/Schedule/Tasks/OneTimeNotifyFuture.h>
2323
#include <Operators/CTE.h>
2424
#include <Operators/SharedQueue.h>
25+
#include <Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h>
2526

2627
#include <exception>
2728

@@ -178,7 +179,7 @@ void PipelineExecutorContext::cancel()
178179
bool origin_value = false;
179180
if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release))
180181
{
181-
// TODO: Add ActiveSegmentReadTaskQueue here to cancel storage layer read tasks
182+
cancelStorageTaskQueues();
182183
cancelSharedQueues();
183184
cancelOneTimeFutures();
184185
if (likely(dag_context))
@@ -235,6 +236,25 @@ void PipelineExecutorContext::cancelSharedQueues()
235236
shared_queue->cancel();
236237
}
237238

239+
void PipelineExecutorContext::addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue)
240+
{
241+
std::lock_guard lock(mu);
242+
RUNTIME_CHECK_MSG(!isCancelled(), "query has been cancelled.");
243+
assert(storage_task_queue);
244+
storage_task_queues.push_back(storage_task_queue);
245+
}
246+
247+
void PipelineExecutorContext::cancelStorageTaskQueues()
248+
{
249+
std::vector<DM::ActiveSegmentReadTaskQueuePtr> tmp;
250+
{
251+
std::lock_guard lock(mu);
252+
std::swap(tmp, storage_task_queues);
253+
}
254+
for (const auto & storage_task_queue : tmp)
255+
storage_task_queue->finishQueue();
256+
}
257+
238258
void PipelineExecutorContext::addOneTimeFuture(const OneTimeNotifyFuturePtr & future)
239259
{
240260
std::lock_guard lock(mu);

dbms/src/Flash/Executor/PipelineExecutorContext.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ class OneTimeNotifyFuture;
4040
using OneTimeNotifyFuturePtr = std::shared_ptr<OneTimeNotifyFuture>;
4141
class DAGContext;
4242

43+
namespace DM
44+
{
45+
class ActiveSegmentReadTaskQueue;
46+
using ActiveSegmentReadTaskQueuePtr = std::shared_ptr<ActiveSegmentReadTaskQueue>;
47+
} // namespace DM
48+
49+
4350
class PipelineExecutorContext : private boost::noncopyable
4451
{
4552
public:
@@ -146,6 +153,8 @@ class PipelineExecutorContext : private boost::noncopyable
146153

147154
void addOneTimeFuture(const OneTimeNotifyFuturePtr & future);
148155

156+
void addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue);
157+
149158
private:
150159
bool setExceptionPtr(const std::exception_ptr & exception_ptr_);
151160

@@ -160,6 +169,8 @@ class PipelineExecutorContext : private boost::noncopyable
160169

161170
void cancelOneTimeFutures();
162171

172+
void cancelStorageTaskQueues();
173+
163174
void cancelResultQueueIfNeed();
164175

165176
private:
@@ -196,5 +207,7 @@ class PipelineExecutorContext : private boost::noncopyable
196207
std::vector<SharedQueuePtr> shared_queues;
197208

198209
std::vector<OneTimeNotifyFuturePtr> one_time_futures;
210+
211+
std::vector<DM::ActiveSegmentReadTaskQueuePtr> storage_task_queues;
199212
};
200213
} // namespace DB

dbms/src/Storages/StorageDisaggregatedRemote.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,9 @@ std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisagg
620620

621621
if (enable_read_thread)
622622
{
623-
TableID table_id = table_scan.getLogicalTableID(); // FIXME:
623+
// Now for StorageDisaggregated, we create only one SegmentReadTaskPool for segments from all partitions.
624+
// Use the logical table id as a workaround.
625+
TableID table_id = table_scan.getLogicalTableID();
624626
return std::make_shared<DM::SegmentReadTaskPool>(
625627
read_queue,
626628
extra_table_id_index,
@@ -690,7 +692,8 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams(
690692
size_t num_streams,
691693
DAGPipeline & pipeline)
692694
{
693-
// Share the read queue among all inputstreams // TODO: share for partition tables under disagg
695+
// Share the read queue among all inputstreams. Note that for StorageDisaggregated,
696+
// now we create only one SegmentReadTaskPool for segment from all partitions.
694697
auto read_queue = std::make_shared<DM::ActiveSegmentReadTaskQueue>(num_streams, log);
695698
// Build the input streams to read blocks from remote segments
696699
auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan);
@@ -763,8 +766,10 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps(
763766
DM::SegmentReadTasks && read_tasks,
764767
size_t num_streams)
765768
{
766-
// Share the read queue among all source ops // TODO: share for partition tables under disagg
769+
// Share the read queue among all source ops. Note that for StorageDisaggregated,
770+
// now we create only one SegmentReadTaskPool for segment from all partitions.
767771
auto read_queue = std::make_shared<DM::ActiveSegmentReadTaskQueue>(num_streams, log);
772+
exec_context.addStorageTaskQueue(read_queue);
768773
// Build the input streams to read blocks from remote segments
769774
auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan);
770775
auto packed_read_tasks = packSegmentReadTasks(

0 commit comments

Comments
 (0)