Skip to content

Commit 391165d

Browse files
Merge pull request ClickHouse#10337 from nikitamikhaylov/proper_dictionary_timeouts
Proper dictionary timeouts.
2 parents b5f8efe + c603acd commit 391165d

File tree

3 files changed

+130
-28
lines changed

3 files changed

+130
-28
lines changed

src/Dictionaries/CacheDictionary.cpp

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ namespace ErrorCodes
4646
extern const int BAD_ARGUMENTS;
4747
extern const int UNSUPPORTED_METHOD;
4848
extern const int TOO_SMALL_BUFFER_SIZE;
49+
extern const int TIMEOUT_EXCEEDED;
4950
}
5051

5152

@@ -63,20 +64,24 @@ CacheDictionary::CacheDictionary(
6364
const DictionaryStructure & dict_struct_,
6465
DictionarySourcePtr source_ptr_,
6566
DictionaryLifetime dict_lifetime_,
67+
size_t strict_max_lifetime_seconds_,
6668
size_t size_,
6769
bool allow_read_expired_keys_,
6870
size_t max_update_queue_size_,
6971
size_t update_queue_push_timeout_milliseconds_,
72+
size_t query_wait_timeout_milliseconds_,
7073
size_t max_threads_for_updates_)
7174
: database(database_)
7275
, name(name_)
7376
, full_name{database_.empty() ? name_ : (database_ + "." + name_)}
7477
, dict_struct(dict_struct_)
7578
, source_ptr{std::move(source_ptr_)}
7679
, dict_lifetime(dict_lifetime_)
80+
, strict_max_lifetime_seconds(strict_max_lifetime_seconds_)
7781
, allow_read_expired_keys(allow_read_expired_keys_)
7882
, max_update_queue_size(max_update_queue_size_)
7983
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
84+
, query_wait_timeout_milliseconds(query_wait_timeout_milliseconds_)
8085
, max_threads_for_updates(max_threads_for_updates_)
8186
, log(&Logger::get("ExternalDictionaries"))
8287
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
@@ -332,6 +337,13 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
332337
{
333338
if (find_result.outdated)
334339
{
340+
/// Protection of reading very expired keys.
341+
if (now > cells[find_result.cell_idx].strict_max)
342+
{
343+
cache_not_found_ids[id].push_back(row);
344+
continue;
345+
}
346+
335347
cache_expired_ids[id].push_back(row);
336348

337349
if (allow_read_expired_keys)
@@ -693,6 +705,9 @@ void registerDictionaryCache(DictionaryFactory & factory)
693705
const String name = config.getString(config_prefix + ".name");
694706
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
695707

708+
const size_t strict_max_lifetime_seconds =
709+
config.getUInt64(layout_prefix + ".cache.strict_max_lifetime_seconds", static_cast<size_t>(dict_lifetime.max_sec));
710+
696711
const size_t max_update_queue_size =
697712
config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
698713
if (max_update_queue_size == 0)
@@ -708,15 +723,27 @@ void registerDictionaryCache(DictionaryFactory & factory)
708723
throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
709724
ErrorCodes::BAD_ARGUMENTS};
710725

726+
const size_t query_wait_timeout_milliseconds =
727+
config.getUInt64(layout_prefix + ".cache.query_wait_timeout_milliseconds", 60000);
728+
711729
const size_t max_threads_for_updates =
712730
config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
713731
if (max_threads_for_updates == 0)
714732
throw Exception{name + ": dictionary of layout 'cache' cannot have zero threads for updates.",
715733
ErrorCodes::BAD_ARGUMENTS};
716734

717735
return std::make_unique<CacheDictionary>(
718-
database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
719-
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
736+
database,
737+
name,
738+
dict_struct,
739+
std::move(source_ptr),
740+
dict_lifetime,
741+
strict_max_lifetime_seconds,
742+
size,
743+
allow_read_expired_keys,
744+
max_update_queue_size,
745+
update_queue_push_timeout_milliseconds,
746+
query_wait_timeout_milliseconds,
720747
max_threads_for_updates);
721748
};
722749
factory.registerLayout("cache", create_layout, false);
@@ -782,20 +809,32 @@ void CacheDictionary::updateThreadFunction()
782809

783810
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const
784811
{
785-
std::unique_lock<std::mutex> lock(update_mutex);
786-
787-
/*
788-
* We wait here without any timeout to avoid SEGFAULT's.
789-
* Consider timeout for wait had expired and main query's thread ended with exception
790-
* or some other error. But the UpdateUnit with callbacks is left in the queue.
791-
* It has these callback that capture god knows what from the current thread
792-
* (most of the variables lies on the stack of finished thread) that
793-
* intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
794-
* */
795-
is_update_finished.wait(
796-
lock,
812+
std::unique_lock<std::mutex> update_lock(update_mutex);
813+
814+
size_t timeout_for_wait = 100000;
815+
bool result = is_update_finished.wait_for(
816+
update_lock,
817+
std::chrono::milliseconds(timeout_for_wait),
797818
[&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
798819

820+
if (!result)
821+
{
822+
std::lock_guard<std::mutex> callback_lock(update_unit_ptr->callback_mutex);
823+
/*
824+
* We acquire a lock here and store false to special variable to avoid SEGFAULT's.
825+
* Consider timeout for wait had expired and main query's thread ended with exception
826+
* or some other error. But the UpdateUnit with callbacks is left in the queue.
827+
* It has these callback that capture god knows what from the current thread
828+
* (most of the variables lies on the stack of finished thread) that
829+
* intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
830+
* */
831+
update_unit_ptr->can_use_callback = false;
832+
throw DB::Exception(
833+
"Dictionary " + getName() + " source seems unavailable, because " +
834+
toString(timeout_for_wait) + " timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
835+
}
836+
837+
799838
if (update_unit_ptr->current_exception)
800839
std::rethrow_exception(update_unit_ptr->current_exception);
801840
}
@@ -968,9 +1007,14 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
9681007
{
9691008
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
9701009
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
1010+
cell.strict_max = now + std::chrono::seconds{strict_max_lifetime_seconds};
9711011
}
9721012
else
1013+
{
9731014
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
1015+
cell.strict_max = now + std::chrono::seconds{strict_max_lifetime_seconds};
1016+
}
1017+
9741018

9751019
/// Set null_value for each attribute
9761020
cell.setDefault();

src/Dictionaries/CacheDictionary.h

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ class CacheDictionary final : public IDictionary
5555
const DictionaryStructure & dict_struct_,
5656
DictionarySourcePtr source_ptr_,
5757
DictionaryLifetime dict_lifetime_,
58+
size_t strict_max_lifetime_seconds,
5859
size_t size_,
5960
bool allow_read_expired_keys_,
6061
size_t max_update_queue_size_,
6162
size_t update_queue_push_timeout_milliseconds_,
63+
size_t query_wait_timeout_milliseconds,
6264
size_t max_threads_for_updates);
6365

6466
~CacheDictionary() override;
@@ -87,9 +89,18 @@ class CacheDictionary final : public IDictionary
8789
std::shared_ptr<const IExternalLoadable> clone() const override
8890
{
8991
return std::make_shared<CacheDictionary>(
90-
database, name, dict_struct, source_ptr->clone(), dict_lifetime, size,
91-
allow_read_expired_keys, max_update_queue_size,
92-
update_queue_push_timeout_milliseconds, max_threads_for_updates);
92+
database,
93+
name,
94+
dict_struct,
95+
source_ptr->clone(),
96+
dict_lifetime,
97+
strict_max_lifetime_seconds,
98+
size,
99+
allow_read_expired_keys,
100+
max_update_queue_size,
101+
update_queue_push_timeout_milliseconds,
102+
query_wait_timeout_milliseconds,
103+
max_threads_for_updates);
93104
}
94105

95106
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@@ -206,6 +217,8 @@ class CacheDictionary final : public IDictionary
206217
/// Stores both expiration time and `is_default` flag in the most significant bit
207218
time_point_urep_t data;
208219

220+
time_point_t strict_max;
221+
209222
/// Sets expiration time, resets `is_default` flag to false
210223
time_point_t expiresAt() const { return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK); }
211224
void setExpiresAt(const time_point_t & t) { data = ext::safe_bit_cast<time_point_urep_t>(t); }
@@ -294,9 +307,11 @@ class CacheDictionary final : public IDictionary
294307
const DictionaryStructure dict_struct;
295308
mutable DictionarySourcePtr source_ptr;
296309
const DictionaryLifetime dict_lifetime;
310+
const size_t strict_max_lifetime_seconds;
297311
const bool allow_read_expired_keys;
298312
const size_t max_update_queue_size;
299313
const size_t update_queue_push_timeout_milliseconds;
314+
const size_t query_wait_timeout_milliseconds;
300315
const size_t max_threads_for_updates;
301316

302317
Logger * const log;
@@ -366,6 +381,12 @@ class CacheDictionary final : public IDictionary
366381
alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, requested_ids.size()){}
367382

368383
std::vector<Key> requested_ids;
384+
385+
/// It might seem that it is a leak of performance.
386+
/// But aquiring a mutex without contention is rather cheap.
387+
std::mutex callback_mutex;
388+
bool can_use_callback{true};
389+
369390
PresentIdHandler present_id_handler;
370391
AbsentIdHandler absent_id_handler;
371392

@@ -412,6 +433,7 @@ class CacheDictionary final : public IDictionary
412433
helper.push_back(unit_ptr->requested_ids.size() + helper.back());
413434
present_id_handlers.emplace_back(unit_ptr->present_id_handler);
414435
absent_id_handlers.emplace_back(unit_ptr->absent_id_handler);
436+
update_units.emplace_back(unit_ptr);
415437
}
416438

417439
concatenated_requested_ids.reserve(total_requested_keys_count);
@@ -428,31 +450,51 @@ class CacheDictionary final : public IDictionary
428450

429451
void informCallersAboutPresentId(Key id, size_t cell_idx)
430452
{
431-
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i)
453+
for (size_t position = 0; position < concatenated_requested_ids.size(); ++position)
432454
{
433-
auto & curr = concatenated_requested_ids[i];
434-
if (curr == id)
435-
getPresentIdHandlerForPosition(i)(id, cell_idx);
455+
if (concatenated_requested_ids[position] == id)
456+
{
457+
auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position);
458+
auto lock = getLockToCurrentUnit(unit_number);
459+
if (canUseCallback(unit_number))
460+
getPresentIdHandlerForPosition(unit_number)(id, cell_idx);
461+
}
436462
}
437463
}
438464

439465
void informCallersAboutAbsentId(Key id, size_t cell_idx)
440466
{
441-
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i)
442-
if (concatenated_requested_ids[i] == id)
443-
getAbsentIdHandlerForPosition(i)(id, cell_idx);
467+
for (size_t position = 0; position < concatenated_requested_ids.size(); ++position)
468+
if (concatenated_requested_ids[position] == id)
469+
{
470+
auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position);
471+
auto lock = getLockToCurrentUnit(unit_number);
472+
if (canUseCallback(unit_number))
473+
getAbsentIdHandlerForPosition(unit_number)(id, cell_idx);
474+
}
444475
}
445476

446477

447478
private:
448-
PresentIdHandler & getPresentIdHandlerForPosition(size_t position)
479+
/// Needed for control the usage of callback to avoid SEGFAULTs.
480+
bool canUseCallback(size_t unit_number)
481+
{
482+
return update_units[unit_number].get()->can_use_callback;
483+
}
484+
485+
std::unique_lock<std::mutex> getLockToCurrentUnit(size_t unit_number)
449486
{
450-
return present_id_handlers[getUpdateUnitNumberForRequestedIdPosition(position)];
487+
return std::unique_lock<std::mutex>(update_units[unit_number].get()->callback_mutex);
451488
}
452489

453-
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t position)
490+
PresentIdHandler & getPresentIdHandlerForPosition(size_t unit_number)
454491
{
455-
return absent_id_handlers[getUpdateUnitNumberForRequestedIdPosition((position))];
492+
return update_units[unit_number].get()->present_id_handler;
493+
}
494+
495+
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t unit_number)
496+
{
497+
return update_units[unit_number].get()->absent_id_handler;
456498
}
457499

458500
size_t getUpdateUnitNumberForRequestedIdPosition(size_t position)
@@ -464,6 +506,8 @@ class CacheDictionary final : public IDictionary
464506
std::vector<PresentIdHandler> present_id_handlers;
465507
std::vector<AbsentIdHandler> absent_id_handlers;
466508

509+
std::vector<std::reference_wrapper<UpdateUnitPtr>> update_units;
510+
467511
std::vector<size_t> helper;
468512
};
469513

src/Dictionaries/CacheDictionary.inc.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ void CacheDictionary::getItemsNumberImpl(
7575

7676
if (find_result.outdated)
7777
{
78+
/// Protection of reading very expired keys.
79+
if (now > cells[find_result.cell_idx].strict_max)
80+
{
81+
cache_not_found_ids[id].push_back(row);
82+
continue;
83+
}
84+
7885
cache_expired_ids[id].push_back(row);
7986
if (allow_read_expired_keys)
8087
update_routine();
@@ -249,6 +256,13 @@ void CacheDictionary::getItemsString(
249256
{
250257
if (find_result.outdated)
251258
{
259+
/// Protection of reading very expired keys.
260+
if (now > cells[find_result.cell_idx].strict_max)
261+
{
262+
cache_not_found_ids[id].push_back(row);
263+
continue;
264+
}
265+
252266
cache_expired_ids[id].push_back(row);
253267

254268
if (allow_read_expired_keys)

0 commit comments

Comments
 (0)