@@ -23,13 +23,27 @@ namespace ErrorCodes
2323 extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
2424}
2525
26+ MySQLBlockInputStream::Connection::Connection (
27+ const mysqlxx::PoolWithFailover::Entry & entry_,
28+ const std::string & query_str)
29+ : entry(entry_)
30+ , query{entry->query (query_str)}
31+ , result{query.use ()}
32+ {
33+ }
2634
2735MySQLBlockInputStream::MySQLBlockInputStream (
28- const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, const bool auto_close_)
29- : entry{entry_}, query{this ->entry ->query (query_str)}, result{query.use ()}, max_block_size{max_block_size_}, auto_close{auto_close_}
36+ const mysqlxx::PoolWithFailover::Entry & entry,
37+ const std::string & query_str,
38+ const Block & sample_block,
39+ const UInt64 max_block_size_,
40+ const bool auto_close_)
41+ : connection{std::make_unique<Connection>(entry, query_str)}
42+ , max_block_size{max_block_size_}
43+ , auto_close{auto_close_}
3044{
31- if (sample_block.columns () != result.getNumFields ())
32- throw Exception{" mysqlxx::UseQueryResult contains " + toString (result.getNumFields ()) + " columns while "
45+ if (sample_block.columns () != connection-> result .getNumFields ())
46+ throw Exception{" mysqlxx::UseQueryResult contains " + toString (connection-> result .getNumFields ()) + " columns while "
3347 + toString (sample_block.columns ()) + " expected" ,
3448 ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
3549
@@ -106,11 +120,11 @@ namespace
106120
107121Block MySQLBlockInputStream::readImpl ()
108122{
109- auto row = result.fetch ();
123+ auto row = connection-> result .fetch ();
110124 if (!row)
111125 {
112126 if (auto_close)
113- entry.disconnect ();
127+ connection-> entry .disconnect ();
114128 return {};
115129 }
116130
@@ -145,11 +159,42 @@ Block MySQLBlockInputStream::readImpl()
145159 if (num_rows == max_block_size)
146160 break ;
147161
148- row = result.fetch ();
162+ row = connection-> result .fetch ();
149163 }
150164 return description.sample_block .cloneWithColumns (std::move (columns));
151165}
152166
167+ MySQLBlockInputStream::MySQLBlockInputStream (
168+ const Block & sample_block_,
169+ UInt64 max_block_size_,
170+ bool auto_close_)
171+ : max_block_size(max_block_size_)
172+ , auto_close(auto_close_)
173+ {
174+ description.init (sample_block_);
175+ }
176+
177+ MySQLLazyBlockInputStream::MySQLLazyBlockInputStream (
178+ mysqlxx::Pool & pool_,
179+ const std::string & query_str_,
180+ const Block & sample_block_,
181+ const UInt64 max_block_size_,
182+ const bool auto_close_)
183+ : MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_)
184+ , pool(pool_)
185+ , query_str(query_str_)
186+ {
187+ }
188+
189+ void MySQLLazyBlockInputStream::readPrefix ()
190+ {
191+ connection = std::make_unique<Connection>(pool.get (), query_str);
192+ if (description.sample_block .columns () != connection->result .getNumFields ())
193+ throw Exception{" mysqlxx::UseQueryResult contains " + toString (connection->result .getNumFields ()) + " columns while "
194+ + toString (description.sample_block .columns ()) + " expected" ,
195+ ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
196+ }
197+
153198}
154199
155200#endif
0 commit comments