MaterializedPostgreSQL: allow dynamically adding/deleting tables, altering settings#28301
Conversation
…alized-postgresql
alesapin
left a comment
There was a problem hiding this comment.
Need to fix potential race conditions.
src/Databases/DatabaseAtomic.h
Outdated
| String path_to_table_symlinks; | ||
| String path_to_metadata_symlink; | ||
| const UUID db_uuid; | ||
| ASTPtr storage_def; |
There was a problem hiding this comment.
Why database atomic now have storage? What is it?
There was a problem hiding this comment.
Hm, no, it is already deleted..
src/Databases/IDatabase.h
Outdated
|
|
||
| virtual void modifySettingsMetadata(const SettingsChanges &, ContextPtr) | ||
| { | ||
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database engine {} does not support settings", getEngineName()); |
There was a problem hiding this comment.
Doesn't support settings alter?
src/Databases/IDatabase.h
Outdated
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database engine {} does not support settings", getEngineName()); | ||
| } | ||
|
|
||
| virtual void tryApplySettings(const SettingsChanges &, ContextPtr) |
There was a problem hiding this comment.
What is the difference between this method and previous? how modifySettingsMetadata differs from tryApplySettings? BTW, tryXXX usually returns bool.
| const String & metadata_path_, | ||
| UUID uuid_, | ||
| const ASTStorage * database_engine_define_, | ||
| ASTPtr storage_def_, |
There was a problem hiding this comment.
database_engine_define_ is much more clear, than storage_def.
| } | ||
|
|
||
|
|
||
| void DatabaseMaterializedPostgreSQL::tryApplySettings(const SettingsChanges & settings_changes, ContextPtr local_context) |
There was a problem hiding this comment.
local_context -> query_context?
| } | ||
|
|
||
|
|
||
| String DatabaseMaterializedPostgreSQL::getTablesList(const String & except) const |
There was a problem hiding this comment.
Unclear method interface. Method return string, not list. What is except -- it's also comma-separated string?
| if (!alter_commands.empty()) | ||
| { | ||
| database->checkAlterIsPossible(alter_commands, getContext()); | ||
| alter_commands.apply(database, getContext()); |
There was a problem hiding this comment.
It's better to leave AlterCommands as lightweight class which operates on AST's or some simple structures like StorageInMemoryMetadata. Taking IDatabase as argument is not the best solution.
src/Storages/AlterCommands.cpp
Outdated
| { | ||
| if (command.type == AlterCommand::MODIFY_DATABASE_SETTING) | ||
| { | ||
| database->tryApplySettings(command.settings_changes, context); |
There was a problem hiding this comment.
What if database will be dropped here?
There was a problem hiding this comment.
Changed the order of writing settings changes to disk and writing to postgresql replication flow.
Then if we crash in between, it will be Ok for this database engine because it always checks metadata on startup and makes according modifications. Though cannot say the same for other engines if they will also support settings changes..
| : log(&Poco::Logger::get("PostgreSQLReaplicaConsumer")) | ||
| Storages storages_, | ||
| const String & name_for_logger) | ||
| : log(&Poco::Logger::get("PostgreSQLReplicaConsumer("+ name_for_logger +")")) |
There was a problem hiding this comment.
| : log(&Poco::Logger::get("PostgreSQLReplicaConsumer("+ name_for_logger +")")) | |
| : log(&Poco::Logger::get("PostgreSQLReplicaConsumer(" + name_for_logger + ")")) |
alesapin
left a comment
There was a problem hiding this comment.
Alter part is almost ok (expect overwrite), locking also ok. Barely understand postgres replication part, but looks like we create too many connections. Maybe we can have one shared connection for handler?
src/Databases/DatabaseAtomic.cpp
Outdated
| #include <Databases/DatabaseOnDisk.h> | ||
| #include <IO/ReadHelpers.h> | ||
| #include <IO/WriteHelpers.h> | ||
| #include <IO/WriteBufferFromFile.h> |
| if (!query_context->isInternalQuery()) | ||
| throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changing setting `{}` is not allowed", change.name); | ||
|
|
||
| DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context); |
There was a problem hiding this comment.
What if we changed 10 settings at one query? We will override whole metadata on disk 10 times?
| { | ||
| /// If there is query context then we need to detach materialized storage. | ||
| /// If there is no query context then we need to detach internal storage from atomic database. | ||
| if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext()) |
There was a problem hiding this comment.
Maybe move this to a separate method?)
There was a problem hiding this comment.
No, I think it is better here. It is rather compact.
| try | ||
| { | ||
| nested_storages[table_name] = loadFromSnapshot(snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>()); | ||
| nested_storages[table_name] = loadFromSnapshot(tmp_connection, snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>()); |
There was a problem hiding this comment.
| nested_storages[table_name] = loadFromSnapshot(tmp_connection, snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>()); | |
| nested_storages[table_name] = loadFromSnapshot(tmp_connection, snapshot_name, table_name, storage->as<StorageMaterializedPostgreSQL>()); |
| StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name, const String & table_name, | ||
| ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializedPostgreSQL * storage, const String & table_name) | ||
| { | ||
| postgres::Connection connection(connection_info); |
There was a problem hiding this comment.
I see pqlib (and pqxx) for the first time. All connections will be correctly closed in destructors?
There was a problem hiding this comment.
BTW, is it ok to create new connection each time?
There was a problem hiding this comment.
I see pqlib (and pqxx) for the first time. All connections will be correctly closed in destructors?
yes, they will be closed.
BTW, is it ok to create new connection each time?
Yes, quite bad.
I cannot make one global connection for all methods in this replication handler becuase it is dangerous as they can be called concurrently.
A solution could be to add a connection pool here. But I am not sure it is for the better because all these connections are opened only while initial tables dump or when some non ordinary rare actions are made.
Overall in normal lifestyle of this engine (after initial synchronization is completed) there will be only 1 active connection at a time.
| auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>(); | ||
| try | ||
| { | ||
| /// FIXME: Looks like it is possible we might get here if there is no nested storage or at least nested storage id field might be empty. |
There was a problem hiding this comment.
Not very scary because "nested storage id might be empty" means not that there is no StorageID in some storage, but that a field nested_table_id in MaterializedPostgreSQL storage is std::nullopt.
Is is set in 2 cases:
- At constructor -- if MaterializedPostgreSQL storage is created with a ready fully made nested storage passed into its constrcutor.
- in storage->prepare() call -- right after materialized storage has created and saved nested storage inside itself.
I'll add this to a comment.
No I changed my mind..
Very scary.
Yes, it is scary.
(if it will happen, but I (want to) believe its not possible)
|
|
||
| auto nested = table_to_delete->as<StorageMaterializedPostgreSQL>()->getNested(); | ||
| if (!nested) | ||
| throw Exception(ErrorCodes::UNKNOWN_TABLE, "Inner table `{}` does not exist", table_name); |
There was a problem hiding this comment.
Looks like logical error? Or it can be dropped manually?
There was a problem hiding this comment.
BTW, is it ok to create new connection each time?
yes, fixed.
Or it can be dropped manually?
no.
| { | ||
| auto current_context = Context::createCopy(getContext()->getGlobalContext()); | ||
| current_context->makeQueryContext(); | ||
| DatabaseAtomic::dropTable(current_context, table_name, true); |
There was a problem hiding this comment.
But why we drop it in detach method?
There was a problem hiding this comment.
Because postgres table can be removed from replication via detach only -- for consistency with attach (because we cannot add table to replication via create table because we do not know table structure).
So we call detach for materialized table and drop for inner table because this is the behaviour I need.
|
|
||
| try | ||
| { | ||
| auto tables_to_replicate = settings->materialized_postgresql_tables_list.value; |
There was a problem hiding this comment.
I don't understand where the source of truth? Which tables we have in materialized_postgresql_tables_list which tables we have in materialized_tables?
There was a problem hiding this comment.
I don't understand where the source of truth?
Sounds so philosophically...
p.s. discussed in dm that I need to add more comments.
src/Databases/IDatabase.h
Outdated
| /// Delete data and metadata stored inside the database, if exists. | ||
| virtual void drop(ContextPtr /*context*/) {} | ||
|
|
||
| virtual void applyNewSettings(const SettingsChanges &, ContextPtr) |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Support adding / deleting tables to replication from PostgreSQL dynamically in database engine MaterializedPostgreSQL. Support alter for database settings. Closes #27573.