-
Notifications
You must be signed in to change notification settings - Fork 413
Expand file tree
/
Copy pathDAGContext.h
More file actions
560 lines (484 loc) · 21.5 KB
/
DAGContext.h
File metadata and controls
560 lines (484 loc) · 21.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cstddef>
#include <mutex>
#include <unordered_map>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <kvproto/mpp.pb.h>
#pragma GCC diagnostic pop
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Core/QueryOperatorSpillContexts.h>
#include <Core/TaskOperatorSpillContexts.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGRequest.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Executor/toRU.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
#include <Operators/IOProfileInfo.h>
#include <Operators/OperatorProfileInfo.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Remote/DisaggTaskId.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
namespace DB
{
class Context;
class MPPTunnelSet;
class ExchangeReceiver;
using ExchangeReceiverPtr = std::shared_ptr<ExchangeReceiver>;
/// key: executor_id of ExchangeReceiver nodes in dag.
using ExchangeReceiverMap = std::unordered_map<String, ExchangeReceiverPtr>;
class MPPReceiverSet;
using MPPReceiverSetPtr = std::shared_ptr<MPPReceiverSet>;
class CoprocessorReader;
using CoprocessorReaderPtr = std::shared_ptr<CoprocessorReader>;
class AutoSpillTrigger;
class CTE;
struct JoinProfileInfo
{
UInt64 peak_build_bytes_usage = 0;
bool is_spill_enabled = false;
bool is_spilled = false;
};
using JoinProfileInfoPtr = std::shared_ptr<JoinProfileInfo>;
struct JoinExecuteInfo
{
String build_side_root_executor_id;
JoinProfileInfoPtr join_profile_info;
BlockInputStreams join_build_streams;
OperatorProfileInfos join_build_profile_infos;
};
using MPPTunnelSetPtr = std::shared_ptr<MPPTunnelSet>;
class ProcessListEntry;
UInt64 inline getMaxErrorCount(const tipb::DAGRequest &)
{
/// todo max_error_count is a system variable in mysql, TiDB should put it into dag request, now use the default value instead
return 1024;
}
namespace TiDBSQLFlags
{
constexpr UInt64 IGNORE_TRUNCATE = 1;
constexpr UInt64 TRUNCATE_AS_WARNING = 1u << 1u;
constexpr UInt64 PAD_CHAR_TO_FULL_LENGTH = 1u << 2u;
constexpr UInt64 IN_INSERT_STMT = 1u << 3u;
constexpr UInt64 IN_UPDATE_OR_DELETE_STMT = 1u << 4u;
constexpr UInt64 IN_SELECT_STMT = 1u << 5u;
// TiDB removed OverflowAsWarning flag in tidb/pull/49122.
// constexpr UInt64 OVERFLOW_AS_WARNING = 1u << 6u;
constexpr UInt64 IGNORE_ZERO_IN_DATE = 1u << 7u;
constexpr UInt64 DIVIDED_BY_ZERO_AS_WARNING = 1u << 8u;
constexpr UInt64 IN_LOAD_DATA_STMT = 1u << 10u;
} // namespace TiDBSQLFlags
namespace TiDBSQLMode
{
constexpr UInt64 REAL_AS_FLOAT = 1ul;
constexpr UInt64 PIPES_AS_CONCAT = 1ul << 1ul;
constexpr UInt64 ANSI_QUOTES = 1ul << 2ul;
constexpr UInt64 IGNORE_SPACE = 1ul << 3ul;
constexpr UInt64 NOT_USED = 1ul << 4ul;
constexpr UInt64 ONLY_FULL_GROUP_BY = 1ul << 5ul;
constexpr UInt64 NO_UNSIGNED_SUBTRACTION = 1ul << 6ul;
constexpr UInt64 NO_DIR_IN_CREATE = 1ul << 7ul;
constexpr UInt64 POSTGRESQL = 1ul << 8ul;
constexpr UInt64 ORACLE = 1ul << 9ul;
constexpr UInt64 MSSQL = 1ul << 10ul;
constexpr UInt64 DB2 = 1ul << 11ul;
constexpr UInt64 MAXDB = 1ul << 12ul;
constexpr UInt64 NO_KEY_OPTIONS = 1ul << 13ul;
constexpr UInt64 NO_TABLE_OPTIONS = 1ul << 14ul;
constexpr UInt64 NO_FIELD_OPTIONS = 1ul << 15ul;
constexpr UInt64 MYSQL323 = 1ul << 16ul;
constexpr UInt64 MYSQL40 = 1ul << 17ul;
constexpr UInt64 ANSI = 1ul << 18ul;
constexpr UInt64 NO_AUTO_VALUE_ON_ZERO = 1ul << 19ul;
constexpr UInt64 NO_BACK_SLASH_ESCAPES = 1ul << 20ul;
constexpr UInt64 STRICT_TRANS_TABLES = 1ul << 21ul;
constexpr UInt64 STRICT_ALL_TABLES = 1ul << 22ul;
constexpr UInt64 NO_ZERO_IN_DATE = 1ul << 23ul;
constexpr UInt64 NO_ZERO_DATE = 1ul << 24ul;
constexpr UInt64 INVALID_DATES = 1ul << 25ul;
constexpr UInt64 ERROR_FOR_DIVISION_BY_ZERO = 1ul << 26ul;
constexpr UInt64 TRADITIONAL = 1ul << 27ul;
constexpr UInt64 NO_AUTO_CREATE_USER = 1ul << 28ul;
constexpr UInt64 HIGH_NOT_PRECEDENCE = 1ul << 29ul;
constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
// Duplicated with Flag::PAD_CHAR_TO_FULL_LENGTH
// PAD_CHAR_TO_FULL_LENGTH = 1ul << 31ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode
constexpr Int32 DEFAULT_DIV_PRECISION_INCREMENT = 4;
enum class ExecutionMode
{
None,
Stream,
Pipeline,
};
enum class DAGRequestKind
{
Cop,
CopStream,
BatchCop,
MPP,
};
/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
public:
// for non-mpp(Cop/CopStream/BatchCop)
DAGContext(
tipb::DAGRequest & dag_request_,
TablesRegionsInfo && tables_regions_info_,
KeyspaceID keyspace_id_,
const String & tidb_host_,
DAGRequestKind cop_kind_,
const String & resource_group_name,
UInt64 connection_id_,
const String & connection_alias_,
LoggerPtr log_);
// for mpp
DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);
// for disaggregated task on write node
DAGContext(
tipb::DAGRequest & dag_request_,
const disaggregated::DisaggTaskMeta & task_meta_,
TablesRegionsInfo && tables_regions_info_,
const String & compute_node_host_,
LoggerPtr log_);
// for test
explicit DAGContext(UInt64 max_error_count_);
// for tests need to run query tasks.
DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);
~DAGContext();
std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();
std::unordered_map<String, OperatorProfileInfos> & getOperatorProfileInfosMap();
void addOperatorProfileInfos(
const String & executor_id,
OperatorProfileInfos && profile_infos,
bool is_append = false);
std::unordered_map<String, std::vector<String>> & getExecutorIdToJoinIdMap();
std::unordered_map<String, JoinExecuteInfo> & getJoinExecuteInfoMap();
std::unordered_map<String, BlockInputStreams> & getInBoundIOInputStreamsMap();
std::unordered_map<String, IOProfileInfos> & getInboundIOProfileInfosMap();
void addInboundIOProfileInfos(
const String & executor_id,
IOProfileInfos && io_profile_infos,
bool is_append = false);
void handleTruncateError(const String & msg);
void handleOverflowError(const String & msg);
void handleDivisionByZero();
void handleInvalidTime(const String & msg, const TiFlashError & error);
void appendWarning(const String & msg, int32_t code = 0);
bool allowZeroInDate() const;
bool allowInvalidDate() const;
bool shouldClipToZero() const;
/// This method is thread-safe.
void appendWarning(const tipb::Error & warning)
{
if (warning_count.fetch_add(1, std::memory_order_acq_rel) < max_recorded_error_count)
{
warnings.tryPush(warning);
}
}
/// Consume all warnings. Once this method called, every warning will be cleared.
/// This method is not thread-safe.
void consumeWarnings(std::vector<tipb::Error> & warnings_)
{
const size_t warnings_size = warnings.size();
warnings_.reserve(warnings_size);
for (size_t i = 0; i < warnings_size; ++i)
{
tipb::Error error;
warnings.pop(error);
warnings_.push_back(error);
}
}
void fillWarnings(tipb::SelectResponse & response)
{
std::vector<tipb::Error> warnings_vec;
consumeWarnings(warnings_vec);
for (auto & warning : warnings_vec)
{
auto * warn = response.add_warnings();
// TODO: consider using allocated warnings to prevent copy?
warn->CopyFrom(warning);
}
response.set_warning_count(getWarningCount());
}
void clearWarnings()
{
warnings.clear();
warning_count = 0;
}
UInt64 getWarningCount() { return warning_count; }
const mpp::TaskMeta & getMPPTaskMeta() const { return mpp_task_meta; }
bool isCop() const { return kind == DAGRequestKind::Cop; }
bool isCopStream() const { return kind == DAGRequestKind::CopStream; }
bool isBatchCop() const { return kind == DAGRequestKind::BatchCop; }
bool isMPPTask() const { return kind == DAGRequestKind::MPP; }
/// root mpp task means mpp task that send data back to TiDB
bool isRootMPPTask() const { return is_root_mpp_task; }
const MPPTaskId & getMPPTaskId() const { return mpp_task_id; }
const std::unique_ptr<DM::DisaggTaskId> & getDisaggTaskId() const { return disaggregated_id; }
std::pair<bool, double> getTableScanThroughput();
const SingleTableRegions & getTableRegionsInfoByTableID(Int64 table_id) const;
bool containsRegionsInfoForTable(Int64 table_id) const;
UInt64 getFlags() const { return flags; }
void setFlags(UInt64 f) { flags = f; }
void addFlag(UInt64 f) { flags |= f; }
void delFlag(UInt64 f) { flags &= (~f); }
bool hasFlag(UInt64 f) const { return (flags & f); }
UInt64 getSQLMode() const { return sql_mode; }
void setSQLMode(UInt64 f) { sql_mode = f; }
void addSQLMode(UInt64 f) { sql_mode |= f; }
void delSQLMode(UInt64 f) { sql_mode &= (~f); }
bool hasSQLMode(UInt64 f) const { return sql_mode & f; }
Int32 getDivPrecisionIncrement() const { return div_precision_increment; }
// for test usage only
void setDivPrecisionIncrement(Int32 new_value) { div_precision_increment = new_value; }
void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit);
ExchangeReceiverPtr getMPPExchangeReceiver(const String & executor_id) const;
void setMPPReceiverSet(const MPPReceiverSetPtr & receiver_set) { mpp_receiver_set = receiver_set; }
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
std::vector<CoprocessorReaderPtr> & getCoprocessorReaders();
void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
std::vector<SubqueriesForSets> && moveSubqueries() { return std::move(subqueries); }
void setProcessListEntry(const std::shared_ptr<ProcessListEntry> & entry) { process_list_entry = entry; }
std::shared_ptr<ProcessListEntry> getProcessListEntry() const { return process_list_entry; }
void setQueryOperatorSpillContexts(
const std::shared_ptr<QueryOperatorSpillContexts> & query_operator_spill_contexts_)
{
query_operator_spill_contexts = query_operator_spill_contexts_;
}
std::shared_ptr<QueryOperatorSpillContexts> & getQueryOperatorSpillContexts()
{
return query_operator_spill_contexts;
}
void setAutoSpillTrigger(const std::shared_ptr<AutoSpillTrigger> & auto_spill_trigger_)
{
auto_spill_trigger = auto_spill_trigger_;
}
AutoSpillTrigger * getAutoSpillTrigger()
{
return auto_spill_trigger == nullptr ? nullptr : auto_spill_trigger.get();
}
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getResourceGroupName() { return resource_group_name; }
// For now, only called for BlockIO execution engine to disable report RU of storage layer.
void clearResourceGroupName() { resource_group_name = ""; }
UInt64 getReadBytes() const;
void switchToStreamMode()
{
RUNTIME_CHECK(execution_mode == ExecutionMode::None);
execution_mode = ExecutionMode::Stream;
}
void switchToPipelineMode()
{
RUNTIME_CHECK(execution_mode == ExecutionMode::None);
execution_mode = ExecutionMode::Pipeline;
}
ExecutionMode getExecutionMode() const { return execution_mode; }
void registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
if (in_auto_spill_mode)
operator_spill_contexts->registerOperatorSpillContext(operator_spill_context);
}
void registerTaskOperatorSpillContexts()
{
query_operator_spill_contexts->registerTaskOperatorSpillContexts(operator_spill_contexts);
}
void setAutoSpillMode() { in_auto_spill_mode = true; }
bool isInAutoSpillMode() const { return in_auto_spill_mode; }
UInt64 getConnectionID() const { return connection_id; }
const String & getConnectionAlias() const { return connection_alias; }
const String & getSQLDigest() const { return sql_digest; }
const String & getPlanDigest() const { return plan_digest; }
MPPReceiverSetPtr getMPPReceiverSet() const { return mpp_receiver_set; }
String getQueryIDAndCTEIDForSink()
{
std::lock_guard<std::mutex> lock(this->cte_mu);
return this->query_id_and_cte_id_for_sink;
}
String getQueryIDAndCTEIDForSource(size_t cte_id)
{
std::lock_guard<std::mutex> lock(this->cte_mu);
return this->query_id_and_cte_id_for_sources[cte_id];
}
void setQueryIDAndCTEIDForSink(const String & query_id_and_cte_id)
{
std::lock_guard<std::mutex> lock(this->cte_mu);
// MPP Task has only one CTESink, it's impossible to set query_id_and_cte_id_for_sink twice
RUNTIME_CHECK(this->query_id_and_cte_id_for_sink.empty(), this->query_id_and_cte_id_for_sink);
this->query_id_and_cte_id_for_sink = query_id_and_cte_id;
}
void addQueryIDAndCTEIDForSource(size_t cte_id, const String & query_id_and_cte_id)
{
std::lock_guard<std::mutex> lock(this->cte_mu);
auto iter = this->query_id_and_cte_id_for_sources.find(cte_id);
if (iter != this->query_id_and_cte_id_for_sources.end())
{
RUNTIME_CHECK_MSG(iter->second == query_id_and_cte_id, "{} vs {}", iter->second, query_id_and_cte_id);
return;
}
this->query_id_and_cte_id_for_sources.insert(std::make_pair(cte_id, query_id_and_cte_id));
}
std::shared_ptr<CTE> getCTESink()
{
std::lock_guard<std::mutex> lock(this->cte_mu);
return this->sink_cte;
}
std::unordered_map<size_t, std::shared_ptr<CTE>> getCTESource()
{
std::lock_guard<std::mutex> lock(this->cte_mu);
return this->source_ctes;
}
void setCTESink(std::shared_ptr<CTE> & cte)
{
std::lock_guard<std::mutex> lock(this->cte_mu);
RUNTIME_CHECK(!this->sink_cte);
this->sink_cte = cte;
}
void addCTESource(size_t cte_id, std::shared_ptr<CTE> & cte)
{
std::lock_guard<std::mutex> lock(this->cte_mu);
auto iter = this->source_ctes.find(cte_id);
if (iter != this->source_ctes.end())
{
RUNTIME_CHECK(iter->second.get() == cte.get());
return;
}
this->source_ctes.insert(std::make_pair(cte_id, cte));
}
public:
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
String dummy_query_string;
ASTPtr dummy_ast;
Int64 compile_time_ns = 0;
Int64 minTSO_wait_time_ns = 0;
size_t final_concurrency = 1;
size_t initialize_concurrency = 1;
bool has_read_wait_index = false;
Clock::time_point read_wait_index_start_timestamp{Clock::duration::zero()};
Clock::time_point read_wait_index_end_timestamp{Clock::duration::zero()};
String table_scan_executor_id;
// For mpp/cop/batchcop this is the host of tidb
// For disaggregated read, this is the host of compute node
String tidb_host = "Unknown";
bool collect_execution_summaries{};
/* const */ DAGRequestKind kind;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_disaggregated_task = false; // a disagg task handling by the write node
// `tunnel_set` is always set by `MPPTask` and is used later.
MPPTunnelSetPtr tunnel_set;
TablesRegionsInfo tables_regions_info;
// part of regions_for_local_read + regions_for_remote_read, only used for batch-cop
RegionInfoList retry_regions;
LoggerPtr log;
// initialized in `initOutputInfo`.
std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type = tipb::EncodeType::TypeDefault;
// only meaningful in final projection.
bool keep_session_timezone_info = false;
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;
/// executor_id, ScanContextPtr
/// Currently, max(scan_context_map.size()) == 1, because one mpp task only have do one table scan
/// While when we support collcate join later, scan_context_map.size() may > 1,
/// thus we need to pay attention to scan_context_map usage that time.
std::unordered_map<String, DM::ScanContextPtr> scan_context_map;
RuntimeFilterMgr runtime_filter_mgr;
private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
tipb::EncodeType analyzeDAGEncodeType() const;
void handleTruncateErrorInternal(const String & msg);
private:
std::shared_ptr<ProcessListEntry> process_list_entry;
bool in_auto_spill_mode = false;
std::shared_ptr<TaskOperatorSpillContexts> operator_spill_contexts;
std::shared_ptr<QueryOperatorSpillContexts> query_operator_spill_contexts;
std::shared_ptr<AutoSpillTrigger> auto_spill_trigger;
/// Holding the table lock to make sure that the table wouldn't be dropped during the lifetime of this query, even if there are no local regions.
/// TableLockHolders need to be released after the BlockInputStream is destroyed to prevent data read exceptions.
TableLockHolders table_locks;
/// operator profile related
/// operator_profile_infos will be added to map concurrently at runtime, so a lock is needed to prevent data race.
std::mutex operator_profile_infos_map_mu;
/// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams.
std::unordered_map<String, BlockInputStreams> profile_streams_map;
/// operator_profile_infos_map is a map that maps from executor_id to OperatorProfileInfos.
std::unordered_map<String, OperatorProfileInfos> operator_profile_infos_map;
/// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children.
std::unordered_map<String, std::vector<String>> executor_id_to_join_id_map;
/// join_execute_info_map is a map that maps from join_probe_executor_id to JoinExecuteInfo
/// DAGResponseWriter / JoinStatistics gets JoinExecuteInfo through it.
std::unordered_map<std::string, JoinExecuteInfo> join_execute_info_map;
/// inbound_io_input_streams_map is a map that maps from executor_id (table_scan / exchange_receiver) to BlockInputStreams.
/// BlockInputStreams contains ExchangeReceiverInputStream, CoprocessorBlockInputStream and local_read_input_stream etc.
std::unordered_map<String, BlockInputStreams> inbound_io_input_streams_map;
/// inbound_io_profile_infos_map is a map that maps from executor_id (table_scan / exchange_receiver) to IOProfileInfos.
/// IOProfileInfos are from ExchangeReceiverSourceOp, CoprocessorSourceOp and local_read_source etc.
std::unordered_map<String, IOProfileInfos> inbound_io_profile_infos_map;
UInt64 flags;
UInt64 sql_mode;
Int32 div_precision_increment = DEFAULT_DIV_PRECISION_INCREMENT;
mpp::TaskMeta mpp_task_meta;
const MPPTaskId mpp_task_id = MPPTaskId::unknown_mpp_task_id;
// The task id for disaggregated read
const std::unique_ptr<DM::DisaggTaskId> disaggregated_id;
/// max_recorded_error_count is the max error/warning need to be recorded in warnings
UInt64 max_recorded_error_count;
ConcurrentBoundedQueue<tipb::Error> warnings;
/// warning_count is the actual warning count during the entire execution
std::atomic<UInt64> warning_count;
// `mpp_receiver_set` is always set by `MPPTask` and is used later.
MPPReceiverSetPtr mpp_receiver_set;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
// The keyspace that the DAG request from
const KeyspaceID keyspace_id = NullspaceID;
String resource_group_name;
// Used to determine the execution mode
// - None: request has not been executed yet
// - Stream: execute with block input stream
// - Pipeline: execute with pipeline model
ExecutionMode execution_mode = ExecutionMode::None;
// It's the session id between mysql client and tidb
UInt64 connection_id;
// It's the session alias between mysql client and tidb
String connection_alias;
String sql_digest;
String plan_digest;
String query_id_and_cte_id_for_sink;
std::unordered_map<size_t, String> query_id_and_cte_id_for_sources;
std::mutex cte_mu;
std::shared_ptr<CTE> sink_cte;
std::unordered_map<size_t, std::shared_ptr<CTE>> source_ctes;
};
} // namespace DB