Skip to content

[RFC] Limited support for transactions in MergeTree tables #22086

@tavplubix

Description

@tavplubix

Use case
Main use cases:

  • Ability to execute multiple INSERTs (maybe into multiple MergeTree tables, including materialized views, Atomic inserts (on parts level) to main table and all dependant Materialized views #11909) in a single "transaction". All INSERT queries in a "transaction" must either completely succeed or fail. All inserted data parts became visible only when "transaction" is committed.
  • Ability to execute multiple SELECTs in a single "transaction", so all of them will read from one consistent snapshot of all MergeTree tables.

It also may be useful to support transactional ALTER ... PARTITION and ALTER ... UPDATE/DELETE, but the first implementation probably will support only SELECT and INSERT queries. Exception will be thrown on attempt to execute non-transactional query inside transaction.

Describe the solution you'd like
MVCC-based transactions on data parts level. It will easily provide Snapshot Isolation (a bit stronger than Repeatable Read) for reading queries. As for writing queries, Snapshot Isolation requires to detect conflicts and rollback a transaction if it tries to modify an object which was modified by concurrent transaction. It's not a problem for concurrent INSERT queries, because they just append new data to table by creating new data parts, so two INSERTs cannot modify single object and cannot cause write-write conflict (but there is a nuance with Replacing, Collapsing and other special *MergeTree engines). To support transactional ALTERs it should be enough to forbid concurrent merging/mutating operations with overlapping source parts sets and do not assign such operations on future parts (because creation of future part can be rolled back).

Implementation details for simple MergeTree

Data parts versioning
Each writing transaction has some unique identifier, let's name it tid. We use tids to understand which transaction has modified/is going to modify each part. We will obtain tid in the beginning of transaction and pass it through query/session Context anywhere it needed. If a data part is involved in some transaction, then it must contain the following metadata:

  • mintid - tid of transaction that has created the part
  • maxtid - NULL or tid of transaction that has removed the part (it's mostly for ALTER ... PARTITIONs, but it also may be useful for ALTER UPDATE/DELETE and background merges, see below)
  • mincsn - NULL or "cached" CSN(mintid), described below
  • maxcsn.- NULL or "cached" CSN(maxtid), described below

When some query in a transaction creates or removes data part, it writes the corresponding mintid or maxtid. It's the only difference between transactional and non-transactional writing query.

Commit
CSN (Commit Sequence Number) indicates a point in time when transaction was committed and its changes in part sets became visible for other transactions. We will maintain tid to csn mapping to understand when each part was committed.
Note: If same data part is in Commited or Outdated state it does not mean that part is actually created or removed in terms of transactions. It means only that the corresponding changes are applied to parts set in some table and ready to become visible after the whole transaction is commited. If some query reads data without creating a transaction it will just see parts in Commited state.

When transaction is committing, it allocates new CSN from a global monotonic counter (for not replicated case it can be just static variable, for replicated case we can use sequential nodes in ZooKeeper, see below), writes its tid and allocated csn into transaction log (WAL for not replicated case, for replicated see below) and updates in-memory tid to csn mapping. After that it writes the corresponding mincsn/maxcsn into data part (it's an optional step, because we always can get csn by tid from the mapping or transaction log).

Rollback
Transaction is rolled back automatically on any uncaught exception. On rollback it should change state of created parts to Outdated and reset maxtid of removed parts to NULL.

Snapshots and data parts visibility
Snapshot is just a CSN. When transaction starts it gets current value of CSN counter and uses it as current version, let's name it snapshotVersion. Data part is visible for transaction if:

  • Part does not have version metadata at all. It means that part was created before transactions were enabled or part was created by non-transactional query. Note: isolation level falls to Read Uncommitted if there are concurrent non-transactional writing queries.
  • Part was created by current transaction, i.e. mintid == current_tid
  • Part creation was committed before we took the snapshot and part removal was not and part was not removed by current transaction, i.e. CSN(mintid) && CSN(mintid) <= snapshotVersion && (!CSN(maxtid) || snapshotVersion < CSN(maxtid)) && (!maxtid || maxtid != current_tid)

Note: It's the first reason why do we need separate transaction ids and CSNs with mapping between them. We can use only tids to define parts visibility, but in this case we have to save a list of concurrent transactions when taking a snapshot and check if each of them is already committed or not to determine part visibility.

Backgound merges
Multiple parts can be merged if all of them are visible with current snapshot version. We also can merge parts if all of them have the same mintid and does not have maxtid, but I'm not sure if we need merge such parts.

Since merge does not actually modify data (at least for ordinary MergeTree), we can just choose maximum of CSNs of the source parts as a CSN of result part and write the corresponding maxtid to each source part, so transactions will not know anything about background merges. Of course, in this case merge should modify version metadata only when result part is ready. It means that merge will fail if maxtid appeared in some source part due to concurrent ALTER ... PARTITION or ALTER ... UPDATE/DELETE, so such queries should cancel merges of the corresponding parts (if any). It may look like significant change, but we already have removePartProducingOpsInRange(...) which does similar thing.

On the other hand, it may be useful to allocate tids and csns for merges as for other transactions (and seems like we have to do it for special *MergeTree engines such as ReplacingMergeTree).

Background cleanup
As before, background thread looks for Outdated parts with reference counter equal to 1. But it should not remove part if there is no maxcsn or if mincsn is greater than the minimum snapshotVersion of running transactions.

ALTER PARTITION, ALTER UPDATE/DELETE and OPTIMIZE
Unlike INSERTs, such queries may try to concurrently remove the same part and replace it with other part causing write-write conflict. We can use maxtid to detect such conflicts. Query writes its tid into maxtid before making any changes. If maxtid is not NULL, i.e. other transaction is currently tries to remove the part, query throws Serialization error.

Durability and server startup
We have two options here:

  1. Write version metadata into data part on disk (see the note about fsync below). Transaction log will contain only (tid, csn) pairs of committed transactions. On restart we will know tids of every data part and will know if the corresponding transactions were committed or not. It will be easier to rotate the log in this case.
  2. Maintain version metadata in memory and write its modifications into transaction log. We will restore data parts state from the log on server startup. To rotate the log we have either write CSNs of old transactions into data parts or write version metadata of all parts into new log.

On server startup we will remove all parts which does not have CSN(mintid) or have CSN(maxtid).

Note: We do not fsync data parts (at least by default), so any data part may become broken in case of hard restart. Committed transaction can be partially rolled back in this case even if all version metadata were reliably written somewhere.

Making it work with ReplicatedMergeTree

For ReplicatedMergeTree we have to allocate CSNs through sequential nodes in ZooKeeper and write tids into these nodes. But tids still can be obtained locally, we only need to guarantee that tid is unique across all replicas. So we can use (snapshotVersion, local_tid_counter, host_id) tuple as tid (we don't really need snapshotVersion in tid, but it can be useful for introspection, because it's impossible to relate local tids of different replicas). Replica can commit multiple transactions into one CSN to reduce workload on ZooKeeper (it's the second reason why we need separate tids and CSNs). It's possible to ensure that other replicas did not make some commits concurrently by checking cversion of parent node.

To make snapshots and part visibility work with replication, we have to add tid into ReplicatedMergeTreeLogEntryData. Snapshot version on some replica is the maximal CSN such that all corresponding log entries are executed (so replica have all visible parts locally).

We can use separate CSN logs in ZooKeeper for different shards.
Distributed transactions across multiple shards are out of scope of this RFC.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions