Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions dbms/src/Storages/Distributed/DirectoryMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ namespace


StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool(std::move(pool_))
, name(std::move(name_))
, path{storage.path + name + '/'}
, path{path_ + '/'}
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
Expand Down Expand Up @@ -692,10 +691,10 @@ std::string StorageDistributedDirectoryMonitor::getLoggerName() const
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor";
}

void StorageDistributedDirectoryMonitor::updatePath()
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path)
{
std::lock_guard lock{mutex};
path = storage.path + name + '/';
path = new_path;
current_batch_file_path = path + "current_batch.txt";
}

Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/Distributed/DirectoryMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);

~StorageDistributedDirectoryMonitor();

static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);

void updatePath();
void updatePath(const std::string & new_path);

void flushAllData();

Expand All @@ -47,7 +47,6 @@ class StorageDistributedDirectoryMonitor

StorageDistributed & storage;
const ConnectionPoolPtr pool;
const std::string name;
std::string path;

const bool should_batch_inserts = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Disks/DiskSpaceMonitor.h>

#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
Expand Down Expand Up @@ -563,11 +564,12 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';
const auto & [disk, data_path] = storage.getPath();
const std::string path(disk + data_path + dir_name + '/');

/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
storage.requireDirectoryMonitor(disk, dir_name);

const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;
Expand Down
156 changes: 104 additions & 52 deletions dbms/src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <DataStreams/OneBlockInputStream.h>

#include <Databases/IDatabase.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/DiskLocal.h>

#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypesNumber.h>
Expand Down Expand Up @@ -146,12 +148,6 @@ UInt64 getMaximumFileNumber(const std::string & dir_path)
return res;
}

void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & increment)
{
if (!path.empty())
increment.set(getMaximumFileNumber(path));
}

/// the same as DistributedBlockOutputStream::createSelector, should it be static?
IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
{
Expand Down Expand Up @@ -204,6 +200,7 @@ static ExpressionActionsPtr buildShardingKeyExpression(const ASTPtr & sharding_k
return ExpressionAnalyzer(query, syntax_result, context).getActions(project);
}


StorageDistributed::StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
Expand All @@ -213,6 +210,7 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & relative_data_path_,
bool attach_)
: IStorage(id_,
Expand All @@ -226,7 +224,8 @@ StorageDistributed::StorageDistributed(
, global_context(context_)
, cluster_name(global_context.getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, path(relative_data_path_.empty() ? "" : (context_.getPath() + relative_data_path_))
, storage_policy(storage_policy_)
, relative_data_path(relative_data_path_)
{
setColumns(columns_);
setConstraints(constraints_);
Expand All @@ -237,6 +236,9 @@ StorageDistributed::StorageDistributed(
sharding_key_column_name = sharding_key_->getColumnName();
}

if (!relative_data_path.empty())
createStorage();

/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
{
Expand All @@ -255,13 +257,34 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & relative_data_path_,
bool attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, relative_data_path_, attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_, relative_data_path_, attach)
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}

void StorageDistributed::createStorage()
{
/// Create default policy with the relative_data_path_
if (storage_policy.empty())
{
std::string path(global_context.getPath());
/// Disk must ends with '/'
if (!path.ends_with('/'))
path += '/';
auto disk = std::make_shared<DiskLocal>("default", path, 0);
volume = std::make_shared<Volume>("default", std::vector<DiskPtr>{disk}, 0);
}
else
{
auto policy = global_context.getStoragePolicySelector()[storage_policy];
if (policy->getVolumes().size() != 1)
throw Exception("Policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
volume = policy->getVolume(0);
}
}

StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
Expand All @@ -271,7 +294,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
ClusterPtr owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = std::move(owned_cluster_);
return res;
}
Expand All @@ -284,7 +307,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = owned_cluster_;
return res;
}
Expand Down Expand Up @@ -373,7 +396,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
const auto & settings = context.getSettingsRef();

/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync)
if (!volume && !owned_cluster && !settings.insert_distributed_sync)
{
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);
Expand Down Expand Up @@ -426,8 +449,19 @@ void StorageDistributed::alter(const AlterCommands & params, const Context & con

void StorageDistributed::startup()
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
if (!volume)
return;

for (const DiskPtr & disk : volume->disks)
createDirectoryMonitors(disk->getPath());

for (const String & path : getDataPaths())
{
UInt64 inc = getMaximumFileNumber(path);
if (inc > file_names_increment.value)
file_names_increment.value.store(inc);
}
LOG_DEBUG(log, "Auto-increment is " << file_names_increment.value);
}


Expand All @@ -436,6 +470,18 @@ void StorageDistributed::shutdown()
cluster_nodes_data.clear();
}

Strings StorageDistributed::getDataPaths() const
{
Strings paths;

if (relative_data_path.empty())
return paths;

for (const DiskPtr & disk : volume->disks)
paths.push_back(disk->getPath() + relative_data_path);

return paths;
}

void StorageDistributed::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
Expand Down Expand Up @@ -481,57 +527,43 @@ bool StorageDistributed::hasColumn(const String & column_name) const
return virtual_columns.count(column_name) || getColumns().hasPhysical(column_name);
}

void StorageDistributed::createDirectoryMonitors()
void StorageDistributed::createDirectoryMonitors(const std::string & disk)
{
if (path.empty())
return;

const std::string path(disk + relative_data_path);
Poco::File{path}.createDirectories();

std::filesystem::directory_iterator begin(path);
std::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
if (std::filesystem::is_directory(*it))
requireDirectoryMonitor(it->path().filename().string());
requireDirectoryMonitor(disk, it->path().filename().string());
}


void StorageDistributed::requireDirectoryMonitor(const std::string & name)
void StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this, monitors_blocker);
}
const std::string path(disk + relative_data_path + name);
const std::string key(disk + name);

ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[name];
node_data.requireConnectionPool(name, *this);
return node_data.conneciton_pool;
auto & node_data = cluster_nodes_data[key];
node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(*this, path, node_data.conneciton_pool, monitors_blocker);
}

size_t StorageDistributed::getShardCount() const
{
return getCluster()->getShardCount();
}

ClusterPtr StorageDistributed::getCluster() const
{
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}

void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::string & name, const StorageDistributed & storage)
std::pair<const std::string &, const std::string &> StorageDistributed::getPath()
{
if (!conneciton_pool)
conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
return {volume->getNextDisk()->getPath(), relative_data_path};
}

void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(
const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker)
ClusterPtr StorageDistributed::getCluster() const
{
requireConnectionPool(name, storage);
if (!directory_monitor)
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool, monitor_blocker);
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}

void StorageDistributed::ClusterNodeData::flushAllData()
Expand Down Expand Up @@ -613,16 +645,26 @@ void StorageDistributed::flushClusterNodesAllData()
void StorageDistributed::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name,
TableStructureWriteLockHolder &)
{
if (!path.empty())
if (!relative_data_path.empty())
renameOnDisk(new_path_to_table_data);
renameInMemory(new_database_name, new_table_name);
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : volume->disks)
{
auto new_path = global_context.getPath() + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
path = new_path;
const String path(disk->getPath());
auto new_path = path + new_path_to_table_data;
Poco::File(path + relative_data_path).renameTo(new_path);

LOG_DEBUG(log, "Updating path to " << new_path);

std::lock_guard lock(cluster_nodes_mutex);
for (auto & node : cluster_nodes_data)
node.second.directory_monitor->updatePath();
node.second.directory_monitor->updatePath(new_path);
}
renameInMemory(new_database_name, new_table_name);

relative_data_path = new_path_to_table_data;
}


Expand All @@ -634,6 +676,7 @@ void registerStorageDistributed(StorageFactory & factory)
* - name of cluster in configuration;
* - name of remote database;
* - name of remote table;
* - policy to store data in;
*
* Remote database may be specified in following form:
* - identifier;
Expand All @@ -644,10 +687,15 @@ void registerStorageDistributed(StorageFactory & factory)

ASTs & engine_args = args.engine_args;

if (!(engine_args.size() == 3 || engine_args.size() == 4))
throw Exception("Storage Distributed requires 3 or 4 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table,"
" sharding key expression (optional).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (engine_args.size() < 3 || engine_args.size() > 5)
throw Exception(
"Storage Distributed requires from 3 to 5 parameters - "
"name of configuration section with list of remote servers, "
"name of remote database, "
"name of remote table, "
"sharding key expression (optional), "
"policy to store data in (optional).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

String cluster_name = getClusterName(*engine_args[0]);

Expand All @@ -657,7 +705,8 @@ void registerStorageDistributed(StorageFactory & factory)
String remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();

const auto & sharding_key = engine_args.size() == 4 ? engine_args[3] : nullptr;
const auto & sharding_key = engine_args.size() >= 4 ? engine_args[3] : nullptr;
const auto & storage_policy = engine_args.size() >= 5 ? engine_args[4]->as<ASTLiteral &>().value.safeGet<String>() : "";

/// Check that sharding_key exists in the table and has numeric type.
if (sharding_key)
Expand All @@ -678,7 +727,10 @@ void registerStorageDistributed(StorageFactory & factory)
return StorageDistributed::create(
args.table_id, args.columns, args.constraints,
remote_database, remote_table, cluster_name,
args.context, sharding_key, args.relative_data_path,
args.context,
sharding_key,
storage_policy,
args.relative_data_path,
args.attach);
});
}
Expand Down
Loading