Skip to content

Commit 3e1b2f5

Browse files
authored
Merge pull request #17311 from ClickHouse/fix_integration_tests
Fix some flaky tests
2 parents 08b3707 + 7e01496 commit 3e1b2f5

File tree

10 files changed

+78
-66
lines changed

10 files changed

+78
-66
lines changed

src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace DB
1616
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
1717
StorageRabbitMQ & storage_,
1818
const StorageMetadataPtr & metadata_snapshot_,
19-
const Context & context_,
19+
std::shared_ptr<Context> context_,
2020
const Names & columns,
2121
size_t max_block_size_,
2222
bool ack_in_suffix_)
@@ -54,7 +54,7 @@ Block RabbitMQBlockInputStream::getHeader() const
5454

5555
void RabbitMQBlockInputStream::readPrefixImpl()
5656
{
57-
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
57+
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
5858
buffer = storage.popReadBuffer(timeout);
5959
}
6060

@@ -96,7 +96,7 @@ Block RabbitMQBlockInputStream::readImpl()
9696
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
9797

9898
auto input_format = FormatFactory::instance().getInputFormat(
99-
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
99+
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size);
100100

101101
InputPort port(input_format->getPort().getHeader(), input_format.get());
102102
connect(input_format->getPort(), port);

src/Storages/RabbitMQ/RabbitMQBlockInputStream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class RabbitMQBlockInputStream : public IBlockInputStream
1515
RabbitMQBlockInputStream(
1616
StorageRabbitMQ & storage_,
1717
const StorageMetadataPtr & metadata_snapshot_,
18-
const Context & context_,
18+
std::shared_ptr<Context> context_,
1919
const Names & columns,
2020
size_t max_block_size_,
2121
bool ack_in_suffix = true);
@@ -37,7 +37,7 @@ class RabbitMQBlockInputStream : public IBlockInputStream
3737
private:
3838
StorageRabbitMQ & storage;
3939
StorageMetadataPtr metadata_snapshot;
40-
const Context & context;
40+
std::shared_ptr<Context> context;
4141
Names column_names;
4242
const size_t max_block_size;
4343
bool ack_in_suffix;

src/Storages/RabbitMQ/StorageRabbitMQ.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ StorageRabbitMQ::StorageRabbitMQ(
7474
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
7575
: IStorage(table_id_)
7676
, global_context(context_.getGlobalContext())
77-
, rabbitmq_context(Context(global_context))
7877
, rabbitmq_settings(std::move(rabbitmq_settings_))
7978
, exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
8079
, format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
@@ -114,8 +113,8 @@ StorageRabbitMQ::StorageRabbitMQ(
114113
storage_metadata.setColumns(columns_);
115114
setInMemoryMetadata(storage_metadata);
116115

117-
rabbitmq_context.makeQueryContext();
118-
rabbitmq_context = addSettings(rabbitmq_context);
116+
rabbitmq_context = addSettings(global_context);
117+
rabbitmq_context->makeQueryContext();
119118

120119
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
121120
event_handler->updateLoopState(Loop::STOP);
@@ -193,16 +192,17 @@ String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_i
193192
}
194193

195194

196-
Context StorageRabbitMQ::addSettings(Context context) const
195+
std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) const
197196
{
198-
context.setSetting("input_format_skip_unknown_fields", true);
199-
context.setSetting("input_format_allow_errors_ratio", 0.);
200-
context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
197+
auto modified_context = std::make_shared<Context>(context);
198+
modified_context->setSetting("input_format_skip_unknown_fields", true);
199+
modified_context->setSetting("input_format_allow_errors_ratio", 0.);
200+
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
201201

202202
if (!schema_name.empty())
203-
context.setSetting("format_schema", schema_name);
203+
modified_context->setSetting("format_schema", schema_name);
204204

205-
return context;
205+
return modified_context;
206206
}
207207

208208

@@ -538,6 +538,7 @@ Pipe StorageRabbitMQ::read(
538538
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
539539

540540
auto modified_context = addSettings(context);
541+
541542
auto block_size = getMaxBlockSize();
542543

543544
bool update_channels = false;
@@ -581,7 +582,9 @@ Pipe StorageRabbitMQ::read(
581582
looping_task->activateAndSchedule();
582583

583584
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
584-
return Pipe::unitePipes(std::move(pipes));
585+
auto united_pipe = Pipe::unitePipes(std::move(pipes));
586+
united_pipe.addInterpreterContext(modified_context);
587+
return united_pipe;
585588
}
586589

587590

@@ -785,7 +788,7 @@ bool StorageRabbitMQ::streamToViews()
785788
insert->table_id = table_id;
786789

787790
// Only insert into dependent views and expect that input blocks contain virtual columns
788-
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
791+
InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
789792
auto block_io = interpreter.execute();
790793

791794
auto metadata_snapshot = getInMemoryMetadataPtr();

src/Storages/RabbitMQ/StorageRabbitMQ.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class StorageRabbitMQ final: public ext::shared_ptr_helper<StorageRabbitMQ>, pub
7373

7474
private:
7575
const Context & global_context;
76-
Context rabbitmq_context;
76+
std::shared_ptr<Context> rabbitmq_context;
7777
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
7878

7979
const String exchange_name;
@@ -135,7 +135,7 @@ class StorageRabbitMQ final: public ext::shared_ptr_helper<StorageRabbitMQ>, pub
135135
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
136136
static String getTableBasedName(String name, const StorageID & table_id);
137137

138-
Context addSettings(Context context) const;
138+
std::shared_ptr<Context> addSettings(const Context & context) const;
139139
size_t getMaxBlockSize() const;
140140
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
141141

tests/integration/test_default_compression_codec/test.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,3 @@ def test_default_codec_version_update(start_cluster):
228228
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_1'") == "LZ4HC(5)\n"
229229
assert node3.query(
230230
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_1'") == "LZ4\n"
231-
assert get_compression_codec_byte(node1, "compression_table_multiple", "2_0_0_1") == CODECS_MAPPING['Multiple']
232-
assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "2_0_0_1") == CODECS_MAPPING['LZ4HC']
233-
assert get_compression_codec_byte(node1, "compression_table_multiple", "3_0_0_1") == CODECS_MAPPING['Multiple']
234-
assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "3_0_0_1") == CODECS_MAPPING['LZ4']
235-
236-
assert node1.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"
237-
assert node2.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"

tests/integration/test_odbc_interaction/test.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pymysql.cursors
55
import pytest
66
from helpers.cluster import ClickHouseCluster
7+
from helpers.test_tools import assert_eq_with_retry
78
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
89

910
cluster = ClickHouseCluster(__file__)
@@ -200,26 +201,43 @@ def test_sqlite_odbc_hashed_dictionary(started_cluster):
200201
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
201202
privileged=True, user='root')
202203

203-
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "3\n"
204-
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "1\n" # default
204+
node1.query("SYSTEM RELOAD DICTIONARY sqlite3_odbc_hashed")
205+
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
206+
print("First update time", first_update_time)
207+
208+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3")
209+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1") # default
210+
211+
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
212+
# Reloaded with new data
213+
print("Second update time", second_update_time)
214+
while first_update_time == second_update_time:
215+
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
216+
print("Waiting dictionary to update for the second time")
217+
time.sleep(0.1)
205218

206-
time.sleep(5) # first reload
207219
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(200, 2, 7);' | sqlite3 {}".format(sqlite_db)],
208220
privileged=True, user='root')
209221

210222
# No reload because of invalidate query
211-
time.sleep(5)
212-
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "3\n"
213-
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "1\n" # still default
223+
third_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
224+
print("Third update time", second_update_time)
225+
counter = 0
226+
while third_update_time == second_update_time:
227+
third_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
228+
time.sleep(0.1)
229+
if counter > 50:
230+
break
231+
counter += 1
232+
233+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3")
234+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1") # still default
214235

215236
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t2 values(1, 2, 5);' | sqlite3 {}".format(sqlite_db)],
216237
privileged=True, user='root')
217238

218-
# waiting for reload
219-
time.sleep(5)
220-
221-
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "5\n"
222-
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "7\n" # new value
239+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "5")
240+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "7")
223241

224242

225243
def test_sqlite_odbc_cached_dictionary(started_cluster):
@@ -241,18 +259,16 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
241259
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t3 values(1, 2, 12);' | sqlite3 {}".format(sqlite_db)],
242260
privileged=True, user='root')
243261

244-
time.sleep(5)
245-
246-
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"
262+
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))", "12")
247263

248264

249265
def test_postgres_odbc_hached_dictionary_with_schema(started_cluster):
250266
conn = get_postgres_conn()
251267
cursor = conn.cursor()
252268
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
253-
time.sleep(5)
254-
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))") == "hello\n"
255-
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))") == "world\n"
269+
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
270+
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))", "hello")
271+
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))", "world")
256272

257273

258274
def test_postgres_odbc_hached_dictionary_no_tty_pipe_overflow(started_cluster):
@@ -265,7 +281,7 @@ def test_postgres_odbc_hached_dictionary_no_tty_pipe_overflow(started_cluster):
265281
except Exception as ex:
266282
assert False, "Exception occured -- odbc-bridge hangs: " + str(ex)
267283

268-
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))") == "xxx\n"
284+
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))", "xxx")
269285

270286

271287
def test_postgres_insert(started_cluster):
@@ -310,7 +326,7 @@ def test_bridge_dies_with_parent(started_cluster):
310326
clickhouse_pid = node1.get_process_pid("clickhouse server")
311327
time.sleep(1)
312328

313-
for i in range(5):
329+
for i in range(30):
314330
time.sleep(1) # just for sure, that odbc-bridge caught signal
315331
bridge_pid = node1.get_process_pid("odbc-bridge")
316332
if bridge_pid is None:

tests/integration/test_storage_rabbitmq/test.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def rabbitmq_setup_teardown():
123123

124124
# Tests
125125

126-
@pytest.mark.timeout(180)
126+
@pytest.mark.timeout(240)
127127
def test_rabbitmq_select(rabbitmq_cluster):
128128
instance.query('''
129129
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@@ -159,7 +159,7 @@ def test_rabbitmq_select(rabbitmq_cluster):
159159
rabbitmq_check_result(result, True)
160160

161161

162-
@pytest.mark.timeout(180)
162+
@pytest.mark.timeout(240)
163163
def test_rabbitmq_select_empty(rabbitmq_cluster):
164164
instance.query('''
165165
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@@ -173,7 +173,7 @@ def test_rabbitmq_select_empty(rabbitmq_cluster):
173173
assert int(instance.query('SELECT count() FROM test.rabbitmq')) == 0
174174

175175

176-
@pytest.mark.timeout(180)
176+
@pytest.mark.timeout(240)
177177
def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
178178
instance.query('''
179179
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@@ -215,7 +215,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
215215
rabbitmq_check_result(result, True)
216216

217217

218-
@pytest.mark.timeout(180)
218+
@pytest.mark.timeout(240)
219219
def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
220220
instance.query('''
221221
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@@ -250,7 +250,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
250250
rabbitmq_check_result(result, True)
251251

252252

253-
@pytest.mark.timeout(180)
253+
@pytest.mark.timeout(240)
254254
def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
255255
instance.query('''
256256
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@@ -285,7 +285,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
285285
rabbitmq_check_result(result, True)
286286

287287

288-
@pytest.mark.timeout(180)
288+
@pytest.mark.timeout(240)
289289
def test_rabbitmq_materialized_view(rabbitmq_cluster):
290290
instance.query('''
291291
DROP TABLE IF EXISTS test.view;
@@ -328,7 +328,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
328328
rabbitmq_check_result(result, True)
329329

330330

331-
@pytest.mark.timeout(180)
331+
@pytest.mark.timeout(240)
332332
def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
333333
instance.query('''
334334
DROP TABLE IF EXISTS test.view;
@@ -371,7 +371,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
371371
rabbitmq_check_result(result, True)
372372

373373

374-
@pytest.mark.timeout(180)
374+
@pytest.mark.timeout(240)
375375
def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
376376
instance.query('''
377377
DROP TABLE IF EXISTS test.view1;
@@ -426,7 +426,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
426426

427427

428428
@pytest.mark.skip(reason="clichouse_path with rabbitmq.proto fails to be exported")
429-
@pytest.mark.timeout(180)
429+
@pytest.mark.timeout(240)
430430
def test_rabbitmq_protobuf(rabbitmq_cluster):
431431
instance.query('''
432432
DROP TABLE IF EXISTS test.view;

tests/integration/test_ttl_replicated/test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def test_ttl_columns(started_cluster):
3535
node.query(
3636
'''
3737
CREATE TABLE test_ttl(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
38-
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
38+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_columns', '{replica}')
3939
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0, min_bytes_for_wide_part=0;
4040
'''.format(replica=node.name))
4141

@@ -155,7 +155,7 @@ def test_modify_ttl(started_cluster):
155155
node.query(
156156
'''
157157
CREATE TABLE test_ttl(d DateTime, id UInt32)
158-
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
158+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_modify', '{replica}')
159159
ORDER BY id
160160
'''.format(replica=node.name))
161161

@@ -179,7 +179,7 @@ def test_modify_column_ttl(started_cluster):
179179
node.query(
180180
'''
181181
CREATE TABLE test_ttl(d DateTime, id UInt32 DEFAULT 42)
182-
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
182+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_column', '{replica}')
183183
ORDER BY d
184184
'''.format(replica=node.name))
185185

@@ -202,7 +202,7 @@ def test_ttl_double_delete_rule_returns_error(started_cluster):
202202
try:
203203
node1.query('''
204204
CREATE TABLE test_ttl(date DateTime, id UInt32)
205-
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
205+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_double_delete', '{replica}')
206206
ORDER BY id PARTITION BY toDayOfMonth(date)
207207
TTL date + INTERVAL 1 DAY, date + INTERVAL 2 DAY SETTINGS merge_with_ttl_timeout=0
208208
'''.format(replica=node1.name))
@@ -288,7 +288,7 @@ def test_ttl_empty_parts(started_cluster):
288288
node.query(
289289
'''
290290
CREATE TABLE test_ttl_empty_parts(date Date, id UInt32)
291-
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
291+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_empty_parts', '{replica}')
292292
ORDER BY id
293293
SETTINGS max_bytes_to_merge_at_min_space_in_pool = 1, max_bytes_to_merge_at_max_space_in_pool = 1,
294294
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0

0 commit comments

Comments
 (0)