Skip to content

Support system replicas queries for distributed#4935

Merged
alesapin merged 4 commits intoClickHouse:masterfrom
zhang2014:feature/support_system_replicas
Jun 17, 2019
Merged

Support system replicas queries for distributed#4935
alesapin merged 4 commits intoClickHouse:masterfrom
zhang2014:feature/support_system_replicas

Conversation

@zhang2014
Copy link
Copy Markdown
Contributor

@zhang2014 zhang2014 commented Apr 8, 2019

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Category :

  • Improvement

Short description :

Support SYSTEM SYNC REPLICA for distributed storage
Support SYSTEM START|STOP REPLICATED SENDS for distributed storage

@zhang2014 zhang2014 force-pushed the feature/support_system_replicas branch 2 times, most recently from 39006fe to c13b82f Compare April 8, 2019 07:19
@zhang2014 zhang2014 changed the title support system replicas queries for distributed Support system replicas queries for distributed Apr 8, 2019
target_table->checkPartitionCanBeDropped(partition);
}

ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the materialized view, maybe we should push down ActionLock ?

@KochetovNicolai KochetovNicolai added can be tested pr-improvement Pull request with some product improvements labels Apr 16, 2019
Copy link
Copy Markdown
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using of same queries for Replicated and Distributed tables is confusing. It would be better to add another query, for example SYSTEM SYNC DISTRIBUTED, SYSTEM STOP DISTRIBUTED SENDS, etc.

Also I'm not sure that this type of queries for Distributed tables are useful. It only triggers additional findFile and fails in case of exception in this function. But StorageDistributedDirectoryMonitor already triggers this function as frequent as possible in background thread and sleeps only in case of exceptions in findFiles. Which problem these queries solve? Maybe exponent in backoff calculation is too big?

std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{max_sleep_time});
tryLogCurrentException(getLoggerName().data());
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to write something to log about it.

void StorageDistributedDirectoryMonitor::syncReplicaSends()
{
if (quit || monitor_blocker.isCancelled())
throw Exception("Cancelled sync distributed sync replica sends.", ErrorCodes::ABORTED);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear message.

@zhang2014
Copy link
Copy Markdown
Contributor Author

Using of same queries for Replicated and Distributed tables is confusing. It would be better to add another query, for example SYSTEM SYNC DISTRIBUTED, SYSTEM STOP DISTRIBUTED SENDS, etc.

Done

Also I'm not sure that this type of queries for Distributed tables are useful. It only triggers additional findFile and fails in case of exception in this function. But StorageDistributedDirectoryMonitor already triggers this function as frequent as possible in background thread and sleeps only in case of exceptions in findFiles. Which problem these queries solve? Maybe exponent in backoff calculation is too big?

SYNC DISTRIBUTED is more convenient for writing tests, as well as some special scenarios, such as ensuring that no distributed data needs to be sent before data migration.

@alesapin
Copy link
Copy Markdown
Member

SYNC DISTRIBUTED is more convenient for writing tests, as well as some special scenarios, such as ensuring that no distributed data needs to be sent before data migration.

We don't wait here https://github.com/yandex/ClickHouse/pull/4935/files#diff-8890e3b1de70b013b79201d37463a0d4R98. findFiles https://github.com/yandex/ClickHouse/blob/master/dbms/src/Storages/Distributed/DirectoryMonitor.cpp#L182 just looks at files in directory at current moment. If some exception will be thrown from findFiles or concurrent insert will happen when SYNC DISTRIBUTED logic of the query will be broken.

@zhang2014
Copy link
Copy Markdown
Contributor Author

We don't wait here https://github.com/yandex/ClickHouse/pull/4935/files#diff-8890e3b1de70b013b79201d37463a0d4R98. findFiles https://github.com/yandex/ClickHouse/blob/master/dbms/src/Storages/Distributed/DirectoryMonitor.cpp#L182 just looks at files in directory at current moment.

We will call SYNC DISTRIBUTED after the write is stopped, considering that its more general semantics should ensure that the cluster can reach consistency at the moment of the call, right? for example, When I execute SYNC DISTRIBUTED after INSERT INTO distributed_xxx VALUES(1)(2)(3)(4), then I must be able to get 1234 in the cluster. In fact, I can even directly use SYNC DISTRIBUTED to synchronize the cluster data, just like this:

SYSTEM STOP DISTRIBUTED SENDS;

INSERT INTO distributed_xxx VALUES(1)(2)(3);

SYSTEM SYNC DISTRIBUTED;

INSERT INTO distributed_xxx VALUES(4)(5)(6);
 
SYSTEM SYNC DISTRIBUTED;

If some exception will be thrown from findFiles or concurrent insert will happen when SYNC DISTRIBUTED logic of the query will be broken.

This may not happen in my understanding, as ClickHouse uses a hard link to synchronize blocks of replicas data(https://github.com/yandex/ClickHouse/blob/master/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp#L563). at the same time, DirectoryMonitor lock are always acquired when SYNC DISTRIBUTED executed(https://github.com/yandex/ClickHouse/pull/4935/files#diff-8890e3b1de70b013b79201d37463a0d4R97).

@alexey-milovidov alexey-milovidov added pr-feature Pull request with new product feature and removed pr-improvement Pull request with some product improvements labels May 9, 2019

static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);

void syncReplicaSends();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading method name, because it's not about replicas.
(Distributed table may look at shards without replicas at all.)


static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);

void syncReplicaSends();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double whitespace.

throw Exception("Cancelled sync distributed sends.", ErrorCodes::ABORTED);

std::unique_lock lock{mutex};
findFiles();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method findFiles must be renamed.

@alexey-milovidov
Copy link
Copy Markdown
Member

Do you really need this command? In my opinion, it's much better to use synchronous distributed inserts (the setting insert_distributed_sync = 1)

Copy link
Copy Markdown
Member

@alexey-milovidov alexey-milovidov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

@zhang2014 zhang2014 force-pushed the feature/support_system_replicas branch from ce78f90 to 80788cd Compare May 29, 2019 02:44
@zhang2014
Copy link
Copy Markdown
Contributor Author

Do you really need this command? In my opinion, it's much better to use synchronous distributed inserts (the setting insert_distributed_sync = 1)

@alexey-milovidov

Motivation(from my friend):

Currently their clickhouse cluster is deployed in japan, us and china, and he needs clickhouse to provide a regular ability to synchronize between different nodes.

For now I recommend using the distributed engine replica and distributed_directory_monitor_batch_inserts, but this has some limitations (the synchronization interval cannot be controlled), and this pr is to solve this problem. just like what i said here

@zhang2014
Copy link
Copy Markdown
Contributor Author

This is the plan I gave him(translated from google translation):

For example, allow a delay between 20s for different regions:
    1. Configure distributed_directory_monitor_batch_inserts = 1, load_balancing=IN_ORDER (users.xml)
    2. Create distributed, corresponding to the China, Japanese, and United States computer rooms, for example:
    ```
        China computer room configuration:
        <cluster_name_xxx>
            <shared>
                <replica>China</replica>
                <replica>Japan</replica>
                <replica>United States</replica>
            </shared>
        </cluster_name_xxx>
        Japanese computer room configuration:
        <cluster_name_xxx>
            <shared>
                <replica>Japan</replica>
                <replica>China</replica>
                <replica>United States</replica>
            </shared>
        </cluster_name_xxx>
        US computer room configuration:
        <cluster_name_xxx>
            <shared>
                <replica>United States</replica>
                <replica>Japan</replica>
                <replica>China</replica>
            </shared>
        </cluster_name_xxx>
    ```
    3. Create a corresponding distributed, mergetree, for example:
    ```
        CREATE TABLE local(...)ENGINE = MergeTree(...)
        CREATE TABLE distributed(...)ENGINE = Distributed(cluster_name_xxx, default, local)
    ```
    4. The computer room in each area can be written into the distributed table of its own computer room.
    5. If https://github.com/yandex/ClickHouse/pull/4935 is available, you can turn off timing synchronization in each instance of the machine room.
    ```
      SYSTEM STOP DISTRIBUTED SENDS distributed
      --- Simultaneously start timing tasks based on tolerable delays between zones, for example every 20 seconds:
      SYSTEM FLUSH DISTRIBUTED distributed;
    ```
plan description:
    Each machine room can be queried immediately (in milliseconds) for its own computer room. It can be checked after 20 seconds for other computer rooms (regardless of transmission time and cost); About high availability: According to the configuration of the equipment room, select in order Available nodes, for example, in the Chinese computer room (assuming that distributed and local are distributed at different nodes, some nodes in the same room are available). If the replica of the Chinese machine room is available, then the Chinese computer room has priority access to the local table of the Chinese computer room (at the time of inquiry). Once the Chinese computer room is unavailable, visit the local table of the Japanese computer room (at the time of inquiry), and visit the local table of the US computer room once the Chinese computer room and the Japanese computer room are unavailable (at the time of inquiry)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-feature Pull request with new product feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants