forked from pingcap/tiflash
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathReadBlockInfo.cpp
More file actions
156 lines (143 loc) · 5.89 KB
/
ReadBlockInfo.cpp
File metadata and controls
156 lines (143 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Common/config.h> // For ENABLE_CLARA
#include <Storages/DeltaMerge/File/ReadBlockInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Stream/IProvideVectorIndex.h>
#include <span>
#if ENABLE_CLARA
#include <Storages/DeltaMerge/Index/FullTextIndex/Stream/IProvideFullTextIndex.h>
#endif
namespace DB::DM
{
ReadBlockInfos ReadBlockInfo::create(
const RSResults & pack_res,
const DMFileMeta::PackStats & pack_stats,
const size_t read_pack_limit,
const size_t rows_threshold_per_read)
{
ReadBlockInfos read_block_infos;
size_t start_pack_id = 0;
size_t read_rows = 0;
auto prev_block_pack_res = RSResult::All;
for (size_t pack_id = 0; pack_id < pack_res.size(); ++pack_id)
{
bool is_use = pack_res[pack_id].isUse();
bool reach_limit = pack_id - start_pack_id >= read_pack_limit || read_rows >= rows_threshold_per_read;
// Get continuous packs with RSResult::All but don't split the read if it is too small.
// Too small block may hurts performance.
bool break_all_match = prev_block_pack_res.allMatch() && !pack_res[pack_id].allMatch()
&& read_rows >= rows_threshold_per_read / 2;
if (!is_use)
{
if (read_rows > 0)
read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows);
// Current pack is not included in the next read_block_info
start_pack_id = pack_id + 1;
read_rows = 0;
prev_block_pack_res = RSResult::All;
}
else if (reach_limit || break_all_match)
{
if (read_rows > 0)
read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows);
// Current pack must be included in the next read_block_info
start_pack_id = pack_id;
read_rows = pack_stats[pack_id].rows;
prev_block_pack_res = pack_res[pack_id];
}
else
{
prev_block_pack_res = prev_block_pack_res && pack_res[pack_id];
read_rows += pack_stats[pack_id].rows;
}
}
if (read_rows > 0)
read_block_infos.emplace_back(start_pack_id, pack_res.size() - start_pack_id, prev_block_pack_res, read_rows);
return read_block_infos;
}
template <typename T>
ReadBlockInfos ReadBlockInfo::createWithRowIDs(
std::span<T> row_ids,
const std::vector<size_t> & pack_offset,
const RSResults & pack_res,
const DMFileMeta::PackStats & pack_stats,
const size_t rows_threshold_per_read)
{
ReadBlockInfos read_block_infos;
size_t start_pack_id = 0;
size_t read_rows = 0;
auto prev_block_pack_res = RSResult::All;
auto sorted_results_it = row_ids.begin();
size_t pack_id = 0;
for (; pack_id < pack_stats.size(); ++pack_id)
{
if (sorted_results_it == row_ids.end())
break;
const auto begin = std::lower_bound( //
sorted_results_it,
row_ids.end(),
pack_offset[pack_id],
[](const auto & lhs, const auto & rhs) { return lhs.rowid < rhs; });
const auto end = std::lower_bound( //
begin,
row_ids.end(),
pack_offset[pack_id] + pack_stats[pack_id].rows,
[](const auto & lhs, const auto & rhs) { return lhs.rowid < rhs; });
bool is_use = begin != end;
bool reach_limit = read_rows >= rows_threshold_per_read;
bool break_all_match = prev_block_pack_res.allMatch() && !pack_res[pack_id].allMatch()
&& read_rows >= rows_threshold_per_read / 2;
if (!is_use)
{
if (read_rows > 0)
read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows);
start_pack_id = pack_id + 1;
read_rows = 0;
prev_block_pack_res = RSResult::All;
}
else if (reach_limit || break_all_match)
{
if (read_rows > 0)
read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows);
start_pack_id = pack_id;
read_rows = pack_stats[pack_id].rows;
prev_block_pack_res = pack_res[pack_id];
}
else
{
prev_block_pack_res = prev_block_pack_res && pack_res[pack_id];
read_rows += pack_stats[pack_id].rows;
}
sorted_results_it = end;
}
if (read_rows > 0)
read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows);
RUNTIME_CHECK_MSG(sorted_results_it == row_ids.end(), "All results are not consumed");
return read_block_infos;
}
template ReadBlockInfos ReadBlockInfo::createWithRowIDs<IProvideVectorIndex::SearchResult>(
std::span<IProvideVectorIndex::SearchResult> row_ids,
const std::vector<size_t> & pack_offset,
const RSResults & pack_res,
const DMFileMeta::PackStats & pack_stats,
size_t rows_threshold_per_read);
#if ENABLE_CLARA
template ReadBlockInfos ReadBlockInfo::createWithRowIDs<IProvideFullTextIndex::SearchResult>(
std::span<IProvideFullTextIndex::SearchResult> row_ids,
const std::vector<size_t> & pack_offset,
const RSResults & pack_res,
const DMFileMeta::PackStats & pack_stats,
size_t rows_threshold_per_read);
#endif
} // namespace DB::DM