Skip to content

Commit e5c4eb3

Browse files
committed
Merge remote-tracking branch 'u/master' into build/llvm-16
* u/master: enable used flags's reinit only when the hash talbe rehash Fix build of libfiu on clang-16 fix flaky test 02504_regexp_dictionary_ua_parser fix convertation Fix test that expected CH to apply a wrong optimization ActionsDAG: remove wrong optimization
2 parents 00fdfa1 + 9bf6175 commit e5c4eb3

15 files changed

+894
-859
lines changed

programs/server/Server.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,8 +1395,8 @@ try
13951395
{
13961396
Poco::Net::ServerSocket socket;
13971397
auto address = socketBindListen(config(), socket, listen_host, port);
1398-
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
1399-
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
1398+
socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
1399+
socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
14001400
return ProtocolServerAdapter(
14011401
listen_host,
14021402
port_name,
@@ -1418,8 +1418,8 @@ try
14181418
#if USE_SSL
14191419
Poco::Net::SecureServerSocket socket;
14201420
auto address = socketBindListen(config(), socket, listen_host, port, /* secure = */ true);
1421-
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
1422-
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
1421+
socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
1422+
socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
14231423
return ProtocolServerAdapter(
14241424
listen_host,
14251425
secure_port_name,

src/Bridge/IBridge.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,14 @@ int IBridge::main(const std::vector<std::string> & /*args*/)
214214

215215
Poco::Net::ServerSocket socket;
216216
auto address = socketBindListen(socket, hostname, port, log);
217-
socket.setReceiveTimeout(http_timeout);
218-
socket.setSendTimeout(http_timeout);
217+
socket.setReceiveTimeout(Poco::Timespan(http_timeout, 0));
218+
socket.setSendTimeout(Poco::Timespan(http_timeout, 0));
219219

220220
Poco::ThreadPool server_pool(3, max_server_connections);
221221

222222
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
223-
http_params->setTimeout(http_timeout);
224-
http_params->setKeepAliveTimeout(keep_alive_timeout);
223+
http_params->setTimeout(Poco::Timespan(http_timeout, 0));
224+
http_params->setKeepAliveTimeout(Poco::Timespan(keep_alive_timeout, 0));
225225

226226
auto shared_context = Context::createShared();
227227
auto context = Context::createGlobal(shared_context.get());

src/Interpreters/ActionsDAG.cpp

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2140,8 +2140,12 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
21402140
}
21412141

21422142
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
2143-
if (conjunction.rejected.size() == 1 && WhichDataType{removeNullable(conjunction.rejected.front()->result_type)}.isFloat())
2143+
if (conjunction.rejected.size() == 1 && !conjunction.rejected.front()->result_type->equals(*predicate->result_type)
2144+
&& conjunction.allowed.front()->type == ActionType::COLUMN)
2145+
{
2146+
// No further optimization can be done
21442147
return nullptr;
2148+
}
21452149

21462150
auto actions = cloneActionsForConjunction(conjunction.allowed, all_inputs);
21472151
if (!actions)
@@ -2191,64 +2195,36 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
21912195
else
21922196
{
21932197
/// Predicate is conjunction, where both allowed and rejected sets are not empty.
2194-
/// Replace this node to conjunction of rejected predicates.
21952198

21962199
NodeRawConstPtrs new_children = std::move(conjunction.rejected);
21972200

2198-
if (new_children.size() == 1)
2201+
if (new_children.size() == 1 && new_children.front()->result_type->equals(*predicate->result_type))
21992202
{
2200-
/// Rejected set has only one predicate.
2201-
if (new_children.front()->result_type->equals(*predicate->result_type))
2202-
{
2203-
/// If it's type is same, just add alias.
2204-
Node node;
2205-
node.type = ActionType::ALIAS;
2206-
node.result_name = predicate->result_name;
2207-
node.result_type = predicate->result_type;
2208-
node.children.swap(new_children);
2209-
*predicate = std::move(node);
2210-
}
2211-
else if (!WhichDataType{removeNullable(new_children.front()->result_type)}.isFloat())
2212-
{
2213-
/// If type is different, cast column.
2214-
/// This case is possible, cause AND can use any numeric type as argument.
2215-
/// But casting floats to UInt8 or Bool produces different results.
2216-
/// so we can't apply this optimization to them.
2217-
Node node;
2218-
node.type = ActionType::COLUMN;
2219-
node.result_name = predicate->result_type->getName();
2220-
node.column = DataTypeString().createColumnConst(0, node.result_name);
2221-
node.result_type = std::make_shared<DataTypeString>();
2222-
2223-
const auto * right_arg = &nodes.emplace_back(std::move(node));
2224-
const auto * left_arg = new_children.front();
2225-
2226-
predicate->children = {left_arg, right_arg};
2227-
auto arguments = prepareFunctionArguments(predicate->children);
2228-
2229-
FunctionOverloadResolverPtr func_builder_cast = CastInternalOverloadResolver<CastType::nonAccurate>::createImpl();
2230-
2231-
predicate->function_base = func_builder_cast->build(arguments);
2232-
predicate->function = predicate->function_base->prepare(arguments);
2233-
}
2203+
/// Rejected set has only one predicate. And the type is the same as the result_type.
2204+
/// Just add alias.
2205+
Node node;
2206+
node.type = ActionType::ALIAS;
2207+
node.result_name = predicate->result_name;
2208+
node.result_type = predicate->result_type;
2209+
node.children.swap(new_children);
2210+
*predicate = std::move(node);
22342211
}
22352212
else
22362213
{
2237-
/// Predicate is function AND, which still have more then one argument.
2238-
/// Or there is only one argument that is a float and we can't just
2239-
/// remove the AND.
2214+
/// Predicate is function AND, which still have more then one argument
2215+
/// or it has one argument of the wrong type.
22402216
/// Just update children and rebuild it.
2241-
predicate->children.swap(new_children);
2242-
if (WhichDataType{removeNullable(predicate->children.front()->result_type)}.isFloat())
2217+
if (new_children.size() == 1)
22432218
{
22442219
Node node;
22452220
node.type = ActionType::COLUMN;
22462221
node.result_name = "1";
22472222
node.column = DataTypeUInt8().createColumnConst(0, 1u);
22482223
node.result_type = std::make_shared<DataTypeUInt8>();
22492224
const auto * const_col = &nodes.emplace_back(std::move(node));
2250-
predicate->children.emplace_back(const_col);
2225+
new_children.emplace_back(const_col);
22512226
}
2227+
predicate->children.swap(new_children);
22522228
auto arguments = prepareFunctionArguments(predicate->children);
22532229

22542230
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());

src/Interpreters/HashJoin.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,13 @@ namespace JoinStuff
7979
{
8080
assert(flags[nullptr].size() <= size);
8181
need_flags = true;
82-
flags[nullptr] = std::vector<std::atomic_bool>(size);
82+
// For one disjunct clause case, we don't need to reinit each time we call addJoinedBlock.
83+
// and there is no value inserted in this JoinUsedFlags before addJoinedBlock finish.
84+
// So we reinit only when the hash table is rehashed to a larger size.
85+
if (flags.empty() || flags[nullptr].size() < size) [[unlikely]]
86+
{
87+
flags[nullptr] = std::vector<std::atomic_bool>(size);
88+
}
8389
}
8490
}
8591

src/Server/KeeperTCPHandler.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length)
293293
if (handshake_length == Coordination::CLIENT_HANDSHAKE_LENGTH_WITH_READONLY)
294294
Coordination::read(readonly, *in);
295295

296-
return Poco::Timespan(0, timeout_ms * 1000);
296+
return Poco::Timespan(timeout_ms * 1000);
297297
}
298298

299299

@@ -342,8 +342,8 @@ void KeeperTCPHandler::runImpl()
342342
int32_t handshake_length = header;
343343
auto client_timeout = receiveHandshake(handshake_length);
344344

345-
if (client_timeout == 0)
346-
client_timeout = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
345+
if (client_timeout.totalMilliseconds() == 0)
346+
client_timeout = Poco::Timespan(Coordination::DEFAULT_SESSION_TIMEOUT_MS * Poco::Timespan::MILLISECONDS);
347347
session_timeout = std::max(client_timeout, min_session_timeout);
348348
session_timeout = std::min(session_timeout, max_session_timeout);
349349
}

src/Server/TCPHandler.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ class TCPHandler : public Poco::Net::TCPServerConnection
173173

174174
/// Connection settings, which are extracted from a context.
175175
bool send_exception_with_stack_trace = true;
176-
Poco::Timespan send_timeout = DBMS_DEFAULT_SEND_TIMEOUT_SEC;
177-
Poco::Timespan receive_timeout = DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC;
176+
Poco::Timespan send_timeout = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0);
177+
Poco::Timespan receive_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0);
178178
UInt64 poll_interval = DBMS_DEFAULT_POLL_INTERVAL;
179179
UInt64 idle_connection_timeout = 3600;
180180
UInt64 interactive_delay = 100000;

src/Storages/RabbitMQ/StorageRabbitMQ.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ void StorageRabbitMQ::read(
724724

725725
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
726726
? rabbitmq_settings->rabbitmq_flush_interval_ms
727-
: static_cast<UInt64>(Poco::Timespan(getContext()->getSettingsRef().stream_flush_interval_ms).milliseconds());
727+
: static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms.totalMilliseconds());
728728

729729
for (size_t i = 0; i < num_created_consumers; ++i)
730730
{
@@ -1053,7 +1053,7 @@ bool StorageRabbitMQ::tryStreamToViews()
10531053

10541054
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
10551055
? rabbitmq_settings->rabbitmq_flush_interval_ms
1056-
: static_cast<UInt64>(Poco::Timespan(getContext()->getSettingsRef().stream_flush_interval_ms).milliseconds());
1056+
: static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms.totalMilliseconds());
10571057

10581058
for (size_t i = 0; i < num_created_consumers; ++i)
10591059
{

src/Storages/StorageDistributed.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,9 +1986,9 @@ void registerStorageDistributed(StorageFactory & factory)
19861986
if (!distributed_settings.monitor_split_batch_on_failure.changed)
19871987
distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure;
19881988
if (!distributed_settings.monitor_sleep_time_ms.changed)
1989-
distributed_settings.monitor_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_sleep_time_ms);
1989+
distributed_settings.monitor_sleep_time_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
19901990
if (!distributed_settings.monitor_max_sleep_time_ms.changed)
1991-
distributed_settings.monitor_max_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms);
1991+
distributed_settings.monitor_max_sleep_time_ms = context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms;
19921992

19931993
return std::make_shared<StorageDistributed>(
19941994
args.table_id,
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<test>
2+
<create_query>CREATE TABLE test_join_used_flags (i64 Int64, i32 Int32) ENGINE = Memory</create_query>
3+
<fill_query>INSERT INTO test_join_used_flags SELECT number AS i64, rand32() AS i32 FROM numbers(20000000)</fill_query>
4+
<query>SELECT l.i64, r.i64, l.i32, r.i32 FROM test_join_used_flags l RIGHT JOIN test_join_used_flags r USING i64 format Null</query>
5+
<drop_query>DROP TABLE IF EXISTS test_join_used_flags</drop_query>
6+
</test>

tests/queries/0_stateless/01655_plan_optimizations.reference

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ Filter column: notEquals(y, 0)
5353
9 10
5454
> one condition of filter should be pushed down after aggregating, other condition is casted
5555
Filter column
56-
FUNCTION _CAST(minus(s, 4) :: 1, UInt8 :: 3) -> and(notEquals(y, 0), minus(s, 4))
56+
FUNCTION and(minus(s, 4) :: 1, 1 :: 3) -> and(notEquals(y, 0), minus(s, 4)) UInt8 : 2
5757
Aggregating
5858
Filter column: notEquals(y, 0)
5959
0 1

0 commit comments

Comments
 (0)