3232#include < Flash/Coprocessor/DAGStorageInterpreter.h>
3333#include < Flash/Coprocessor/InterpreterUtils.h>
3434#include < Flash/Coprocessor/RemoteRequest.h>
35+ #include < Flash/Coprocessor/TableScanPipelineExecBuilder.h>
3536#include < Flash/Coprocessor/collectOutputFieldTypes.h>
3637#include < Interpreters/Context.h>
3738#include < Interpreters/SharedContexts/Disagg.h>
4344#include < Operators/UnorderedSourceOp.h>
4445#include < Parsers/makeDummyQuery.h>
4546#include < Storages/DeltaMerge/Index/VectorIndex/Stream/Ctx.h>
47+ #include < Storages/DeltaMerge/ReadThread/SharedBlockQueue.h>
4648#include < Storages/DeltaMerge/Remote/DisaggSnapshot.h>
4749#include < Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h>
4850#include < Storages/DeltaMerge/ScanContext.h>
@@ -917,6 +919,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
917919std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSelectQueryInfos ()
918920{
919921 std::unordered_map<TableID, SelectQueryInfo> ret;
922+ auto read_queue = std::make_shared<DM::SharedBlockQueue>(log);
920923 auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo {
921924 SelectQueryInfo query_info;
922925 // / to avoid null point exception
@@ -934,6 +937,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
934937 query_info.req_id = fmt::format (" {} table_id={}" , log->identifier (), table_id);
935938 query_info.keep_order = table_scan.keepOrder ();
936939 query_info.is_fast_scan = table_scan.isFastScan ();
940+ query_info.read_queue = read_queue;
937941 return query_info;
938942 };
939943 RUNTIME_CHECK_MSG (mvcc_query_info->scan_context != nullptr , " Unexpected null scan_context" );
@@ -1060,16 +1064,16 @@ Int32 getMaxAllowRetryForLocalRead(const SelectQueryInfo & query_info)
10601064}
10611065} // namespace
10621066
1063- DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalStreamsForPhysicalTable (
1067+ void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable (
10641068 const TableID & table_id,
10651069 const SelectQueryInfo & query_info,
10661070 DAGPipeline & pipeline,
1071+ DM::Remote::DisaggReadSnapshotPtr & disagg_snap,
10671072 size_t max_block_size)
10681073{
1069- DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
10701074 size_t region_num = query_info.mvcc_query_info ->regions_query_info .size ();
10711075 if (region_num == 0 )
1072- return table_snap ;
1076+ return ;
10731077
10741078 assert (storages_with_structure_lock.find (table_id) != storages_with_structure_lock.end ());
10751079 auto & storage = storages_with_structure_lock[table_id].storage ;
@@ -1087,6 +1091,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
10871091 {
10881092 try
10891093 {
1094+ DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
10901095 if (!dag_context.is_disaggregated_task )
10911096 {
10921097 // build local inputstreams
@@ -1114,6 +1119,12 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
11141119 // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams`
11151120 // may contain different data other than expected.
11161121 validateQueryInfo (*query_info.mvcc_query_info , learner_read_snapshot, tmt, log);
1122+
1123+ // Only after all streams are built successfully, we add the task to mvcc_query_info
1124+ if (table_snap)
1125+ {
1126+ disagg_snap->addTask (table_id, std::move (table_snap));
1127+ }
11171128 break ;
11181129 }
11191130 catch (RegionException & e)
@@ -1124,6 +1135,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
11241135 {
11251136 // clean all streams from local because we are not sure the correctness of those streams
11261137 pipeline.streams .clear ();
1138+ // clean table task from read_queue
1139+ query_info.read_queue ->resetTableTask (table_id);
11271140 if (likely (checkRetriableForBatchCopOrMPP (table_id, query_info, e, num_allow_retry)))
11281141 continue ; // next retry to read from local storage
11291142 else
@@ -1143,20 +1156,18 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
11431156 throw ;
11441157 }
11451158 }
1146- return table_snap;
11471159}
11481160
1149- DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalExecForPhysicalTable (
1161+ void DAGStorageInterpreter::buildLocalExecForPhysicalTable (
11501162 PipelineExecutorContext & exec_context,
1151- PipelineExecGroupBuilder & group_builder,
1163+ TableScanPipelineExecGroupBuilder & group_builder,
11521164 const TableID & table_id,
11531165 const SelectQueryInfo & query_info,
11541166 size_t max_block_size)
11551167{
1156- DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
11571168 size_t region_num = query_info.mvcc_query_info ->regions_query_info .size ();
11581169 if (region_num == 0 )
1159- return table_snap ;
1170+ return ;
11601171
11611172 RUNTIME_CHECK (storages_with_structure_lock.find (table_id) != storages_with_structure_lock.end ());
11621173 auto & storage = storages_with_structure_lock[table_id].storage ;
@@ -1167,6 +1178,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
11671178 {
11681179 try
11691180 {
1181+ DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
11701182 if (!dag_context.is_disaggregated_task )
11711183 {
11721184 storage->read (
@@ -1197,6 +1209,9 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
11971209 // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `sourceOps`
11981210 // may contain different data other than expected.
11991211 validateQueryInfo (*query_info.mvcc_query_info , learner_read_snapshot, tmt, log);
1212+
1213+ // Only after all sourceOps are built and verified, we add the snapshot to group_builder
1214+ group_builder.addPhysicalTableTask (table_id, std::move (table_snap));
12001215 break ;
12011216 }
12021217 catch (RegionException & e)
@@ -1207,6 +1222,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
12071222 {
12081223 // clean all operator from local because we are not sure the correctness of those operators
12091224 group_builder.reset ();
1225+ // clean table task from read_queue
1226+ query_info.read_queue ->resetTableTask (table_id);
12101227 if (likely (checkRetriableForBatchCopOrMPP (table_id, query_info, e, num_allow_retry)))
12111228 continue ;
12121229 else
@@ -1226,7 +1243,6 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
12261243 throw ;
12271244 }
12281245 }
1229- return table_snap;
12301246}
12311247
12321248void DAGStorageInterpreter::buildLocalStreams (DAGPipeline & pipeline, size_t max_block_size)
@@ -1237,6 +1253,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
12371253 return ;
12381254 mvcc_query_info->scan_context ->setRegionNumOfCurrentInstance (total_local_region_num);
12391255 const auto table_query_infos = generateSelectQueryInfos ();
1256+ RUNTIME_CHECK_MSG (!table_query_infos.empty (), " No table query info generated for local read" );
12401257 bool has_multiple_partitions = table_query_infos.size () > 1 ;
12411258 // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
12421259 std::shared_ptr<MultiPartitionStreamPool> stream_pool
@@ -1248,12 +1265,17 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
12481265 DAGPipeline current_pipeline;
12491266 const TableID physical_table_id = table_query_info.first ;
12501267 const SelectQueryInfo & query_info = table_query_info.second ;
1251- auto table_snap
1252- = buildLocalStreamsForPhysicalTable (physical_table_id, query_info, current_pipeline, max_block_size);
1253- if (table_snap)
1254- {
1255- disaggregated_snap->addTask (physical_table_id, std::move (table_snap));
1256- }
1268+ RUNTIME_CHECK_MSG (
1269+ query_info.read_queue != nullptr ,
1270+ " read_queue should not be null, table_id={}" ,
1271+ physical_table_id);
1272+ buildLocalStreamsForPhysicalTable (
1273+ physical_table_id,
1274+ query_info,
1275+ current_pipeline,
1276+ disaggregated_snap,
1277+ max_block_size);
1278+
12571279 if (has_multiple_partitions)
12581280 stream_pool->addPartitionStreams (current_pipeline.streams );
12591281 else
@@ -1263,6 +1285,10 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
12631285 current_pipeline.streams .end ());
12641286 }
12651287
1288+ assert (!table_query_infos.empty ()); // check at start of this function
1289+ auto read_queue = table_query_infos.begin ()->second .read_queue ;
1290+ read_queue->finishQueueIfEmpty ();
1291+
12661292 LOG_DEBUG (
12671293 log,
12681294 " local streams built, is_disaggregated_task={} snap_id={}" ,
@@ -1304,30 +1330,33 @@ void DAGStorageInterpreter::buildLocalExec(
13041330 return ;
13051331 mvcc_query_info->scan_context ->setRegionNumOfCurrentInstance (total_local_region_num);
13061332 const auto table_query_infos = generateSelectQueryInfos ();
1333+ RUNTIME_CHECK_MSG (!table_query_infos.empty (), " No table query info generated for local read" );
13071334 bool has_multiple_partitions = table_query_infos.size () > 1 ;
13081335 ConcatBuilderPool builder_pool{max_streams, context.getSettingsRef ().dt_enable_unordered_concat };
13091336
13101337 auto disaggregated_snap = std::make_shared<DM::Remote::DisaggReadSnapshot>();
13111338 for (const auto & table_query_info : table_query_infos)
13121339 {
1313- PipelineExecGroupBuilder builder;
13141340 const TableID physical_table_id = table_query_info.first ;
13151341 const SelectQueryInfo & query_info = table_query_info.second ;
1316- auto table_snap
1317- = buildLocalExecForPhysicalTable (exec_context, builder, physical_table_id, query_info, max_block_size);
1318- if (table_snap)
1319- {
1320- disaggregated_snap-> addTask (physical_table_id, std::move (table_snap) );
1321- }
1342+ RUNTIME_CHECK_MSG (
1343+ query_info. read_queue != nullptr ,
1344+ " read_queue should not be null, table_id={} " ,
1345+ physical_table_id);
1346+ TableScanPipelineExecGroupBuilder builder (query_info. read_queue , disaggregated_snap );
1347+ buildLocalExecForPhysicalTable (exec_context, builder, physical_table_id, query_info, max_block_size);
13221348
13231349 if (has_multiple_partitions)
13241350 builder_pool.add (builder);
13251351 else
13261352 group_builder.merge (std::move (builder));
13271353 }
13281354
1329- LOG_DEBUG (log, " local sourceOps built, is_disaggregated_task={}" , dag_context.is_disaggregated_task );
1355+ assert (!table_query_infos.empty ()); // check at start of this function
1356+ auto read_queue = table_query_infos.begin ()->second .read_queue ;
1357+ read_queue->finishQueueIfEmpty ();
13301358
1359+ LOG_DEBUG (log, " local sourceOps built, is_disaggregated_task={}" , dag_context.is_disaggregated_task );
13311360 if (dag_context.is_disaggregated_task )
13321361 {
13331362 // register the snapshot to manager
0 commit comments