Showing posts with label Parallel Streams. Show all posts
Showing posts with label Parallel Streams. Show all posts

Sunday, 27 September 2020

Let the Stream flow

Java Streams is one of the powerful feature of JDK that is based on Lambda.

This post is quick refresher of Streams concepts using learning test.



Streams are made up of 

Source |> map | filter |>reduce 

Stream basic

Stream is computation pipeline that start with Source and series of intermediate operation and ends with terminal operation.

Stream is expressed as pipeline of functional transformation and it enable optimal execution strategy like lazy execution, short circuiting & fusion of operations.

These execution strategy allows to avoid un-necessary materialization of data because many things are done as Single pass or by multiplexing.

Streams can be also seen as SIMD at application layer. Stream is made of state less & state full operations, state less operations are part of single stage of pipeline. 

State less operation like ( map, flatmap,filter) are fused to provide optimal execution and only state full operation like (sort, takewhile , drop while,limit, distinct ) can add barrier or new stage in pipeline.

Since stream is computation pipeline, so it takes advantage of CPU caches by performing all the transformation on single element while it is hot in cache. Some time this execution strategy is also called depth first, goes to leaf and process it.

Accessing data while it is hot in CPU cache makes big difference in performance and you can read about it more on post cpu-cache-access-pattern 

Stream ends with terminal operations. Terminal operation are short-circuit ( allMatch, findFirst,anyMatch) or non short circuit like (reduce , collect , forEach)

Short circuit will cause early termination and very useful for search related operation.

Non short circuit operation will touch every element of stream, reduce & collect is example of such operation and allows to solve very complex problems.

Streams favors reduction/folding over imperative accumulation, reduction are easy to make it parallel and simple to understand. It also opens up embarrassing parallel opportunity.  

Reduction also has property of associativity for example

(+(+ (+ a b) c ) d) = (+ (+ a b) (+ c d))

Above example is reduction using Plus(+) operator, left is sequential reduction and right one is parallel but output is same for both of the execution path.

Associative operator are embarrassingly parallel.   

Power of stream is in advance reduction patterns and collectors class has tons of example.

Collector accepts supplier, accumulator & combiner. These 3 things are composed to do very complex reduction.

Lets look at String based reduction by looking at world famous String joiner. 

values
.stream()
.collect(StringBuilder::new, (sb, value) -> sb.append(value), (sb1, sb2) -> sb1.append(sb2))


If stream ends with non short circuit operation then records are processed in batch(forEachRemaning) and in case of shortCircut it is process as single at a time(tryAdvance)


Stream Operations Flag

Every stream has some characteristic that is used by stream framework for optimization.
As a application programmer we don't get exposed to stream operations flag but knowing these will help in understanding optimization technique used by stream.
  
Stream at source is defined with characteristic and stream operation(map, filter, limit, sort etc) may preserve, clear or inject new characteristic.
Terminal operation will result in inspecting all the characteristic and select optimized code path for execution.

Very simple stream characteristic is Parallel, this is taken in account by stream framework to use single thread vs multiple threads for execution.
Some of the other Stream flags are
  • Distinct - Stream has distinct values.
  • Sorted - Element are sorted by natural order
  • Ordered - Element has order 
  • Size - Finite size, important for splitting .
  • Short Circuit - stream can be short circuit, it may be due to find, limit etc
Lets take distinct stream operation to understand how it gets optimized.
Once we have distinct element then we can count the number of element using below code snippet. 

values
.stream()
.distinct()
.count()

Distinct count of element can be implemented in many ways based on underlying collection of stream.

  • List/LinkedList 

 If underlying collection is list then only brute force way can be used for distinct count. We have to allocate Set and keeping adding element in the set and then return size. This will cause some memory pressure on system when collection is large.
 

  • SortedSet
If underlying collection is Sorted collection like Tree Set then distinct count does not need any memory allocation and distinct can be computed by using simple loop checking current and previous value, code snippet doing distinct count 


static int distinct(SortedSet<String> values) {
Iterator<String> itr = values.iterator();
if (!itr.hasNext()) return 0;

String previous = itr.next();
int itemCount = 1;
while (itr.hasNext()) {
String next = itr.next();
if (!previous.equals(next)) {
itemCount++;
previous = next;
}
}
return itemCount;
}

  • Set
If underlying collection is Set then it is just calling size function on it!

This is simple example on how Stream pipeline can take advantage of Stream Ops to plugin optimal code. This seems like the way database optimizer works and shows power of declarative programming.
  
 We can take this further by adding new Stream ops like approx distinct and it can be based on HyperLogLog probabilistic data structure and it can handle any types of collection with very less memory overhead and by trading off little bit of accuracy.  I shared about some of the probabilistic data structure in data-structure-for-big-data post.

Other flags also does lots of magic to make code fast.

Conclusion

Stream is very powerful abstraction for solving problem using declarative way.
Enjoy the various examples of streams in github project streams. Examples are organized in chapters and it cover simple to advance usage patterns.


Saturday, 10 November 2018

SQL is Stream


Stream API for any language looks like writing SQL.

Map is Select Columns
filter is Where
count is Count(1)
limit is LIMIT X
collect is get all result on client side

So it is very easy to map all the functions of Streams API to some part of SQL.

Object relation mapping framework like (hibernate, mybatis, JPA, Toplink,ActiveRecord etc) give good abstraction over SQL but adds lot of overhead and also does not give much control on how SQL is build and many times you have write native SQL.

Image result for i hate hibernate

ORM never made writing SQL easy and if you don't trust me then quick refresh to how code looks .

Sometime i feel that engineer are writing more annotation than real algorithm!

To implement any feature we have to keep switching between SQL API and non sql API, this makes code hard to maintain and many times it is not optimal also.

This problem can be solved by having library that is based on Streams API and it can generate SQL then we don't have to switch, it becomes unified programming experience.

With such library testing will become easy as source of stream can be changed on need basis like in real env it is database and it test it is in memory data structure.

In this post i will share toy example of how library will look look like.

Code Snippet

Stream<StocksPrice> rows = stocksTable.stream();
long count = rows
                .filter(Where.GT("volume", 1467200))
                .filter(Where.GT("open_price", 1108d))
                .count();

Above code generates
Select Count(1) From stocks_price where volume > 1467200 AND open_price > 1108

Look at another example with Limit

stocksTable.stream()
                .filter(Where.GT("volume", 1467200))
                .filter(Where.GT("open_price", 1108d))
                .limit(2)
                .collect(Collectors.toList());

Select stock_symbol,open_price,high_price,trade_date FROM stocks_price WHERE volume > 1467200 AND open_price > 1108.0 LIMIT 2

These API can also use code generation to give compile time safety like checking column names, type etc.

Benefits 

Streams API comes will give some other benefits like
 - Parallel Execution
 - Join between database data and non db data can be easily done using map.
- Allows to use pure streaming approach and this is good when dealing with huge data.
- Opens up options of generating Native optimized query because multiple phase of pipeline can be merged.

This programming model is not new , it is very common in distributed computing framework like Spark, Kafka, Flink etc.

Spark dataset is based on this approach where it generates optimized query like pushing filters to storage, reducing reads by looking at partitions, selective column read etc.

Conclusion

Database driver must give stream based API and this will help in reducing dependency on ORM framework.
This is very powerful programming model and opens up lots of options.

Code used in this post is available @ streams github repo.

Wednesday, 14 January 2015

Java8 Parallel Streams


Recently found great article on when to use Parallel Stream by Doug lea and team. In this blog i will share some performance result of different data structure.

Splittable data structure
This is one of the most important factor that needs to be consider. All the parallel stream operation are executed using default fork join pool and fork join pool uses Divide and conquer algorithms to split stream in small chunks and apply function on small chunks, so splitting is important factor and if splitting is going to take more time then all computation is going to choke!

Types of datastructure

-Array
Array based data structure are most efficient data structure from splitting perspective because each element can be randomly accessed , so splitting is very quick operation.
Some of the example of array based collections are ArrayList & open addressing based Set/Map.

-Tree
Balanced tree based collection like TreeMap or ConcurrentHashMap. It is easy to chop the collection into 2 parts, but if tree is not balanced then splitting will be not that efficient because task load will be not equal for each thread.
Another thing to consider is that memory access pattern for tree based data structure is random , so there will be memory latency cost when elements are accessed.

-LinkedList
This type of data structure gives worst splitting performance because each element must be visited to split it. Some of the samples from JDK are LinkedList,Queues

- Others
This for all others type of datastructure for eg I/O based , BufferedReader.lines which returns stream but splitting operation is very expensive.

Some performance numbers
All performance tuning guess must be backed by experiment, so lets have look at some numbers.

Some code snippet
Array Stream
ArrayList Stream
Set Stream
LinkedList Stream


Measurement Code


Result 

Array 31 sec
ArrayList 31 sec
set 52 sec
linkedlist 106 sec

Result confirms the guess that Array based collections are fastest, so choose right datastructure before you start using parallel streams.

Code used in blog is available @ github