Skip to content

2PC Mechanisms, r.eval() and r.rql() proposal #1863

@nviennot

Description

@nviennot

While we don't support any ACID transactions, it would still be desirable to support some easy ways to perform two phase commits.

A simple example is to consider an game application with players. players are stored in a single table. Each player have a money attribute. I wish to implement a transfer money feature with the following requirements:

  • A player can send money to another player as long as his available balance remains positive.
  • After performing a money transfert of amount X, eventually the sender should have his balance decreased by X, and the receiver should gain X.

These requirements have to withstand any sort of application server crash, or network failure, or slow network, or slow application servers.

Intuitively, we want to perform atomically:

  • Check if the balance of Sender is positive
  • If yes, decrement his balance, and increment the balance of Receiver.

Using a separate lock server like ZooKeeper would not work, because my application server could be arbitrarily slow or loose the connection with the lock server, so the app server could could lose the lock while performing the decrement/increment.

Okay, so that leaves us with the atomic operations of rethinkdb.

We can decrement the balance so that the positive balance invariant is always respected, but then if the app server dies right after that, the Receiver account will never be incremented.
Well, that's fixable, if we remember that we need to do an increment on the receiver account in a pending_sends set. Let's pick a some random xid to denote the money transaction id. So the update is now "if balance > X, decrement balance by X, and add to pending_sends set the pair [xid, receiver_id, X]". This way, if the app server dies, we remembered that we have to increment receiver_id's balance X amount of money. We still have the problem of having incrementing the receiver's account by X, and removing the xid from the sender's pending_sends array to prevent giving the receiver too much money. That's fixable too, we can add a pending_receives set on the receiver side, and add [xid, X] to the set. Once done, we can safely remove the xid from the pending_sends. And once done we can atomically increment the sender's account and remove the xid from his pending_receives.

That's pretty complicated. What happens if we want to do a transaction with N players, and what if we also want to create and delete documents along the way.

We need some ways to do a 2PC in an elegant manner. Considering only update operations does not lose generalization as creates/deletes operations can be emulated by shadowing documents on the app side with a "visible" attribute that gets updated.


Ideally, we want something like r.transaction(write_opertions) to enqueue all of our writes. If one of them cannot be performed, don't perform any of them. if they can all be performed, do them all. We obviously don't care about isolation (readers can see some of the writes already done, some not yet done).

With the players example we would do something like:

r.transaction(
  r.table('players').get('sender_id').update { |doc|
    r.branch(doc[:balance] > X,
             doc[:balance] = doc[:balance] - X,
             r.error('no enough money'))
  },
  r.table('players').get('receiver_id').update { |doc|
    doc[:balance] = doc[:balance] + X
  }
)

Well, we can't have this, otherwise specifying an invariant "doc[:balance] > X" in both queries would necessitate to take a lock on the two documents which would be a bad idea.


We could get away with it by providing r.eval() and r.rql() commands to let the application specify its mechanism:

  • r.rql(rql_expr) leaves the rql expression as is and doesn't evaluate it. It's just a wrapper for rql_expr.to_pb at the end of the day (but performed server side). For example:
r.table('players').get('sender_id').update(:todo =>
  r.rql(r.table('players').get('receiver_id').update(:stuff => '123')))

This would store the protobuf of the rql expression in the todo attribute (it's just a wrapper on .to_pb, but it's a bit nicer).

  • r.eval(rql_expr) runs the rql_expr on the server. For example:
r.table('players').get('sender_id').eval(:todo)
r.do(r.table('players').get('sender_id')) { |doc| r.eval(doc[:todo]) }

r.eval would have security implications, so perhaps it's better to only allow eval on a special type. Similarly with how dates are stored with $reql_type$, we could to do the same with the type of r.rql(). It somewhat solves the problem if only the database can write this special type to documents (to avoid a user passing a fancy hash as a value with this rql type).

Here is how it would work to do a 2pc:

def prepare_2pc(doc_updates)
  # doc_updates is a hash of rql selectors to rql updates.
  # e.g. { r.table('player').get('123') => r.do(unbound) { |doc| doc[:money] = doc[:money] + 1 } }

  # A transaction is identified with a xid.
  xid = UUID.generate

  # We saved all the document selectors involved in the 2pc first, because
  # as soon as we write to the database, we need to be able to rollback
  # the 2pc. This also gives us a table of transactions and the ability to
  # rollback pending transactions. We save all the updates to be performed
  # so we can commit the transaction later in time.
  r.table('transactions').insert(
    :id => xid,
    :state => 'prepare',
    :created_at => Time.now,
    :doc_selectors => doc_updates.keys.map { |s| r.rql(s) },
    :doc_updates => doc_updates.values.map { |s| r.rql(s) }
  ).run

  # Locking the documents in place, so concurrent transactions will fail.
  # current_xid can be namespaced to allow different transactions that
  # operates on different fields to occur. it could be current_balance_xid
  # for example.
  docs_selector.update { |doc|
    r.branch(doc[:current_xid],
             r.error('document is already in a transaction'),
             { doc[:current_xid] => xid })
  }.run

  return xid
rescue
  # Some document might have been involved in a transaction, we are out.
  rollback_2pc(xid)
  return nil
end

def rollback_2pc(xid)
  r.table('transactions').get(xid).update { |transaction|
    r.branch(transaction[:state].ne('commit'),
             {:state => 'rollback'},
             r.error("Cannot rollback, the transaction is committed"))
  }.run

  # We remove all the current_xid from the documents to unlock them
  r.do(r.table('transactions').get(xid)) { |transaction|
    transaction[:doc_selectors].map { |selector|
      r.eval(selector).update { |doc|
        r.branch(doc[:current_xid].eq(xid),
                 doc[:current_xid].update(:current_xid => nil),
                 { })
      }
    }
  }.run

  # And we finally remove the transaction
  r.table('transactions').get(xid).delete.run
end

def commit_2pc(xid)
  r.table('transactions').get(xid).update { |transaction|
    r.branch(transaction[:state].ne('rollback'),
             {:state => 'commit'},
             r.error("Cannot commit, the transaction is rollbacked"))
  }.run

  r.do(r.table('transactions').get(xid)) { |transaction|
    {:left => transaction[:doc_selectors],
     :right => transaction[:doc_updates]}.zip { |selector, update|
      r.eval(selector).update { |doc|
        r.branch(doc[:current_xid].eq(xid),        
                r.expr([doc[:current_xid] = nil, r.eval(update, doc)]),
                { })
      }
    }
  }.run

  r.table('transactions').get(xid).delete.run
end
xid = prepare_2pc(
  r.table('player').get('sender_id')   => r.do(unbound) { |doc| doc[:money] = doc[:money] - X },
  r.table('player').get('receiver_id') => r.do(unbound) { |doc| doc[:money] = doc[:money] + X }
)

if r.table('player').get('sender_id').run['money'] > X
  commit_2pc(xid)
else
  rollback_2pc(xid)
end

There are some issue where we need to call r.do(unbound) { |doc| ... } to pass a lambda with an argument to the db that can be serialized. That's because we need to be able to call r.eval(serialized_rql, the_bindings) at some point.

In short, r.eval() is pure awesomeness, because now we are able to completely abstract the 2pc mechanism easily. As a sys admin, I can come in and list the transactions in progress in the transactions table. I can also always finish a transaction safely (by committing if the transaction was already committed, or by doing a rollback) (there is a small race when preparing the transaction, but that could be easily avoided with an additional state), and this without knowing what the actual commit operations were, because there are stored in the transaction object in RQL.

Not that it would be impossible to call .to_pb manually, and parsing back the RQL putting some glue everywhere and recompiling it to some sane RQL, it feels much more natural to have it part RQL.
The 2pc implementation demonstrates the power of r.eval(), but I'm sure other use cases can leverage r.eval() to do interesting things.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions