Skip to content

Commit 7357347

Browse files
Merge pull request #15299 from CurtizJ/fix-mysql-hung
Fix hang of queries with a lot of subqueries to same mysql table
2 parents b4f1eb0 + 085f63a commit 7357347

File tree

4 files changed

+107
-14
lines changed

4 files changed

+107
-14
lines changed

src/Formats/MySQLBlockInputStream.cpp

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,27 @@ namespace ErrorCodes
2323
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
2424
}
2525

26+
MySQLBlockInputStream::Connection::Connection(
27+
const mysqlxx::PoolWithFailover::Entry & entry_,
28+
const std::string & query_str)
29+
: entry(entry_)
30+
, query{entry->query(query_str)}
31+
, result{query.use()}
32+
{
33+
}
2634

2735
MySQLBlockInputStream::MySQLBlockInputStream(
28-
const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, const bool auto_close_)
29-
: entry{entry_}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size_}, auto_close{auto_close_}
36+
const mysqlxx::PoolWithFailover::Entry & entry,
37+
const std::string & query_str,
38+
const Block & sample_block,
39+
const UInt64 max_block_size_,
40+
const bool auto_close_)
41+
: connection{std::make_unique<Connection>(entry, query_str)}
42+
, max_block_size{max_block_size_}
43+
, auto_close{auto_close_}
3044
{
31-
if (sample_block.columns() != result.getNumFields())
32-
throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while "
45+
if (sample_block.columns() != connection->result.getNumFields())
46+
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
3347
+ toString(sample_block.columns()) + " expected",
3448
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
3549

@@ -106,11 +120,11 @@ namespace
106120

107121
Block MySQLBlockInputStream::readImpl()
108122
{
109-
auto row = result.fetch();
123+
auto row = connection->result.fetch();
110124
if (!row)
111125
{
112126
if (auto_close)
113-
entry.disconnect();
127+
connection->entry.disconnect();
114128
return {};
115129
}
116130

@@ -145,11 +159,42 @@ Block MySQLBlockInputStream::readImpl()
145159
if (num_rows == max_block_size)
146160
break;
147161

148-
row = result.fetch();
162+
row = connection->result.fetch();
149163
}
150164
return description.sample_block.cloneWithColumns(std::move(columns));
151165
}
152166

167+
MySQLBlockInputStream::MySQLBlockInputStream(
168+
const Block & sample_block_,
169+
UInt64 max_block_size_,
170+
bool auto_close_)
171+
: max_block_size(max_block_size_)
172+
, auto_close(auto_close_)
173+
{
174+
description.init(sample_block_);
175+
}
176+
177+
MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
178+
mysqlxx::Pool & pool_,
179+
const std::string & query_str_,
180+
const Block & sample_block_,
181+
const UInt64 max_block_size_,
182+
const bool auto_close_)
183+
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_)
184+
, pool(pool_)
185+
, query_str(query_str_)
186+
{
187+
}
188+
189+
void MySQLLazyBlockInputStream::readPrefix()
190+
{
191+
connection = std::make_unique<Connection>(pool.get(), query_str);
192+
if (description.sample_block.columns() != connection->result.getNumFields())
193+
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
194+
+ toString(description.sample_block.columns()) + " expected",
195+
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
196+
}
197+
153198
}
154199

155200
#endif

src/Formats/MySQLBlockInputStream.h

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010

1111
namespace DB
1212
{
13+
1314
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
14-
class MySQLBlockInputStream final : public IBlockInputStream
15+
class MySQLBlockInputStream : public IBlockInputStream
1516
{
1617
public:
1718
MySQLBlockInputStream(
18-
const mysqlxx::PoolWithFailover::Entry & entry_,
19+
const mysqlxx::PoolWithFailover::Entry & entry,
1920
const std::string & query_str,
2021
const Block & sample_block,
2122
const UInt64 max_block_size_,
@@ -25,15 +26,43 @@ class MySQLBlockInputStream final : public IBlockInputStream
2526

2627
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
2728

28-
private:
29+
protected:
30+
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_);
2931
Block readImpl() override;
3032

31-
mysqlxx::PoolWithFailover::Entry entry;
32-
mysqlxx::Query query;
33-
mysqlxx::UseQueryResult result;
33+
struct Connection
34+
{
35+
Connection(const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str);
36+
37+
mysqlxx::PoolWithFailover::Entry entry;
38+
mysqlxx::Query query;
39+
mysqlxx::UseQueryResult result;
40+
};
41+
42+
std::unique_ptr<Connection> connection;
43+
3444
const UInt64 max_block_size;
3545
const bool auto_close;
3646
ExternalResultDescription description;
3747
};
3848

49+
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
50+
/// It allows to create a lot of stream objects without occupation of all connection pool.
51+
class MySQLLazyBlockInputStream final : public MySQLBlockInputStream
52+
{
53+
public:
54+
MySQLLazyBlockInputStream(
55+
mysqlxx::Pool & pool_,
56+
const std::string & query_str_,
57+
const Block & sample_block_,
58+
const UInt64 max_block_size_,
59+
const bool auto_close_ = false);
60+
61+
private:
62+
void readPrefix() override;
63+
64+
mysqlxx::Pool & pool;
65+
std::string query_str;
66+
};
67+
3968
}

src/Storages/StorageMySQL.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ Pipe StorageMySQL::read(
9696

9797
/// TODO: rewrite MySQLBlockInputStream
9898
return Pipe(std::make_shared<SourceFromInputStream>(
99-
std::make_shared<MySQLBlockInputStream>(pool.get(), query, sample_block, max_block_size_)));
99+
std::make_shared<MySQLLazyBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true)));
100100
}
101101

102102

tests/integration/test_storage_mysql/test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,25 @@ def started_cluster():
3333
cluster.shutdown()
3434

3535

36+
def test_many_connections(started_cluster):
37+
table_name = 'test_many_connections'
38+
conn = get_mysql_conn()
39+
create_mysql_table(conn, table_name)
40+
41+
node1.query('''
42+
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
43+
'''.format(table_name, table_name))
44+
45+
node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name))
46+
47+
query = "SELECT count() FROM ("
48+
for i in range (24):
49+
query += "SELECT id FROM {t} UNION ALL "
50+
query += "SELECT id FROM {t})"
51+
52+
assert node1.query(query.format(t=table_name)) == '250\n'
53+
conn.close()
54+
3655
def test_insert_select(started_cluster):
3756
table_name = 'test_insert_select'
3857
conn = get_mysql_conn()

0 commit comments

Comments
 (0)