Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ static inline bool atomicReadWrite(
std::tie(decoding_schema_snapshot, block_ptr)
= storage->getSchemaSnapshotAndBlockForDecoding(lock, true, should_handle_version_col);
block_decoding_schema_epoch = decoding_schema_snapshot->decoding_schema_epoch;
const auto & table_info = storage->getTableInfo();
LOG_INFO(
Logger::get("dddddddddd"),
"decode with schema snapshot, keyspace={} table_id={} region_id={} epoch={} update_ts={} "
"with_version_column={} columns={}",
rw_ctx.keyspace_id,
rw_ctx.table_id,
region->id(),
block_decoding_schema_epoch,
table_info.update_timestamp,
should_handle_version_col,
table_info.columns.size());

auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(*block_ptr, data_list_read, force_decode))
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/KVStore/Decode/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ bool RegionBlockReader::read(Block & block, const ReadList & data_list, bool for
{
try
{
for (const auto & data : data_list)
{
LOG_INFO(Logger::get("dddddddddd"), "RegionBlockReader::read data.value={}", data.value->toDebugString());
}

switch (schema_snapshot->pk_type)
{
case TMTPKType::INT64:
Expand Down Expand Up @@ -177,8 +182,14 @@ struct VersionColResolver<RegionUncommittedDataList>
template <TMTPKType pk_type, typename ReadList>
bool RegionBlockReader::readImpl(Block & block, const ReadList & data_list, bool force_decode)
{
LOG_INFO(
Logger::get("dddddddddd"),
"RegionBlockReader::readImpl start, schema_snapshot.column_defines={}",
*schema_snapshot->column_defines);

VersionColResolver<ReadList> version_col_resolver;
version_col_resolver.check(block, schema_snapshot->column_defines->size());
// The column_ids to read according to schema_snapshot, each elem is (column_id, block_pos)
const auto & read_column_ids = schema_snapshot->getColId2BlockPosMap();
const auto & pk_column_ids = schema_snapshot->pk_column_ids;
const auto & pk_pos_map = schema_snapshot->pk_pos_map;
Expand Down Expand Up @@ -269,6 +280,8 @@ bool RegionBlockReader::readImpl(Block & block, const ReadList & data_list, bool
else
{
// Parse column value from encoded value
// Decode the column_ids from `column_ids_iter` to `read_column_ids.end()`
// and insert into `block` at position starting from `next_column_pos`
if (!appendRowToBlock(
*value_ptr,
column_ids_iter,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
{
auto tikv_key = TiKVKey(cmds.keys[i].data, cmds.keys[i].len);
auto tikv_value = TiKVValue(cmds.vals[i].data, cmds.vals[i].len);
LOG_INFO(Logger::get("dddddddddd"), "Region::handleWriteRaftCmd Put key={}, value={}", tikv_key.toDebugString(), tikv_value.toDebugString());
if (cf == ColumnFamilyType::Write)
{
write_put_key_count++;
Expand Down
141 changes: 141 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_region_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,5 +692,146 @@ try
}
CATCH

TEST_F(RegionBlockReaderTest, ReadFromRegionDefaultValue)
try
{
// With this table_info, column "c1" is "NOT NULL" and has no origin default
TableInfo table_info_c1_not_null_no_origin_default(
R"({"cols":[{"id":1,"name":{"L":"c0","O":"c0"},"offset":0,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":4,"Tp":1}},{"id":2,"name":{"L":"handle","O":"handle"},"offset":1,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":515,"Flen":11,"Tp":3}},{"default":"-56083770","id":7,"name":{"L":"c1","O":"c1"},"offset":2,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":1,"Flen":20,"Tp":8}},{"id":4,"name":{"L":"c2","O":"c2"},"offset":3,"origin_default":"0.07954397","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":-1,"Flag":4097,"Flen":12,"Tp":4}},{"id":5,"name":{"L":"c5","O":"c5"},"offset":4,"origin_default":"0","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}},{"default":"247262911","id":6,"name":{"L":"c4","O":"c4"},"offset":5,"origin_default":"247262911","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}}],"id":636,"index_info":[],"is_common_handle":false,"keyspace_id":4294967295,"name":{"L":"t0","O":"t0"},"pk_is_handle":true,"state":5,"tiflash_replica":{"Available":true,"Count":1},"update_timestamp":463845180343844895})",
NullspaceID);

// With this table_info, column "c1" is "NOT NULL" and has no default value
TableInfo table_info_c1_not_null_no_default_value(
R"({"cols":[{"id":1,"name":{"L":"c0","O":"c0"},"offset":0,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":4,"Tp":1}},{"id":2,"name":{"L":"handle","O":"handle"},"offset":1,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":515,"Flen":11,"Tp":3}},{"default":"-56083770","id":7,"name":{"L":"c1","O":"c1"},"offset":2,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":4097,"Flen":20,"Tp":8}},{"id":4,"name":{"L":"c2","O":"c2"},"offset":3,"origin_default":"0.07954397","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":-1,"Flag":4097,"Flen":12,"Tp":4}},{"id":5,"name":{"L":"c5","O":"c5"},"offset":4,"origin_default":"0","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}},{"default":"247262911","id":6,"name":{"L":"c4","O":"c4"},"offset":5,"origin_default":"247262911","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}}],"id":636,"index_info":[],"is_common_handle":false,"keyspace_id":4294967295,"name":{"L":"t0","O":"t0"},"pk_is_handle":true,"state":5,"tiflash_replica":{"Available":true,"Count":1},"update_timestamp":463845180343844895})",
NullspaceID);

// With this table_info, column "c1" has the "NOT NULL" flag and has origin default "-56083770"
TableInfo table_info_c1_not_null_with_origin_default(
R"({"cols":[{"id":1,"name":{"L":"c0","O":"c0"},"offset":0,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":4,"Tp":1}},{"id":2,"name":{"L":"handle","O":"handle"},"offset":1,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":515,"Flen":11,"Tp":3}},{"origin_default":"-56083770","id":7,"name":{"L":"c1","O":"c1"},"offset":2,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":1,"Flen":20,"Tp":8}},{"id":4,"name":{"L":"c2","O":"c2"},"offset":3,"origin_default":"0.07954397","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":-1,"Flag":4097,"Flen":12,"Tp":4}},{"id":5,"name":{"L":"c5","O":"c5"},"offset":4,"origin_default":"0","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}},{"default":"247262911","id":6,"name":{"L":"c4","O":"c4"},"offset":5,"origin_default":"247262911","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}}],"id":636,"index_info":[],"is_common_handle":false,"keyspace_id":4294967295,"name":{"L":"t0","O":"t0"},"pk_is_handle":true,"state":5,"tiflash_replica":{"Available":true,"Count":1},"update_timestamp":463845180343844895})",
NullspaceID);

// With this table_info, column "c1" does not have the "NOT NULL" flag
TableInfo table_info_c1_nullable(
R"({"cols":[{"id":1,"name":{"L":"c0","O":"c0"},"offset":0,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":4,"Tp":1}},{"id":2,"name":{"L":"handle","O":"handle"},"offset":1,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":515,"Flen":11,"Tp":3}},{"id":7,"name":{"L":"c1","O":"c1"},"offset":2,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":20,"Tp":8}},{"id":4,"name":{"L":"c2","O":"c2"},"offset":3,"origin_default":"0.07954397","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":-1,"Flag":4097,"Flen":12,"Tp":4}},{"id":5,"name":{"L":"c5","O":"c5"},"offset":4,"origin_default":"0","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}},{"default":"247262911","id":6,"name":{"L":"c4","O":"c4"},"offset":5,"origin_default":"247262911","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}}],"id":636,"index_info":[],"is_common_handle":false,"keyspace_id":4294967295,"name":{"L":"t0","O":"t0"},"pk_is_handle":true,"state":5,"tiflash_replica":{"Available":true,"Count":1},"update_timestamp":463844343842340870})",
NullspaceID);

RegionID region_id = 4;
// the start_key and end_key for table_id = 68
String region_start_key(bytesFromHexString("7480000000000002FF7C5F720000000000FA"));
String region_end_key(bytesFromHexString("7480000000000002FF7D00000000000000F8"));
auto region = RegionBench::makeRegionForRange(region_id, region_start_key, region_end_key);
// the hex kv dump from RaftLog
std::vector<std::tuple<std::string_view, std::string_view>> kvs = {
{
"7480000000000002FFA95F728000000000FF0000010000000000FAF9901806DEF7FFDA",
"50A380A08892FFF9B706762C8000040000000405060708000F0016001A00BFDC4011A00000000A0080000000000A008000003CA339"
"1ABC85",
},
{
"7480000000000002FFA95F728000000000FF0000010000000000FAF9901806DEF7FFD8",
"50A680A08892FFF9B706762C8000040000000405060708000F0016001A00BFDC4011A00000000A008033E04D600A008000003CA339"
"1ABC85",
},
{
"7480000000000002FFA95F728000000000FF0000020000000000FAF9901806DE33FFE8",
"509680B08E92FFF9B706762580000300000004050608000F001600BF720CDD400000000A0080000000010A00800000393C",
},
};
for (const auto & [k, v] : kvs)
{
region->insertDebug("write", TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v)));
}

auto data_list_read = ReadRegionCommitCache(region, true);
ASSERT_TRUE(data_list_read.has_value());

// Test with `table_info_c1_not_null_no_origin_default`
auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info_c1_not_null_no_origin_default);
{
// force_decode=false can not decode because there are
// missing value for column with not null flag.
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_FALSE(reader.read(res_block, *data_list_read, false));
}
{
// force_decode=true can decode the block, and filling the default value for c1
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_TRUE(reader.read(res_block, *data_list_read, true));
LOG_INFO(
Logger::get(),
"Decoded block:\n{}",
DB::tests::getColumnsContent(res_block.getColumnsWithTypeAndName()));
ASSERT_EQ(res_block.getByName("c1").type->getName(), "Int64");
// verify the default value is filled correctly
ASSERT_COLUMN_EQ( //
res_block.getByName("c1"),
createColumn<Int64>({-2051270087, -2051270087, 0}));
}

// Test with `table_info_c1_not_null_no_default_value`
decoding_schema = getDecodingStorageSchemaSnapshot(table_info_c1_not_null_no_default_value);
{
// force_decode=false can not decode because there are
// missing value for column with not null flag.
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_FALSE(reader.read(res_block, *data_list_read, false));
}
{
// force_decode=true can decode the block, and filling the default value for c1
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_TRUE(reader.read(res_block, *data_list_read, true));
LOG_INFO(
Logger::get(),
"Decoded block:\n{}",
DB::tests::getColumnsContent(res_block.getColumnsWithTypeAndName()));
ASSERT_EQ(res_block.getByName("c1").type->getName(), "Int64");
// verify the default value is filled correctly
ASSERT_COLUMN_EQ( //
res_block.getByName("c1"),
createColumn<Int64>({-2051270087, -2051270087, 0}));
}

// Test with `table_info_c1_not_null_with_origin_default`
decoding_schema = getDecodingStorageSchemaSnapshot(table_info_c1_not_null_with_origin_default);
{
// force_decode=false can decode because origin_default exists and NoDefaultValue flag is not set
// so RegionBlockReader can use origin_default to fill the missing value`
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_TRUE(reader.read(res_block, *data_list_read, false));
LOG_INFO(
Logger::get(),
"Decoded block:\n{}",
DB::tests::getColumnsContent(res_block.getColumnsWithTypeAndName()));
ASSERT_EQ(res_block.getByName("c1").type->getName(), "Int64");
// verify the default value is filled correctly
ASSERT_COLUMN_EQ( //
res_block.getByName("c1"),
createColumn<Int64>({-2051270087, -2051270087, -56083770}));
}

// Test with `table_info_c1_nullable`
decoding_schema = getDecodingStorageSchemaSnapshot(table_info_c1_nullable);
{
// force_decode=false should be able to decode because c1 is nullable
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_TRUE(reader.read(res_block, *data_list_read, false));
LOG_INFO(
Logger::get(),
"Decoded block:\n{}",
DB::tests::getColumnsContent(res_block.getColumnsWithTypeAndName()));
ASSERT_EQ(res_block.getByName("c1").type->getName(), "Nullable(Int64)");
// verify the default value is filled with NULL correctly at the last row
ASSERT_COLUMN_EQ( //
res_block.getByName("c1"),
createNullableColumn<Int64>({-2051270087, -2051270087, 0}, {0, 0, 1}));
}
}
CATCH

} // namespace DB::tests
8 changes: 8 additions & 0 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,14 @@ std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerg
store->getHandle(),
decoding_schema_epoch++,
with_version_column);
LOG_INFO(
Logger::get("dddddddddd"),
"Refresh decoding schema snapshot, table_id={} epoch={} update_ts={} with_version_column={} table_info={}",
tidb_table_info.id,
decoding_schema_snapshot->decoding_schema_epoch,
tidb_table_info.update_timestamp,
with_version_column,
tidb_table_info.serialize());
cache_blocks.clear();
decoding_schema_changed = false;
}
Expand Down
Loading