Skip to content

Commit 28e47db

Browse files
io: Fix checksum seek at end (#10341)
close #10340 1. In `FramedChecksumReadBuffer::doSeek`, add guard check for `working_buffer` size and `target_offset`. If the working_buffer is cleared, then re-seek the working buffer according to the offset and load data from the underlying file again. 2. In order to support the behavior above, `S3RandomAccessFile::seekImpl` support seek backward, which is implemented by reopening the file again 3. Add retry when reading from S3 meet errno=115, EINPROGRESS Signed-off-by: JaySon-Huang <tshent@qq.com> Co-authored-by: JaySon <tshent@qq.com> Co-authored-by: JaySon-Huang <tshent@qq.com>
1 parent 76a4ec4 commit 28e47db

6 files changed

Lines changed: 400 additions & 81 deletions

File tree

dbms/src/Common/ProfileEvents.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
M(S3PutObjectRetry) \
139139
M(S3IORead) \
140140
M(S3IOSeek) \
141+
M(S3IOSeekBackward) \
141142
M(FileCacheHit) \
142143
M(FileCacheMiss) \
143144
M(FileCacheEvict) \

dbms/src/IO/Checksum/ChecksumBuffer.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,80 @@ namespace DB
1818
{
1919
using namespace DB::Digest;
2020

21+
template <typename Backend>
22+
off_t FramedChecksumReadBuffer<Backend>::doSeek(off_t offset, int whence)
23+
{
24+
auto & frame = reinterpret_cast<ChecksumFrame<Backend> &>(
25+
*(this->working_buffer.begin() - sizeof(ChecksumFrame<Backend>))); // align should not fail
26+
27+
if (whence == SEEK_CUR)
28+
{
29+
offset = getPositionInFile() + offset;
30+
}
31+
else if (whence != SEEK_SET)
32+
{
33+
throw TiFlashException(
34+
"FramedChecksumReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence",
35+
Errors::Checksum::Internal);
36+
}
37+
auto target_frame = offset / frame_size;
38+
auto target_offset = offset % frame_size;
39+
40+
// If we have already seek to EOF, then working_buffer was cleared
41+
if (target_frame == current_frame && working_buffer.size() > 0)
42+
{
43+
if (unlikely(target_offset > working_buffer.size()))
44+
pos = working_buffer.end();
45+
else
46+
pos = working_buffer.begin() + target_offset;
47+
return offset;
48+
}
49+
50+
// Seek according to `target_frame` and `target_offset`
51+
// read the header and the body
52+
auto header_offset = target_frame * (sizeof(ChecksumFrame<Backend>) + frame_size);
53+
auto result = in->seek(static_cast<off_t>(header_offset), SEEK_SET);
54+
if (result == -1)
55+
{
56+
throw TiFlashException(
57+
Errors::Checksum::IOFailure,
58+
"checksum framed file {} is not seekable",
59+
in->getFileName());
60+
}
61+
auto length = expectRead(
62+
working_buffer.begin() - sizeof(ChecksumFrame<Backend>),
63+
sizeof(ChecksumFrame<Backend>) + frame_size);
64+
if (length == 0)
65+
{
66+
current_frame = target_frame;
67+
pos = working_buffer.begin();
68+
working_buffer.resize(0);
69+
return offset; // EOF
70+
}
71+
if (unlikely(length != sizeof(ChecksumFrame<Backend>) + frame.bytes))
72+
{
73+
throw TiFlashException(
74+
Errors::Checksum::DataCorruption,
75+
"frame length (header = {}, body = {}, read = {}) mismatch for {}",
76+
sizeof(ChecksumFrame<Backend>),
77+
frame.bytes,
78+
length,
79+
in->getFileName());
80+
}
81+
82+
// body checksum examination
83+
checkBody();
84+
85+
// update statistics
86+
current_frame = target_frame;
87+
if (unlikely(target_offset > working_buffer.size()))
88+
pos = working_buffer.end();
89+
else
90+
pos = working_buffer.begin() + target_offset;
91+
92+
return offset;
93+
}
94+
2195
template class FramedChecksumReadBuffer<None>;
2296
template class FramedChecksumReadBuffer<CRC32>;
2397
template class FramedChecksumReadBuffer<CRC64>;

dbms/src/IO/Checksum/ChecksumBuffer.h

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -423,72 +423,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor
423423
return true;
424424
}
425425

426-
off_t doSeek(off_t offset, int whence) override
427-
{
428-
auto & frame = reinterpret_cast<ChecksumFrame<Backend> &>(
429-
*(this->working_buffer.begin() - sizeof(ChecksumFrame<Backend>))); // align should not fail
430-
431-
if (whence == SEEK_CUR)
432-
{
433-
offset = getPositionInFile() + offset;
434-
}
435-
else if (whence != SEEK_SET)
436-
{
437-
throw TiFlashException(
438-
"FramedChecksumReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence",
439-
Errors::Checksum::Internal);
440-
}
441-
auto target_frame = offset / frame_size;
442-
auto target_offset = offset % frame_size;
443-
444-
if (target_frame == current_frame)
445-
{
446-
pos = working_buffer.begin() + target_offset;
447-
return offset;
448-
}
449-
else
450-
{
451-
// read the header and the body
452-
auto header_offset = target_frame * (sizeof(ChecksumFrame<Backend>) + frame_size);
453-
auto result = in->seek(static_cast<off_t>(header_offset), SEEK_SET);
454-
if (result == -1)
455-
{
456-
throw TiFlashException(
457-
"checksum framed file " + in->getFileName() + " is not seekable",
458-
Errors::Checksum::IOFailure);
459-
}
460-
auto length = expectRead(
461-
working_buffer.begin() - sizeof(ChecksumFrame<Backend>),
462-
sizeof(ChecksumFrame<Backend>) + frame_size);
463-
if (length == 0)
464-
{
465-
current_frame = target_frame;
466-
pos = working_buffer.begin();
467-
working_buffer.resize(0);
468-
return offset; // EOF
469-
}
470-
if (unlikely(length != sizeof(ChecksumFrame<Backend>) + frame.bytes))
471-
{
472-
throw TiFlashException(
473-
fmt::format(
474-
"frame length (header = {}, body = {}, read = {}) mismatch for {}",
475-
sizeof(ChecksumFrame<Backend>),
476-
frame.bytes,
477-
length,
478-
in->getFileName()),
479-
Errors::Checksum::DataCorruption);
480-
}
481-
482-
// body checksum examination
483-
checkBody();
484-
485-
// update statistics
486-
current_frame = target_frame;
487-
pos = working_buffer.begin() + target_offset;
488-
}
489-
490-
return offset;
491-
}
426+
off_t doSeek(off_t offset, int whence) override;
492427
};
493428

494429
} // namespace DB

dbms/src/IO/Checksum/tests/gtest_dm_checksum_buffer.cpp

Lines changed: 131 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#pragma GCC diagnostic push
16-
#pragma GCC diagnostic ignored "-Wsign-compare"
17-
#include <gtest/gtest.h>
18-
#pragma GCC diagnostic pop
15+
#include <Common/Logger.h>
1916
#include <IO/BaseFile/PosixRandomAccessFile.h>
2017
#include <IO/BaseFile/PosixWritableFile.h>
2118
#include <IO/BaseFile/RateLimiter.h>
@@ -28,14 +25,15 @@
2825
#include <Poco/File.h>
2926
#include <Storages/DeltaMerge/DMChecksumConfig.h>
3027
#include <Storages/Page/PageUtil.h>
28+
#include <TestUtils/TiFlashTestBasic.h>
3129
#include <fmt/format.h>
3230

31+
#include <ext/scope_guard.h>
3332
#include <random>
3433

35-
namespace DB
36-
{
37-
namespace tests
34+
namespace DB::tests
3835
{
36+
3937
namespace
4038
{
4139
std::random_device dev; // NOLINT(cert-err58-cpp)
@@ -372,5 +370,129 @@ TEST_STACKED_SEEKING(CRC32)
372370
TEST_STACKED_SEEKING(CRC64)
373371
TEST_STACKED_SEEKING(City128)
374372
TEST_STACKED_SEEKING(XXH3)
375-
} // namespace tests
376-
} // namespace DB
373+
374+
template <ChecksumAlgo D>
375+
void runCompressedSeekableReaderBufferTest()
376+
try
377+
{
378+
auto log = Logger::get();
379+
// Create a temporary file for testing
380+
const std::string temp_file_path = "/tmp/tiflash_compressed_seek_test.dat";
381+
SCOPE_EXIT({
382+
Poco::File file(temp_file_path);
383+
if (file.exists())
384+
file.remove();
385+
});
386+
// Test data - create multiple blocks with different patterns
387+
std::vector<std::string> test_blocks;
388+
389+
test_blocks = {
390+
std::string(1500, 'A') + "BLOCK0_END",
391+
std::string(800, 'B') + "BLOCK1_END",
392+
"", // Block 2 is empty
393+
"", // Block 3 is empty
394+
};
395+
396+
std::vector<size_t> block_compressed_offsets;
397+
std::vector<size_t> block_decompressed_sizes;
398+
399+
auto [limiter, provider] = prepareIO();
400+
auto config = DM::DMChecksumConfig{{}, TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE, D};
401+
402+
// Write compressed data to file
403+
{
404+
auto plain_file = ChecksumWriteBufferBuilder::build(
405+
true,
406+
provider,
407+
temp_file_path,
408+
EncryptionPath(temp_file_path, temp_file_path),
409+
false,
410+
limiter->getWriteLimiter(),
411+
config.getChecksumAlgorithm(),
412+
config.getChecksumFrameLength(),
413+
/*flags*/
414+
-1,
415+
/*mode*/ 0666,
416+
1048576);
417+
auto compressed_buf
418+
= CompressedWriteBuffer<>::build(*plain_file, CompressionSettings(CompressionMethod::LZ4), false);
419+
420+
for (const auto & block_data : test_blocks)
421+
{
422+
// Record the compressed file offset before writing this block
423+
block_compressed_offsets.push_back(plain_file->count());
424+
block_decompressed_sizes.push_back(block_data.size());
425+
426+
// Write the block data
427+
compressed_buf->write(block_data.data(), block_data.size());
428+
compressed_buf->next(); // Force compression of this block
429+
}
430+
}
431+
432+
LOG_INFO(log, "Created compressed file with {} blocks", test_blocks.size());
433+
for (size_t i = 0; i < block_compressed_offsets.size(); ++i)
434+
{
435+
LOG_INFO(
436+
log,
437+
"Block {}: compressed_offset={}, decompressed_size={}",
438+
i,
439+
block_compressed_offsets[i],
440+
block_decompressed_sizes[i]);
441+
}
442+
443+
444+
auto compressed_in = CompressedReadBufferFromFileBuilder::build(
445+
provider,
446+
temp_file_path,
447+
EncryptionPath(temp_file_path, temp_file_path),
448+
config.getChecksumFrameLength(),
449+
limiter->getReadLimiter(),
450+
config.getChecksumAlgorithm(),
451+
config.getChecksumFrameLength());
452+
453+
// 1. Check seek + read
454+
for (size_t i = 0; i < test_blocks.size(); ++i)
455+
{
456+
// Seek to the start of each block
457+
LOG_INFO(log, "Seeking to block {} at offset {}", i, block_compressed_offsets[i]);
458+
compressed_in->seek(block_compressed_offsets[i], 0);
459+
460+
// Read the data
461+
std::string read_data;
462+
read_data.resize(block_decompressed_sizes[i]);
463+
compressed_in->readBig(read_data.data(), block_decompressed_sizes[i]);
464+
465+
// Verify the data matches
466+
ASSERT_EQ(read_data, test_blocks[i]) << "Block " << i << " data mismatch";
467+
}
468+
469+
// Seek in inverse order to test seek again
470+
for (size_t i = 0; i < test_blocks.size(); ++i)
471+
{
472+
assert(i + 1 <= test_blocks.size());
473+
const size_t target_block = test_blocks.size() - i - 1;
474+
compressed_in->seek(block_compressed_offsets[target_block], 0);
475+
std::string read_data;
476+
read_data.resize(block_decompressed_sizes[target_block]);
477+
size_t num_read = compressed_in->readBig(read_data.data(), test_blocks[target_block].size());
478+
ASSERT_EQ(num_read, test_blocks[target_block].size());
479+
read_data.resize(num_read);
480+
ASSERT_EQ(read_data, test_blocks[target_block])
481+
<< "Block " << target_block << " data mismatch after seek again";
482+
}
483+
}
484+
CATCH
485+
486+
#define TEST_COMPRESSEDSEEKABLE(ALGO) \
487+
TEST(DMChecksumBuffer##ALGO, CompressedSeekable) \
488+
{ \
489+
runCompressedSeekableReaderBufferTest<ChecksumAlgo::ALGO>(); \
490+
} // NOLINT(cert-err58-cpp)
491+
492+
TEST_COMPRESSEDSEEKABLE(None)
493+
TEST_COMPRESSEDSEEKABLE(CRC32)
494+
TEST_COMPRESSEDSEEKABLE(CRC64)
495+
TEST_COMPRESSEDSEEKABLE(City128)
496+
TEST_COMPRESSEDSEEKABLE(XXH3)
497+
498+
} // namespace DB::tests

0 commit comments

Comments
 (0)