Skip to content

Commit 60df8c8

Browse files
committed
fix ut
Signed-off-by: JaySon-Huang <tshent@qq.com>
1 parent 3df6b91 commit 60df8c8

8 files changed

Lines changed: 74 additions & 5 deletions

File tree

dbms/src/Operators/tests/gtest_concat_source.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ try
387387
}
388388

389389
std::atomic<size_t> received_blocks = 0;
390-
received_blocks.fetch_add(1, std::memory_order_relaxed);
390+
ResultHandler h([&](const Block & /*block*/) { received_blocks.fetch_add(1, std::memory_order_relaxed); });
391391

392392
PipelineExecGroupBuilder result_builder;
393393
builder_pool.generate(result_builder, exec_context, "test");

dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,18 +200,21 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils
200200

201201
void read(const RowKeyRange & range, const PushDownExecutorPtr & executor, const ColumnWithTypeAndName & out)
202202
{
203+
auto read_queue = std::make_shared<ActiveSegmentReadTaskQueue>(2, Logger::get("read"));
203204
auto in = store->read(
204205
*db_context,
205206
db_context->getSettingsRef(),
206207
{cdVec()},
207208
{range},
209+
read_queue,
208210
/* num_streams= */ 1,
209211
/* start_ts= */ std::numeric_limits<UInt64>::max(),
210212
executor,
211213
std::vector<RuntimeFilterPtr>{},
212214
0,
213215
TRACING_NAME,
214216
DMReadOptions{})[0];
217+
read_queue->finishQueueIfEmpty();
215218
ASSERT_INPUTSTREAM_COLS_UR(
216219
in,
217220
Strings({vec_column_name}),

dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,18 +153,21 @@ bool checkMatch(
153153
store->mergeDeltaAll(context);
154154

155155
const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd;
156+
auto read_queue = std::make_shared<ActiveSegmentReadTaskQueue>(2, Logger::get("read"));
156157
auto streams = store->read(
157158
context,
158159
context.getSettingsRef(),
159160
{col_to_read},
160161
{all_range},
162+
read_queue,
161163
1,
162164
std::numeric_limits<UInt64>::max(),
163165
std::make_shared<PushDownExecutor>(filter),
164166
std::vector<RuntimeFilterPtr>{},
165167
0,
166168
name,
167169
DMReadOptions{});
170+
read_queue->finishQueueIfEmpty();
168171
auto rows = getInputStreamNRows(streams[0]);
169172
store->drop();
170173

dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,15 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic
126126
// Submit to pending_pools
127127
scheduler.add(pool);
128128
{
129-
std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings
129+
std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnings
130130
ASSERT_EQ(scheduler.pending_pools.size(), 1);
131131
}
132132
ASSERT_EQ(scheduler.read_pools.size(), 0);
133133

134134
// Reap the pending_pools
135135
scheduler.reapPendingPools();
136136
{
137-
std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings
137+
std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnings
138138
ASSERT_EQ(scheduler.pending_pools.size(), 0);
139139
}
140140
ASSERT_EQ(scheduler.read_pools.size(), 1);
@@ -199,9 +199,12 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic
199199
pool->finishSegment(merged_task->units.front().task);
200200
}
201201

202-
ASSERT_EQ(pool->q.size(), 0);
203202
Block blk;
204-
ASSERT_FALSE(pool->q.pop(blk));
203+
// popBlock can only return empty block
204+
pool->shared_q->popBlock(blk);
205+
ASSERT_FALSE(blk);
206+
pool->shared_q->tryPopBlock(blk);
207+
ASSERT_FALSE(blk);
205208

206209
pool->decreaseUnorderedInputStreamRefCount();
207210
ASSERT_FALSE(pool->valid());

dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,13 @@ class DeltaMergeStoreTestFastAddPeer
254254
void verifyRows(const RowKeyRange & range, size_t rows)
255255
{
256256
const auto & columns = store->getTableColumns();
257+
auto read_queue = std::make_shared<ActiveSegmentReadTaskQueue>(2, Logger::get("verifyRows"));
257258
BlockInputStreamPtr in = store->read(
258259
*db_context,
259260
db_context->getSettingsRef(),
260261
columns,
261262
{range},
263+
read_queue,
262264
/* num_streams= */ 1,
263265
/* start_ts= */ std::numeric_limits<UInt64>::max(),
264266
EMPTY_FILTER,
@@ -267,6 +269,7 @@ class DeltaMergeStoreTestFastAddPeer
267269
TRACING_NAME,
268270
DMReadOptions{},
269271
/* expected_block_size= */ 1024)[0];
272+
read_queue->finishQueueIfEmpty();
270273
ASSERT_INPUTSTREAM_NROWS(in, rows);
271274
}
272275

0 commit comments

Comments
 (0)