Skip to content

Commit ec6b716

Browse files
Limit memory usage of exchange sender (#10387) (#10417)
close #10337 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Co-authored-by: xufei <xufeixw@mail.ustc.edu.cn> Co-authored-by: xufei <xufei@pingcap.com>
1 parent 7417e50 commit ec6b716

19 files changed

Lines changed: 336 additions & 66 deletions

dbms/src/Flash/Coprocessor/DAGDriver.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <DataStreams/copyData.h>
2121
#include <Flash/Coprocessor/DAGContext.h>
2222
#include <Flash/Coprocessor/DAGDriver.h>
23+
#include <Flash/Coprocessor/DAGUtils.h>
2324
#include <Flash/Coprocessor/StreamWriter.h>
2425
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
2526
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
@@ -171,6 +172,7 @@ try
171172
streaming_writer,
172173
context.getSettingsRef().dag_records_per_chunk,
173174
context.getSettingsRef().batch_send_min_limit,
175+
getMaxBufferedBytesInResponseWriter(context.getSettingsRef().max_buffered_bytes_in_executor, 1),
174176
dag_context);
175177
response_writer->prepare(query_executor->getSampleBlock());
176178
query_executor
@@ -225,6 +227,7 @@ try
225227
streaming_writer,
226228
context.getSettingsRef().dag_records_per_chunk,
227229
context.getSettingsRef().batch_send_min_limit,
230+
getMaxBufferedBytesInResponseWriter(context.getSettingsRef().max_buffered_bytes_in_executor, 1),
228231
dag_context);
229232
response_writer->prepare(query_executor->getSampleBlock());
230233
query_executor

dbms/src/Flash/Coprocessor/DAGResponseWriter.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,17 @@ class DAGResponseWriter
4949

5050
virtual ~DAGResponseWriter() = default;
5151

52+
bool needFlush() const { return buffered_bytes >= max_buffered_bytes || buffered_rows >= max_buffered_rows; }
53+
5254
protected:
5355
Int64 records_per_chunk;
5456
DAGContext & dag_context;
5557
bool has_pending_flush = false;
58+
59+
UInt64 max_buffered_bytes = 0;
60+
UInt64 max_buffered_rows = 0;
61+
UInt64 buffered_bytes = 0;
62+
UInt64 buffered_rows = 0;
5663
};
5764

5865
} // namespace DB

dbms/src/Flash/Coprocessor/DAGUtils.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include <Common/FmtUtils.h>
16+
#include <Common/ThresholdUtils.h>
1617
#include <Common/TiFlashException.h>
1718
#include <Core/Types.h>
1819
#include <DataTypes/DataTypeNullable.h>
@@ -1570,4 +1571,19 @@ tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name)
15701571
throw Exception(fmt::format("Unsupported function {}", name));
15711572
return func_name_sig_map[name];
15721573
}
1574+
1575+
constexpr ssize_t MAX_BATCH_SEND_MIN_LIMIT_MEM_SIZE
1576+
= 1024 * 1024 * 32; // 32MB (8192 Rows * 256 Byte/row * 16 partitions)
1577+
1578+
UInt64 getMaxBufferedBytesInResponseWriter(Int64 max_buffered_bytes_in_executor, size_t concurrency)
1579+
{
1580+
// max buffered bytes in ExchangeSender, we don't want to send too many small chunks so there is a minimum limit of MAX_BATCH_SEND_MIN_LIMIT_MEM_SIZE
1581+
UInt64 ret = MAX_BATCH_SEND_MIN_LIMIT_MEM_SIZE;
1582+
if (max_buffered_bytes_in_executor > 0)
1583+
{
1584+
ret = std::max(ret, getAverageThreshold(max_buffered_bytes_in_executor, concurrency));
1585+
}
1586+
return ret;
1587+
}
1588+
15731589
} // namespace DB

dbms/src/Flash/Coprocessor/DAGUtils.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ void assertBlockSchema(const DataTypes & expected_types, const Block & block, co
8585

8686
void assertBlockSchema(const Block & header, const Block & block, const String & context_description);
8787

88+
UInt64 getMaxBufferedBytesInResponseWriter(Int64 max_buffered_bytes_in_executor, size_t concurrency);
89+
8890
class UniqueNameGenerator
8991
{
9092
private:

dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
3535
StreamWriterPtr writer_,
3636
Int64 records_per_chunk_,
3737
Int64 batch_send_min_limit_,
38+
UInt64 max_buffered_bytes_,
3839
DAGContext & dag_context_)
3940
: DAGResponseWriter(records_per_chunk_, dag_context_)
40-
, batch_send_min_limit(batch_send_min_limit_)
4141
, writer(writer_)
4242
{
43-
rows_in_blocks = 0;
4443
switch (dag_context.encode_type)
4544
{
4645
case tipb::EncodeType::TypeDefault:
@@ -56,15 +55,20 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
5655
throw TiFlashException("Unsupported EncodeType", Errors::Coprocessor::Internal);
5756
}
5857
/// For other encode types, we will use records_per_chunk to control the batch size sent.
59-
batch_send_min_limit
60-
= dag_context.encode_type == tipb::EncodeType::TypeCHBlock ? batch_send_min_limit : (records_per_chunk - 1);
58+
auto batch_send_min_limit
59+
= dag_context.encode_type == tipb::EncodeType::TypeCHBlock ? batch_send_min_limit_ : (records_per_chunk - 1);
60+
if (batch_send_min_limit <= 0)
61+
max_buffered_rows = 1;
62+
else
63+
max_buffered_rows = static_cast<UInt64>(batch_send_min_limit);
64+
max_buffered_bytes = max_buffered_bytes_;
6165
}
6266

6367
template <class StreamWriterPtr>
6468
WriteResult StreamingDAGResponseWriter<StreamWriterPtr>::flush()
6569
{
6670
has_pending_flush = false;
67-
if (rows_in_blocks > 0)
71+
if (buffered_rows > 0)
6872
{
6973
auto wait_res = waitForWritable();
7074
if (wait_res == WaitResult::Ready)
@@ -96,11 +100,12 @@ WriteResult StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & blo
96100
size_t rows = block.rows();
97101
if (rows > 0)
98102
{
99-
rows_in_blocks += rows;
103+
buffered_rows += rows;
104+
buffered_bytes += block.allocatedBytes();
100105
blocks.push_back(block);
101106
}
102107

103-
if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
108+
if (needFlush())
104109
{
105110
return flush();
106111
}
@@ -157,7 +162,8 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::encodeThenWriteBlocks()
157162
}
158163

159164
assert(blocks.empty());
160-
rows_in_blocks = 0;
165+
buffered_rows = 0;
166+
buffered_bytes = 0;
161167
writer->write(response.getResponse());
162168
}
163169

dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
3535
StreamWriterPtr writer_,
3636
Int64 records_per_chunk_,
3737
Int64 batch_send_min_limit_,
38+
UInt64 max_buffered_bytes_,
3839
DAGContext & dag_context_);
3940
WaitResult waitForWritable() const override;
4041

@@ -45,10 +46,8 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
4546
void encodeThenWriteBlocks();
4647

4748
private:
48-
Int64 batch_send_min_limit;
4949
StreamWriterPtr writer;
5050
std::vector<Block> blocks;
51-
size_t rows_in_blocks;
5251
std::unique_ptr<ChunkCodecStream> chunk_codec_stream;
5352
};
5453

dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <gtest/gtest.h>
2525

2626
#include <Flash/Coprocessor/StreamingDAGResponseWriter.cpp>
27+
#include <cstddef>
2728

2829
namespace DB
2930
{
@@ -113,6 +114,7 @@ try
113114
const size_t block_rows = 64;
114115
const size_t block_num = 64;
115116
const size_t batch_send_min_limit = 108;
117+
const size_t max_buffered_bytes = 1024 * 1024 * 16;
116118

117119
// 1. Build Blocks.
118120
std::vector<Block> blocks;
@@ -135,6 +137,7 @@ try
135137
mock_writer,
136138
batch_send_min_limit,
137139
batch_send_min_limit,
140+
max_buffered_bytes,
138141
*dag_context_ptr);
139142
for (const auto & block : blocks)
140143
dag_writer->write(block);
@@ -169,5 +172,57 @@ try
169172
}
170173
CATCH
171174

175+
TEST_F(TestStreamingWriter, testMaxBufferedBytes)
176+
try
177+
{
178+
std::vector<tipb::EncodeType> encode_types{
179+
tipb::EncodeType::TypeDefault,
180+
tipb::EncodeType::TypeChunk,
181+
tipb::EncodeType::TypeCHBlock};
182+
for (const auto & encode_type : encode_types)
183+
{
184+
dag_context_ptr->encode_type = encode_type;
185+
186+
const size_t block_rows = 64;
187+
const size_t block_num = 64;
188+
const size_t batch_send_min_limit = block_rows * block_num / 2;
189+
const std::vector<UInt64> max_buffered_bytes_vec{1, 1024 * 1024 * 1024};
190+
const std::vector<UInt64> expected_response_num{block_num, 2};
191+
192+
for (size_t i = 0; i < max_buffered_bytes_vec.size(); ++i)
193+
{
194+
// 1. Build Blocks.
195+
std::vector<Block> blocks;
196+
for (size_t i = 0; i < block_num; ++i)
197+
{
198+
blocks.emplace_back(prepareBlock(block_rows));
199+
blocks.emplace_back(prepareBlock(0));
200+
}
201+
Block header = blocks.back();
202+
203+
// 2. Build MockStreamWriter.
204+
std::vector<tipb::SelectResponse> write_report;
205+
auto checker = [&write_report](tipb::SelectResponse & response) {
206+
write_report.emplace_back(std::move(response));
207+
};
208+
auto mock_writer = std::make_shared<MockStreamWriter>(checker);
209+
210+
// 3. Write blocks
211+
auto dag_writer = std::make_shared<StreamingDAGResponseWriter<std::shared_ptr<MockStreamWriter>>>(
212+
mock_writer,
213+
batch_send_min_limit,
214+
batch_send_min_limit,
215+
max_buffered_bytes_vec[i],
216+
*dag_context_ptr);
217+
for (const auto & block : blocks)
218+
dag_writer->write(block);
219+
dag_writer->flush();
220+
221+
ASSERT_EQ(write_report.size(), expected_response_num[i]);
222+
}
223+
}
224+
}
225+
CATCH
226+
172227
} // namespace tests
173228
} // namespace DB

dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ class TestTiRemoteBlockInputStream : public testing::Test
345345
auto dag_writer = std::make_shared<BroadcastOrPassThroughWriter<MockWriterPtr>>(
346346
writer,
347347
batch_send_min_limit,
348+
1024 * 1024 * 16,
348349
*dag_context_ptr,
349350
MPPDataPacketVersion::MPPDataPacketV1,
350351
tipb::CompressionMode::FAST,
@@ -374,6 +375,7 @@ class TestTiRemoteBlockInputStream : public testing::Test
374375
writer,
375376
0,
376377
batch_send_min_limit,
378+
1024 * 1024 * 16,
377379
*dag_context_ptr);
378380

379381
// 2. encode all blocks

dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,36 +25,35 @@ namespace DB
2525
template <class ExchangeWriterPtr>
2626
BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
2727
ExchangeWriterPtr writer_,
28-
Int64 batch_send_min_limit_,
28+
Int64 max_buffered_rows_,
29+
UInt64 max_buffered_bytes_,
2930
DAGContext & dag_context_,
3031
MPPDataPacketVersion data_codec_version_,
3132
tipb::CompressionMode compression_mode_,
3233
tipb::ExchangeType exchange_type_)
3334
: DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_)
34-
, batch_send_min_limit(batch_send_min_limit_)
3535
, writer(writer_)
3636
, exchange_type(exchange_type_)
3737
, data_codec_version(data_codec_version_)
3838
, compression_method(ToInternalCompressionMethod(compression_mode_))
3939
{
40-
rows_in_blocks = 0;
4140
RUNTIME_CHECK(dag_context.encode_type == tipb::EncodeType::TypeCHBlock);
4241
RUNTIME_CHECK(exchange_type == tipb::ExchangeType::Broadcast || exchange_type == tipb::ExchangeType::PassThrough);
4342

4443
switch (data_codec_version)
4544
{
4645
case MPPDataPacketV0:
47-
if (batch_send_min_limit <= 0)
48-
batch_send_min_limit = 1;
46+
if (max_buffered_rows_ <= 0)
47+
max_buffered_rows_ = 1;
4948
break;
5049
case MPPDataPacketV1:
5150
default:
5251
{
5352
// make `batch_send_min_limit` always GT 0
54-
if (batch_send_min_limit <= 0)
53+
if (max_buffered_rows_ <= 0)
5554
{
5655
// set upper limit if not specified
57-
batch_send_min_limit = 8 * 1024 /* 8K */;
56+
max_buffered_rows_ = 8 * 1024 /* 8K */;
5857
}
5958
for (const auto & field_type : dag_context.result_field_types)
6059
{
@@ -63,13 +62,15 @@ BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
6362
break;
6463
}
6564
}
65+
max_buffered_rows = static_cast<UInt64>(max_buffered_rows_);
66+
max_buffered_bytes = max_buffered_bytes_;
6667
}
6768

6869
template <class ExchangeWriterPtr>
6970
WriteResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
7071
{
7172
has_pending_flush = false;
72-
if (rows_in_blocks > 0)
73+
if (buffered_rows > 0)
7374
{
7475
auto wait_res = waitForWritable();
7576
if (wait_res == WaitResult::Ready)
@@ -102,11 +103,12 @@ WriteResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block &
102103
size_t rows = block.rows();
103104
if (rows > 0)
104105
{
105-
rows_in_blocks += rows;
106+
buffered_rows += rows;
107+
buffered_bytes += block.allocatedBytes();
106108
blocks.push_back(block);
107109
}
108110

109-
if (static_cast<Int64>(rows_in_blocks) >= batch_send_min_limit)
111+
if (needFlush())
110112
{
111113
return flush();
112114
}
@@ -130,7 +132,8 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::writeBlocks()
130132
else
131133
writer->passThroughWrite(blocks, data_codec_version, compression_method);
132134
blocks.clear();
133-
rows_in_blocks = 0;
135+
buffered_rows = 0;
136+
buffered_bytes = 0;
134137
}
135138

136139
template class BroadcastOrPassThroughWriter<SyncMPPTunnelSetWriterPtr>;

dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
3131
public:
3232
BroadcastOrPassThroughWriter(
3333
ExchangeWriterPtr writer_,
34-
Int64 batch_send_min_limit_,
34+
Int64 max_buffered_rows_,
35+
UInt64 max_buffered_bytes_,
3536
DAGContext & dag_context_,
3637
MPPDataPacketVersion data_codec_version_,
3738
tipb::CompressionMode compression_mode_,
@@ -44,10 +45,8 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
4445
void writeBlocks();
4546

4647
private:
47-
Int64 batch_send_min_limit;
4848
ExchangeWriterPtr writer;
4949
std::vector<Block> blocks;
50-
size_t rows_in_blocks;
5150
const tipb::ExchangeType exchange_type;
5251

5352
// support data compression

0 commit comments

Comments
 (0)