Showing posts with label distribution. Show all posts
Showing posts with label distribution. Show all posts

Monday, 19 February 2018

Distributed iteration improvements

Infinispan hasn't always provided a way for iterating upon entries in a distributed cache. In fact the first iteration wasn't until Infinispan 7. Then in Infinispan 8, with the addition of Java 8, we fully integrated this into distributed streams, which brought some minor iteration improvements in performance.

We are proud to announce that with Infinispan 9.2 there are even more improvements. This contains no API changes, although those will surely come in the future. This one is purely for performance and utilization.

New implementation details

 

There are a few different aspects that have been changed.  A lot of these revolve around the amount of entries being retrieved at once, which if you are familiar with DistributedStreams can be configured via the distributedBatchSize method. Note that if this is not specified it defaults to the chunk size in state transfer.

Entry retrieval is now pull based instead of push

Infinispan core (embedded) has added rxjava2 and reactive streams as dependencies and rewrote all of the old push style iterator code over to pull style to fully utilize the Publisher and Subscriber interfaces.

With this we only pull up to the batchSize in entries at a time from any set of nodes. The old style utilized push with call stack blocking, which could return up two times the amount of entries. Also since we aren't performing call stack blocking, we don't have to waste threads as these calls to retrieve entries are done async and finish very quickly irrespective of user interaction. The old method required multiple threads to be reserved for this purpose.

Streamed batches

The responses from a remote node are written directly to the output stream so there are no intermediate collections allocated. This means we only have to iterate upon the data once as we retain the iterator between requests. On the originator we still have to store the batches in a collection to be enqueued for the user to pull.

Rewritten Parallel Distribution

Great care was taken to implement parallel distribution in a way to vastly reduce contention and ensure that we properly follow the batchSize configuration.

When parallel distribution is in use the new implementation will start 4 remote node requests sharing the batch size (so each one gets 1/4). This way we can guarantee that we only have the desired size irrespective of the number of nodes in the cluster. The old implementation would request batchSize from all nodes at the same time. So not only did it reserve a thread for node but could easily swamp your JVM memory, causing OutOfMemoryErrors (which no one likes). The latter alone made us force the default to be sequential distribution when using an iterator.

The old implementation would write entries from all nodes (including local) to the same shared queue. The new implementation has a different queue for each request, which allows for faster queues with no locking to be used.

Due to these changes and other isolations between threads, we can now make parallel distribution the default setting for the iterator method. And as you will see this has improved performance nicely.

Performance


We have written a JMH test harness specifically for this blog post, testing 9.1.5.Final build against latest 9.2.0.SNAPSHOT. The test runs by default with 4GB of heap with 6 nodes in a distributed cache with 2 owners. It has varying entry count, entry sizes and distributed batch sizes.

Due to the variance in each test a large number of tests were ran and with different permutations to make sure it covered a large amount of test cases. The JMH test that was ran can be found at github. All the default settings were used for the run except -t4 (runs with 4 worker threads) was provided. This was all ran on my measly laptop (i7-4810MQ and 16 GB) - maxing out the CPU was not a hard task.

CAVEAT: The tests don't do anything with the iterator and just try to pull them as fast as they can. Obviously if you have a lot of processing done between iterations you will likely not see as good of a performance increase.

The entire results can be found here. It shows each permutation and how many operations per second and finds the difference (green shows 5% or more and red shows -5% or less).


Operation Average Gain Code
Specified Distribution Mode 3.5% .entrySet().stream().sequentialDistribution.iterator()
Default 11% .entrySet().iterator()
No Rehash 14% .entrySet().stream().disableRehashAware().iterator()

The above 3 rows show a few different ways you could have been invoking the iterator method. The second row is probably by far the most used case. In this case you should see around a 11% increase in performance (results will vary). This is due to the new pulling method as well as parallel distribution becoming the new default running mode. It is unlikely a user was using the other 2 methods, but are provided for a more complete view.

If you were specifying a distribution mode manually, either sequential or distribution you will only see a few percent faster run (3.5%), but every little bit helps! Also if you can switch to parallel you may want to think about doing so.

Also you can see if you were running with rehash disabled prior, it has even more gains (14%). Those don't even include the fact that no rehash was 28% faster than with before (which means it is about 32% faster in general now). So if you can get away with a at most once guarantee, disabling rehash will provide the best throughput.

Whats next? 


As was mentioned this is not exposed to the user directly. You still interact with the iterator as you would normally. We should remedy this at some point.

Expose new method

We would love to eventually expose a method to return a Publisher directly to the user so that they can get the full benefits of having a pull based implementation underneath.

This way any intermediate operations applied to the stream before would be distributed and anything applied to the Publisher would be done locally. And just like the iterator method this publisher would be fully rehash aware if you have it configured to do so and would make sure you get all entries delivered in an exactly once fashion (rehash disabled guarantees at most once).

Another side benefit is that the Subscriber methods could be called on different threads so there is no overhead required on the ISPN side for coordinating these into queue(s). Thus the Subscriber should be able to retrieve all entries faster than just doing an iterator.

Java 9 Flow

Also many of you may be wondering why we aren't using the new Flow API introduced in Java 9. Luckily the Flow API is a 1:1 conversion of reactive streams. So whenever Infinispan will start supporting Java 9 interfaces/classes, we hope to properly expose these as the JDK classes.

Segment Based Iteration 

With Infinispan 9.3, we hope to introduce data container and cache store segment aware iteration. This means when iterating over either we would only have to process entries that map to a given segment. This should reduce the time and processing for iteration substantially, especially for cache stores. Keep your eyes out for a future blog post detailing these as 9.3 development commences.

Give us Feedback

We hope you find a bit more performance when working with your distributed iteration. Also we value any feedback on what you want our APIs to look like or find any bugs. As always let us know at any of the places listed here.

Thursday, 22 March 2012

Infinispan 5.1.3.CR1 out now!

The feedback keeps coming, particularly from AS7 users, so we've decided to do another point release in the 5.1 'Brahma' series. Apart from fixing several issues, including a critical L1 cache memory leak in active/passive set ups, this version enables the JBoss Marshaller class resolver to be configured via both the old and new programmatic configuration API. This enables Infinispan to provide a better solution for marshalling/unmarshalling classes in modular environments.

Full details of what has been fixed can be found here, and if you have feedback, please visit our forums. Finally, as always, you can download the release from here.

Cheers,
Galder

Tuesday, 19 April 2011

5.0.0.BETA2 released with better distribution!

A brand new Infinispan 5.0 "Pagoa" beta is out now, 5.0.0.BETA2 bringing even more goodies for Infinispan users:
  • Initial implementation of virtual nodes for consistent hash algorithm based distribution is included. This means that each Infinispan node can now pick multiple nodes in the hash wheel reducing the standard deviation and so improving the distribution of data. The configuration is done via the numVirtualNodes attribute in hash element.
  • The externalizer configuration has been revamped in order to make it more user-friendly! You only need the @SerializeWith annotation and an Externalizer implementation in its most basic form, but more advanced externalizer configuration is still available for particular use cases. The wiki on plugging externalizers has been rewritten to show these changes.
  • lazyDeserialization XML element has been renamed to storeAsBinary in order to better represent its function. The previous programmatic configuration for this option has been deprecated to help ease migration but your XML will need changing.
  • All references to JOPR, including the maven module name have been renamed to RHQ. So bear make sure you plug your RHQ server with infinispan-rhq-plugin.jar instead of infinispan-jopr-plugin.jar
There's some other minor API changes and fixes as show in the release notes. As always, please use the user forums to report back, grab the release here, enjoy and keep the feedback coming.

Cheers,
Galder

Wednesday, 19 January 2011

Introducing distributed execution and MapReduce framework

In case you did not pay attention to the area of large scale distributed computing – there is a revolution going on! It is becoming increasingly evident that the software ecosystems built around so called Big Data are at the forefront of cloud computing innovation. Unfortunately, there has been more debate around determining how big Big Data actually is rather than defining common set of requirements for the large scale Big Data computational platforms.
Stephen O'Grady of RedMonk summarized this phenomena succinctly: “Big Data, like NoSQL, has become a liability in most contexts. Setting aside the lack of a consistent definition, the term is of little utility because it is single-dimensional. Larger dataset sizes present unique computational challenges. But the structure, workload, accessibility and even location of the data may prove equally challenging.”
Zack Urlocker, an advisor and board member to several startup companies in the area of SaaS was equally vocal in his criticism regarding complexity of the existing systems : “You pretty much gotta be near genius level to build systems on top of Cassandra, Hadoop and the like today. These are powerful tools, but very low-level, equivalent to programming client server applications in assembly language. When it works its [sic] great, but the effort is significant and it’s probably beyond the scope of mainstream IT organizations.”
This is exactly where we are positioning Infinispan's roadmap as we are announcing initial steps into the area of distributed execution and MapReduce framework built on top of Infinispan. Infinispan's distributed data grid is a most natural fit for such a platform. We have already built an infrastructure for essentially unlimited linear in-memory data scaling. However, having such a data grid without an ability to execute large scale computation on it is like having a Ferrari without a drivers licence. Listening to the criticism regarding the lack of direction in Big Data field and complexity of the existing distributed execution frameworks our focus was primarily on simplicity without sacrificing power and a rich feature set such a framework should have.

Simple distributed execution model

The main interfaces for simple distributed task execution are DistributedCallable and DistributedExecutorService. DistributedCallable is essentially a version of the existing Callable from java.util.concurrent package except that DistributedCallable can be executed in remote JVM and receive input from Infinispan cache. Tasks' main algorithm is essentially unchanged, only the input source is changed. Exisiting Callable implementation most likely gets its input in a form of some Java object/primitive while DistributedCallable gets its input from Infinispan cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Infinispan execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.
public interface DistributedCallable extends Callable {
/**
* Invoked by execution environment after DistributedCallable
* has been migrated for execution to
* a specific Infinispan node.
*
* @param cache
*           cache whose keys are used as input data for
* this DistributedCallable task
* @param inputKeys
*           keys used as input for this DistributedCallable task
*/
public void setEnvironment(Cache cache, Set inputKeys);
}
DistributedExecutorService is an simple extension of a familiar ExecutorService from java.util.concurrent package. However, the advantages of DistributedExecutorService are not to be overlooked. For the existing Callable tasks users would submit to ExecutorService there is an option to submit them for an execution on Infinispan cluster. Infinispan execution environment would migrate this task to an execution node, run the task and return the results to the calling node. Of course, not all Callable task would benefit from this feature. Excellent candidates are long running and computationally intensive tasks.
The second advantage of the DistributedExecutorService is that it allows a quick and simple implementation of tasks that take input from Infinispan cache nodes, execute certain computation and return results to the caller. Users would specify which keys to use as input for specified DistributedCallable and submit that callable for execution on Infinispan cluster. Infinispan runtime would locate the appriate keys, migrate DistributedCallable to target execution node(s) and finally return a list of results for each executed Callable. Of course, users can omit specifying input keys in which case Infinispan would execute DistributedCallable on all keys for a specified cache.

MapReduce model

Infinispan's own MapReduce model is an adaptation of Google's original MapReduce. There are four main components in each map reduce task: Mapper, Reducer, Collator and MapReduceTask.
Implementation of a Mapper class is a component of a MapReduceTask invoked once for each input entry K,V. Every Mapper instance migrated to an Infinispan node, given a cache entry K,V input pair transforms that input pair into a result T. Intermediate result T is further reduced using a Reducer.
public interface Mapper {

/**
* Invoked once for each input cache entry
* K,V transforms that input into a result T.
*
* @param key
*           the kay
* @param value
*           the value
* @return result T
*/
T map(K key, V value);

}
Reducer, as its name implies, reduces a list of results T from map phase of MapReduceTask. Infinispan distributed execution environment creates one instance of Reducer per execution node.
public interface Reducer {

  
/**
* Reduces a result T from map phase and return R.
* Assume that on Infinispan node N, an instance
* of Mapper was mapped and invoked on k many
* key/value pairs. Each T(i) in the list of all
* T's returned from map phase executed on
* Infinispan node N is passed to reducer along
* with previsouly computed R(i-1). Finally the last
* invocation of reducer on T(k), R is returned to a
* distributed task that originated map/reduce
* request.
*
* @param mapResult
*           result T of map phase
* @param previouslyReduced
*           previously accumulated reduced result
* @return result R
*
*/ 
R reduce(T mapResult, R previouslyReduced);

}
Collator coordinates results from Reducers executed on Infinispan cluster and assembles a final result returned to an invoker of MapReduceTask.
public interface Collator {

/**
* Collates all results added so far and
* returns result R to invoker of distributed task.
*
* @return final result of distributed task computation
*/
R collate();

/**
* Invoked by runtime every time reduced result
* R is received from executed Reducer on remote
* nodes.
*
* @param remoteNode
*           address of the node where reduce phase occurred
* @param remoteResult
*           the result R of reduce phase
*/
void reducedResultReceived(Address remoteNode, R remoteResult);
}
Finally, MapReduceTask is a distributed task uniting Mapper, Reducer and Collator into a cohesive large scale computation to be transparently parallelized across Infinispan cluster nodes. Users of MapReduceTask need to provide a cache whose data is used as input for this task. Infinispan execution environment will instantiate and migrate instances of provided mappers and reducers seamlessly across Infinispan nodes. Unless otherwise specified using onKeys method input keys filter all available key value pairs of a specified cache will be used as input data for this task.
MapReduceTask implements a slightly different execution model from the original MapReduce proposed by Google. Here is the pseudocode of the MapReduceTask.
mapped = list()
for entry in cache.entries:
t = mapper.map(entry.key, entry.value)
mapped.add(t)

r = null
for t in mapped:
r = reducer.reduce(t, r)
return r to Infinispan node that invoked the task

On Infinispan node invoking this task:
reduced_results = invoke map reduce task on all nodes, retrieve map{address:result}
for r in reduced_results.entries:
remote_address = r.key
remote_reduced_result = r.value
collator.add(remote_address, remote_reduced_result)

return collator.collate()

Examples

In order to get a better feel for MapReduce framework lets have a look at the example related to Infinispan's grid file system. How would we calculate total size of all files in the system using MapReduce framework? Easy! Have a look at GridFileSizeExample.
public class GridFileSizeExample {
 public static void main(String arg[]) throws Exception {

Cache  cache = null;
MapReduceTask task =

new MapReduceTask(cache);

Long result = task.mappedWith(new Mapper() {

@Override
public Long map(String key, GridFile.Metadata value) {
return (long) value.getLength();
}

}).reducedWith(new Reducer() {

@Override
public Long reduce(Long mapResult, Long previouslyReduced) {
return previouslyReduced == null ? mapResult : mapResult + previouslyReduced;
}

}).collate(new Collator(){

private Long result = 0L;

@Override
public Long collate() {
return result;
}

@Override
public void reducedResultReceived(Address remoteNode, Long remoteResult) {
result += remoteResult;
}});

System.out.println("Total filesystem size is " + result + " bytes");

}
}


In conclusion, this is not a perfect and final distributed execution and MapReduce API that can satisfy requirements of all users but it is a good start. As we push forward and make it more feature rich while keeping it simple we are continuously looking for your feedback. Together we can reach the ambitious goals set out in the beginning of this article.

Friday, 21 August 2009

Distribution instead of Buddy Replication

People have often commented on Buddy Replication (from JBoss Cache) not being available in Infinispan, and have asked how Infinispan's far superior distribution mode works. I've decided to write this article to discuss the main differences from a high level. For deeper technical details, please visit the Infinispan wiki.

Scalability versus high availability
These two concepts are often at odds with one another, even though they are commonly lumped together. What is usually good for scalability isn't always good for high availability, and vice versa. When it comes to clustering servers, high availability often means simply maintaining more copies, so that if nodes fail - and with commodity hardware, this is expected - state is not lost. An extreme case of this is replicated mode, available in both JBoss Cache and Infinispan, where each node is a clone of its neighbour. This provides very high availability, but unfortunately, this does not scale well. Assume you have 2GB per node. Discounting overhead, with replicated mode, you can only address 2GB of space, regardless of how large the cluster is. Even if you had 100 nodes - seemingly 200GB of space! - you'd still only be able to address 2GB since each node maintains a redundant copy. Further, since every node needs a copy, a lot of network traffic is generated as the cluster size grows.

Enter Buddy Replication
Buddy Replication (BR) was originally devised as a solution to this scalability problem. BR does not replicate state to every other node in the cluster. Instead, it chooses a fixed number of 'backup' nodes and only replicates to these backups. The number of backups is configurable, but in general it means that the number of backups is fixed. BR improved scalability significantly and showed near-linear scalability with increasing cluster size. This means that as more nodes are added to a cluster, the space available grows linearly as does the available computing power if measured in transactions per second.

But Buddy Replication doesn't help everybody!
BR was specifically designed around the HTTP session caching use-case for the JBoss Application Server, and heavily optimised accordingly. As a result, session affinity is mandated, and applications that do not use session affinity can be prone to a lot of data gravitation and 'thrashing' - data is moved back and forth across a cluster as different nodes attempt to claim 'ownership' of state. Of course this is not a problem with JBoss AS and HTTP session caching - session affinity is recommended, available on most load balancer hardware and/or software, is taken for granted, and is a well-understood and employed paradigm for web-based applications.

So we had to get better
Just solving the HTTP session caching use-case wasn't enough. A well-performing data grid needs to to better, and crucially, session affinity cannot be taken for granted. And this was the primary reason for not porting BR to Infinispan. As such, Infinispan does not and will not support BR as it is too restrictive.

Distribution
Distribution is a new cache mode in Infinispan. It is also the default clustered mode - as opposed to replication, which isn't scalable. Distribution makes use of familiar concepts in data grids, such as consistent hashing, call proxying and local caching of remote lookups. What this leads to is a design that does scale well - fixed number of replicas for each cache entry, just like BR - but no requirement for session affinity.

What about co-locating state?
Co-location of state - moving entries about as a single block - was automatic and implicit with BR. Since each node always picked a backup node for all its state, one could visualize all of the state on a given node as a single block. Thus, colocation was trivial and automatic: whatever you put in Node1 will always be together, even if Node1 eventually dies and the state is accessed on Node2. However, this meant that state cannot be evenly balanced across a cluster since the data blocks are very coarse grained.
With distribution, colocation is not implicit. In part due to the use of consistent hashing to determine where each cached entry resides, and also in part due to the finer-grained cache structure of Infinispan - key/value pairs instead of a tree-structure - this leads to individual entries as the granularity of state blocks. This means nodes can be far better balanced across a cluster. However, it does mean that certain optimizations which rely on co-location - such as keeping related entries close together - is a little more tricky.

One approach to co-locate state would be to use containers as values. For example, put all entries that should be colocated together into a HashMap. Then store the HashMap in the cache. But that is coarse-grained and ugly as an approach, and will mean that the entire HashMap would need to be locked and serialized as a single atomic unit, which can be expensive if this map is large.

Another approach is to use Infinispan's AtomicMap API. This powerful API lets you group entries together, so they will always be colocated, locked together, but replication will be much finer-grained, allowing only deltas to the map to be replicated. So that makes replication fast and performant, but it still means everything is locked as a single atomic unit. While this is necessary for certain applications, it isn't always be desirable.

One more solution is to implement your own ConsistentHash algorithm - perhaps extending DefaultConsistentHash. This implementation would have knowledge of your object model, and hashes related instances such that they are located together in the hash space. By far the most complex mechanism, but if performance and co-location really is a hard requirement then you cannot get better than this approach.

In summary:

Buddy Replication
  • Near-linear scalability
  • Session affinity mandatory
  • Co-location automatic
  • Applicable to a specific set of use cases due to the session affinity requirement
Distribution
  • Near-linear scalability
  • No session affinity needed
  • Co-location requires special treatment, ranging in complexity based on performance and locking requirements. By default, no co-location is provided
  • Applicable to a far wider range of use cases, and hence the default highly scalable clustered mode in Infinispan
Hopefully this article has sufficiently interested you in distribution, and has whetted your appetite for more. I would recommend the Infinispan wiki which has a wealth of information including interactive tutorials and GUI demos, design documents and API documentation. And of course you can't beat downloading Infinispan and trying it out, or grabbing the source code and looking through the implementation details.

Cheers
Manik