@@ -46,6 +46,7 @@ namespace ErrorCodes
4646 extern const int BAD_ARGUMENTS;
4747 extern const int UNSUPPORTED_METHOD;
4848 extern const int TOO_SMALL_BUFFER_SIZE;
49+ extern const int TIMEOUT_EXCEEDED;
4950}
5051
5152
@@ -63,20 +64,24 @@ CacheDictionary::CacheDictionary(
6364 const DictionaryStructure & dict_struct_,
6465 DictionarySourcePtr source_ptr_,
6566 DictionaryLifetime dict_lifetime_,
67+ size_t strict_max_lifetime_seconds_,
6668 size_t size_,
6769 bool allow_read_expired_keys_,
6870 size_t max_update_queue_size_,
6971 size_t update_queue_push_timeout_milliseconds_,
72+ size_t query_wait_timeout_milliseconds_,
7073 size_t max_threads_for_updates_)
7174 : database(database_)
7275 , name(name_)
7376 , full_name{database_.empty () ? name_ : (database_ + " ." + name_)}
7477 , dict_struct(dict_struct_)
7578 , source_ptr{std::move (source_ptr_)}
7679 , dict_lifetime(dict_lifetime_)
80+ , strict_max_lifetime_seconds(strict_max_lifetime_seconds_)
7781 , allow_read_expired_keys(allow_read_expired_keys_)
7882 , max_update_queue_size(max_update_queue_size_)
7983 , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
84+ , query_wait_timeout_milliseconds(query_wait_timeout_milliseconds_)
8085 , max_threads_for_updates(max_threads_for_updates_)
8186 , log(&Logger::get (" ExternalDictionaries" ))
8287 , size{roundUpToPowerOfTwoOrZero (std::max (size_, size_t (max_collision_length)))}
@@ -332,6 +337,13 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
332337 {
333338 if (find_result.outdated )
334339 {
340+ // / Protection of reading very expired keys.
341+ if (now > cells[find_result.cell_idx ].strict_max )
342+ {
343+ cache_not_found_ids[id].push_back (row);
344+ continue ;
345+ }
346+
335347 cache_expired_ids[id].push_back (row);
336348
337349 if (allow_read_expired_keys)
@@ -693,6 +705,9 @@ void registerDictionaryCache(DictionaryFactory & factory)
693705 const String name = config.getString (config_prefix + " .name" );
694706 const DictionaryLifetime dict_lifetime{config, config_prefix + " .lifetime" };
695707
708+ const size_t strict_max_lifetime_seconds =
709+ config.getUInt64 (layout_prefix + " .cache.strict_max_lifetime_seconds" , static_cast <size_t >(dict_lifetime.max_sec ));
710+
696711 const size_t max_update_queue_size =
697712 config.getUInt64 (layout_prefix + " .cache.max_update_queue_size" , 100000 );
698713 if (max_update_queue_size == 0 )
@@ -708,15 +723,27 @@ void registerDictionaryCache(DictionaryFactory & factory)
708723 throw Exception{name + " : dictionary of layout 'cache' have too little update_queue_push_timeout" ,
709724 ErrorCodes::BAD_ARGUMENTS};
710725
726+ const size_t query_wait_timeout_milliseconds =
727+ config.getUInt64 (layout_prefix + " .cache.query_wait_timeout_milliseconds" , 60000 );
728+
711729 const size_t max_threads_for_updates =
712730 config.getUInt64 (layout_prefix + " .max_threads_for_updates" , 4 );
713731 if (max_threads_for_updates == 0 )
714732 throw Exception{name + " : dictionary of layout 'cache' cannot have zero threads for updates." ,
715733 ErrorCodes::BAD_ARGUMENTS};
716734
717735 return std::make_unique<CacheDictionary>(
718- database, name, dict_struct, std::move (source_ptr), dict_lifetime, size,
719- allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
736+ database,
737+ name,
738+ dict_struct,
739+ std::move (source_ptr),
740+ dict_lifetime,
741+ strict_max_lifetime_seconds,
742+ size,
743+ allow_read_expired_keys,
744+ max_update_queue_size,
745+ update_queue_push_timeout_milliseconds,
746+ query_wait_timeout_milliseconds,
720747 max_threads_for_updates);
721748 };
722749 factory.registerLayout (" cache" , create_layout, false );
@@ -782,20 +809,32 @@ void CacheDictionary::updateThreadFunction()
782809
783810void CacheDictionary::waitForCurrentUpdateFinish (UpdateUnitPtr & update_unit_ptr) const
784811{
785- std::unique_lock<std::mutex> lock (update_mutex);
786-
787- /*
788- * We wait here without any timeout to avoid SEGFAULT's.
789- * Consider timeout for wait had expired and main query's thread ended with exception
790- * or some other error. But the UpdateUnit with callbacks is left in the queue.
791- * It has these callback that capture god knows what from the current thread
792- * (most of the variables lies on the stack of finished thread) that
793- * intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
794- * */
795- is_update_finished.wait (
796- lock,
812+ std::unique_lock<std::mutex> update_lock (update_mutex);
813+
814+ size_t timeout_for_wait = 100000 ;
815+ bool result = is_update_finished.wait_for (
816+ update_lock,
817+ std::chrono::milliseconds (timeout_for_wait),
797818 [&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception ; });
798819
820+ if (!result)
821+ {
822+ std::lock_guard<std::mutex> callback_lock (update_unit_ptr->callback_mutex );
823+ /*
824+ * We acquire a lock here and store false to special variable to avoid SEGFAULT's.
825+ * Consider timeout for wait had expired and main query's thread ended with exception
826+ * or some other error. But the UpdateUnit with callbacks is left in the queue.
827+ * It has these callback that capture god knows what from the current thread
828+ * (most of the variables lies on the stack of finished thread) that
829+ * intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
830+ * */
831+ update_unit_ptr->can_use_callback = false ;
832+ throw DB::Exception (
833+ " Dictionary " + getName () + " source seems unavailable, because " +
834+ toString (timeout_for_wait) + " timeout exceeded." , ErrorCodes::TIMEOUT_EXCEEDED);
835+ }
836+
837+
799838 if (update_unit_ptr->current_exception )
800839 std::rethrow_exception (update_unit_ptr->current_exception );
801840}
@@ -968,9 +1007,14 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
9681007 {
9691008 std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec , dict_lifetime.max_sec };
9701009 cell.setExpiresAt (now + std::chrono::seconds{distribution (rnd_engine)});
1010+ cell.strict_max = now + std::chrono::seconds{strict_max_lifetime_seconds};
9711011 }
9721012 else
1013+ {
9731014 cell.setExpiresAt (std::chrono::time_point<std::chrono::system_clock>::max ());
1015+ cell.strict_max = now + std::chrono::seconds{strict_max_lifetime_seconds};
1016+ }
1017+
9741018
9751019 // / Set null_value for each attribute
9761020 cell.setDefault ();
0 commit comments