Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,11 @@ class Client : public Poco::Util::Application
if (std::string::npos != embedded_stack_trace_pos && !config().getBool("stacktrace", false))
text.resize(embedded_stack_trace_pos);

/// If we probably have progress bar, we should add additional newline,
/// otherwise exception may display concatenated with the progress bar.
if (need_render_progress)
std::cerr << '\n';

std::cerr << "Received exception from server (version " << server_version << "):" << std::endl
<< "Code: " << e.code() << ". " << text << std::endl;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_execution_speed, 0, "Maximum number of execution rows per second.", 0) \
M(SettingUInt64, min_execution_speed_bytes, 0, "Minimum number of execution bytes per second.", 0) \
M(SettingUInt64, max_execution_speed_bytes, 0, "Maximum number of execution bytes per second.", 0) \
M(SettingSeconds, timeout_before_checking_execution_speed, 0, "Check that the speed is not too low after the specified time has elapsed.", 0) \
M(SettingSeconds, timeout_before_checking_execution_speed, 10, "Check that the speed is not too low after the specified time has elapsed.", 0) \
\
M(SettingUInt64, max_columns_to_read, 0, "", 0) \
M(SettingUInt64, max_temporary_columns, 0, "", 0) \
Expand Down
9 changes: 5 additions & 4 deletions src/DataStreams/ExecutionSpeedLimits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;

/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang).
/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount,
/// and otherwise it's too easy to make query hang).
sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds);

sleepForMicroseconds(sleep_microseconds);
Expand All @@ -45,14 +46,14 @@ void ExecutionSpeedLimits::throttle(
{
if ((min_execution_rps != 0 || max_execution_rps != 0
|| min_execution_bps != 0 || max_execution_bps != 0
|| (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0)) &&
(static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
|| (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
{
/// Do not count sleeps in throttlers
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];

double elapsed_seconds = 0;
if (throttler_sleep_microseconds > total_elapsed_microseconds)
if (total_elapsed_microseconds > throttler_sleep_microseconds)
elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;

if (elapsed_seconds > 0)
Expand Down
9 changes: 6 additions & 3 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1498,16 +1498,19 @@ void InterpreterSelectQuery::executeFetchColumns(
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*
* The limits to throttle maximum execution speed is also checked on all servers.
*/
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
}

limits.speed_limits.max_execution_rps = settings.max_execution_speed;
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;

auto quota = context->getQuota();

for (auto & stream : streams)
Expand Down
8 changes: 8 additions & 0 deletions tests/queries/0_stateless/01287_max_execution_speed.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Ok (1)
Ok (2)
2000000
1
Ok (3)
2000000
1
Ok (4)
44 changes: 44 additions & 0 deletions tests/queries/0_stateless/01287_max_execution_speed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
SET min_execution_speed = 100000000000, timeout_before_checking_execution_speed = 0.1;
SELECT count() FROM system.numbers; -- { serverError 160 }
SELECT 'Ok (1)';
SET min_execution_speed = 0;

SET min_execution_speed_bytes = 800000000000, timeout_before_checking_execution_speed = 0.1;
SELECT count() FROM system.numbers; -- { serverError 160 }
SELECT 'Ok (2)';
SET min_execution_speed_bytes = 0;

SET max_execution_speed = 1000000;
SET max_block_size = 100;

CREATE TEMPORARY TABLE times (t DateTime);

INSERT INTO times SELECT now();
SELECT count() FROM numbers(2000000);
INSERT INTO times SELECT now();

SELECT max(t) - min(t) >= 1 FROM times;
SELECT 'Ok (3)';
SET max_execution_speed = 0;

SET max_execution_speed_bytes = 8000000;
TRUNCATE TABLE times;

INSERT INTO times SELECT now();
SELECT count() FROM numbers(2000000);
INSERT INTO times SELECT now();

SELECT max(t) - min(t) >= 1 FROM times;
SELECT 'Ok (4)';
SET max_execution_speed_bytes = 0;

-- Note that 'min_execution_speed' does not count sleeping due to throttling
-- with 'max_execution_speed' and similar limits like 'priority' and 'max_network_bandwidth'

-- Note: I have to disable this part of the test because it actually can work slower under sanitizers,
-- with debug builds and in presense of random system hickups in our CI environment.

--SET max_execution_speed = 1000000, min_execution_speed = 2000000;
-- And this query will work despite the fact that the above settings look contradictory.
--SELECT count() FROM numbers(1000000);
--SELECT 'Ok (5)';
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0
1
15 changes: 15 additions & 0 deletions tests/queries/0_stateless/01288_shard_max_network_bandwidth.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Limit to 10 MB/sec
SET max_network_bandwidth = 10000000;

-- Lower max_block_size, so we can start throttling sooner. Otherwise query will be executed too quickly.
SET max_block_size = 100;

CREATE TEMPORARY TABLE times (t DateTime);

-- rand64 is uncompressable data. Each number will take 8 bytes of bandwidth.
-- This query should execute in no less than 1.6 seconds if throttled.
INSERT INTO times SELECT now();
SELECT sum(ignore(*)) FROM (SELECT rand64() FROM remote('127.0.0.{2,3}', numbers(2000000)));
INSERT INTO times SELECT now();

SELECT max(t) - min(t) >= 1 FROM times;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DROP TABLE IF EXISTS ES;

create table ES(A String) Engine=MergeTree order by tuple();
insert into ES select toString(number) from numbers(10000000);

SET max_execution_time = 100, max_execution_speed = 1000000;
SET max_threads = 1;
SET max_block_size = 1000000;

-- Exception about execution speed is not thrown from these queries.
SELECT * FROM ES LIMIT 1 format Null;
SELECT * FROM ES LIMIT 10 format Null;
SELECT * FROM ES LIMIT 100 format Null;
SELECT * FROM ES LIMIT 1000 format Null;
SELECT * FROM ES LIMIT 10000 format Null;
SELECT * FROM ES LIMIT 100000 format Null;
SELECT * FROM ES LIMIT 1000000 format Null;

DROP TABLE ES;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2000000
1
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SET max_execution_speed = 1000000, timeout_before_checking_execution_speed = 0.001, max_block_size = 100;

CREATE TEMPORARY TABLE times (t DateTime);

INSERT INTO times SELECT now();
SELECT count('special query for 01290_max_execution_speed_distributed') FROM remote('127.0.0.{2,3}', numbers(1000000));
INSERT INTO times SELECT now();

SELECT max(t) - min(t) >= 1 FROM times;

-- Check that the query was also throttled on "remote" servers.
SYSTEM FLUSH LOGS;
SELECT DISTINCT query_duration_ms >= 500 FROM system.query_log WHERE event_date >= yesterday() AND query LIKE '%special query for 01290_max_execution_speed_distributed%' AND type = 2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
4392010
1
4392010
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SET max_execution_speed = 4000000, timeout_before_checking_execution_speed = 0.001;

CREATE TEMPORARY TABLE times (t DateTime);

INSERT INTO times SELECT now();
SELECT count() FROM test.hits SAMPLE 1 / 2;
INSERT INTO times SELECT now();

SELECT max(t) - min(t) >= 1 FROM times;
TRUNCATE TABLE times;

INSERT INTO times SELECT now();
SELECT count() FROM merge(test, '^hits$') SAMPLE 1 / 2;
INSERT INTO times SELECT now();

SELECT max(t) - min(t) >= 1 FROM times;