0% found this document useful (0 votes)
17 views247 pages

Rxjavafx Guide

This document serves as a guide for learning RxJava with JavaFX, detailing its integration and benefits. It introduces reactive programming concepts, explains the relationship between RxJava and JavaFX, and discusses the use of Kotlin alongside Java. The document also outlines the setup process for RxJavaFX and RxKotlinFX, providing code examples and emphasizing the importance of reactive programming in modern UI applications.

Uploaded by

dhguswns36
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
17 views247 pages

Rxjavafx Guide

This document serves as a guide for learning RxJava with JavaFX, detailing its integration and benefits. It introduces reactive programming concepts, explains the relationship between RxJava and JavaFX, and discusses the use of Kotlin alongside Java. The document also outlines the setup process for RxJavaFX and RxKotlinFX, providing code examples and emphasizing the importance of reactive programming in modern UI applications.

Uploaded by

dhguswns36
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Table of Contents

Introduction 1.1
Preface 1.2
1. Getting Started 1.3
2. RxJava Fundamentals 1.4
3. Events and Value Changes 1.5
4. Collections 1.6
5. Combining Observables 1.7
6. Bindings 1.8
7. Dialogs and Multicasting 1.9
8. Concurrency 1.10
9. Switching, Throttling, and Buffering 1.11
10. Decoupling 1.12

1
Introduction

Learning RxJava with JavaFX

With RxJavaFX and RxKotlinFX


Thomas Nield

NOTE: This covers RxJavaFX 2.x

This work is licensed under a Creative Commons Attribution 4.0 International


License.

2
Preface

Preface
Over the past year or so, I have discovered so much can be achieved leveraging
reactive programming in JavaFX applications. To this day, I still discover amazing
patterns that RxJava allows in JavaFX applications. RxJavaFX is merely a layer
between these two technologies to make them talk to each other, just like
RxAndroid bridges RxJava and the Android UI.

I guess the best way to introduce RxJavaFX is to share how it came about. In
2014, I had already developed some Swing applications that were used internally
at my company. These applications were quite involved with lots of interactivity,
data processing, and complex user inputs. Like most UI applications, these were a
beat-down to build. Naturally, I was drawn to JavaFX and was particularly
intrigued by the ObservableValue , ObservableList , and other data
structures that notified the UI automatically of their changes. No more
[Link]() ! Although I briefly considered HTML5 as my
next platform, JavaFX showed more promise in the environment I worked in.

Keeping things synchronized between different components in a UI is difficult, so I


liked JavaFX's idea of Bindings and ObservableValues. I became fascinated by
this idea of events triggering other events, and a value notifying another value of
its change. I started to believe there should be a way to express this functionally
much like Java 8 Streams, and I had a vague idea what I was looking for.

But as I studied JavaFX deeper, I became discontent. JavaFX does have


Binding functionality that can synchronize properties and events of different
controls, but the ability to express transformations was limited. One morning,
someone in an online community suggested I check out ReactFX, a reactive
JavaFX library built by Tomas Mikula. This opened up my world and I discovered
reactive programming for the first time. I played with the EventStream and was
composing events and data together. I knew at that moment, reactive
programming was the solution I was looking for.

I originally set out to use reactive programming as a way to handle UI events, and
ReactFX was perfect for this. But I began to suspect I was missing the bigger
picture. I researched reactive programming further and discovered RxJava, a

3
Preface

reactive API with a rich ecosystem of libraries built around it, including [RxAndroid]
([Link] and [RxJava-JDBC]
([Link] RxJava rapidly became a core
technology in the Android stack, and I wondered if it had the same potential in
JavaFX. As I studied RxJava, I was immediately drawn to the RxJava-JDBC
library. Effectively, I could leverage bindings that were bound to database queries.
It soon became clear that with reactive programming, events are data, and data
are events!

But how do I plug RxJava into ReactFX?

To create a fully effective reactive solution, I needed RxJava to talk to ReactFX. I


tried this task and it was wrought with problems. Technically, it was difficult turning
a ReactFX EventStream into an RxJava Observable and vice versa. I also
realized ReactFX encourages doing everything on the JavaFX thread, but I
wanted to switch between threads easily allowing concurrency. There is nothing
wrong with ReactFX. It is awesome library that simply had a different purpose and
goal.

During my struggle, I stumbled on the RxJavaFX project. It was a small library that
converted Node and ObservableValue events into RxJava Observables. It
also contained a Scheduler for the JavaFX thread. I knew immediately this was
the alternative to ReactFX I needed, but some folks at Netflix were having some
build issues with it. Ben Christensen was eager to give it away to someone who
knew JavaFX, as nobody at Netflix used JavaFX. After a period of no activity, I
reluctantly volunteered to take ownership of it. After hours of Googling, trawling
GitHub projects with similar issues, and making a few tweaks, the build and tests
finally succeeded. I was able to get it released on Maven Central and RxJavaFX
was now live.

When I took ownership of the RxJavaFX library, I doubted it would progress


beyond turning JavaFX Node events and ObservableValue changes into
RxJava Observables. But I quickly learned there was much more to be done.
JavaFX was built with event hooks everywhere, including collections like
ObservableList . This provided all the tools needed to make a fully reactive
API for JavaFX, and there was a lot of power yet to be exposed. With random
epiphanies as well as some guidance from the community, RxJavaFX has
become a robust solution to integrate JavaFX into the RxJava ecosystem.

4
Preface

So let's get started!

5
1. Getting Started

1. Getting Started
Reactive programming is about composing events and data together, and treating
them identically. This idea of "events are data, and data are events" is powerful,
and because UI's often have to coordinate both it is the perfect place to learn and
apply reactive programming. For this reason, this book will teach RxJava from a
JavaFX perspective and assume no prior RxJava knowledge. If you already have
experience with RxJava, you are welcome to skip the next chapter.

I would highly recommend being familiar with JavaFX (or TornadoFX) before
starting this book. [Mastering JavaFX 8 Controls (Hendrik Ebbers) and Pro
JavaFX 8 (James Weaver and Weiqi Gao) are excellent books to learn JavaFX. If
you are interested in leveraging JavaFX with the Kotlin language, check out the
TornadoFX Guide written by Edvin Syse and Thomas Nield (yours truly). I will
explain why this book shows examples in both the Java and Kotlin languages
shortly. For now, let us explore the benefits of using RxJava with JavaFX.

Why Use RxJava with JavaFX?


As stated earlier, reactive programming can equalize events and data by treating
them the same way. This is a powerful idea with seemingly endless practical use
cases. JavaFX provides many hooks that can easily be made reactive. There are
many reactive libraries, from Akka and Sodium to ReactFX. But RxJava really hit
on something, especially with its simple handling of concurrency, extensibility, and
rich ecosystem of third party libraries. It has taken the Android community by
storm and continues to make reactive programming a go-to tool to meet modern
user demands.

RxJavaFX is a lightweight but comprehensive library to plug JavaFX components


into RxJava, and vice versa. This is what this book will cover. Some folks reading
this may ask "Why not use ReactFX? Why do we need a second reactive
framework for JavaFX when that one is perfectly fine?" ReactFX is an excellent
reactive framework made by Tomas Mikula, and you can read more about my
experiences with it in the Preface. But the TL;DR is this: ReactFX encourages
keeping all operations on the JavaFX thread, while RxJava embraces full-

6
1. Getting Started

blown concurrency. On top of that, RxJava also has a rich ecosystem of


extensible libraries (e.g. RxJava-JDBC, while ReactFX focuses solely on
JavaFX events. ReactFX and RxJavaFX simply have different scopes and goals.

RxJava has a rapidly growing and active community. The creators and
maintainers of RxJava do an awesome job of answering questions and being
responsive to developers of all skill levels. Reactive programming has enabled an
exciting new domain filled with new ideas, practical applications, and constant
discovery. RxJava is one of the many ReactiveX API's standardized across many
programming languages. Speaking of other languages, let us talk about Kotlin.

Using Kotlin (Optional)


This book will present examples in two languages: Java and Kotlin. If you are not
familiar, Kotlin is a new JVM language created by JetBrains, the company behind
Intellij IDEA, PyCharm, CLion, and several other IDE's and tools. JetBrains
believed they could be more productive by creating a new language that
emphasized pragmatism and industry over convention. After 5 years of developing
and testing, Kotlin 1.0 was released in February 2016 to fulfill this need. A year
later Kotlin 1.1 was released with more practical (but tightly-scoped) features.

If you have never checked out Kotlin, I would higly recommend giving it a look. It
is an intuitive language that only takes a few hours for a Java developer to learn.
The reason I present Kotlin in this book is because it created a unique opportunity
on the JavaFX front. Towards the end of Kotlin's beta, Edvin Syse released
TornadoFX, a lightweight Kotlin library that significantly streamlines development
of JavaFX applications.

For instance, with TornadoFX you can create an entire TableView using just the
Kotlin code below:

tableview<Person> {
column("ID", Person::id)
column("Name", Person::name)
column("Birthday", Person::birthday)
column("Age", Person::age)
}

7
1. Getting Started

I had the privilege of joining Edvin's project not long after TornadoFX's release,
and the core team has created a phenomenal JavaFX suite of features enabled by
the Kotlin language. I would highly recommend giving the TornadoFX Guide a look
to learn more.

There is a Kotlin extension of the RxJavaFX library called RxKotlinFX. It wraps


Kotlin extension functions around RxJavaFX and includes some additional
operators. The Kotlin versions of examples will use this library, and will also use
TornadoFX. Using this stack may add a few more dependencies to your project,
but the amount of value it adds through abstraction and productivity may make it
worthwhile!

If you are not interested in Kotlin, no worries! The Java version of code samples
will be always be presented first and you can ignore the Kotlin ones.

Setting Up
Currently, RxJavaFX and RxKotlinFX support both RxJava 1.x and RxJava 2.x
(and aligned with their own respectiveX 1.x and 2.x versions). RxJava 2.x brings a
number of large changes to RxJava, and this is the version that the guide will
cover.

You should prefer RxJava 2.x because RxJava 1.x support will discontinue early
2018.

Java
To setup RxJavaFX 2.x for Java, use the Gradle or Maven configuration below
where x.y.z is the version number you want to specify.

Gradle

compile '[Link].rxjava2:x.y.z'

Maven

8
1. Getting Started

<dependency>
<groupId>[Link].rxjava2</groupId>
<artifactId>rxjavafx</artifactId>
<version>x.y.z</version>
</dependency>

Kotlin
If you are using Kotlin, you will want to use RxKotlinFX instead of RxJavaFX.
Make sure you have configured Maven or Gradle to use a Kotlin configuration,
and include the dependencies below. Note the x.y.z is where you put the
targeted version number, and I included TornadoFX and RxKotlin as
dependencies since the examples will use them.

Gradle

compile '[Link]:rxkotlinfx:x.y.z'
compile '[Link]:tornadofx:x.y.z`
compile '[Link].rxjava2:rxkotlin:x.y.z'

Maven

<dependency>
<groupId>[Link]</groupId>
<artifactId>rxkotlinfx</artifactId>
<version>x.y.z</version>
</dependency>
<dependency>
<groupId>[Link]</groupId>
<artifactId>tornadofx</artifactId>
<version>x.y.z</version>
</dependency>
<dependency>
<groupId>[Link]</groupId>
<artifactId>rxkotlin</artifactId>
<version>x.y.z</version>
</dependency>

9
1. Getting Started

Figure 1.1. shows a Venn diagram showing the stack of technologies typically
used to built a reactive JavaFX application with Kotlin. The overlaps indicate that
library is used to interoperate between the 3 domains: JavaFX, RxJava and Kotlin

Figure 1.1

Summary
In this chapter we got a high level overview of reactive programming and the role
RxJavaFX plays in connecting JavaFX and RxJava together. There was also an
explanation why Kotlin is presented alongside Java in this book, and why both
RxKotlinFX and TornadoFX are compelling options when building JavaFX
applications. You can go through this book completely ignoring the Kotlin
examples if you like.

10
1. Getting Started

In the next chapter we will cover the fundamentals of RxJava, and do it from a
JavaFX perspective. If you are already experienced with RxJava, you are
welcome to skip this chapter. But if you have been looking for a practical domain
to apply RxJava, read on!

11
2. RxJava Fundamentals

2. RxJava Fundamentals
RxJava has two core types: the Observable and the Observer . In the
simplest definition, an Observable pushes things. A given Observable<T>
will push items of type T through a series of operators that form other
Observables, and finally the terminal Observer is what consumes the items at
the end of the chain.

Each pushed T item is known as an emission. Usually there is a finite number


of emissions, but sometimes there can be infinite. An emission can represent
either data or an event (or both!). This is where the power of reactive
programming differentiates itself from Java 8 Streams and Kotlin Sequences. It
has a notion of emissions over time, and we will explore this concept in this
chapter.

The Observable and Observer


As stated earlier, an Observable pushes things. It pushes things of type T
through a series of operators forming other Observables . Each pushed item is
known as an emission. Those emissions are pushed all the way to a Observer
where they are finally consumed.

You will need to create a source Observable where emissions originate from,
and there are many factories to do this. To create a source Observable that
pushes items 1 through 5, declare the following:

Java

12
2. RxJava Fundamentals

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<Integer> source = [Link](1,2,3,4,5);


}
}

Kotlin

import [Link]

fun main(args: Array<String>) {


val source = [Link](1,2,3,4,5)
}

This source Observable<Integer> is saved to a variable named source .


However, it has not pushed anything yet. In order to start pushing emissions, you
need to create an Observer . The quickest way to do this is call subscribe()
and pass a lambda specifying what to do with each emission.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<Integer> source = [Link](1,2,3,4,5);

[Link]([Link]::println);
}
}

13
2. RxJava Fundamentals

Kotlin

import [Link]

fun main(args: Array<String>) {

val source = [Link](1,2,3,4,5)


[Link](::println)
}

A lambda is a special type of argument specifying an action. This one will take
each emission and print it, and this subscribe() operation creates a
Observer for us based on this lambda argument.

Java 8 and Kotlin have their own ways of expressing lambdas. If you need to
learn more about Java 8 lambdas, I would recommend reading at least the
first two chapters of Java 8 Lambdas by Richard Warburton before
proceeding. You can read the Kotlin Reference to learn about lambdas in
Kotlin. Lambdas are a critical syntax feature that we will use constantly in this
book.

Go ahead and run the code above, and you should get the following:

OUTPUT:

1
2
3
4
5

This effectively pushed the integers 1 through 5, one-at-a-time, to the Observer


defined by the lambda in the subscribe() method. The subscribe()
method does not have to print items. It could populate them in a JavaFX control,
write them to a database, or post it as a server response.

Understandings Observers

14
2. RxJava Fundamentals

You can specify up to three lambda arguments on the subscribe() method to


not only handle each emission, but also handle the event of an error as well as an
action on completion when there are no more emissions.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<Integer> source = [Link](1,2,3,4,5);

[Link]([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);
}
}

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

val source = [Link](1, 2, 3, 4, 5)

[Link](
onNext = ::println,
onError = { [Link]() },
onComplete = { println("Done!") }
)
}

OUTPUT:

15
2. RxJava Fundamentals

1
2
3
4
5
Done!

Typically, you should always supply an onError lambda to your


subscribe() call so errors do not quietly go unhandled. We will not use
onError very much in this book for the sake of brevity, but be sure to use it
when putting reactive code in production.

Let's briefly break down the Observer to understand it better. The lambdas are
just a shortcut to allow the subscribe() method to quickly create an
Observer for you. You can create your own Observer object explicitly by
extending ResourceObserver and implementing its three abstract methods:
onNext() , onError() , and onComplete() . You can then pass this
Observer to the subscribe() method.

Java

16
2. RxJava Fundamentals

import [Link];
import [Link];
import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<Integer> source = [Link](1,2,3,4,5);

Observer<Integer> subscriber = new ResourceObserver<Inte


ger>() {
@Override
public void onComplete() {
[Link]("Done!");
}

@Override
public void onError(Throwable e) {
[Link]();
}

@Override
public void onNext(Integer integer) {
[Link](integer);
}
};

[Link](subscriber);
}
}

Kotlin

17
2. RxJava Fundamentals

import [Link]
import [Link]

fun main(args: Array<String>) {


val source = [Link](1, 2, 3, 4, 5)

val subscriber = object: ResourceObserver<Int>() {


override fun onComplete() = println("Done!")

override fun onNext(i: Int) = println(i)

override fun onError(e: Throwable) = [Link]()


}

[Link](subscriber)

The Observer interface defines these three methods. The onNext() is what
is called to pass an emission. The onError() is called when there is an error,
and onComplete() is called when there are no more emissions. Logically with
infinite Observables, the onComplete() is never called.

We extend a ResourceObserver because the Observer actually has a


fourth abstract method, onSubscribe() . This is something you will rarely
need to implement yourself, and ResourceObserver will take care of it for
you. Essentially, the onSubscribe() allows the Observer implementation
to have control of the Disposable which is beyond the scope of this book.
We will touch on the Dispsoable later, but you can learn much more about
it in Chapter 2 of my book Learning RxJava by Packt Publishing.

Although this example is helpful for understanding the Observer , it also shows
implementing Observer objects can be pretty verbose. Therefore, it is helpful to
use lambdas instead for conciseness. These three methods on the Observer
are critical for understanding RxJava, and we will revisit them several times in this
chapter.

18
2. RxJava Fundamentals

It is critical to note that the onNext() can only be called by one thread at a time.
There should never be multiple threads calling onNext() concurrently, and we
will learn more about this later when we cover concurrency. For now just note
RxJava has no notion of parallelization, and when you subscribe to a factory like
[Link](1,2,3,4,5) , you will always get those emissions serially, in
that exact order, and on a single thread.

Source Observable Factories


Going back to the Observable , there are other factories to create source
Observables. Above we emitted the integers 1 through 5. Since these are
consecutive, we can use [Link]() to accomplish the same thing. It
will emit the numbers 1 through 5 based on their range, and then call
onComplete() .

Note these examples have no subscribers, so there will be no output when


running them.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<Integer> source = [Link](1,5);


}
}

Kotlin

19
2. RxJava Fundamentals

import [Link]

fun main(args: Array<String>) {

val source = [Link](1,5)


}

You can also turn any Iterable<T> into an Observable<T> quickly using
[Link]() . It will emit all items in that Iterable<T> and
then call onComplete() when it is done.

Java

import [Link];
import [Link];
import [Link];

public class Launcher {

public static void main(String[] args) {

List<Integer> list = [Link](1,2,3,4,5);


Observable<Integer> source = [Link](lis
t); }
}

Kotlin

import [Link]

fun main(args: Array<String>) {

val list = listOf(1,2,3,4,5)


val source = [Link]()
}

Using Operators
20
2. RxJava Fundamentals

Let us do something a little more useful than just connecting a source


Observable and an Observer . Let's put some operators between them to
actually transform emissions and do work.

map()
Say you have an Observable<String> that pushes String values.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<String> source =
[Link]("Alpha", "Beta", "Gamma", "Delta"
, "Epsilon");
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

val source = [Link]("Alpha","Beta","Gamma","Delta",


"Epsilon")

In RxJava, you can use hundreds of operators to transform emissions and create
new Observables with those transformations. For instance, you can create an
Observable<Integer> off an Observable<String> by using the map()

21
2. RxJava Fundamentals

operator, and use it to emit each String's length.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<String> source =
[Link]("Alpha","Beta","Gamma","Delta",
"Epsilon");

Observable<Integer> lengths = [Link](String::length)


;

[Link]([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

val source = [Link]("Alpha","Beta","Gamma","Delta"


, "Epsilon")

val lengths = [Link] { [Link] }

[Link](::println)
}

OUTPUT:

22
2. RxJava Fundamentals

5
4
5
5
7

The source Observable pushes each String to the map() operator where
it is mapped to its length . That length is then pushed from the map()
operator to the Observer where it is printed.

You can do all of this without any intermediary variables holding each
Observable , and instead do everything in a single "chain" call. This can be
done in one line or broken up into multiple lines.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {


[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.map(String::length)
.subscribe([Link]::println);
}
}

Kotlin

23
2. RxJava Fundamentals

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.map { [Link] }
.subscribe(::println)
}

Operators behave as both an intermediary Observer and an Observable ,


receiving emissions from the upstream source, transforming them, and passing
them downstream to the final Observer .

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon") // ca
lls onNext() on map()
.map(s -> [Link]()) // calls onNext() on Observer
.subscribe(i -> [Link](i));

filter()
Another common operator is filter() , which suppresses emissions that fail to
meet a certain criteria, and pushes the ones that do forward. For instance, you
can emit only Strings where the length() is at least 5. In this case, the
filter() will stop "Beta" from proceeding since it is 4 characters.

Java

24
2. RxJava Fundamentals

import [Link];

public class Launcher {

public static void main(String[] args) {


[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.filter(s -> [Link]() >= 5)
.subscribe([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha", "Beta", "Gamma", "Delta", "Epsilon"


)
.filter { [Link] >= 5 }
.subscribe(::println)
}

OUTPUT:

Alpha
Gamma
Delta
Epsilon

distinct()
There are also operators like distinct() , which will suppress emissions that
have previously been emitted to prevent duplicate emissions (based on each
emission's hashcode() / equals() implementation).

25
2. RxJava Fundamentals

Java

import [Link];

public class Launcher {

public static void main(String[] args) {


[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.map(String::length)
.distinct()
.subscribe([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha", "Beta", "Gamma", "Delta", "Epsilon"


)
.map { [Link] }
.distinct()
.subscribe(::println)
}

OUTPUT:

5
4
7

You can also provide a lambda specifying an attribute of each emitted item to
distinct on, rather than the item itself.

26
2. RxJava Fundamentals

Java

import [Link];

public class Launcher {

public static void main(String[] args) {


[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.distinct(String::length)
.subscribe([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.distinct { [Link] }
.subscribe(::println)
}

OUTPUT:

Alpha
Beta
Epsilon

take()
The take() operator will cut off at a fixed number of emissions and then
unsubscribe from the source. Afterwards, it will call onComplete() downstream
to the final Observer .

27
2. RxJava Fundamentals

Java

import [Link];

public class Launcher {

public static void main(String[] args) {


[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.take(3)
.subscribe([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.take(3)
.subscribe(::println)
}

OUTPUT:

Alpha
Beta
Gamma

takeWhile() will do something similar to take() , but specifies a lambda


condition to determine when to stop taking emissions rather than using a fixed
count.

Java

28
2. RxJava Fundamentals

import [Link];

public class Launcher {

public static void main(String[] args) {


[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.takeUntil((String s) -> [Link]("D.*"))
.subscribe([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.takeUntil { [Link]("D" )}
.subscribe(::println)
}

OUTPUT:

Alpha
Beta
Gamma
Delta

count()
Some operators will aggregate the emissions in some form (in a classic
MapReduce fashion), and then push that aggregation as a single emission to the
Observer . Obviously, this requires the onComplete() to be called so that the
aggregation can be finalized and pushed to the Observer .

29
2. RxJava Fundamentals

One of these aggregation operators is count() . It will simply count the number
of emissions and when its onComplete() is called, and push the count up to the
Observer as a single emission. Then it will call onComplete() up to the
Observer .

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.count()
.subscribe([Link]::println);
}
}));

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.count()
.subscribeBy { println(it) }
}

OUTPUT:

30
2. RxJava Fundamentals

The count() actually returns a Single , which is a specialized Observable


type that only emits one item. The Single does not have an onNext or
onComplete , but rather an onSuccess event which passes the single item. If
you ever need to turn a Single back into an Observable (so it works with
certain Observable operators), just call its toObservable() method.

Another variant of Observable we will encounter is the Maybe , which


emits 0 or 1 values. There is also the Completable which ignores
emissions but we will not be using it in this book.

toList()
The toList() is similar to the count() , and it also will yield a Single
rather than an Observable . It will collect the emissions until its onComplete()
is called. After that it will push an entire List containing all the emissions to the
Observer .

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.toList()
.subscribe([Link]::println);
}
}

Kotlin

31
2. RxJava Fundamentals

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.toList()
.subscribeBy { println(it) }
}

OUTPUT:

[Alpha, Beta, Gamma, Delta, Epsilon]

Aggregate operators like toList() will misbehave on infinite Observables


because collections can only be finite, and it needs that onComplete() to be
called to push the List forward. Otherwise it will collect and work infinitely.

When using Singles in Kotlin, we use RxKotlin's subscribeBy() instead of


subscribe() because there is an inference bug with the Kotlin compiler
working with SAM types. Hopefully this will be fixed soon by JetBrains. You
can follow the filed issue here to track its status.

reduce()
When you need to do a custom aggregation or reduction, you can use
reduce() to achieve this in most cases (to aggregate into collections and other
mutable structures, you can use its cousin collect() ). This will return a
Single (if a "seed" value is provided) or a Maybe (if no "seed" value is
provided). But say we wanted the sum of all lengths for all emissions. Starting with
a seed value of zero, we can use a lambda specifying how to "fold" the emissions
into a single value.

Java

32
2. RxJava Fundamentals

import [Link];

public class Launcher {

public static void main(String[] args) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.map(String::length)
.reduce(0,(current,next) -> current + next)
.subscribe([Link]::println);
}
}

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.map { [Link] }
.reduce(0) { current,next -> current + next }
.subscribeBy { println(it) }
}

OUTPUT:

26

The lambda in reduce() will keep adding two Integer values, where one of
them is the "rolling total" ( current ) or seed 0 value, and the other is the new
value ( next ) to be added. As soon as onComplete() is called, it will push the
result to the Observer .

33
2. RxJava Fundamentals

scan()
The reduce() will push a single aggregated value derived from all the
emissions. If you want to push the "running total" for each emission, you can use
scan() instead. This can work with infinite Observables since it will push each
accumulation for each emission, rather than waiting for all emissions to be
accumulated.

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.map(String::length)
.reduce(0,(current,next) -> current + next)
.subscribe([Link]::println);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {

[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.map { [Link] }
.reduce(0) { current,next -> current + next }
.subscribe(::println)
}

OUTPUT:

34
2. RxJava Fundamentals

0
5
9
14
19
26

flatMap()
There are hundreds of operators in RxJava, but we will only cover one more for
now. Throughout the book we will learn more as we go, and the most effective
way to learn operators is to seek them out of need.

The flatMap() is similar to map() , but will map the emission to another set of
emissions via another Observable . This is one of the most powerful operators
in RxJava and is full of use cases, but for now we will just stick with a simple
example.

Say we have some String emissions where each one contains concatenated
numbers separated by a slash / . We want to break up these numbers into
separate emissions (and omit the slashes). You can call split() on each
String and specify splitting on the slashes / , and this will return an array of
the separated String values. Then you can turn that array into an
Observable inside the flatMap() .

Java

35
2. RxJava Fundamentals

import [Link];

public class Launcher {

public static void main(String[] args) {

[Link]("123/52/6345","23421/534","758/2341/7493
2")
.flatMap(s -> [Link]([Link]("/"))
)
.subscribe([Link]::println);
}
}

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link]("123/52/6345","23421/534","758/2341/74932")
.flatMap { [Link]("/").toObservable() }
.subscribe(::println)
}

OUTPUT:

123
52
6345
23421
534
758
2341
74932

36
2. RxJava Fundamentals

If you observe this closely, hopefully you will find the flatMap() is pretty
straightforward. You are taking each emission and replacing it with another set of
emissions, by providing another Observable derived off that emission. There is
a lot of very powerful ways to leverage the flatMap() , especially when used
with infinite, concurrent, and hot Observables which we will cover later.

Also note that flatMapSingle() can be used to flatMap() to a Single ,


and flatMapMaybe() to a Maybe . This saves you the step of having to call
toObservable() on each resulting Maybe or Single . If we wanted to sum
each set of numbers, we would do it like this since this reduce() will yield a
Single .

Java

import [Link];

public class Launcher {

public static void main(String[] args) {

[Link]("123/52/6345","23421/534","758/2341/7493
2")
.flatMapSingle(s ->
[Link]([Link]("/"))
.map(Integer::valueOf)
.reduce(0, (curr,next) -> curr + next)
)
.subscribe([Link]::println);
}
}

Kotlin

37
2. RxJava Fundamentals

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link]("123/52/6345","23421/534","758/2341/74932")
.flatMapSingle {
[Link]("/").toObservable()
.map { [Link]() }
.reduce(0) { curr, next -> curr + next }
}
.subscribe(::println)
}

OUTPUT:

6520
23955
78031

Observables and Timing


If you are a somewhat experienced developer, you might be asking how is the
Observable any different than a Java 8 Stream or Sequences in Kotlin, C#, or
Scala. Up to this point you are correct, they do not seem much different. But recall
that Observables push, while Java 8 Streams and Sequences pull. This enables
RxJava to achieve much more and unleashes capabilities that these other
functional utilities do not offer.

But the fundamental benefit of pushing is it allows a notion of emissions over time.
Our previous examples do not exactly show this, but now we will dive into some
examples that do.

Making Button Click Events an Observable

38
2. RxJava Fundamentals

So far we just pushed data out of Observables. But did you know you can push
events too? As stated earlier, data and events are basically the same thing in
RxJava. Let's take a simple JavaFX Application with a single Button .

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me");

[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
button("Press Me")
}
}

39
2. RxJava Fundamentals

Rendered UI:

We can use RxJavaFX or RxKotlinFX to create an Observable<ActionEvent>


that pushes an ActionEvent emission each time the Button is pressed.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me");

[Link](button)
.subscribe([Link]::println);

[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

40
2. RxJava Fundamentals

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
button("Press Me")
.actionEvents()
.subscribe { println(it) }
}
}

If you click the Button a couple times your console should look something like
this:

OUTPUT:

[Link][source=Button@751b917f[styleClass=butto
n]'Press Me']
[Link][source=Button@751b917f[styleClass=butto
n]'Press Me']
[Link][source=Button@751b917f[styleClass=butto
n]'Press Me']

Wait, did we just treat the ActionEvent like any other emission and push it
through the Observable ? Yes we did! As said earlier, this is the powerful part of
RxJava. It treats events and data the same way, and you can use all the operators
we covered earlier. For example, we can use scan() to push how many times
the Button was pressed, and push that into a Label . Just map() each
ActionEvent to a 1 to drive increments.

Java

41
2. RxJava Fundamentals

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me");
Label countLabel = new Label("0");

[Link](button)
.map(ae -> 1)
.scan(0,(x,y) -> x + y)
.subscribe(clickCount -> [Link](clic
[Link]()));

[Link]().add(countLabel);
[Link]().add(button);

[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

42
2. RxJava Fundamentals

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

val countLabel = label("0")

button("Press Me")
.actionEvents()
.map { 1 }
.scan(0) {x,y -> x + y }
.subscribe { [Link] = [Link]() }
}
}

RENERED UI: After I clicked the button 4 times

So how does all this work? The Observable<ActionEvent> we created off this
Button is emitting an ActionEvent item every time the Button is pressed.
Every time that Button is clicked, it pushes an ActionEvent emission
through the Observable . There is no notion of completion either as this
Observable is always alive during the life of the Button .

Of course you could use operators that make the operation finite, like take() . If
you only take 5 ActionEvent emissions from the Button , it will stop pushing
on emission 4 . Then it will unsubscribe from the source and call
onComplete() down the chain to the Observer .

Java

43
2. RxJava Fundamentals

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me");
Label countLabel = new Label("0");
Label doneLabel = new Label("");

[Link](button)
.map(ae -> 1)
.scan(0,(x,y) -> x + y)
.take(5)
.subscribe(
clickCount -> [Link](clickCo
[Link]()),
Throwable::printStackTrace,
() -> [Link]("Done!")
);

[Link]().addAll(countLabel, doneLabel,button);

[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

44
2. RxJava Fundamentals

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

val countLabel = label("0")


val doneLabel = label("")

button("Press Me")
.actionEvents()
.map { 1 }
.scan(0) {x,y -> x + y }
.take(5)
.subscribeBy(
onNext = { [Link] = [Link]()
},
onError = { [Link]() },
onComplete = { [Link] = "Done!" }
)
}
}

RENDERED UI: After 4 Button clicks (emits an initial 0 from scan() )

A Button emitting an ActionEvent item every time it is clicked is an example


of a hot Observable , as opposed to cold Observables which typically push
data. Let's dive into this discussion next.

Cold vs. Hot Observables

45
2. RxJava Fundamentals

The Observable<ActionEvent> we created off a Button is an example of a


hot Observable . Earlier in this chapter, all of our examples emitting Integer
and String items are cold Observables. So what is the difference?

Remember this source Observable that simply pushes five String


emissions?

Java

Observable<String> source =
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon");

Kotlin

val source = [Link]("Alpha","Beta","Gamma","Delta", "Ep


silon")

What do you think will happen if we subscribe() to it twice? Try it out.

Java

Observable<String> source =
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
);

[Link](s -> [Link]("Observer 1: " + s));


[Link](s -> [Link]("Observer 2: " +s));

val source = [Link]("Alpha","Beta","Gamma","Delta", "Ep


silon")

[Link] { println("Observer 1: $it") }


[Link] { println("Observer 2: $it") }

You will find the emissions are replayed for each Observer .

OUTPUT:

46
2. RxJava Fundamentals

Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 1: Delta
Observer 1: Epsilon
Observer 2: Alpha
Observer 2: Beta
Observer 2: Gamma
Observer 2: Delta
Observer 2: Epsilon

With a Cold Observable, every Observer independently receives all the


emissions regardless of when they Subscribe . There is no notion of timing
making an impact to which emissions they receive. Cold Observables are often
used to "play" data independently to each Observer . This is like giving every
Observer a music CD to play, and they can independently play all the tracks.

Hot Observables, however, will simultaneously push emissions to all Observers


at the same time. Logically, an effect of this is Observers that come later and have
missed previous emissions will not receive them. They will only get emissions
going forward from the time they subscribe() . Instead of a music CD, Hot
Observables are more like radio stations. They will broadcast a given song
(emission) to all listeners (Observers) at the same time. If a listener misses a
song, they missed it.

While data and events are the same in RxJava, Hot Observables are often used
to represent events, such as an Observable<ActionEvent> built off a
Button .

Let's do an experiment to see if tardy Observers indeed miss previous emissions.


subscribe() immediately to a Button 's clicks to create the first Observer .
But have another Button that when clicked, will subscribe() a second
Observer .

Java

import [Link];
import [Link];

47
2. RxJava Fundamentals

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me");
Button secondSubButton = new Button("Subscribe Observer
2");

Observable<ActionEvent> clicks =
[Link](button);

//Observer 1
[Link](ae ->
[Link]("Observer 1 Received Click!"));

//Subscribe Observer 2 when secondSubButton is clicked


[Link](event -> {
[Link]("Observer 2 subscribing!");
[Link]().set(true);
//Observer 2
[Link](ae ->
[Link]("Observer 2 Received Clic
k!")
);
});

[Link]().addAll(button,secondSubButton);

[Link](new Scene(vBox));
[Link]();
}

48
2. RxJava Fundamentals

Kotlin

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

val clicks = button("Press Me").actionEvents()

//Observer 1
[Link] { println("Observer 1 Received Click!")
}

//Subscribe Observer 2 when this button is clicked


button("Subscribe Observer 2") {
setOnAction {
println("Observer 2 subscribing!")
isDisable = true
[Link] { println("Observer 2 Received
Click!") }
}
}
}
}

RENDERED UI:

49
2. RxJava Fundamentals

Click the "Press Me" Button 3 times, then click the "Subscribe Observer 2"
Button . Finally click "Press Me" 2 more times, and you should get this output in
your console.

Observer 1 Received Click!


Observer 1 Received Click!
Observer 1 Received Click!
Observer 2 subscribing!
Observer 1 Received Click!
Observer 2 Received Click!
Observer 1 Received Click!
Observer 2 Received Click!

Notice that Observer 1 received those first three clicks, and then we
subscribed Observer 2 . But notice that Observer 2 has missed those first
three clicks. It will never get them because it subscribed too late to a hot
Observable. The only emissions Observer 2 receives are the ones that happen
after it subscribes.

After Observer 2 is subscribed, you can see the last two emissions were
pushed simultaneously to both Observer 1 and Observer 2 .

Again, Cold Observables will replay emissions to each Observer independently.


Hot Observables play emissions all at once to whomever is subscribed, and it will
not replay missed emissions to tardy Observers.

ConnectableObservable
We will learn several ways to create hot Observables in this book for different
tasks, but one that is worth mentioning now is the ConnectableObservable .
Among a few other subtle behaviors it creates, it can turn a cold Observable
into a hot one by forcing its emissions to become hot. To create one, you can take
any Observable and call its publish() method. You can then set up the
Observers and then call connect() to start firing the emissions.

50
2. RxJava Fundamentals

One reason you may do this is because it might be expensive to replay emissions
for each Observer , especially if it is emitting items from a slow database query
or some other intensive operation. Notice too that each emission interleaves and
goes to each Observer simultaneously.

Java

import [Link];
import [Link];

public class Launcher {

public static void main(String[] args) {

ConnectableObservable<String> source =
[Link]("Alpha","Beta","Gamma","Delta",
"Epsilon").publish();

[Link](s -> [Link]("Observer 1: "


+ s));
[Link](s -> [Link]("Observer 2: "
+s));

[Link]();
}
}

Kotlin

51
2. RxJava Fundamentals

import [Link]

fun main(args: Array<String>) {


val source = [Link]("Alpha","Beta","Gamma","Delta",
"Epsilon").publish()

[Link] { println("Observer 1: $it") }


[Link] { println("Observer 2: $it") }

[Link]()
}

OUTPUT:

Observer 1: Alpha
Observer 2: Alpha
Observer 1: Beta
Observer 2: Beta
Observer 1: Gamma
Observer 2: Gamma
Observer 1: Delta
Observer 2: Delta
Observer 1: Epsilon
Observer 2: Epsilon

Remember though that the ConnectableObservable is a hot Observable


too, so you got to be careful when pushing data through it. If any Observer
comes in after the connect() is called, it will miss data that was emitted
previously.

Disposing
There is one last operation we need to cover: unsubscribing. Unsubscription
should happen automatically for finite Observables once onComplete() is
called. But for infinite or long-running Observables, there will be times you want to

52
2. RxJava Fundamentals

stop the emissions and cancel the entire operation. This will also free up
resources in the Observable chain and clean up any resources it was using.

If you want to disconnect an Observer from an Observable so it stops


receiving emissions, there are a couple ways to do this. The easiest way is to note
the subscribe() method returns a Disposable object. This represents the
connection between the Observable and the Observer , and you can call
dispose() on it at any time to dispose the connections so no more emissions
are pushed.

For instance, let's take our incrementing Button example from earlier and add
another Button that will unsubscribe the emissions. We need to save the
Disposable returned from the subscribe() method, and then we can refer
to it later to call dispose() and stop emissions.

Java

53
2. RxJava Fundamentals

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me");
Button unsubscribeButton = new Button("Unsubscribe");

Label countLabel = new Label("0");

Disposable disposable = [Link](


button)
.map(ae -> 1)
.scan(0,(x,y) -> x + y)
.subscribe(clickCount -> [Link](clic
[Link]()));

[Link](e -> [Link]())


;

[Link]().addAll(button,unsubscribeButton,count
Label);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

54
2. RxJava Fundamentals

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

val countLabel = label("0")

val subscription = button("Press Me")


.actionEvents()
.map { 1 }
.scan(0) {x,y -> x + y }
.subscribe { [Link] = [Link]() }

button("Unsubscribe").setOnAction {
[Link]()
}
}
}

Note that when you press the "Unsubscribe" Button , the increments stop
because the Observer was disposed, and it instructed the Observable to
stop sending emissions. Disposal automatically happens with finite Observables
once onComplete() is called. But with infinite or long-running Observables, you
need to manage their disposal if you intend to terminate them at some point.

When you have infinite Observables that need to be disposed, it is very critical to
call dispose() on any Disposables when you are done with them. If you do not
do this, you will run into memory leak problems and the garbage collector will not
be able to free those resources.

When you have a lot of Disposables to manage and you want to dispose them all
at once, you can use a CompositeDisposable which acts as a collection of
Disposables. You can add any number of Disposables to it, and when you want to

55
2. RxJava Fundamentals

unsubscribe all of them just call its dispose() method.

Java

Observable<ActionEvent> buttonClicks = ...

CompositeDisposable disposables = new CompositeDisposable();

Disposable disposable1 =
[Link](ae -> [Link]("Clicked!"))
;

[Link](subscription1);

Disposable disposable1 =
[Link](ae -> [Link]("Clicked Her
e Too!"));

[Link](disposable1);

//work with UI, then dispose when done


[Link]();

Kotlin

val buttonClicks: Observable<ActionEvent> = ...


val disposables = CompositeDisposable()

[Link] { println("Clicked!") }
.addto(disposables)

[Link] { println("Clicked Here Too!") }


.addto(disposables)

//work with UI, then dispose when done


[Link]()

Using doOnXXX() Operators

56
2. RxJava Fundamentals

It might be helpful to create a few "side effects" in the middle of an Observable


chain. In other words, we want to put Observers in the middle of the chain at
certain points. For instance, it might be helpful to change a "Submit" Button's text
to "WORKING" when a request is being processed, as well as disable it so no
more requests can be sent until the current one completes.

RxJava has doOnXXX() operators that allow you to "peek" into an


Observable at that point in the chain. For instance, you can use doOnNext()
and pass a lambda to do something with each emission, like print it.
doOnComplete() will fire a specified action when that point of the chain
received a completion notification, and doOnError() will do the same for an
error event. Here is a complete list of these doOnXXX() operators in RxJava.

Operator Example Description


doOnNext(i -> Performs an action
doOnNext()
[Link](i)) for each emission
doOnComplete(() -> Performs an action
doOnComplete()
[Link]("Done!")) on completion
doOnError(e -> Performs an action
doOnError()
[Link]()) on an error
doOnSubscribe(() -> Performs an action
doOnSubscribe()
[Link]("Subbing!")) on subscription
doOnDispose(() -> Performs an action
doOnDispose()
[Link]("Disposing!")) on unsubscription
doOnTerminated(() -> Performs an action
doOnTerminate() [Link]("I'm done or for completion or an
had an error")) error

Summary
In this chapter we covered some RxJava fundamentals. The Observable treats
data and events in the same way, and this is a powerful idea that applies really
well with JavaFX. Cold Observables replay emissions to each Observer
independently. Hot Observables will broadcast emissions live to all Observers
simultaneously, and not replay missed emissions to tardy Observers.

57
2. RxJava Fundamentals

This book will continue to cover RxJava and apply it in a JavaFX context. There
are hundreds of operators and unfortunately we will not be able to cover them all,
but we will focus on the ones that are especially helpful for building JavaFX
applications. If you want to learn more about RxJava and its operators
comprehensively, please check out my Packt book Learning RxJava).

In the next chapter, we are going to dive a little deeper into JavaFX events, and
turn Node and ObservableValue events into Observables.

58
3. Events and Value Changes

3. Events and Value Changes


In the previous chapter, we got a brief introduction to handling events reactively.
But RxJavaFX is equipped to handle almost any event type for various Node
controls. JavaFX also utilizes the ObservableValue , and its value changes can
be turned into Observables as well.

Turning JavaFX Events into Observables


To create an Observable for any event on any Node , you can target the
Node 's events using a [Link]() factory for Java, and
the Node extension function events() for Kotlin. You can pass the
EventType you are targeting as a parameter, and an Observable emitting
that EventType will be returned.

Here is an example with a ListView containing String items representing


the integers 0 through 9. Whenever a numeric key is pressed on your keyboard, it
will select that item in the ListView (Figure 3.1).

Java

59
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();

ListView<String> listView = new ListView<>();

for (int i = 0; i <= 9; i++) {


[Link]().add([Link](i));
}

[Link](listView, KeyEvent.KEY_TYPED)
.map(KeyEvent::getCharacter)
.filter(s -> [Link]("[0-9]"))
.subscribe(s -> [Link]().sel
ect(s));

[Link]().add(listView);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

60
3. Events and Value Changes

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

listview<String> {
(0..9).asSequence().map { [Link]() }.forEach {
[Link](it) }

events(KeyEvent.KEY_TYPED)
.map { [Link] }
.filter { [Link](Regex("[0-9]")) }
.subscribe { [Link](it)}
}
}
}

Figure 3.1 - A ListView that "jumps" to the numeric key input

61
3. Events and Value Changes

Notice above we targeted KeyEvent.KEY_TYPED and the returned


Observable will emit a KeyEvent item every time a KEY_TYPED event
occurs. Some events like this one have helpful information on them, such as the
character String representing the value for that key. We used a regular
expression to validate the character String was a single numeric character,
and filter emissions that are not. Then we selected it in the ListView 's
SelectionModel .

If you want to combine keystrokes to form entire Strings rather than a series
of single characters, you will want to use throttling, buffering, and switching
operators to combine them based on timing windows. We will cover these
later in Chapter 9.

Here is another example that targets MouseEvent.MOVE_MOVED events on a


Rectangle . As you move your cursor over the Rectangle , the x and y
positions of the cursor will be concatenated and pushed into a Label .

Java

62
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();

Label positionLabel = new Label();


Rectangle rectangle = new Rectangle(200,200);
[Link]([Link]);

[Link](rectangle, MouseEvent.MOUSE_MO
VED)
.map(me -> [Link]() + "-" + [Link]())
.subscribe(positionLabel::setText);

[Link]().addAll(positionLabel,rectangle);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

63
3. Events and Value Changes

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

val positionLabel = label()

rectangle(height = 200.0, width = 200.0) {

fill = [Link]

events(MouseEvent.MOUSE_MOVED)
.map { "${it.x}-${it.y}" }
.subscribe { [Link] = it }
}
}
}

Figure 3.2 - A red rectangle that pushes the cursor coordinates when its hovered
over.

64
3. Events and Value Changes

JavaFX is packed with events everywhere, and you will need to know which
events you are targeting on a given Node control. Be sure to look at the
JavaDocs for the control you are using to see which event types you want to
target.

Currently you can target events on Node , Window , and Scene types and
there should be factories to support each one.

ActionEvents
In the previous chapter we were exposed to the simple ActionEvent . You can
actually target the ActionEvent using the events factory and emit them through
an Observable<ActionEvent> .

Java

65
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp2 extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me!");

[Link](button, [Link])
.subscribe(ae -> [Link]("Pressed!"))
;

[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

66
3. Events and Value Changes

import [Link]
import [Link]
import tornadofx.*

class MyView : View() {


override val root = hbox {
button("Press Me")
.events([Link])
.subscribe { println("Pressed!") }
}
}

ActionEvent is a pretty common event that indicates a simple action was


performed, like pressing a Button or MenuItem . It is so common that it is
given its own factory as shown below, which is what we used in the previous
chapter.

Java

67
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox vBox = new VBox();


Button button = new Button("Press Me!");

[Link](button)
.subscribe(ae -> [Link]("Pressed!"))
;

[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

68
3. Events and Value Changes

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {


override val root = hbox {
button("Press Me")
.actionEvents()
.subscribe { println("Pressed!")}
}
}

Currently, the ActionEvent factory supports Button , MenuItem , and


ContextMenu .

ObservableValue Changes
This is where reactive JavaFX starts to get interesting. Up to this point we only
have worked with events. There is some metadata on event emissions that can be
useful, but we are not quite working with data in the traditional sense.

JavaFX has many implementations of its ObservableValue<T> type. This is


essentially a wrapper around a mutable value of a type T , and it notifies any
listeners when the value changes. This provides a perfect opportunity to hook a
listener onto it and make a reactive stream of value changes.

Create a simple UI with a ComboBox<String> and use the


[Link]() factory to emit its value changes in a hot
Observable .

Java

69
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

HBox hBox = new HBox();


ComboBox<String> comboBox = new ComboBox<>();
[Link]().setAll("Alpha","Beta","Gamma","Delta"
,"Epsilon");

[Link]([Link]())
.subscribe(v -> [Link](v + " was sel
ected"));

[Link]().add(comboBox);
[Link](new Scene(hBox));
[Link]();
}
}

Kotlin

70
3. Events and Value Changes

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = hbox {


combobox<String> {

[Link]("Alpha","Beta","Gamma","Delta","Epsilon"
)

valueProperty().toObservable()
.subscribe { println("$it was selected") }
}
}
}

When you select different items in the ComboBox , you should get a console
output that looks something like this:

null was selected


Alpha was selected
Delta was selected
Epsilon was selected

For the next few examples, let's just focus on the Observable chain. Notice that
the [Link]() (or toObservable() for Kotlin) does not
push the initial null value, because RxJava 2 does not emit null values. However,
you can provide a second argument to put a sentinel value for a null.

Java

71
3. Events and Value Changes

[Link]([Link](), "N/A")
.subscribe(v -> [Link](v + " was selected")
);

Kotlin

valueProperty().toObservable("N/A")
.subscribe { println("$it was selected") }

If you want to emit an Optional<T> to handlue nullability, use the


[Link]() factory as shown below:

Java

[Link]([Link]())
.subscribe(v -> [Link](v + " was selected")
);

Kotlin

valueProperty().toNullableObservable()
.subscribe { println("$it was selected") }

Remember that we can use any RxJava operators. We can map() each String's
length() and push that to the Observer .

Java

[Link]([Link]())
.map(String::length)
.subscribe(i ->
[Link]("A String with length " + i + " w
as selected")
);

Kotlin

72
3. Events and Value Changes

valueProperty().toObservable()
.map { [Link] }
.subscribe { println("A String with length $it was selec
ted") }

OUTPUT:

A String with length 5 was selected


A String with length 4 was selected
A String with length 7 was selected
A String with length 4 was selected

Let's get a little more creative, and use scan() to do a rolling sum of the lengths
with each emission.

Java

[Link]([Link]())
.map(String::length)
.scan(0,(x,y) -> x + y)
.subscribe(i -> [Link]("Rolling length total
: " + i));

Kotlin

valueProperty().toObservable()
.map { [Link] }
.scan(0,(x,y) -> x + y)
.subscribe { println("Rolling length total: $it") }

When you make a few selections to the ComboBox , your output should look
something like this depending on which Strings you selected.

OUTPUT:

73
3. Events and Value Changes

Rolling length total: 0


Rolling length total: 5
Rolling length total: 10
Rolling length total: 17
Rolling length total: 22
Rolling length total: 26
Rolling length total: 31

This example may be a bit contrived, but hopefully you are starting to see some of
the possibilities when you have a chain of operators "reacting" to a change in a
ComboBox . Pushing each value every time it is selected in a ComboBox allows
you to quickly tell other parts of the UI to update accordingly.

Again, you can use this factory on any ObservableValue . This means you can
hook into any JavaFX component property and track its changes reactively. The
possibilities are quite vast. For instance, for every selection event in a
ComboBox , you can query a database for items of that selection, and populate
them into a TableView . Then that TableView may have Observables built off
its events and properties to trigger other controls to update.

You might be wondering if making lots of ComboBox selections resulting in


expensive queries could overwhelm the application. If the queries are that
expensive, yes that will happen. But in Chapter 9 we will learn how to switch,
throttle, and buffer which will resolve this issue effectively.

You also have the option of pushing the old and new value in a Change item
through the changesOf() factory. This can be helpful for validation, and you can
restore that old value back into the control if the new value fails to meet a
condition.

For example, you can emit the old value and new value together on each typed
character in a TextField . If at any moment the text is not numeric (or is an
empty String ), the previous value can be restored immediately using the
Observer .

Java

74
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

HBox hBox = new HBox();


TextField textField = new TextField();

[Link]([Link]())
.map(s -> [Link]().matches("[0-9]+") ? [Link]
tNewVal() : [Link]())
.subscribe(textField::setText);

[Link]().add(textField);
[Link](new Scene(hBox));
[Link]();
}
}

Kotlin

75
3. Events and Value Changes

import [Link]
import tornadofx.*

class MyView : View() {

override val root = hbox {


textfield {
textProperty().toObservableChanges()
.filter { ![Link](Regex("[0-9]+")
) }
.map { [Link] }
.subscribe {
text = it
}
}
}
}

If you study the Observable operation above, you can see that each Change
item is emitted holding the old and new value for each text input. Using a regular
expression, we validated for text inputs that are not numeric or are empty. We
then map() it back to the old value and set it to the TextField in the
Observer .

Error Recovery
When working with Observables built off UI events, sometimes an error can occur
which will be communicated up the chain via onError() . In production, you
should always have the Observer handle an onError() so the error does not
just quietly disappear. But when you are dealing with UI input events, there is
likely one other error handling issue to consider.

Say you have this simple JavaFX Application with a Button that adds a
numeric input from a TextField , and adds it to a total in a Label (Figure
3.3).

Java

76
3. Events and Value Changes

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

Label label = new Label("Input Number");


TextField input = new TextField();
Label totalLabel = new Label();

Button button = new Button("Add to Total");

[Link](button)
.map(ae -> [Link]([Link]()))
.scan(0,(x,y) -> x + y)
.subscribe(i -> {
[Link]([Link]());
[Link]();
}, e -> new Alert([Link], [Link]
ssage()).show());

[Link]().setAll(label,input, totalLabel, butto


n);
[Link](new Scene(root));
[Link]();
}
}

77
3. Events and Value Changes

Kotlin

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

label("Input Number")
val input = textfield()
val totalLabel = label("")

button("Add to Total").actionEvents()
.map { [Link]() }
.scan(0) {x,y -> x + y }
.subscribeBy(
onNext = {
[Link] = [Link]()
[Link]()
},
onError = { Alert([Link], it
.message).show() }
)
}
}

Figure 3.3

78
3. Events and Value Changes

That TextField should only have numeric inputs, but nothing is stopping non-
numeric inputs from being emitted. Therefore, if you type a non-numeric value in
that TextField and click the Button , you will get an Alert as specified in
the Observers's onError() (Figure 3.4).

Figure 3.4

Despite the error being handled, there is one problem here. The Observable is
now dead. It called onError() and closed the stream, assuming nothing could
be done to recover from it. You will find the Button is no longer sending
emissions. You can fix this by adding the retry() operator right before the
Observer . When its onError() is called, it will intercept and swallow the
error, then resubscribe again.

Java

[Link](button)
.map(ae -> [Link]([Link]()))
.scan(0,(x,y) -> x + y)
.retry()
.subscribe(i -> {
[Link]([Link]());
[Link]();
}, e -> new Alert([Link], [Link]()).s
how());

79
3. Events and Value Changes

Kotlin

button("Add to Total").actionEvents()
.map { [Link]() }
.scan(0) {x,y -> x + y }
.retry()
.subscribeBy(
onNext = {
[Link] = [Link]()
[Link]()
},
onError = { Alert([Link], [Link]
).show() }
)

If you type in a non-numeric input, it will resubscribe and start all over. The
scan() operator will send another initial emission of 0 and result in
everything being reset. You also have the option of moving the retry() before
the scan() operation (so it intercepts the error before the scan() ), and that
would maintain the current rolling total rather than canceling it and starting over at
0.

But notice that the onError() in the Observer is never called, and we never
get an Alert . This is because the retry() intercepted the onError() call
and kept it from going to the Observer . To get the Alert , you may want to
move it to a doOnError() operator before the retry() . The error will flow
through it to trigger the Alert before the retry() intercepts it.

Java

80
3. Events and Value Changes

[Link](button)
.map(ae -> [Link]([Link]()))
.scan(0,(x,y) -> x + y)
.doOnError( e -> new Alert([Link], [Link]
sage()).show())
.retry()
.subscribe(i -> {
[Link]([Link]());
[Link]();
});

Kotlin

button("Add to Total").actionEvents()
.map { [Link]() }
.scan(0) {x,y -> x + y }
.doOnError { Alert([Link], [Link]).sh
ow() }
.retry()
.subscribe {
[Link] = [Link]()
[Link]()
}

By default, retry() will resubscribe an unlimited number of times for an


unlimited number of errors. This means for cold data sources, this can spiral out of
control quickly by retrying an infinite number of times! You can pass an Integer
argument like retry(3) so that it will only retry three times and the fourth
onError() will go to the Observer . There is also a retryWhen() operator
that allows you to conditionally resubscribe based on some attribute of the error
(like its type).

There are a couple of error-handling operators in RxJava that are worth being
familiar with. But for UI input controls, you will likely want to leverage retry()
so Observables built off UI controls do not remain dead after an error. This is
especially critical if you are kicking off complex reactive processes.

81
3. Events and Value Changes

It is also worth noting that the best way to handle errors is to handle them
proactively. In this example, it would have been good to forbid numbers from
being entered in the TextField in the first place (like our previous exercise).
Another valid check would be to filter() out non-numeric values so they are
suppressed before being turned into an Integer .

Java

[Link](button)
.map(ae -> [Link]())
.filter(s -> [Link]("[0-9]+"))
.map(Integer::valueOf)
.scan(0,(x,y) -> x + y)
.subscribe(i -> {
[Link]([Link]());
[Link]();
});

Kotlin

button("Add to Total").actionEvents()
.map { [Link] }
.filter { [Link](Regex("[0-6]+")) }
.map { [Link]() }
.scan(0) {x,y -> x + y }
.subscribeBy(
onNext = {
[Link] = [Link]()
[Link]()
},
onError = { Alert([Link], [Link]).sh
ow() }
)

Summary

82
3. Events and Value Changes

In this chapter, we learned the basic RxJavaFX/RxKotlinFX factories to create


RxJava Observables off JavaFX Events and ObservableValues. Definitely spend
some time experimenting with this small but powerful set of factories that can be
applied almost anywhere in the JavaFX API. We also learned how to resubscribe
Observables built off UI events in the event an onError() occurs.

But there are a few more facilities we need to be truly productive with reative
JavaFX, starting next with JavaFX Collections. This is where the line between
data and events truly become blurred in surpringly useful ways.

83
4. Collections

4. Collections and Data


Any sizable application needs to work with data and collections of items. One of
the greatest utilities to come out of JavaFX are ObservableCollections such as
ObservableList , ObservableSet , and ObservableMap . These
implementations of List , Set , and Map are built specifically for JavaFX to
notify the UI when it has been modified, and any control built off it will visually
update accordingly.

However, these ObservableCollections can have custom listeners added to them.


This creates an opportunity to reactively work with data through collections. The
idea of emitting a collection every time it changes allows some surprisingly useful
reactive transformations, and we will see plenty of examples in this chapter.

Do not confuse the JavaFX ObservableValue , ObservableList ,


ObservableSet , and ObservableMap to somehow be related to the
RxJava Observable . This is not the case. Remember that JavaFX's
concept of an Observable is not the same as an RxJava Observable .
However, we will turn all of these into an RxJava Observable to fully utilize
their capabilities.

Emitting an Observable Collection


Let's create a simple application backed by an ObservableList of Strings.
There will be a ListView<String> to display these values, and another
ListView<Integer> that will hold their distinct lengths. We will use a
TextField and a Button to add Strings to the ObservableList , and both
ListViews should update accordingly with each addition.

You should get a UI that looks like Figure 4.1 when you run the code below.

Java

import [Link];
import [Link];
import [Link];

84
4. Collections

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ObservableList<String> values =
[Link]("Alpha","Beta",
"Gamma");

Label valuesLabel = new Label("VALUES");


ListView<String> valuesListView = new ListView<>(values)
;

Label distinctLengthsLabel = new Label("DISTINCT LENGTHS"


);
ListView<Integer> distinctLengthsListView = new ListView
<>();

[Link](values)
.flatMapSingle(list ->
[Link](list).map(String
::length).distinct().toList()
).subscribe(lengths -> distinctLengthsListView.g
etItems().setAll(lengths));

TextField inputField = new TextField();


Button addButton = new Button("ADD");

85
4. Collections

[Link](addButton)
.map(ae -> [Link]())
.filter(s -> s != null && ![Link]().isEmpty())
.subscribe(s -> {
[Link](s);
[Link]();
});

[Link]().addAll(valuesLabel,valuesListView,dis
tinctLengthsLabel,
distinctLengthsListView,inputField,addButton);

[Link](new Scene(root));
[Link]();
}
}

Kotlin

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

val values = [Link]("Alpha", "Bet


a", "Gamma")

override val root = vbox {

label("VALUES")
listview(values)

label("DISTINCT LENGTHS")

86
4. Collections

listview<Int> {
[Link]()
.flatMapSingle {
[Link]().map { [Link] }.dist
inct().toList()
}.subscribe {
[Link](it)
}
}

label("INPUT")
val inputField = textfield()

button("ADD").actionEvents()
.map { [Link] }
.filter { ![Link]().isEmpty() }
.subscribe {
[Link](it)
[Link]()
}
}
}

Figure 4.1

87
4. Collections

Go ahead and type in "Delta", then click "ADD". Then do the same for "Epsilon".
You should now see Figure 4.2.

Figure 4.2

88
4. Collections

See that? Not only did "Delta" and "Epsilon" get added to the top ListView , but
the distinct length of 7 was added to the bottom one. So how exactly was this
made possible?

Study the code above very closely. We declared an ObservableList<String>


called values . All the magic is built around it. We created an
Observable<ObservableList<String>> off it using
[Link]() (or onChangedObservable() for
Kotlin). While the type looks a little strange the idea is very simple: every time the
ObservableList<String> changes, it is pushed through the
Observable<ObservableList<String>> in its entirety as an emission. It will
also emit the ObservableList on subscription as the initial emission.

This is a useful pattern because as we have just seen, we can transform this
ObservableList emission inside a flatMap() any way we want. In this
example, we effectively created a new ObservableList<Integer> that

89
4. Collections

receives the distinct lengths of the ObservableList<String> .

Note the placement of operators is very critical! The toList() operator


occured inside the flatMapSingle() where it was working with a finite
Observable derived from an ObservableList . Putting that toList()
outside a flatMap() will cause it to work against an infinite Observable ,
and it will forever collect items and never emit.

Let's leverage this idea in another way. Instead of putting the distinct lengths in
another ObservableList<Integer> , let's concatenate them as a String and
push it into a Label 's text.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ObservableList<String> values =
[Link]("Alpha","Beta",
"Gamma");

Label valuesLabel = new Label("VALUES");

90
4. Collections

ListView<String> valuesListView = new ListView<>(values)


;

Label distinctLengthsLabel = new Label("DISTINCT LENGTHS"


);
Label distinctLengthsConcatLabel= new Label();
[Link]([Link]);

[Link](values)
.flatMapSingle(list ->
[Link](list)
.map(String::length)
.distinct().reduce("",(x,y) -> x
+ ([Link]("") ? "" : "|") + y)
).subscribe(distinctLengthsConcatLabel::setText)
;

TextField inputField = new TextField();


Button addButton = new Button("ADD");

[Link](addButton)
.map(ae -> [Link]())
.filter(s -> s != null && ![Link]().isEmpty())
.subscribe(s -> {
[Link](s);
[Link]();
});

[Link]().addAll(valuesLabel,valuesListView,dis
tinctLengthsLabel,
distinctLengthsConcatLabel,inputField,addButton)
;

[Link](new Scene(root));
[Link]();
}
}

Kotlin

91
4. Collections

import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

val values = [Link]("Alpha","Beta"


,"Gamma")

override val root = vbox {

label("VALUES")
listview(values)

label("DISTINCT LENGTHS")
label {
textFill = [Link]

[Link]()
.flatMapSingle {
[Link]()
.map { [Link] }
.distinct()
.reduce("") { x,y -> x + (if (x
== "") "" else "|") + y }
}.subscribe {
text = it
}
}

label("INPUT")
val inputField = textfield()

92
4. Collections

button("ADD").actionEvents()
.map { [Link] }
.filter { ![Link]().isEmpty()}
.subscribe {
[Link](it)
[Link]()
}
}
}

Figure 4.3

Awesome, right? We are pushing a transformation of the ObservableList


source and driving a Label 's text with it. Simply using an Observable and
Observer , we can easily do limitless transformations of data and events that
are almost impractical to do in standard JavaFX.

Note also there are factories for ObservableSet and ObservableMap to


accomplish the same behavior. [Link]() will emit
an ObservableSet every time it changes,

93
4. Collections

Java

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ObservableSet<String> values =
[Link]("Alpha","Beta","Gamm
a");

[Link](values)
.subscribe([Link]::println);

[Link]("Delta");
[Link]("Alpha"); //no effect

[Link]("Beta");

[Link](0); //quit
}
}

Kotlin

94
4. Collections

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val values = [Link]("Alpha","Beta",
"Gamma")

[Link]()
.subscribe { println(it) }

[Link]("Delta")
[Link]("Alpha") //no effect

[Link]("Beta")

[Link](0) //quit
}
}

OUTPUT:

[Alpha, Gamma, Beta]


[Alpha, Gamma, Delta, Beta]
[Alpha, Gamma, Delta]

[Link]() will also emit an ObservableMap


every time it changes.

Java

95
4. Collections

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ObservableMap<Integer,String> values =
[Link]();

[Link](values)
.subscribe([Link]::println);

[Link](1,"Alpha");
[Link](2,"Beta");
[Link](3,"Gamma");
[Link](1,"Alpha"); //no effect
[Link](3,"Delta");
[Link](2);

[Link](0);
}
}

Kotlin

96
4. Collections

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val values = [Link]<Int, String>
()

[Link](values)
.subscribe { println(it) }

[Link](1, "Alpha")
[Link](2, "Beta")
[Link](3, "Gamma")
[Link](1, "Alpha") //no effect
[Link](3, "Delta")
[Link](2)

[Link](0);
}
}

OUTPUT:

{}
{1=Alpha}
{1=Alpha, 2=Beta}
{1=Alpha, 2=Beta, 3=Gamma}
{1=Alpha, 2=Beta, 3=Delta}
{1=Alpha, 3=Delta}

Add, Remove, and Update Events

97
4. Collections

There are factories for ObservableList , ObservableSet , and


ObservableMap to emit specific change events against those collections. To get
an emission for each modification to an ObservableList , you can use
changesOf() . It will pair each affected element T with a Flag in a
ListChange emission. The Flag is an enum with possible values ADDED ,
REMOVED , or UPDATED .

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ObservableList<String> values =
[Link]("Alpha","Beta",
"Gamma");

[Link](values)
.subscribe([Link]::println);

[Link]("Delta");
[Link]("Epsilon");
[Link]("Alpha");
[Link](2,"Eta");

[Link](0);
}
}

Kotlin

98
4. Collections

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val values = [Link]("Alpha",
"Beta", "Gamma")

[Link]().subscribe { println(it) }

[Link]("Delta")
[Link]("Epsilon")
[Link]("Alpha")
values[2] = "Eta"

[Link](0)
}
}

OUTPUT:

ADDED Delta
ADDED Epsilon
REMOVED Alpha
ADDED Eta
REMOVED Delta

There are equivalent factories for ObservableMap and ObservableSet as


well, which are overloads for changesOf() .

Note that this factory has no initial emission. It will only emit changes going
forward after subscription. A ListChange is emitted with the affected value and
whether it was ADDED , REMOVED , or UPDATED . Interestingly, note that calling
set() on the ObservableList will replace an element at a given index, and

99
4. Collections

result in two emissions: one for the REMOVED item, and another for the ADDED
item. When we set the item at index 2 to "Eta", it replaced "Delta" which was
REMOVED , and then "Eta" was ADDED .

An UPDATED emission occurs when an ObservableValue property of a T


item in an ObservableList<T> changes. This is a lesser-known feature in
JavaFX but can be enormously helpful. Consider a User class with an
updateable Property called name .

Java

class User {
private final int id;
private final Property<String> name =
new SimpleStringProperty();

User(int id, String name) {


[Link] = id;
[Link](name);
}
public int getId() {
return id;
}
public Property<String> nameProperty() {
return name;
}
@Override
public String toString() {
return id + "-" + [Link]();
}
}

Kotlin

100
4. Collections

class User(val id: Int, name: String) {


var name: String by property(name)
fun nameProperty() = getProperty(User::name)

override fun toString() = "$id-$name"


}

Whenever this name property for any User changes, this change will be
pushed as an emission. It will be categorized in a ListChange as UPDATED .

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ObservableList<User> values =
[Link](user -> new Ob
servableValue[]{ [Link]() });

[Link](values)
.subscribe([Link]::println);

[Link](new User(503,"Tom Nield"));


[Link](new User(504,"Jason Shwartz"));

[Link](0).nameProperty().setValue("Thomas Nield");

[Link](0);

101
4. Collections

static final class User {


private final int id;
private final Property<String> name = new SimpleStringPr
operty();

User(int id, String name) {


[Link] = id;
[Link](name);
}
public int getId() {
return id;
}
public Property<String> nameProperty() {
return name;
}
@Override
public String toString() {
return id + "-" + [Link]();
}
}
}

Kotlin

102
4. Collections

import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val values = [Link]<User> { u
ser ->
arrayOf<ObservableValue<*>>([Link]())
}

[Link](values)
.subscribe { println(it) }

[Link](User(503, "Tom Nield"))


[Link](User(504, "Jason Shwartz"))

values[0].nameProperty().value = "Thomas Nield"

[Link](0)
}
}

class User(val id: Int, name: String) {


var name: String by property(name)
fun nameProperty() = getProperty(User::name)

override fun toString() = "$id-$name"


}

OUTPUT:

ADDED 503-Tom Nield


ADDED 504-Jason Shwartz
UPDATED 503-Thomas Nield

103
4. Collections

We declared a lambda specifying an array of ObservableValue properties we


are interested in listening to, which in this case is only the name property. When
the first element containing the User named "Tom Nield" had its name
property changed to Thomas Nield , it was emitted as a change. This will also
work with the emitOnChanged() factory we saw earlier, and the entire
ObservableList<T> will be pushed every time any specified property changes.

This can be helpful to react not just to items in the list being added or removed,
but also when their properties are modified. You can then use this behavior to, for
example, to drive updates to concatenations.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ObservableList<User> values =
[Link](user -> new Ob
servableValue[]{[Link]()});

[Link](values)
.flatMapSingle(list ->
[Link](list)
.map(User::getName)
.reduce("",(u1,u2) -> u1 + (u1.e
quals("") ? "" : ", ") + u2)
)
.subscribe([Link]::println);

[Link](new User(503,"Tom Nield"));

104
4. Collections

[Link](new User(504,"Jason Shwartz"));

[Link](0).nameProperty().setValue("Thomas Nield");

[Link](0);
}
static final class User {
private final int id;
private final Property<String> name = new SimpleStringPr
operty();

User(int id, String name) {


[Link] = id;
[Link](name);
}
public int getId() {
return id;
}
public Property<String> nameProperty() {
return name;
}
@Override
public String toString() {
return id + "-" + [Link]();
}
}
}

Kotlin

105
4. Collections

import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val values = [Link]<User> { u
ser ->
arrayOf<ObservableValue<*>>([Link]())
}

[Link](values)
.flatMapSingle {
[Link]().map {[Link] }
.reduce("") { u1,u2 -> u1 + (if (u1
== "") "" else ", ") + u2 }
}
.subscribe { println(it) }

[Link](User(503, "Tom Nield"))


[Link](User(504, "Jason Shwartz"))

values[0].nameProperty().value = "Thomas Nield"

[Link](0)
}
}

class User(val id: Int, name: String) {


var name: String by property(name)
fun nameProperty() = getProperty(User::name)

override fun toString() = "$id-$name"


}

106
4. Collections

OUTPUT:

Tom Nield
Tom Nield, Jason Shwartz
Thomas Nield, Jason Shwartz

Note also there are factories that target only ADDED , REMOVED , and UPDATED
events. These will only emit items corresponding to those event types, and also
are available under the JavaFxObservable utility class. Here is a complete list
of these additional factories as well as the others we covered so far.

Figure 4.4 - JavaFX Collection Factories

Collection Type Java Factory Kotlin Extension

ObservableList<T> emitOnChanged() onChangedObservable()

ObservableList<T> additionsOf() additions()


ObservableList<T> removalsOf() removals()
ObservableList<T> updatesOf() updates()

ObservableSet<T> emitOnChanged() onChangedObservable()

ObservableSet<T> additionsOf() additions()


ObservableSet<T> removalsOf() removals()
ObservableSet<T> fromObservableSetUpdates() updates()

ObservableMap<T> emitOnChanged() onChangedObservable()

ObservableMap<<K,T> additionsOf() additions()


ObservableMap<K,T> removalsOf() removals()
ObservableMap<K,T> fromObservableMapUpdates() updates()

Distinct ObservableList Changes

107
4. Collections

There may be times you want to emit only distinct changes to a JavaFX
ObservableList . What this means is you want to ignore duplicates added or
removed to the collection and not emit them as a change. This can be helpful to
synchronize two different ObservableLists, where one has duplicates and the
other does not.

Take this application that will hold two ListView<String> instances each
backed by an ObservableList<String> . The top ListView<String> will
hold duplicate values, but the bottom ListView<String> will hold only distinct
values from the top ListView (Figure 4.5).

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

Label header1 = new Label("VALUES");


ListView<String> listView = new ListView<>();

Label header2 = new Label("DISTINCT VALUES");


ListView<String> distinctListView = new ListView<>();

[Link]([Link]())
.subscribe(c -> {
if ([Link]().equals([Link]))

108
4. Collections

[Link]().add([Link]
ue());
else
[Link]().remove([Link]
Value());
});

TextField inputField = new TextField();

Button addButton = new Button("Add");


[Link](addButton)
.map(ae -> [Link]())
.subscribe(s -> {
[Link]().add(s);
[Link]();
});

Button removeButton = new Button("Remove");


[Link](removeButton)
.map(ae -> [Link]())
.subscribe(s -> {
[Link]().remove(s);
[Link]();
});

[Link]().addAll(header1,listView,header2,
distinctListView,inputField,addButton,removeButt
on);

[Link](new Scene(root));
[Link]();
}
}

Kotlin

import [Link]
import [Link]
import [Link]

109
4. Collections

import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {

label("Values")
val listView = listview<String>()

label("Distinct Values")
val distinctListView = listview<String>()

label("Input/Remove Value")
val inputField = textfield()

[Link]()
.subscribe {
if ([Link] == [Link])
[Link]([Link])
else
[Link]([Link])
}

button("Add").actionEvents()
.map { [Link] }
.subscribe {
[Link](it)
[Link]()
}

button("Remove").actionEvents()
.map { [Link] }
.subscribe {
[Link](it)
[Link]()
}
}
}

110
4. Collections

Figure 4.5

The key factory here is the distinctChangesOf() (or distinctChanges()


for Kotlin). It pushes only distinct changes from the top ListView to the bottom
one. If you input "Alpha" twice, the top ListView will hold both instances, but
the bottom will only receive one. The second ADDED emission was suppressed.
If you remove one of the "Alpha" values, it will not fire the REMOVED emission
until you rid the other one too.

You also have the option of choosing an attribute of the item to distinct on rather
than the item itself. If you wanted to only emit distinct values based on the first
character, you can pass a lambda argument to the factory that substrings out the
first character.

111
4. Collections

Java

[Link]([Link](), s -> s.
substring(0,1))
.subscribe(c -> {
if ([Link]().equals([Link]))
[Link]().add([Link]());
else
[Link]().remove([Link]());
});

Kotlin

[Link] { [Link](0,1) }
.subscribe {
if ([Link] == [Link])
[Link]([Link])
else
[Link]([Link])
}

Figure 4.6

112
4. Collections

As you may see, this might be helpful to sample only one item with a distinct
property. If you add "Alpha" and then "Apple", only "Alpha" will be emitted to the
bottom ListView since it was the first to start with "A". The "Alpha" will only be
removed from the bottom ListView when both "Alpha" and "Apple" are
removed, when there are no longer any "A" samples.

If you want to push the mapped value itself rather than the item it was derived
from, you can use the distinctMappingsOf() factory (or
distinctMappingChanges() for Kotlin) (Figure 4.7).

Java

113
4. Collections

[Link]([Link](), s -> s.
substring(0,1))
.subscribe(c -> {
if ([Link]().equals([Link]))
[Link]().add([Link]
ue());
else
[Link]().remove([Link]
Value());
});

Kotlin

[Link] { [Link](0,1) }
.subscribe {
if ([Link] == [Link])
[Link]([Link])
else
[Link]([Link])
}

Figure 4.7

114
4. Collections

If you input "Alpha", an "A" will show up in the bottom ListView . Adding "Apple"
will have no effect as "A" (its first character) has already been distincly ADDED .
When you remove both "Alpha" and "Apple", the "A" will then be REMOVED from
the bottom.

Summary
In this chapter we covered how to reactivly use JavaFX ObservableCollections.
When you emit an entire collection every time it changes, or emit the elements
that changed, you can get a lot of functionality that simply does not exist with

115
4. Collections

JavaFX alone. We also covered distinct additions and removals, which can be
helpful to create an ObservableList that distincts off of another
ObservableList .

Hopefully by now, RxJava is slowly starting to look useful. But we have only just
gotten started. The power of Rx really starts to unfold when we combine
Observables, leverage concurrency, and use other features that traditionally take
a lot of effort. Next, we will cover combining Observables.

116
5. Combining Observables

5. Combining Observables
So far in this book, we have merely set the stage to make Rx useful. We learned
how to emit JavaFX Events, ObservableValues, and ObservableCollections
through RxJava Observables. But there is only so much you can do when a
reactive stream is built off one source. When you have emissions from multiple
Observables being joined together in some form, this is truly where things
transition from merely being useful to definitely game-changing.

There are several ways to combine emissions from multiple Observables, and we
will cover many of these combine operators. What makes these operators
especially powerful is they are not only threadsafe, but also non-blocking. They
can merge concurrent sources from different threads, and we will see this in
action later in Chapter 7.

Concatenation
One of the simplest ways to combine Observables is to use the concat()
operators. You can specify two or more Observables emitting the same type T
and it will fire emissions from each one in order.

Java

import [Link];

public static void main(String[] args) {


Observable<String> source1 = [Link]("Alpha","Beta",
"Gamma");
Observable<String> source2 = [Link]("Delta","Epsilo
n");

[Link](source1,source2)
.map(String::length)
.toList()
.subscribe([Link]::println);
}

117
5. Combining Observables

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

val source1 = [Link]("Alpha","Beta","Gamma")


val source2 = [Link]("Delta","Epsilon")

[Link](source1,source2)
.map { [Link] }
.toList()
.subscribeBy { println(it) }
}

OUTPUT:

[5, 4, 5, 5, 7]

It is very critical to note that onComplete() must be called by each


Observable so it moves on to the next one. If you have an infinite
Observable in a concatenated operation, it will hold up the line by infinitely
emitting items, forever keeping any Observables after it from getting fired.

Concatentation is also available as an operator and not just a factory, and it


should yield the same output.

Java

118
5. Combining Observables

import [Link];

public static void main(String[] args) {


Observable<String> source1 = [Link]("Alpha","Beta",
"Gamma");
Observable<String> source2 = [Link]("Delta","Epsilo
n");

[Link](source2)
.map(String::length)
.toList()
.subscribe([Link]::println);
}

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

val source1 = [Link]("Alpha","Beta","Gamma")


val source2 = [Link]("Delta","Epsilon")

[Link](source2)
.map { [Link] }
.toList()
.subscribeBy { println(it) }
}

OUTPUT:

[5, 4, 5, 5, 7]

If you want to do a concenation but put another Observable in front rather than
after, you can use startWith() instead.

Java

119
5. Combining Observables

import [Link];

public static void main(String[] args) {


Observable<String> source1 = [Link]("Alpha","Beta",
"Gamma");
Observable<String> source2 = [Link]("Delta","Epsilo
n");

[Link](source2)
.subscribe([Link]::println);
}

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

val source1 = [Link]("Alpha","Beta","Gamma")


val source2 = [Link]("Delta","Epsilon")

[Link](source2)
.subscribe { println(it) }
}

OUTPUT:

Delta
Epsilon
Alpha
Beta
Gamma

Again, this operator is likely not one you would use with infinite Observables. You
are more likely to use this for data-driven Observables rather than UI events.
Technically, you can specify an infinite Observable to be the last Observable

120
5. Combining Observables

to concatenate. That way it is not holding up any other Observables.

When you want to simultaneously combine all emissions from all Observables,
you might want to consider using merge() , which we will cover next.

Merging
Merging is almost like concatenation but with one important difference: it will
combine all Observables of a given emission type T simultaneously. This
means all emissions from all Observables are merged together at once into a
single stream without any regard for order or completion.

This is pretty helpful to merge multiple UI event sources since they are infinite. For
instance, you can consolidate the ActionEvent emissions of two buttons into a
single Observable<ActionEvent> using [Link]() . (Figure 5.1).

Java

121
5. Combining Observables

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();

Button firstButton = new Button("Press Me");


Button secondButton = new Button("Press Me Too");

[Link](
[Link](firstButton),
[Link](secondButton)
).subscribe(i -> [Link]("You pressed one of
the buttons!"));

[Link]().addAll(firstButton,secondButton);
[Link](new Scene(root));
[Link]();
}
}

Kotlin

122
5. Combining Observables

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val buttonUp = button("Press Me")
val buttonDown = button("Press Me Too")

[Link](
[Link](),
[Link]()
)
.subscribe {
println("You pressed one of the buttons!")
}
}
}

Figure 5.1

When you press either Button , it will consolidate the emissions into a single
Observable<ActionEvent> which goes to a single Observer . But let's make
this more interesting. Change these two Buttons so they are labeled "UP" and
"DOWN", and map their ActionEvent to either a 1 or -1 respectively.
Using a scan() we can create a rolling sum of these emissions and push the
incrementing/decrementing number to a Label (Figure 5.2).

Java

123
5. Combining Observables

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();

Label label = new Label("0");

Button buttonUp = new Button("UP");


Button buttonDown = new Button("DOWN");

[Link](
[Link](buttonUp).map(ae
-> 1),
[Link](buttonDown).map(
ae -> -1)
).scan(0,(x,y) -> x + y)
.subscribe(i -> [Link]([Link]()));

[Link]().addAll(label,buttonUp,buttonDown);
[Link](new Scene(root));
[Link]();
}
}

Kotlins

124
5. Combining Observables

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {


override val root = vbox {
val label = label("0")
val buttonUp = button("UP")
val buttonDown = button("DOWN")

[Link](
[Link]().map { 1 },
[Link]().map { -1 }
).scan(0) { x,y -> x + y }
.subscribe {
[Link] = [Link]()
}
}
}

Figure 5.2

When you press the "UP" Button , it will increment the integer in the Label .
When you press the "DOWN" Button , it will decrement it. This was
accomplished by merging the two infinite Observables returned from the map()
operator. The 1 or -1 is then pushed to the scan() operation where it is
emitted as a rolling total.

Like concatenation, there is also an operator version you can use instead of the
factory to merge an Observable<T> with another Observable<T>

125
5. Combining Observables

Java

[Link](buttonUp).map(ae -> 1)
.mergeWith(
[Link](buttonDown).map(ae -> -1
)
).scan(0,(x,y) -> x + y)
.subscribe(i -> [Link]([Link]()));

Kotlin

[Link]().map { 1 }
.mergeWith(
[Link]().map { -1 }
).scan(0) { x,y -> x + y }
.subscribe {
[Link] = [Link]()
}

With both concatentation and merging, you can combine as many Observables as
you want (up to 9 before you have to pass an Iterable<Observable> instead).
But these two operators work with Observables emitting the same type T . There
are ways to combine emissions of different types which we will see next.

Zip
One way you can combine multiple Observables, even if they are different types,
is by "zipping" their emissions together. Think of a zipper on a jacket and how the
teeth pair up. From a reactive perspective, this means taking one emission from
the first Observable , and one from a second Observable , and combining
both emissions together in some way.

Take these two Observables, one emitting Strings and the other emitting Integers.
For each String that is emitted, you can pair it with an emitted Integer and
join them together somehow.

Java

126
5. Combining Observables

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<String> letters = [Link]("A", "B", "


C", "D", "E", "F");
Observable<Integer> numbers = [Link](1, 2, 3, 4
, 5);

[Link](letters, numbers, (l, n) -> l + "-" + n)


.subscribe([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);
}
}

Kotlin

import [Link]
import [Link]
import [Link]

fun main(args: Array<String>) {

val letters = [Link]("A","B","C","D","E","F")


val numbers = [Link](1,2,3,4,5)

[Link](letters,numbers) { l, n -> "$l-$n"}


.subscribeBy(
onNext = { println(it) },
onError = { [Link]() },
onComplete = { println("Done!") }
)
}

127
5. Combining Observables

OUTPUT:

A-1
B-2
C-3
D-4
E-5
Done!

Notice that "A" paired with the "1", and "B" paired with the "2", and so on. Again,
you are "zipping" them just like a jacket zipper. But take careful note of something:
there are 6 letters emissions and 5 numbers emissions. What happened to
that sixth letter "F" since it had no number to zip with? Since the two zipped
sources do not have the same number of emissions, it was ignored the moment
onComplete() was called by numbers . Logically, it will never have anything to
pair with so it gave up and proceeded to skip it and call onComplete() down to
the Subscriber .

There is also an operator equivalent called zipWith() you can use. This should
yield the exact same output.

Java

128
5. Combining Observables

import [Link];

public class Launcher {

public static void main(String[] args) {

Observable<String> letters = [Link]("A", "B", "


C", "D", "E", "F");
Observable<Integer> numbers = [Link](1, 2, 3, 4
, 5);

[Link](numbers, (l, n) -> l + "-" + n)


.subscribe([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);
}
}

Kotlin

import [Link]
import [Link]
import [Link]

fun main(args: Array<String>) {

val letters = [Link]("A","B","C","D","E","F")


val numbers = [Link](1,2,3,4,5)

[Link](numbers) {l,n -> "$l-$n"}


.subscribeBy(
onNext = { println(it) },
onError = { [Link]() },
onComplete = { println("Done!") }
)
}

129
5. Combining Observables

Zipping can be helpful when you need to sequentially pair things from two or more
sources, but from my experience this rarely works well with UI events. Let's adapt
this example to see why.

Suppose you create two ComboBox controls holding these letters and numbers
respectively. You want to create an Observable off each one that emits the
selected values. Then you want to zip the values together, concatentate them into
a single String , and print them in an Observer . You are looking to combine
two different user inputs together (Figure 5.3).

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ComboBox<String> letterCombo = new ComboBox<>();


[Link]().setAll("A", "B", "C", "D", "E", "
F");

ComboBox<Integer> numberCombo = new ComboBox<>();


[Link]().setAll(1,2,3,4,5);

Observable<String> letterSelections =
[Link]([Link]
rty());

Observable<Integer> numberSelections =
[Link]([Link]
rty());

130
5. Combining Observables

[Link](letterSelections, numberSelections, (l, n


) -> l + "-" + n)
.subscribe([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);

HBox root = new HBox();


[Link]().setAll(letterCombo,numberCombo);

[Link](new Scene(root));
[Link]();
}
}

Kotlin

131
5. Combining Observables

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = hbox {

val letterSelections = combobox<String> {


[Link]("A","B","C","D","E","F")
}.valueProperty().toObservable()

val numberSelections = combobox<Int> {


[Link](1,2,3,4,5)
}.valueProperty().toObservable()

[Link](letterSelections,numberSelections) { l,
n -> "$l-$n"}
.subscribeBy(
onNext = { println(it) },
onError = { [Link]() },
onComplete = { println("Done!") }
)
}
}

Figure 5.3

This seems like a good idea, right? When I select a letter, and I select a number,
the two are zipped together and sent to the Observer ! But there is something
subtle and problematic with this. Select multiple letters without selecting any
numbers, then select multiple numbers. Notice how the letters are backlogged and

132
5. Combining Observables

each one is waiting for a number to be paired with? This is problematic and
probably not what you want. If you select "A", then "B", then "C" followed by "1",
then "2", then "3", you are going to get "A-1", "B-2", and "C-3" printed to the
console.

Here is another way of looking at it. The problem with our zipping example is for
every selected "letter", you need to select a "number" to evenly pair with it. If you
make several selections to one combo box and neglect to make selections on the
other, you are going to have a backlog of emissions waiting to be paired. If you
select eight different letters (shown below), and only four numbers, the next
number you select is going to pair with the "D", not "F" which is currently selected.
If you select another letter its only going to worsen the backlog and make it more
confusing as to what the next number will pair with.

A 1
B 5
A 3
A 6
D
C
A
F

If you want to only combine the latest values from each Observable and ignore
previous ones, you might want to use combineLatest() which we will cover
next.

Note you can make zip more than two Observables. If you have three
Observables, it will zip three emissions together before consolidating them into a
single emission.

Combine Latest
With our zipping example earlier, it might be more expected if we combine values
by chasing after the latest values. Using combineLatest() instead of zip() ,
we can select a value in either ComboBox . Then it will emit with the latest value
from the other ComboBox .

133
5. Combining Observables

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ComboBox<String> letterCombo = new ComboBox<>();


[Link]().setAll("A", "B", "C", "D", "E", "
F");

ComboBox<Integer> numberCombo = new ComboBox<>();


[Link]().setAll(1,2,3,4,5);

Observable<String> letterSelections =
[Link]([Link]
rty());

Observable<Integer> numberSelections =
[Link]([Link]
rty());

[Link](letterSelections, numberSelecti
ons, (l, n) -> l + "-" + n)
.subscribe([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);

HBox root = new HBox();


[Link]().setAll(letterCombo,numberCombo);

134
5. Combining Observables

[Link](new Scene(root));
[Link]();
}
}

Kotlin

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = hbox {

val letterSelections = combobox<String> {


[Link]("A","B","C","D","E","F")
}.valueProperty().toObservable()

val numberSelections = combobox<Int> {


[Link](1,2,3,4,5)
}.valueProperty().toObservable()

[Link](letterSelections,numberSelecti
ons) {l,n -> "$l-$n"}
.subscribeBy(
onNext = { println(it) },
onError = { [Link]() },
onComplete = { println("Done!") }
)
}
}

If you select "A","4", "E", and then "1", you should get this output.

135
5. Combining Observables

OUTPUT:

A-4
E-4
E-1

Selecting "4" with "A" will emit with the latest letter "A". Then selecting "E" will emit
with the latest number "4", and finally selecting "1" will emit with "E".

Simply put, a change in value for either ComboBox will result in the latest value
for both being pushed forward. For combining UI input events, we often are only
concerned with the latest user inputs and do not care about previous ones.
Therefore, combineLatest() is often useful for JavaFX.

Another powerful usage of combineLatest() is merging two ObservableLists


into one, and always keeping it synchronized when additions or removals happen
to the ObservableLists it was derived off of.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

//declare two ObservableLists


ObservableList<String> startLocations =
[Link]("Dallas", "Hou
ston", "Chicago", "Boston");

ObservableList<String> endLocations =

136
5. Combining Observables

[Link]("San Diego", "


Salt Lake City", "Seattle");

//this ObservableList will hold contents of both


ObservableList<String> allLocations = [Link]
rvableArrayList();

//this will pump both ObservableLists into `allLocations`

[Link](
[Link](startLocations),
[Link](endLocations),
(l1, l2) -> {
ArrayList<String> combined = new ArrayList<>
();
[Link](l1);
[Link](l2);
return combined;
}
).subscribe(allLocations::setAll);

//print `allLocations` every time it changes, to prove i


ts working
[Link](allLocations).subscribe(S
[Link]::println);

//do modifications to trigger above operations


[Link]("Portland");
[Link]("Dallas");
[Link]("Phoenix");
[Link]("Boston");

[Link](0);
}
}

Kotlin

import [Link]

137
5. Combining Observables

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = hbox {

//declare two ObservableLists


val startLocations =
[Link]("Dallas","Hous
ton","Chicago","Boston")

val endLocations =
[Link]("San Diego", "
Salt Lake City", "Seattle")

//this ObservableList will hold contents of both


val allLocations = [Link]<Str
ing>()

//this will pump both ObservableLists into `allLocations`

[Link]([Link]
able(),
[Link]()) {l1,l2 ->
ArrayList<String>().apply {
addAll(l1)
addAll(l2)
}
}.subscribe {
[Link](it)
}

//print `allLocations` every time it changes, to prove i


ts working
[Link]().subscribe { println(i

138
5. Combining Observables

t) }

//do modifications to trigger above operations


[Link]("Portland")
[Link]("Dallas")
[Link]("Phoenix")
[Link]("Boston")

[Link](0)
}
}

OUTPUT:

[Dallas, Houston, Chicago, Boston, San Diego, Salt Lake City, Se


attle]
[Dallas, Houston, Chicago, Boston, Portland, San Diego, Salt Lak
e City, Seattle]
[Dallas, Houston, Chicago, Boston, Portland, San Diego, Salt Lak
e City, Seattle, Dallas]
[Dallas, Houston, Chicago, Boston, Portland, San Diego, Salt Lak
e City, Seattle, Dallas, Phoenix]
[Dallas, Houston, Chicago, Portland, San Diego, Salt Lake City,
Seattle, Dallas, Phoenix]

Whenever either ObservableList ( startLocations or endLocations ) is


modified, it will update the combined ObservableList ( allLocations ) so it
always reflects the contents of both. This is a powerful way to leverage JavaFX
ObservableCollections and combine them to drive the content of other
ObservableCollections.

If you want to go a step further, you can easily modify this operation so that the
combined ObservableList only contains distinct items from both
ObservableLists. Simply add a flatMapSingle() before the Observer that
intercepts the ArrayList , turns it into an Observable , distincts it, and
collects it back into a List . Notice when you run it, the duplicate "Dallas"
emission is held back.

139
5. Combining Observables

Java

[Link](
[Link](startLocations),
[Link](endLocations),
(l1,l2) -> {
ArrayList<String> combined = new ArrayList<>();
[Link](l1);
[Link](l2);
return combined;
}
).flatMapSingle(l -> [Link](l).distinct().toLis
t())
.subscribe(allLocations::setAll);

Kotlin

[Link]([Link](),
[Link]()) {l1,l2 ->
mutableListOf<String>().apply {
addAll(l1)
addAll(l2)
}
}.flatMapSingle {
[Link]().distinct().toList()
}.subscribe {
[Link](it)
}

OUTPUT:

140
5. Combining Observables

[Dallas, Houston, Chicago, Boston, San Diego, Salt Lake City, Se


attle]
[Dallas, Houston, Chicago, Boston, Portland, San Diego, Salt Lak
e City, Seattle]
[Dallas, Houston, Chicago, Boston, Portland, San Diego, Salt Lak
e City, Seattle]
[Dallas, Houston, Chicago, Boston, Portland, San Diego, Salt Lak
e City, Seattle, Phoenix]
[Dallas, Houston, Chicago, Portland, San Diego, Salt Lake City,
Seattle, Phoenix]

While this is a pretty procedural example, using combineLatest() with


ObservableLists has very powerful applications, especially with data controls.
Combining data from two different data controls (like TableViews), you can merge
the two data sets into some form of aggregation in a third control. All three data
controls will always be synchronized, and you can published the combined
ObservableList while it is driven by two or more ObservableCollections
backing it.

A more advanced but elegant way to accomplish either task above is to return an
Observable<Observable<String>> from the combineLatest() , and then
flatten it with a flatMap() afterwards. This avoids creating an intermediary
ArrayList and is a bit leaner.

Java

[Link](
[Link](startLocations),
[Link](endLocations),
(l1,l2) -> [Link](l1).concatWith(Observ
[Link](l2))
).flatMapSingle(obs -> [Link]().toList())
.subscribe(allLocations::setAll);

Kotlin

141
5. Combining Observables

[Link]([Link](),
[Link]()) { l1,l2 ->
[Link]().concatWith([Link]())
}.flatMapSingle {
[Link]().toList()
}.subscribe {
[Link](it)
}

This is somewhat more advanced, so do not worry if you find the code above
challenging to grasp. It is a pattern where an Observable is emitting
Observables, and you can feel free to move on and study it again later as you get
more comfortable with Rx.

Summary
In this chapter, we covered combining Observables and which combine operators
are helpful to use with UI events vs simply merging data. Hopefully by now, you
are excited that you can achieve tasks beyond what the JavaFX API provides.
Tasks like synchronizing an ObservableList to the contents of two other
ObservableLists become almost trival with reactive programming. Soon we will get
to the most anticipated feature of RxJava: concurrency with observeOn() and
subscribeOn() . But first, we will cover a few final topics before we hit that.

142
6. Bindings

6. Bindings
There are situations where JavaFX will want a Binding rather than an RxJava
Observable or Observer , and we will cover some streamlined utilities to
meet this need. We will also cover JavaFX Dialogs and how to use them
reactively.

Bindings and RxJava


In JavaFX, a Binding is an implementation of ObservableValue that is
derived off other ObservableValues in some way. Bindings also allow you to
synchronize JavaFX ObservableValue items through bind() and
bindBidirectional() methods. You can express transformations of an
ObservableValue and bind on those transformations, but RxJava expresses
this task much more easily. As you probably observed, RxJavaFX provides a
robust and expressive way to make controls communicate their changes.

For instance, you can leverage bindings to disable a Button when a


TextField does not contain six characters.

Java

143
6. Bindings

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();

Label label = new Label("Input a 6-character String");

TextField input = new TextField();


Button button = new Button("Proceed");

[Link]().bind(
[Link]().length().isNotEqualTo(6)
);
[Link]().addAll(label,input,button);
[Link](new Scene(root));
[Link]();
}
}

Kotlin

144
6. Bindings

import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

label("Input a 6-character String")

val input = textfield()


val button = button("Proceed")

[Link]().bind(
[Link]().length().isNotEqualTo(6)
)
}
}

Figure 6.1 Using bindings to disable a Button unless a TextField is six


characters

![]([Link]

Of course, the need for Binding in this case is eliminated thanks to RxJava.
Knowing what you know now, RxJava creates a more streamlined and inuitive
way to "push" the input text values, map them to a boolean expression, and
finally sends them to an Observer that sets the disableProperty() of the
Button .

Java

145
6. Bindings

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();

Label label = new Label("Input a 6-character String");

TextField input = new TextField();


Button button = new Button("Proceed");

[Link]([Link]())
.map(s -> [Link]() != 6)
.subscribe(b -> [Link]().setValu
e(b));

[Link]().addAll(label,input,button);
[Link](new Scene(root));
[Link]();
}
}

Kotlin

146
6. Bindings

import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = vbox {

label("Input a 6-character String")

val input = textfield()


val button = button("Proceed")

[Link]().toObservable()
.map { [Link] != 6 }
.subscribe { [Link]().set(it) }
}
}

If you are fluent in Rx, this is more intuitive than native JavaFX Bindings. It is also
much more flexible as a given ObservableValue remains openly mutable rather
than being strictly bound to another ObservableValue . But there are times you
will need to use Bindings to fully work with the JavaFX API. If you need to create a
Binding off an RxJava Observable , there is a factory/extension function to
turn an RxJava Observable<T> into a JavaFX Binding<T> . Let's take a look
at one place where Bindings are needed: TableViews.

Say you have the given domain type Person . It has a birthday property that
holds a LocalDate . The getAge() is an Observable<Long> driven off the
birthday and is converted to a Binding<Long> . When you change the
birthday , it will push a new Long value to the Binding (Figure 6.2).

Java

import [Link];
import [Link];

147
6. Bindings

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];
import [Link];

public final class Person {

private final StringProperty name;


private final ObjectProperty<LocalDate> birthday;
private final Binding<Long> age;

Person(String name, LocalDate birthday) {


[Link] = new SimpleStringProperty(name);
[Link] = new SimpleObjectProperty<>(birthday);

[Link] = [Link](
[Link](birthdayProperty())
.map(dt -> [Link](dt,L
[Link]()))
);
}

public StringProperty nameProperty() {


return name;
}

public ObjectProperty<LocalDate> birthdayProperty() {


return birthday;
}

public Binding<Long> getAge() {


return age;
}
}

148
6. Bindings

Kotlin

import [Link]
import [Link]
import tornadofx.*
import [Link]
import [Link]

class Person(name: String, birthday: LocalDate) {


var name by property(name)
fun nameProperty() = getProperty(Person::name)

var birthday by property(birthday)


fun birthdayProperty() = getProperty(Person::birthday)

val age = birthdayProperty().toObservable()


.map { [Link](it,[Link]())
}
.toBinding()
}

In Java, you can also fluently use the to() operator to map the Observable
to any arbitrary type. We can use it to streamline turning it into a Binding .

[Link] = [Link](birthdayProperty())
.map(dt -> [Link](dt,LocalDate
.now()))
.to(JavaFxObserver::toBinding);
);

Now if you put a few instances of Person in a TableView , each row will then
come to life (Figure 6.2).

Java

import [Link];
import [Link];
import [Link];

149
6. Bindings

import [Link].*;
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

TableView<Person> table = new TableView<>();


[Link](true);

[Link]().setAll(
new Person("Thomas Nield", [Link](1989,1,18
)),
new Person("Sam Tulsa",[Link](1980,5,12)),
new Person("Ron Johnson",[Link](1975,3,8))
);

TableColumn<Person,String> nameCol = new TableColumn<>("


Name");
[Link](v -> [Link]().nameProper
ty());

TableColumn<Person,LocalDate> birthdayCol = new TableCol


umn<>("Birthday");
[Link](v -> [Link]().birthd
ayProperty());
[Link]([Link]
lumn(new LocalDateStringConverter()));

TableColumn<Person,Long> ageCol = new TableColumn<>("Age"


);
[Link](v -> [Link]().getAge());

[Link]().addAll(nameCol,birthdayCol,ageCol);

150
6. Bindings

[Link](new Scene(table));
[Link]();
}

Kotlin

import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView: View() {

override val root = tableview<Person> {


isEditable = true
[Link](
Person("Thomas Nield",[Link](1989,1,18)),
Person("Sam Tulsa",[Link](1980,5,12)),
Person("Ron Johnson",[Link](1975,3,8))
)

column("Name",Person::nameProperty)
column("Birthday",Person::birthdayProperty).useTextField
(LocalDateStringConverter())
column("Age",Person::age)
}
}

Figure 6.2

151
6. Bindings

When you edit the "Birthday" field for a given row, you will see the "Age" field
update automatically. This is because the age Binding is subscribed to the
RxJava Observable derived from the birthday Property .

Handling Errors with Reactive Bindings


When you create a JavaFX Binding<T> off an Observable<T> , it usually is a
good idea to pass a lambda to handle the onError() event. Otherwise errors
may go unnoticed and unhandled. Try to make this part of your best practices,
even if we do not do this for the rest of the book (for sake of brevity).

Java

private final Binding<Long> age = [Link](birt


hdayProperty())
.map(dt -> [Link](dt,[Link]()
))
.to(obs -> [Link](obs, Throwable::pr
intStackTrace));

Kotlin

152
6. Bindings

val age = birthdayProperty().toObservable()


.map { [Link](it,[Link]()) }
.toBinding { [Link]() }

Disposing Bindings
If we are going to remove records from a TableView . we will need to dispose
any Bindings that exist on each item. This will dispose() the Binding from
the RxJava Observable to prevent any memory leaks and free resources.

It is good practice to put a method on your domain type that will dispose all
Bindings on that item. For our Person , we will want to dispose() the age
Binding when that Person is no longer needed.

Java

public final class Person {

// existing code

public void dispose() {


[Link]();
}
}

Kotlin

class Person(name: String, birthday: LocalDate) {

//existing code

fun dispose() = [Link]()


}

Whever you remove items from the TableView , call dispose() on each
Person so all Observables are unsubscribed. If your domain type has several
Bindings, you can add them all to a CompositeBinding . This is basically a

153
6. Bindings

collection of Bindings that you can dispose() all at once. Say we added
another Binding to Person called isAdult (which is conveniently built off
age by turning it into an Observable ). It may be convenient to add both
Bindings to a CompositeBinding in the constructor, so dispose() will
dispose them both.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];
import [Link];

public final class Person {

private final StringProperty name;


private final ObjectProperty<LocalDate> birthday;
private final Binding<Long> age;
private final Binding<Boolean> isAdult;

private final CompositeBinding bindings = new CompositeBindi


ng();

Person(String name, LocalDate birthday) {


[Link] = new SimpleStringProperty(name);
[Link] = new SimpleObjectProperty<>(birthday);

[Link] = [Link](birthdayProperty())
.map(dt -> [Link](dt,LocalDate
.now()))
.to(JavaFxObserver::toBinding);

[Link] = [Link](age)

154
6. Bindings

.map(age -> age >= 18)


.to(JavaFxObserver::toBinding);

[Link](age);
[Link](isAdult);
}

public StringProperty nameProperty() {


return name;
}

public ObjectProperty<LocalDate> birthdayProperty() {


return birthday;
}

public Binding<Long> getAge() {


return age;
}

public void dispose() {


[Link]();
}
}

Kotlin

155
6. Bindings

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
import [Link]

class Person(name: String, birthday: LocalDate) {


var name by property(name)
fun nameProperty() = getProperty(Person::name)

var birthday by property(birthday)


fun birthdayProperty() = getProperty(Person::birthday)

private val bindings = CompositeBinding()

val age = birthdayProperty().toObservable()


.map { [Link](it,[Link]())
}
.toBinding()
.addTo(bindings)

val isAdult = [Link]()


.map { it >= 18 }
.toBinding()
.addTo(bindings)

fun dispose() = [Link]()


}

Lazy Bindings
When you create a Binding<T> off an Observable<T> , it will subscribe
eagerly and request emissions immediately. There may be situtations you would
rather a Binding<T> be lazy and not subscribe to the Observable<T> until a
value is first needed (specifically, when getValue() is called). This is

156
6. Bindings

particularly helpful for data controls like TableView where only visible records in
view will request values. If you scroll quickly, it will only request values when you
slow down on a set of records. This way, the TableView does not have to
calculate all values for all records, but rather just the ones you see.

If we wanted to make our two reactive Bindings on Person lazy, so they only
subscribe when that Person is in view, call toLazyBinding() instead of
toBinding() .

Java

[Link] = [Link](birthdayProperty())
.map(dt -> [Link](dt,LocalDate
.now()))
.to(JavaFxObserver::toLazyBinding);

Kotlin

val age = birthdayProperty().toObservable()


.map { [Link](it,[Link]()) }
.toLazyBinding())

In some situations, you may have a Binding that is driven off an Observable
that queries a database (using RxJava-JDBC) or some other service. Because
these requests can be expensive, toLazyBinding() can be valuable to
initiatlize the TableView more quickly. Of course, this lazy loading can
sometimes cause laggy scrolling by holding up the JavaFX thread, and we will
learn about concurrency later in this book to mitigate this.

Summary
In this chapter we learned about turning Observables into JavaFX Bindings, which
helps interop RxJava with JavaFX more thoroughly. Typically you do not need to
use Bindings often as RxJava provides a robust means to synchronize properties
and events, but some parts of the JavaFX API expect a Binding which you now
have the means to provide.

157
6. Bindings

158
7. Dialogs and Multicasting

Dialogs and Multicasting


In this chapter we will cover using Dialogs as well as multicasting. Dialogs are
helpful for getting user inputs, and they can be helpful in an Rx context.
Multicasting is a way to force Observables to be hot, and we will learn why it is
critical to do this when multiple Subscribers to a UI event Observable are
present.

Dialogs
JavaFX Dialogs are popups to quickly show a message to the user or solicit an
input. They can be helpful in reactive applications, so they also have a factory to
turn their response into an Observable .

You can pass an Alert or Dialog to the fromDialog() factory, and it will
return an Observable that emits the response as a single emission. Then it will
call onCompleted() .

Java

159
7. Dialogs and Multicasting

import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

[Link](
new Alert([Link], "Are you
sure you want to do this?")
).subscribe(response -> [Link]("You pressed "
+ [Link]()));

[Link](0);
}
}

Kotlin

160
7. Dialogs and Multicasting

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView: View() {

override val root = pane {


Alert([Link], "Are you sure you wa
nt to do this?")
.toMaybe()
.subscribe { println("You pressed " + [Link]) }

[Link](0)
}
}

Dialogs can become more useful in a flatMap() , flatMapSingle() , or


flatMapMaybe() to intercept and manipulate emissions. If you
flatMapMaybe() a Button 's ActionEvents to a Dialog response, you can
use filter() on the response to conditionally allow an emission to go forward
or be suppressed.

For example, say you have a "Run Process" Button that will kick of a simple
process emitting the integers 1 through 10, and then collects them into a List .
Pretend this process was something more intensive, and you want the user to
confirm on pressing the Button if they want to run it. You can use a Dialog to
intercept ActionEvent emissions in a flatMapMaybe() , map to the
Dialog 's response, and allow only emissions that are [Link] . Then
you can flatMap() that emission to kick off the process (Figure 6.3), which
actually yields a Single so we will use flatMapSingle() .

Java

161
7. Dialogs and Multicasting

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

Button runButton = new Button("Run Process");

[Link](runButton)
.flatMapMaybe(ae ->
[Link](new Alert(Al
[Link], "Are you sure you want to run the pr
ocess?"))
.filter(response -> [Link]
als([Link]))
).flatMapSingle(response -> [Link](1,10
).toList())
.subscribe(i -> [Link]("Processed in
teger list: " + i));

VBox root = new VBox();


[Link]().add(runButton);

[Link](new Scene(root));

[Link]();
}
}

162
7. Dialogs and Multicasting

Kotlin

import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = vbox {


button("Run Process").actionEvents()
.flatMapMaybe {
Alert([Link], "Are you
sure you want to run the process?")
.toMaybe()
.filter { it == [Link] }
}.flatMapSingle { [Link](1,10).toList(
) }
.subscribe { println("Processed integer list: $i
t") }
}
}

Figure 6.3

163
7. Dialogs and Multicasting

That flatMapMaybe() to an Alert dialog will emit a [Link] ,


[Link] , or no emission at all depending on what the user
chooses. Filtering for only [Link] emissions, only those emissions will
result in a kickoff of the flatMapSingle { [Link](1,10).toList()
} process. Otherwise it will be empty and no List<Integer> will be emitted.
This shows we can use a Dialog or Alert inputs to intercept and manipulate
emissions in an Observable chain.

Here is another example. Let's say clicking a Button will emit an


ActionEvent . You will then have integers 0 through 10 emitted inside a
flatMap() for each ActionEvent , and you want the user to decide which
integers should proceed to the Observer . Using some creative flatmapping, this
is not terribly hard. You can use an Alert or Dialog for each integer
emission to control which ones will go forward (Figure 6.4).

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

Button runButton = new Button("Run Process");

[Link](runButton)
.flatMap(ae ->
[Link](1,10)
.flatMapMaybe(i ->
[Link]

164
7. Dialogs and Multicasting

log(
new Alert(Alert.
[Link],
"Are you
sure you want to process integer " + i + "?",
ButtonTy
[Link], [Link])
).filter(response -> res
[Link]([Link]))
.map(response -> i)
)

).subscribe(i -> [Link]("Processed i


nteger: " + i));

VBox root = new VBox();


[Link]().add(runButton);

[Link](new Scene(root));

[Link]();
}
}

Kotlin

165
7. Dialogs and Multicasting

import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = vbox {


button("Run Process").actionEvents()
.flatMapMaybe {
[Link](1, 10).flatMap { i ->
Alert([Link],
"Do you want to process integer
$i?",
[Link], [Link]
).toMaybe()
.filter { it == [Link] }
.map { response -> i }
}
}.subscribe { println("Processed integer: $it")
}
}
}

Figure 6.4

166
7. Dialogs and Multicasting

The map(response -> i) is a simple trick you can do to take a response after
it has been filtered, and map it back to the integer. If you say "YES" to 1, 3, 6 and
"NO" to everything else, you should get the output above. 2,4,5,7,9, and 10 never
made it to the Observer because "NO" was selected and filtered them out.

That is how you can reactively leverage Dialogs and Alerts, and any control that
implements Dialog to return a single result can be reactively emitted in this
way.

Multicasting
For the sake of keeping the previous chapters accessible, I might have mislead
you when I said UI events are hot Observables. The truth is by default, they are a
gray area between a hot and cold Observable (or should I say "warm"?).
Remember, a "hot" Observable will emit to all Observers at once, while a "cold"
Observable will replay emissions to each Observer individually. This is a
pragmatic way to separate the two, but UI event factories in RxJavaFX (as well as
RxBindings for Android) awkwardly operate as both hot and cold unless you
multicast, or force an emission to hotly be emitted to all Observers.

To understand this subtle impact, here is a trick question. Say you have a Maybe
driven off a Dialog or Alert , and it has two Observers. Do you think the
response is going to go to both Subscribers?

Java

167
7. Dialogs and Multicasting

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

Maybe<Boolean> response = [Link](


new Alert([Link],"Are you
sure you want to proceed?")
).map(r -> [Link]([Link]));

[Link](r -> [Link]("Subscriber 1


received: " + r));

[Link](r -> [Link]("Subscriber 2


received: " + r));

[Link](0);
}
}

Kotlin

168
7. Dialogs and Multicasting

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = vbox {

val response = Alert([Link], "Are


you sure you want to proceed?")
.toMaybe()
.map { it == [Link] }

[Link] { println("Subscriber 1 received: $it"


) }

[Link] { println("Subscriber 2 received: $it"


) }

[Link](0)
}
}

Try running it and you will see the Alert popup twice, once for each
Observer . This is almost like it's a cold Observable and it is "replaying" the
Dialog procedure for each Observer . As a matter of fact, that is exactly what
is happening. Both Observer are receiving their own, independent streams. You
can actually say OK on one Observer and CANCEL to the other. The two
Subscribers are, in fact, not receiving the same emission as you would expect in a
hot Observable .

This behavior is not a problem when you have one Observer . But when you
have multiple Observers, you will start to realize this is not a 100% hot
Observable . It is "hot" in that previous emissions are missed by tardy

169
7. Dialogs and Multicasting

Observers, but it is not "hot" in that a single set of emissions are going to all
Observers. To force the latter to happen, you can multicast, and that will force this
Observable to be 100% hot.

One way to multicast is to use the ConnectableObservable we used in Chapter


2. We can publish() the Observable to get a ConnectableObservable ,
set up up the Observers , then call connect() to start firing the same
emissions to all Observers.

Java

170
7. Dialogs and Multicasting

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

ConnectableObservable<Boolean> response = JavaFxObservab


[Link](
new Alert([Link],"Are you
sure you want to proceed?")
).map(r -> [Link]([Link]))
.toObservable().publish(); //returns ConnectableObserva
ble

[Link](r -> [Link]("Subscriber 1


received: " + r));

[Link](r -> [Link]("Subscriber 2


received: " + r));

[Link]();

[Link](0);
}
}

Kotlin

171
7. Dialogs and Multicasting

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = vbox {

val response = Alert([Link], "Are


you sure you want to proceed?")
.toObservable()
.map { it == [Link] }
.publish() //returns ConnectableObservable

[Link] { println("Subscriber 1 received: $it"


) }

[Link] { println("Subscriber 2 received: $it"


) }

[Link]()

[Link](0)
}
}

When you run this program, you will now see the Alert only pop up once, and
the single response will go to both Observers simultaneously. Every operator
before the publish() will be a single stream of emissions. But take note that
everything after the publish() is subject to be on separate streams from that
point.

The Maybe as well as the Single are not able to multicast, so just turn
them into an Observable via toObservable() when you need to.

172
7. Dialogs and Multicasting

If you want this ConnectableObservable to automatically connect() for you


when the first Observer is subscribed, you can call refCount() to turn it
back into a normal Observable .

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

Observable<Boolean> response = [Link]


og(
new Alert([Link],"Are you
sure you want to proceed?")
).map(r -> [Link]([Link]))
.toObservable()
.publish()
.refCount();

[Link](r -> [Link]("Subscriber 1


received: " + r));

[Link](r -> [Link]("Subscriber 2


received: " + r));

[Link](0);
}
}

Kotlin

173
7. Dialogs and Multicasting

import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

override val root = vbox {

val response = Alert([Link], "Are


you sure you want to proceed?")
.toMaybe()
.map { it == [Link] }
.toObservable()
.publish()
.refCount()

[Link] { println("Subscriber 1 received: $it"


) }

[Link] { println("Subscriber 2 received: $it"


) }

[Link](0)
}
}

refCount() is a convenient way to turn a ConnectableObservable back into


an automatic Observable . It is helpful to force emissions to be hot without
manually calling connect() . Just be aware it will start emitting on the first
subscription, and any following subscriptions may miss the first emissions as they
are subscribed after the firing starts. But for UI events waiting for a user input,
chances are all subscriptions will connect() in time before the user inputs
anything, so refCount() is usually acceptable for UI events. But if your

174
7. Dialogs and Multicasting

Observable is going to fire emissions the moment it is subscribed, you may just
want to manually set up a ConnectableObservable , subscribe the Observers,
and call connect() yourself.

So when should you multicast with a ConnectableObservable (or use its


refCount() )? The answer is when you have multiple Observers to a single UI
event Observable . When you broadcast something as simple as a Button 's
ActionEvents, it is more efficient to multicast it so it does not create a Listener
for each Observer , but rather consolidates to one Listener .

Again, use multicasting for UI event Observables when there is more than one
Observer. Even though most of the time this makes no functional difference, it is
more efficient. It also will prevent subtle misbehaviors like we saw in cases like the
Dialog , where we want to force a single emission stream to go to all Observers
rather than each Observer getting its own emissions in a cold-like manner. If
you have only one Observer , the additional overhead of
ConnectableObservable is not necessary.

Replaying Observables
Another kind of ConnectableObservable is the one returned by the
replay() operator. This will hold on to the last x number of emissions and
"replay" them to each new Observer .

For instance, an Observable can emit the numbers 1 through 5. If I call


replay(1) on it, it will return a ConnectableObservable that will emit the last
emission "5" to later Observers. However if I am going to multicast this, I may
want to use autoConnect() instead of refCount() . Here is why: the
refCount() will "shut down" when it has no more active Observers (particularly
Observers that call onComplete() ). This will reset everything and clear the "5"
from its cache. If another Observer comes in, it will be treated as the first
Observer and receive all 5 emissions rather than just the "5". The
autoConnect() , however, will always stay alive whether or not it has
Subscribers, and persist the cached value of "5" indefinitely until a new value
replaces it.

Java

175
7. Dialogs and Multicasting

Observable<Integer> source = [Link](1,5)


.replay(1).autoConnect();

[Link](i -> [Link](i)); //receives 1,2,3,4,5


[Link](3000); //sleep 3 seconds, try-catch this
[Link](i -> [Link](i)); //receives 5

Kotlin

val source = [Link](1,5)


.replay(1).autoConnect()

[Link] { println(it) } //receives 1,2,3,4,5


[Link](3000) //sleep 3 seconds, try-catch this
[Link] { println(it) } //receives 5

OUTPUT:

1
2
3
4
5
5

The replay() operator can be helpful to replay the last emitted value for a UI
input (e.g a ComboBox ) so new Observers immediately get that value rather than
missing it. There are other argument overloads for replay() to replay
emissions for other scopes, like time windows. But simply holding on to the last
emission is a common helpful pattern for reactive UI's.

You can also use the cache() operator to horde and replay ALL emissions,
but keep in mind this can increase memory usage and cause data to go stale.

Summary

176
7. Dialogs and Multicasting

If you got this far, congrats! We have covered a lot. We ran through reactive
usage of Dialogs, which you can use to intercept emissions and get a user input
for each one, as well as multicasting. The topic of multicasting is critical to
understand because UI Observables do not always act hot when multiple
Observers are subscribed. Creating a ConnectableObservable is an effective
way to force an Observable to become hot and ensure each emission goes to
all Observers at once, and remove redundant listeners

Make sure you are somewhat comfortable with the material we covered so far,
because next we are going to cover concurrency. This is the topic that everything
leads up to, as RxJava revolutionizes how we multithread safely.

177
8. Concurrency

8. Concurrency
Concurrency has become a critical skill in software development. Most computers
and smart phones now have multiple core processors, and this means the most
effective way to scale performance is to leverage all of them. For this full utilization
to happen, code must explicitly be coded for multiple threads that can be worked
by multiple cores.

The idea of concurrency is essentially multitasking. Multiple threads execute


multiple tasks at the same time. Suppose you had some yard work to do and had
three tasks: mow the lawn, trim the trees, and sweep the patio. If you are working
alone, there is no way you can do all three of these tasks at the same time. You
have to sequentially work on each task one-at-a-time. But if you had two friends to
help out, you can get done more quickly as all three of you can execute all three
tasks simultaneously. In essence, each person is a thread and each chore is a
task.

Even if you have less threads than tasks (such as two threads and three tasks),
the two threads can tackle two of the tasks immediately. The first one to get done
can then move on to the third task. This is essentially what a thread pool does. It
has a fixed number of threads and is given a "queue" of tasks to do. Each thread
will take a task, execute it, and then take another. "Reusing" threads and giving
them a queue of tasks, rather than creating a thread for each task, is usually more
efficient since threads are expensive to create and dispose.

Traditionally, Java concurrency is difficult to master. A lot can go wrong especially


with mutable variables being accessed by multiple threads. Thankfully, RxJava
makes concurrency easier and safer. When you stay within an Observable
chain, it does not matter what thread emissions get pushed on (except of course
Observers and operators affecting JavaFX UI's, which need to happen on the
JavaFX thread). A major selling point of RxJava is its ability to make concurrency
trivial to compose, and this is helpful to make JavaFX UI's responsive and
resilient.

178
8. Concurrency

It is recommended to study concurrency without RxJava, just so you are


aware of the "gotchas" that can happen with multithreading. Benjamin
Winterberg has an awesome online tutorial walking through Java 8
concurrency. If you want deep knowlege in Java concurrnecy, [Java
Concurrency in Practice]([Link] is an excellent book to gain low-level
knowledge.

Using subscribeOn()
By default, for a given Observable chain, the thread that calls the
subscribe() method is the thread the Observable sends emissions on. For
instance, a simple subscription to an Observable inside a main() method will
fire the emissions on the main daemon thread.

Java

import [Link];

public class Launcher {


public static void main(String[] args) {
[Link](1,5)
.subscribe(i ->
[Link]("Receiving " + i + "
on thread "
+ [Link]().getName
())
);
}
}

Kotlin

179
8. Concurrency

import [Link]

fun main(args: Array<String>) {


[Link](1,5)
.subscribe { println("Receiving $it on thread ${Thre
[Link]().name}") }
}

OUTPUT:

Receiving 1 on thread main


Receiving 2 on thread main
Receiving 3 on thread main
Receiving 4 on thread main
Receiving 5 on thread main

However, we can easily switch these emissions to happen on another thread


using subscribeOn() . We can pass a Scheduler as an argument, which
specifies where it gets a thread from. In this case we can pass subscribeOn()
an argument of [Link]() , so it will execute on a new thread
for each Observer .

Java

180
8. Concurrency

import [Link];
import [Link];
import [Link];

public class Launcher {


public static void main(String[] args) {
[Link](1,5)
.subscribeOn([Link]())
.subscribe(i ->
[Link]("Receiving " + i + "
on thread "
+ [Link]().getName
())
);

try {
[Link](3);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

import [Link]
import [Link]
import [Link]

fun main(args: Array<String>) {


[Link](1,5)
.subscribeOn([Link]())
.subscribe { println("Receiving $it on thread ${Thre
[Link]().name}") }

[Link](3)
}

181
8. Concurrency

OUTPUT:

Receiving 1 on thread RxNewThreadScheduler-1


Receiving 2 on thread RxNewThreadScheduler-1
Receiving 3 on thread RxNewThreadScheduler-1
Receiving 4 on thread RxNewThreadScheduler-1
Receiving 5 on thread RxNewThreadScheduler-1

This way we can declare our Observable chain and an Observer , but then
immediately move on without waiting for the emissions to finish. Those are now
happening on a new thread named RxNewThreadScheduler-1 . Notice too we
have to call [Link](3) afterwards to make the main thread
sleep for 3 seconds. This gives our Observable a chance to fire all emissions
before the program exits. You should not have to do this sleep() with a JavaFX
application since its own non-daemon threads will keep the session alive.

A critical behavior to note here is that all emissions are happening sequentially on
a single RxNewThreadScheduler-1 thread. Emissions are strictly happening
one-at-a-time on a single thread. There is no parallelization or racing to call
onNext() throughout the chain. If this did occur, it would break the
Observable contract. It may surprise some folks to hear that RxJava is not
parallel, but we will cover some concurrency tricks with flatMap() later to get
parallelization without breaking the Observable contract.

subscribeOn() can be declared anywhere in the Observable chain, and it


will communicate all the way up to the source what thread to fire emissions on. If
you pointlessly declare multiple subscribeOn() operators in a chain, the left-
most one (closest to the source) will win. Later we will cover the observeOn()
which can switch emissions to a different thread in the middle of the chain.

Pooling Threads: Choosing a Scheduler


In reality, you should be conservative about using [Link]() as
it creates a new thread for each Observer . You will notice that if we attach
multiple Observers to this Observable , we are going to create a new thread for
each Observer .

182
8. Concurrency

Java

import [Link];
import [Link];
import [Link];

public class Launcher {


public static void main(String[] args) {
Observable<Integer> source = [Link](1,5)
.subscribeOn([Link]());

//Observer 1
[Link](i ->
[Link]("Observer 1 receiving " + i +
" on thread "
+ [Link]().getName())
);

//Observer 2
[Link](i ->
[Link]("Observer 2 receiving " + i +
" on thread "
+ [Link]().getName())
);

try {
[Link](3);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

183
8. Concurrency

import [Link]
import [Link]
import [Link]

fun main(args: Array<String>) {


val source = [Link](1,5)
.subscribeOn([Link]())

//Observer 1
[Link] { println("Observer 1 receiving $it on thre
ad ${[Link]().name}") }

//Observer 2
[Link] { println("Observer 2 receiving $it on thre
ad ${[Link]().name}") }

[Link](3)
}

OUTPUT:

Observer 2 receiving 1 on thread RxNewThreadScheduler-2


Observer 1 receiving 1 on thread RxNewThreadScheduler-1
Observer 2 receiving 2 on thread RxNewThreadScheduler-2
Observer 1 receiving 2 on thread RxNewThreadScheduler-1
Observer 2 receiving 3 on thread RxNewThreadScheduler-2
Observer 1 receiving 3 on thread RxNewThreadScheduler-1
Observer 2 receiving 4 on thread RxNewThreadScheduler-2
Observer 1 receiving 4 on thread RxNewThreadScheduler-1
Observer 2 receiving 5 on thread RxNewThreadScheduler-2
Observer 1 receiving 5 on thread RxNewThreadScheduler-1

Now we have two threads, RxNewThreadScheduler-1 and


RxNewThreadScheduler-2 . What if we had 100, or even 1000 Observers? This
can easily happen if you are flatMapping to hundreds or thousands of
Observables each with their own subscribeOn([Link]()) .
Threads are very expensive and can tax your machine, so we want to constrain
the number of threads that can be used at a time.

184
8. Concurrency

The most effective way to keep thread creation under control is to "reuse" threads.
You can do this with the different Schedulers . A Scheduler is RxJava's
equivalent to Java's standard Executor . You can create your own Scheduler
by passing an Executor to the [Link]() factory. But for most
cases, it is better to use RxJava's standard Schedulers as they are optimized to
be conservative and efficient for most cases.

Computation
If you are doing computation-intensive operations, you will likely want to use
[Link]() which will maintain a conservative number of
threads to keep the CPU from being taxed.

Observable<Integer> source = [Link](1,5)


.subscribeOn([Link]());

Observable operations that are doing calculation and algorithm-heavy work are
optimal to use with the computation Scheduler . If you are not sure how many
threads will be created by a process, you might want to make this one your go-to.

IO
If you are doing a lot of IO-related tasks, like sending web requests or database
queries, these are much less taxing on the CPU and threads can be created more
liberally. [Link]() is suited for this kind of work. It will add and remove
threads depending on how much work is being thrown at it at a given time, and
reuse threads as much as possible.

Observable<Integer> source = [Link](1,5)


.subscribeOn([Link]());

But be careful as it will not limit how many threads it creates! As a rule-of-thumb,
assume it will create a new thread for each task.

Immediate

185
8. Concurrency

The [Link]() is the default Scheduler , and it will work


execute work on the immediate thread declaring the Observer .

Observable<Integer> source = [Link](1,5)


.subscribeOn([Link]());

You will likely not use this Scheduler very often since it is the default. The code
above is no different than declaring an Observable with no subscribeOn() .

Observable<Integer> source = [Link](1,5)

Trampoline
An interesting Scheduler is the [Link]() . It will schedule
the emissions to happen on the immediate thread, but allow the immediate thread
to finish its current task first. In other words, this defers execution of the emissions
but will fire them the moment the current thread declaring the subscription is no
longer busy.

Observable<Integer> source = [Link](1,5)


.subscribeOn([Link]());

You will likely not use the Trampoline Scheduler unless you encounter nuanced
situations where you have to manage complex operations on a single thread and
starvation can occur. The JavaFX Scheduler uses a trampoline mechanism, which
we will cover next.

JavaFX Scheduler
Finally, the JavaFxScheduler is packaged with the RxJavaFX library. It
executes the emissions on the JavaFX thread so they can safely make
modifications to a UI. It uses a trampoline policy against the JavaFX thread,
making it highly resilient against recursive hangups and thread starvation.

The JavaFX Scheduler is not in the Schedulers class, but rather is stored as a
singleton in its own class. You can call it like below:

186
8. Concurrency

Observable<Integer> source = [Link](1,5)


.subscribeOn([Link]());

In Kotlin, The RxKotlinFX library can save you some boilerplate by using an
extension function instead.

val source = [Link](1,5)


.subscribeOnFx()

At the time of writing, all RxJavaFX/RxKotlinFX factories already emit on the


JavaFxScheduler . Therefore, declaring a subscribeOn() against these
sources will have no affect. You will need to leverage observeOn() to switch to
another thread later in the chain, which we will cover shortly.

Java

Button button = new Button("Press me");

[Link](button)
.subscribeOn([Link]()) // has no effect
.subscribe(ae -> [Link]("You clicked me!"));

Kotlin

val button = Button("Press me")

[Link]()
.subscribeOn([Link]()) // has no effect
.subscribe { println("You clicked me!") }

Also note that the JavaFX Scheduler is already used when declaring UI code, and
will be the default subscribeOn() Scheduler since it is the immediate thread.
Therefore, you will rarely call subscribeOn() to specify the JavaFxScheduler.
You are more likely to use it with observeOn() .

187
8. Concurrency

Intervals
While we are talking about concurrency, it is worth mentioning there are other
factories that already emit on a specific Scheduler . For instance, there are
factories in both RxJava and RxJavaFX to emit at a specified time interval.

In RxJava, there is an [Link]() that will emit a consecutive


Long at every specified time interval. By default, this runs on the
[Link]() unless you specify a different one as a third
argument.

Here is an application that will increment a Label every second (Figure 8.1).

Java

188
8. Concurrency

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

Label label = new Label();

[Link](1, [Link], JavaFxScheduler


.platform())
.map(Object::toString)
.subscribe(label::setText);

[Link]().add(label);

[Link](new Scene(root));

[Link](60);
[Link]();
}
}

Kotlin

189
8. Concurrency

import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View("My View") {

override val root = vbox {

minWidth = 60.0

label {
[Link](1, [Link], JavaFxSched
[Link]())
.map { [Link]() }
.subscribe { text = it }
}
}
}

OUTPUT:

0
1
2
3
4

Figure 8.1

You can also use [Link]() to pass a Duration


instead of a TimeUnit , and not have to specify the JavaFX Scheduler.

190
8. Concurrency

Intervals are helpful to create timer-driven Observables, or perform tasks such as


scheduling jobs or periodically driving refreshes. If you want all Observers to not
receive separate interval streams, be sure to use publish().refCount() or
publish().autoConnect() to multicast the same interval timer to all Observers
downstream.

Using observeOn()
A lot of folks get confused by the difference between subscribeOn() and
observeOn() , but the distinction is quite simple. A subsribeOn() instructs
the source Observable what thread to emit items on. However, the
observeOn() switches emissions to a different thread at that point in the chain.

In JavaFX, the most common useage of observeOn() is to put items back on


the JavaFX thread after a compution or IO operation finishes from another thread.
Say you wanted to import some expensive data on [Link]() and
collect it in a List . Once it is ready, you want to move that List emission to
the JavaFX thread to feed a ListView . That is perfectly doable with an
observeOn() (Figure 8.2).

Java

191
8. Concurrency

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ListView<String> listView = new ListView<>();

[Link]("Alpha","Beta","Gamma","Delta","Epsilon"
)
.subscribeOn([Link]())
.toList()
.observeOn([Link]())
.subscribe(list -> [Link]().setAll(li
st));

[Link]().add(listView);

[Link](new Scene(root));

[Link]();
}
}

Kotlin

192
8. Concurrency

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View("My View") {

override val root = vbox {

listview<String> {
[Link]("Alpha","Beta","Gamma","Delta","Epsi
lon")
.subscribeOn([Link]())
.toList()
.observeOnFx()
.subscribeBy { [Link](it) }
}
}
}

Figure 8.2

The five Strings are emitted and collected into a List on a [Link]()
thread. But immediately after the toList() is an observeOn() that takes that
List and emits it on the JavaFX Scheduler. Unlike the subscribeOn() where

193
8. Concurrency

placement does not matter, the placement of the observeOn() does. It switches
to a different thread at that point in the Observable chain.

This all happens a bit too fast to see this occuring, so let's exaggerate this
example and emulate a long-running database query or request. Use the
delay() operator to delay the emissions by 3 seconds. Note that delay()
subscribes on the [Link]() by default, so having a
subscribeOn() no longer has any effect. But we can pass the
[Link]() as a third argument to make it use an IO thread instead
(Figure 8.3).

Java

194
8. Concurrency

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ListView<String> listView = new ListView<>();

[Link]("Alpha", "Beta", "Gamma", "Delta", "Epsi


lon")
.delay(3, [Link], [Link]())
.toList()
.observeOn([Link]())
.subscribe(list -> [Link]().setAll(li
st));

[Link]().add(listView);

[Link](new Scene(root));

[Link]();
}
}

Kotlin

195
8. Concurrency

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View("My View") {

override val root = vbox {

listview<String> {
[Link]("Alpha","Beta","Gamma","Delta","Epsi
lon")
.delay(3, [Link], [Link]())
.toList()
.observeOnFx()
.subscribeBy { [Link](it) }
}
}
}

Figure 8.3

![]([Link]

In Figure 8.3, notice that our UI is empty for 3 seconds before it is finally
populated. The data importing and collecting into a List happens on the IO
thread, and then it is safely emitted back on the JavaFX thread where it is
populated into the ListView . The JavaFX thread does not hold up the UI from
displaying due to this operation keeping it busy. If we had more controls we would
see the UI is completely interactive as well during this background operation.

Chaining Multiple observeOn() Calls

196
8. Concurrency

It is also not uncommon to use multiple observeOn() calls. Here is a more real-
life example: let's say you want to create an application that displays a text
response (such as JSON) from a URL. This has the potential to create an
unrespsonsive application that freezes while it is fetching the request. But using
an observeOn() we can switch this work from the FX thread to an IO therad,
then call another observeOn() afterwards to put it back on the FX thread.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];
import [Link];

public class MyApp extends Application {


@Override
public void start(Stage stage) throws Exception {
VBox vBox = new VBox();
Label label = new Label("Input URL");
TextField input = new TextField();
TextArea output = new TextArea();
Button button = new Button("Submit");

[Link](true);

[Link](button)
.map(ae -> [Link]())
.observeOn([Link]())
.map(MyApp::getResponse)

197
8. Concurrency

.observeOn([Link]())
.subscribe(output::setText);

[Link]().setAll(label,input,output,button);
[Link](new Scene(vBox));
[Link]();

}
private static String getResponse(String path) {
try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();
}
}
}

Kotlin

198
8. Concurrency

import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View("My View") {

override val root = vbox {

label("Input URL")
val input = textfield()
val output = textarea {
isWrapText = true
}

button("Submit").actionEvents()
.map { [Link] }
.observeOn([Link]())
.map {
URL([Link]).readText()
}.observeOnFx()
.subscribe {
[Link] = it
}
}
}

Figure 8.4

199
8. Concurrency

You can then put in a URL in the TextField (such as


"[Link] and then click the "Submit"
Button to process it. You will notice the UI stays interactive and after a few
seconds it will put the response in the TextArea (Figure 8.5).

Figure 8.5

Of course, you can click the "Submit" Button multiple times and that could
queue up the requests in an undesirable way. But at least the work is kept off the
UI thread. In the next chapter we will learn about the switchMap() to mitigate
excessive user inputs and kill previous requests, so only the latest emission is
chased after.

But we will take a stateful strategy for now to prevent this from happening.

200
8. Concurrency

doOnXXXFx() Operators
Remember the doOnXXX() operators like doOnNext() , doOnComplete() ,
etc? RxKotlinFX has JavaFX equivalents that will perform on the FX thread,
regardless of which Scheduler is being used. This can be helpful to modify UI
elements in the middle of an Observable chain.

For example, you might want to disable the Button and change its text during
processing. Your first instinct might be to use the doOnNext() to achieve this.

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];
import [Link];

public class MyApp extends Application {


@Override
public void start(Stage stage) throws Exception {
VBox vBox = new VBox();
Label label = new Label("Input URL");
TextField input = new TextField();
TextArea output = new TextArea();
Button button = new Button("Submit");

[Link](true);

[Link](button)
.map(ae -> [Link]())

201
8. Concurrency

.observeOn([Link]())
.doOnNext(path -> {
[Link]("BUSY");
[Link](true);
})
.map(MyApp::getResponse)
.observeOn([Link]())
.subscribe(r -> {
[Link](r);
[Link]("Submit");
[Link](false);
});

[Link]().setAll(label,input,output,button);
[Link](new Scene(vBox));
[Link]();

}
private static String getResponse(String path) {
try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();
}
}
}

Kotlin

202
8. Concurrency

import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View("My View") {

override val root = vbox {

label("Input URL")
val input = textfield()
val output = textarea {
isWrapText = true
}

val submitButton = button("Submit")

[Link]()
.map { [Link] }
.observeOn([Link]())
.doOnNext {
[Link] = "BUSY"
[Link] = true
}
.map {
URL([Link]).readText()
}.observeOnFx()
.subscribe {
[Link] = it
[Link] = "Submit"
[Link] = false
}
}
}

203
8. Concurrency

But if you try to execute a request this way, you will get an error indicating that the
submitButton is not being modified on the FX thread. This is occuring because
an IO thread (not the FX thread) is trying to modify the Button . You could move
this doOnNext() operator before the observeOn([Link]()) so it
catches the FX thread, and that would address the issue. But there will be times
where you must call a doOnNext() deep in an Observable chain that is
already on another thread (such as updating a ProgressBar ).

In RxKotlinFX, there are doOnXXXFx() operator equivalents that run on the


JavaFX thread, regardless of which thread the operator is called on. You can
achieve this also with RxJavaFX using a Transformer , which is essentially a
custom operator you can pass to a compose() operator.

Java

import [Link];
import [Link];
import [Link]
rs;
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];
import [Link];

public class MyApp extends Application {


@Override
public void start(Stage stage) throws Exception {
VBox vBox = new VBox();
Label label = new Label("Input URL");
TextField input = new TextField();
TextArea output = new TextArea();

204
8. Concurrency

Button button = new Button("Submit");

[Link](true);

[Link](button)
.map(ae -> [Link]())
.observeOn([Link]())
.compose([Link](t -
> {
[Link]("BUSY");
[Link](true);
}))
.map(MyApp::getResponse)
.observeOn([Link]())
.subscribe(r -> {
[Link](r);
[Link]("Submit");
[Link](true);
});
[Link]().setAll(label,input,output,button);
[Link](new Scene(vBox));
[Link]();

}
private static String getResponse(String path) {
try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();
}
}
}

Kotlin

205
8. Concurrency

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View("My View") {

override val root = vbox {

label("Input URL")
val input = textfield()
val output = textarea {
isWrapText = true
}

val submitButton = button("Submit")

[Link]()
.map { [Link] }
.observeOn([Link]())
.doOnNextFx {
[Link] = "BUSY"
[Link] = true
}
.map {
URL([Link]).readText()
}.observeOnFx()
.subscribe {
[Link] = it
[Link] = "Submit"
[Link] = true
}
}
}

206
8. Concurrency

Java does not have extension functions like Kotlin. But RxJava does have a
compose() operator that you can pass a Transformer to, as well as a
lift() operator to accept custom operators. Between these two methods,
it is possible to create your own operators for RxJava. However, these are
beyond the scope of this book. You can read about creating your own
operators in my book _Learning RxJava.

Here are all the doOnXXXFx() operators availabe in RxKotlin. These behave
exactly the same way as the doOnXXX() operators introduced in Chapter 2, but
the action specified in the lambda will execute on the FX thread.

doOnNextFx()
doOnErrorFx()
doOnCompletedFx()
doOnSubscribeFx()
doOnUnsubscribeFx()
doOnTerminateFx()
doOnNextCountFx()
doOnCompletedCountFx()
doOnErrorCountFx()

The doOnXXXCountFx() operators will provide a count of emissions that


occurred before each of those events. They can be helpful for updating a
ProgressBar , an incrementing StatusBar , or other controls that track
progress.

Parallelization
Did you know the flatMap() (as well as flatMapSingle() and
flatMapMaybe() ) is actually a concurrency tool? RxJava by default does not do
parallelization, so effectively there is no way to parallelize an Observable . As
we have seen, subscribeOn() and observeOn() merely move emissions
from one thread to another thread, not one thread to many threads. However, you
can leverage flatMap() to create several Observables parallelizing emissions
on different threads.

207
8. Concurrency

For instance we can parallelize a (simulated) long-running process for 10


consecutive integers.

Java

import [Link];
import [Link];

public class Launcher {


public static void main(String[] args) {
[Link](1,10)
.flatMap(i -> [Link](i)
.subscribeOn([Link]())
.map(Launcher::runLongProcess)
).subscribe(i -> [Link]("Received "
+
i + " on " + [Link]().getName())
);

try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
public static int runLongProcess(int i) {
try {
[Link]((long) ([Link]() * 1000));
} catch (Exception e) {
[Link]();
}
return i;
}
}

Kotlin

208
8. Concurrency

import [Link]
import [Link]

fun main(args: Array<String>) {


[Link](1,10)
.flatMap {
[Link](it)
.subscribeOn([Link]())
.map { runLongProcess(it) }
}.subscribe {
println("Received $it on ${[Link](
).name}")
}

[Link](15000)
}

fun runLongProcess(i: Int): Int {


[Link]([Link]().toLong() * 1000)
return i
}

OUTPUT:

Received 1 on RxComputationScheduler-1
Received 3 on RxComputationScheduler-3
Received 5 on RxComputationScheduler-1
Received 9 on RxComputationScheduler-1
Received 4 on RxComputationScheduler-4
Received 8 on RxComputationScheduler-4
Received 2 on RxComputationScheduler-3
Received 6 on RxComputationScheduler-3
Received 7 on RxComputationScheduler-3
Received 10 on RxComputationScheduler-3

Your output may look different from what is above, and that is okay since nothing
is deterministic when we do this sort of parallelized concurrency. But notice we
have processing happening on at least three threads (RxComputationScheduler-

209
8. Concurrency

1, 3, and 4). Threads will be assigned at random. Since each Observable


created by an emission inside a flatMap() will take its own thread from the
given Scheduler , each resulting Observable will indepedently process
emisssions on a separate thread within the flatMap() .

It is critical to note that the flatMap() can fire emissions from multiple
Observables inside it, all of which may be running on different threads. But to
respect the Observable contract, it must make sure that emissions leaving the
flatMap() towards the Observer are serialized in a single Observable . If
one thread is busy pushing items out of the flatMap() , the other threads will
leave their emissions for that occupying thread to take ownership of in a queue.
This allows the benefit of concurrency without any blocking or synchronization of
threads.

You can learn more about achieving RxJava parallelization in two articles
written by yours truly: Acheiving Parallelization and [Maximizing
Parallelization]([Link]
[Link]).

Summary
In ths chapter we have learned one of the main selling points of Rxjava: concise,
flexible, and composable concurrency. You can compose Observables to change
concurrency policies at any time with the subscribeOn() and observeOn() .
This makes applications adaptable, scalable, and evolvable over long periods of
time. You do not have to mess with synchronizers, semaphores, or any other low-
level concurrency tools as RxJava takes care of these complexities for you.

But we are not quite done yet. As we will see in the next chapter, we can leverage
concurrency to create features you might have thought impractical to put in your
applications.

210
9. Switching, Throttling, and Buffering

9. Switching, Throttling, and Buffering


In the previous chapter, we learned that RxJava makes concurrency accessible
and fairly trivial to accomplish. But being able to compose concurrency easily
enables us to do much more with RxJava.

In UI development, users will inevitably click things that kick off long-running
processes. Even if you have concurrency in place, users that rapidly select UI
inputs can kick of expensive processes, and those processes will start to queue
up undesirably. Other times, we may want to group up rapid emissions to make
them a single unit, such as typing keystrokes. There are tools to effectively
overcome all these problems, and we will cover them in this chapter.

A Quick Note About Flowables and


Backpressure
One topic that I've decided is beyond the scope of this book is backpressure,
which involves using a Flowable instead of an Observable . A Flowable is
just like an Observable but it has a notion of "pushing back" on the source and
telling it to slow down. This way, highly concurrent chains of operations do not
create bottlenecks between consumers and producers, potentially causing the
JVM to run out of memory.

Flowables are highly critical if you are working with large amounts of data
concurrently. However, you cannot effectively use Flowables against user
interface events because a user cannot programmatically be told to "slow dow'
and respect a backpressure request. You can, however, use reactive operators to
"knock down" emissions from a rapidly-firing source which this chapter will cover.
You can also compose Flowables and Observables together using conversion
operators like Observable#toFlowable() and Flowable#toObservable() ,
as well as Observable#flatMapPublisher() and
Flowable#flatMapObservable() .

211
9. Switching, Throttling, and Buffering

But backpressure and Flowables are beyond the scope of this book. Please read
Chapter 8 of my Packt book Learning RxJava to get thorough examples,
explanations, and use cases for Flowables and backpressure.

Switching with switchMap()


Let's emulate a situation where rapid user inputs could overwhelm your
application with requests. Say you have two ListView<T> controls. The top one
has String values, and the bottom one will always display the individual
characters for each selected String . When you select "Alpha" on the top one,
the bottom one will contain items "A","l","p","h", and "a" (Figure 9.1).

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ListView<String> listView = new ListView<>();

[Link]().setAll("Alpha","Beta","Gamma",
"Delta","Epsilon","Zeta","Eta");

ListView<String> itemsView = new ListView<>();

[Link]([Link]
l().getSelectedItems())

212
9. Switching, Throttling, and Buffering

.flatMapSingle ( list -> [Link]


(list)
.flatMap (s -> [Link]([Link]
lit("(?!^)")))
.toList()
).subscribe(l -> [Link]().setAll(l))
;

[Link]().addAll(listView, itemsView);

[Link](new Scene(root));

[Link]();
}
}

Kotlin

213
9. Switching, Throttling, and Buffering

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View() {

val items = listOf("Alpha","Beta","Gamma",


"Delta","Epsilon","Zeta","Eta").observable()

override val root = vbox {

val listView = listview(items)

listview<String> {
[Link]
vable()
.flatMapSingle { [Link]()
.flatMap { [Link]().map(Char
::toString).toObservable() }
.toList()
}.subscribe { [Link](it) }
}
}
}

Figure 9.1

214
9. Switching, Throttling, and Buffering

This is a pretty quick computation which hardly keeps the JavaFX thread busy.
But in the real world, running database queries or HTTP requests can take awhile.
The last thing we want is for these rapid inputs to create a queue of requests that
will quickly make the application unusable as it works through the queue. Let's
emulate this by using the delay() operator. Remember that the delay()
operator already specifies a subscribeOn() internally, but we can specify an
argument which Scheduler it uses. Let's put it in the IO Scheduler. The
Observer must receive each emission on the JavaFX thread, so be sure to
observeOn() the JavaFX Scheduler before the emission goes to the
Observer .

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

215
9. Switching, Throttling, and Buffering

import [Link];

import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ListView<String> listView = new ListView<>();

[Link]().setAll("Alpha","Beta","Gamma",
"Delta","Epsilon","Zeta","Eta");

ListView<String> itemsView = new ListView<>();

[Link]([Link]
l().getSelectedItems())
.flatMapSingle( list -> [Link](
list)
.delay(3, [Link], Schedulers.i
o())
.flatMap (s -> [Link]([Link]
lit("(?!^)")))
.toList()
).observeOn([Link]())
.subscribe(l -> [Link]().setAll(l));

[Link]().addAll(listView, itemsView);

[Link](new Scene(root));

[Link]();
}
}

Kotlin

216
9. Switching, Throttling, and Buffering

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View() {

val items = listOf("Alpha","Beta","Gamma",


"Delta","Epsilon","Zeta","Eta").observable()

override val root = vbox {

val listView = listview(items)

listview<String> {
[Link]
vable()
.flatMapSingle { [Link]()
.delay(3,
[Link], Schedulers
.io())
.flatMap { [Link]().map(Char
::toString).toObservable() }
.toList()
}.observeOnFx().subscribe { [Link](it)
}
}
}
}

Now if we click several items on the top ListView , you will notice a 3-second
lag before the letters show up on the bottom ListView . This emulates long-
running requests for each click, and now we have these requests queuing up and
causing the bottom ListView to go berserk, trying to display each previous

217
9. Switching, Throttling, and Buffering

request before it gets to the current one. Obviously, this is undesirable. We likely
want to kill previous requests when a new one comes in, and this is simple to do.
Just change the flatMapSingle() that emits the List<String> of
characters to a switchMap() . Since there is no switchMapSingle() , just
convert that resulting Single from toList() to an Observable .

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

ListView<String> listView = new ListView<>();

[Link]().setAll("Alpha","Beta","Gamma",
"Delta","Epsilon","Zeta","Eta");

ListView<String> itemsView = new ListView<>();

[Link]([Link]
l().getSelectedItems())
.switchMap ( list -> [Link](lis
t)
.delay(3, [Link], Schedulers.i
o())

218
9. Switching, Throttling, and Buffering

.flatMap (s -> [Link]([Link]


lit("(?!^)")))
.toList()
.toObservable()
).observeOn([Link]())
.subscribe(l -> [Link]().setAll(l));

[Link]().addAll(listView, itemsView);

[Link](new Scene(root));

[Link]();
}
}

Kotlin

219
9. Switching, Throttling, and Buffering

import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View() {

val items = listOf("Alpha","Beta","Gamma",


"Delta","Epsilon","Zeta","Eta").observable()

override val root = vbox {

val listView = listview(items)

listview<String> {
[Link]
vable()
.switchMap { [Link]()
.delay(3, [Link], Schedule
[Link]())
.flatMap { [Link]().map(Char
::toString).toObservable() }
.toList()
.toObservable()
}.observeOnFx().subscribe { [Link](it)
}
}
}
}

This makes the application much more responsive. The switchMap() works
identically to any variant of flatMap() , but it will only chase after the latest user
input and kill any previous requests. In other words, it is only chasing after the

220
9. Switching, Throttling, and Buffering

latest Observable derived from the latest emission, and unsubscribing any
previous requests. The switchMap() is a powerful utility to create responsive
and resilient UI's, and is the perfect way to handle click-happy users!

You can also use the switchMap() to cancel long-running or infinite processes
using a neat little trick with [Link]() . For instance, a
ToggleButton has a true/false state depending on whether it is selected. When
you emit its false state, you can return an empty Observable to kill the
previous processing Observable , as shown below. When the ToggleButton
is selected, it will kick off an [Link]() that emits a consecutive
integer every 10 milliseconds. But unselecting the ToggleButton will cause the
flatMap() to switch to an [Link]() , killing and unsubscribing
from the [Link]() (Figure 9.2).

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];

public class MyApp extends Application {


@Override
public void start(Stage stage) throws Exception {
VBox vBox = new VBox();

ToggleButton toggleButton = new ToggleButton("START");


Label timerLabel = new Label("0");

[Link]([Link](
))
.switchMap(selected -> {

221
9. Switching, Throttling, and Buffering

if (selected) {
[Link]("STOP");
return [Link](10, TimeUnit.
MILLISECONDS);
} else {
[Link]("START");
return [Link]();
}
})
.observeOn([Link]())
.subscribe(i -> [Link]([Link]())
);

[Link]().setAll(toggleButton,timerLabel);
[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

222
9. Switching, Throttling, and Buffering

import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]

class MyApp: App(MyView::class)

class MyView : View() {

override val root = vbox {

val toggleButton = togglebutton("START")


val timerLabel = label("0")

[Link]().toObservable()
.switchMap { selected ->
if (selected) {
[Link] = "STOP"
[Link](10, [Link]
CONDS)
} else {
[Link] = "START"
[Link]()
}
}.observeOnFx()
.subscribe {
[Link] = [Link]()
}
}
}

Figure 9.2

223
9. Switching, Throttling, and Buffering

The switchMap() can come in handy for any situation where you want to switch
from one Observable source to another.

Buffering
We may want to collect emissions into a List , but doing so on a batching
condition so several lists are emitted. The buffer() operators help accomplish
this, and they have several overload flavors.

The simplest buffer() specifies the number of emissions that will be collected
into a List before that List is pushed forward, and then it will start a new
one. In this example, emissions will be grouped up in batches of 10 .

Java

import [Link];

public class Launcher {


public static void main(String[] args) {
[Link](1,100)
.buffer(10)
.subscribe([Link]::print);
}
}

Kotlin

import [Link]

fun main(args: Array<String>) {


[Link](1,100)
.buffer(10)
.subscribe { println(it) }
}

OUTPUT:

224
9. Switching, Throttling, and Buffering

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

There are other flavors of buffer() . Another will collect emissions based on a
specified time cutoff. If you have an Observable emitting at an interval of 300
milliseconds, you can buffer them into a List at every second. This is what the
output would look like:

Java

import [Link];

import [Link];

public class Launcher {


public static void main(String[] args) {

[Link](300, [Link])
.buffer(1, [Link])
.subscribe([Link]::println);

try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

225
9. Switching, Throttling, and Buffering

import [Link]
import [Link]

fun main(args: Array<String>) {


[Link](300, [Link])
.buffer(1, [Link])
.subscribe { println(it) }

[Link](10000)
}

OUTPUT:

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11, 12]
[13, 14, 15]
[16, 17, 18]
[19, 20, 21, 22]
[23, 24, 25]
[26, 27, 28]
[29, 30, 31, 32]

Another way to accomplish this is to pass another Observable to buffer()


as an argument, whose each emission (regardless of type) will "cut" and emit the
List at that moment.

Java

226
9. Switching, Throttling, and Buffering

import [Link];

import [Link];

public class Launcher {


public static void main(String[] args) {

Observable<Long> oneSecondInterval = [Link](


1, [Link]);

[Link](300, [Link])
.buffer(oneSecondInterval)
.subscribe([Link]::println);

try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

227
9. Switching, Throttling, and Buffering

import [Link]
import [Link]

fun main(args: Array<String>) {

val oneSecondInterval = [Link](1, [Link]


NDS)

[Link](300, [Link])
.buffer(oneSecondInterval)
.subscribe { println(it) }

[Link](10000)
}

OUTPUT:

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11, 12]
[13, 14, 15]
[16, 17, 18]
[19, 20, 21, 22]
[23, 24, 25]
[26, 27, 28]
[29, 30, 31, 32]

This is a helpful way to buffer() lists because you can use another
Observable to control when the Lists are emitted. We will see an example of
this at the end of this chapter when we group up keystrokes.

RxJava-Extras has some additional buffer-like operators, such as


toListWhile() which will group emissions into a List while a condition is
true, then it will emit the List\ and move on to the next one.

228
9. Switching, Throttling, and Buffering

Note that there are also window() operators that are similar to buffer() , but
they will return an Observable<Observable<T>> instead of an
Observable<List<T>> . In other words, they will return an Observable
emitting Observables rather than Lists. These might be more desirable in some
situations where you do not want to collect Lists and want to efficiently do further
operations on the groupings.

You can read more about buffer() and window() on the RxJava Wiki.

Throttling
When you have a rapidly firing Observable , you may just want to emit the first
or last emission within a specified scope. For example, you can use
throttleLast() (which is also aliased as sample() ) to emit the last
emission for each fixed time interval.

Java

import [Link];

import [Link];

public class Launcher {


public static void main(String[] args) {

[Link](300, [Link])
.throttleLast(1, [Link])
.subscribe([Link]::println);

try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

229
9. Switching, Throttling, and Buffering

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link](300, [Link])
.throttleLast(1, [Link])
.subscribe { println(it) }

[Link](10000)
}

OUTPUT:

2
5
8
12
15
18
22
25
28
32

throttleFirst() will do the opposite and emit the first emission within each
time interval. It will not emit again until the next time interval starts and another
emission occurs in it.

Java

230
9. Switching, Throttling, and Buffering

import [Link];

import [Link];

public class Launcher {


public static void main(String[] args) {

[Link](300, [Link])
.throttleFirst(1, [Link])
.subscribe([Link]::println);

try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

import [Link]
import [Link]

fun main(args: Array<String>) {

[Link](300, [Link])
.throttleFirst(1, [Link])
.subscribe { println(it) }

[Link](10000)
}

OUTPUT:

231
9. Switching, Throttling, and Buffering

0
4
8
12
16
20
24
28
32

The debounce() operator (also aliased as throttleWithTimeOut() ) will hold


off emitting the latest emission until a specified amount of time has passed with no
emissions. Below, we have a debounce() operator that will push the latest
emission after 1 second of no activity. If we send 10 rapid emissions at 100
millisecond intervals, 3 emissions separated by 2 second intervals, and 4
emissions at 500 millisecond intervals, we will likely get this output below:

Java

232
9. Switching, Throttling, and Buffering

import [Link];

import [Link];

public class Launcher {


public static void main(String[] args) {

Observable<String> source = [Link](


[Link](100,[Link]).t
ake(10).map(i -> "A" + i),
[Link](2, [Link]).take(3)
.map(i -> "B" + i),
[Link](500, [Link]).
take(4).map(i -> "C" + i)
);

[Link](1, [Link])
.subscribe([Link]::println);

try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}

Kotlin

233
9. Switching, Throttling, and Buffering

import [Link]
import [Link]

fun main(args: Array<String>) {

val source = [Link](


[Link](100,[Link]).take(
10).map { "A-$it"},
[Link](2, [Link]).take(3).map
{ "B-$it"},
[Link](500, [Link]).take(
4).map { "C-$it"}
)

[Link](1, [Link])
.subscribe { println(it) }

[Link](10000)
}

OUTPUT:

A9
B0
B1
C3

I labeled each source as "A", "B", or "C" and concatenated that with the index of
the emission that was throttled. You will notice that the 10 rapid emissions
resulted in the last emission "A9" getting fired after the 1-second interval of "B"
resulted in that inactivity. Then "B0" and "B1" had 1 second breaks between them
resulting in them being emitted. But "B3" did not go forward because "C" started
firing at 500 millisecond intervals and gave no inactivity interval for it to fire. Then
"C3" was the last emission to fire at the final respite.

If you want to see more examples and marble diagrams of these operators, check
out the RxJava Wiki article.

234
9. Switching, Throttling, and Buffering

Grouping Up Keystrokes
Now we will move on to a real-world example that puts everything in this chapter
in action. Say we have a ListView<String> containing all 50 states of the
United States (I saved them to a plain text file on GitHub Gist. When we have the
ListView selected, we want users to be able to start typing a state and it will
immediately jump to the first state that starts with that inputted String .

Achieving this can be a bit tricky. As a user is typing rapidly, we want to collect
those emissions into a single String to turn individual characters into words.
When the user stops typing, we want to stop collecting characters and push that
String forward so it is selected in a ListView . Here is how we can do that:

Java

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public final class MyApp extends Application {

@Override
public void start(Stage stage) throws Exception {

VBox root = new VBox();

//Declare a ListView with all U.S. states


ListView<String> listView = new ListView<>();

235
9. Switching, Throttling, and Buffering

List<String> states = [Link](getResponse("https:/


/[Link]/S0xuOi").split("\\r?\\n"));
[Link]().setAll(states);

//broadcast typed keys


Observable<String> typedKeys = [Link]
(listView, KeyEvent.KEY_TYPED)
.map(KeyEvent::getCharacter)
.publish().refCount();

//immediately jump to state being typed


[Link](200, [Link]).startWith(
"")
.switchMap(s ->
[Link]((x,y) -> x + y)
.switchMap(input ->
[Link](
states)
.filter(st -> st
.toUpperCase().startsWith([Link]()))
.take(1)
)
).observeOn([Link]())
.subscribe(st ->
[Link]().select(st)
);

[Link]().add(listView);

[Link](new Scene(root));

[Link]();
}

private static String getResponse(String path) {


try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();

236
9. Switching, Throttling, and Buffering

}
}
}

Kotlin

import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]

class MyApp: App(MyView::class)

class MyView : View("My View") {

val states = [Link](


URL("[Link]
"\\r?\\n"))
)

override val root = vbox {

val listView = listview<String> {


items = states
}

val typedKeys = [Link](KeyEvent.KEY_TYPED)


.map { [Link] }
.publish().refCount()

[Link](200, [Link]).startWith(
"")

237
9. Switching, Throttling, and Buffering

.switchMap {
[Link] { x,y -> x + y }
.switchMap { input ->
[Link]()
.filter { [Link]
().startsWith([Link]()) }
.take(1)
}
}.observeOnFx()
.subscribe {
[Link](it)
}
}
}

Figure 9.3 - A ListView that will select states that are being typed

There is a lot happening here, so let's break it down.

Obviously we set up our ObservableList<String> containing all the U.S.


states, and set that to back the ListView . Then we multicast the keystrokes
through the typedKeys Observable. We use this typedKeys Observable for

238
9. Switching, Throttling, and Buffering

two separate tasks: 1) Signal the user has stopped typing after 200ms of inactivity
via debounce() 2) Receive that signal emission within a switchMap() , where
typedKeys is used again to infinitely scan() typed characters and
concatentate them together as the user types. Then each concatenation of
characters is compared to all the states and finds the first one that matches. That
state is then put back on the FX thread and to the Observer to be selected.

This is probably the most complex task I have found in using RxJava with JavaFX,
but it is achieving an incredible amount of complex concurrent work with little
code. Take some time to study the code above. Although it may take a few
moments (or perhaps days) to sink in, try to look at what each part is doing in
isolation. An infinite Observable is doing a rolling concatenation of user
keystrokes to form Strings (and using a switchMap() to kill off previous
searches). That inifinite Observable is killed after 200 ms of inactivity and
replaced with a new inifinte Observable , effectively "resetting" it.

Once you get a hang of this, you will be unstoppable in creating high-quality
JavaFX applications that can not only cope, but also leverage rapid user inputs.

Summary
In this chapter, we learned how to handle a high volume of emissions effectively
through various strategies. When Observers cannot keep up with a hot
Observable , you can use switching, throttling, and buffering to make the
volume manageable. We also learned some powerful patterns to group up
emissions based on timing mechanisms, and make tasks like processing
keystrokes fairly trivial.

We are almost done with our RxJava journey. In the final chapter, we will cover a
question probably on many readers' minds: decoupling UI's when using RxJava.

239
10. Decoupling

10. Decoupling Reactive Streams


In this book, we kept our examples fairly coupled and did not bring any UI code
separation patterns. This was to keep the focus on Rx topics and not distract
away from them. But in this chapter, we will introduce how you can separate
Observables and Observers cleanly so they are not coupled with each other, even
if they are in different parts of the UI. This aids goals to create effective code
separation patterns and increase maintainability of complex applications.

RxJava has a special reactive type called a Subject , which comes in a number
of implementations. A Subject operates as both an Observable and an
Observer . However, you need to be selective when to use Subjects as they can
introduce antipatterns. They can come in handy to decouple reactive streams by
having multiple "sources" subscribe a Subject to their emissions, and then that
Subject will pass those emissions downstream to any receiving Observers.
Subjects automatically multicast as well.

Using the PublishSubject


A Subject cam act as a proxy between one or more source Observables and
one or more Observers. The most vanilla type of Subject is the
PublishSubject , which can be called using [Link]() . It
will simply relay emissions from one or more upstream sources to one or more
downstream Observers, without any additional behaviors.

Below, we do a simple separation between the source of text input values and an
Observer that consumes them by putting them in a Label . The Subject
will act as a proxy between them.

Java

240
10. Decoupling

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public class JavaFxApp extends Application {

private final Subject<String> textInputs = [Link]


eate();

@Override
public void start(Stage stage) throws Exception {

TextField textField = new TextField();


Label label = new Label();

//pass emissions to the Subject


[Link]([Link]()).subs
cribe(textInputs);

//receive emissions from the Subject


[Link](s -> new StringBuilder(s).reverse().toStr
ing())
.subscribe(label::setText);

VBox vBox = new VBox(textField, label);

[Link](new Scene(vBox));
[Link]();
}
}

Kotlin

241
10. Decoupling

import [Link]
import [Link]
import tornadofx.*

class MyApp: App(MyView::class)

class MyView : View("My View") {

val textInputs = [Link]<String>()

override val root = vbox {


textfield {
textProperty()
.toObservable()
.subscribe(textInputs)
}

label {
[Link](String::reversed)
.subscribe { text = it }
}
}
}

Using a Subject in a Model


Typically, you will hold the Subject in a separate model class of some sort to
support your JavaFX applications, and relay emissions from one component to
another in an EventBus-like fashion. This is helpful to not only broadcast universal
events throughout your application, but also provide several sources to drive a
single event.

Java

242
10. Decoupling

import [Link];
import [Link];
import [Link];
import [Link];

public class MyEventModel {

private MyEventModel() {}

private static final MyEventModel instance = new MyEventMode


l();

public static MyEventModel getInstance() {


return instance;
}

private final Subject<ActionEvent> refreshRequests = Publish


[Link]();

public Observable<ActionEvent> getRefreshRequests() {


return refreshRequests;
}
}

Kotlin

import [Link]
import [Link]

object MyEventModel {
val refreshRequests = [Link]<ActionEvent>()
}

In this MyEventModel we have a Subject<ActionEvent> that handles


refreshRequests , Let's say we wanted three events to drive a refresh: a
Button , a MenuItem , and a key combination "CTRL + R" on a TableView .

243
10. Decoupling

If you declare these Observables in three separate places throughtout your UI


code, you can add each of them to this CompositeObservable .

Java

//make refresh Button


Button button = new Button("Refresh");
[Link](button)
.subscribe([Link]().getRefreshRequests());

//make refresh MenuItem


MenuItem menuItem = new MenuItem("Refresh");
[Link](menuItemClicks)
.subscribe([Link]().getRefreshRequests());

//CTRL + R hotkeys on a TableView


TableView<MyType> tableView = new TableView<>();

[Link](tableView, KeyEvent.KEY_PRESSED)
.filter(ke -> [Link]() && [Link]().equals(
KeyCode.R))
.map(ke -> new ActionEvent());
.subscribe([Link]().getRefreshRequests
());

Kotlin

244
10. Decoupling

//make refresh button


val button = Button("Refresh")
[Link]().subscribe([Link])

//make refresh MenuItem


val menuItem = MenuItem("Refresh")
[Link]().subscribe([Link])

//CTRL + R hotkeys on a TableView


val tableView = TableView<MyType>();
[Link](KeyEvent.KEY_PRESSED)
.filter { [Link] && [Link] == KeyCode.R }
.map { ActionEvent() }
.subscribe([Link])

These three event sources are now proxied through one Subject . You can then
have one or more Observers subscribe() to this Subject , and they will
respond to any of these three sources requesting a refresh.

Java

//subscribe to refresh events


[Link]()
.getRefreshRequests()
.subscribe(ae -> refresh());

Kotlin

[Link]
.subscribe { refresh() }

You can set up as many models as you like with as many Subjects as you like to
pass different data and events back-and-forth throughout your application.

Other Subject Types

245
10. Decoupling

There are a couple of other Subject implementations to be aware of.


BehaviorSubject will cache the last emission that will be replayed to every
new Observer , which can be helpful to always broadcast the latest value
selected in a control. ReplaySubject will replay all values and indefinitely
cache them. AsyncSubject will broadcast only the last value after the source
calls onComplete() , and UnicastSubject will cache emissions until it gets
the first downstream Observer , which it will emit all the items to and then flush
its cache.

You can learn more about these subjects in Rx documentation as well as the
Learning RxJava Packt Book.

Summary
In this chapter we covered how to separate reactive streams between UI
components with the Subject , which can serve as a proxy between Observable
sources and downstream Observers. You can put Subject instances in a
backing class to serve as an Rx-flavored event bus to relay data and events. Use
the Subject to consolidate mutliple event sources that drive the same action, or
to cleanly separate your Observable sources and terminal Observers.

Closing
You have reached the end of this book. Congrats! Keep researching RxJava and
learn what it can do inside and outside of JavaFX. You will find it is used on
Android via the RxAndroid and RxBindings libraries, as well as on backend
development with RxNetty and other frameworks. I encourage you to keep
learning the various operators and check out books and online resources to grow
your proficiency.

I highly encourage reading my Packt book Learning RxJava to get more thorough
knowledge of RxJava beyond JavaFX applications. I wrote it in the same style and
spirit as this book, with the intent of helping the largest number of people possible.
The book can be purchased on Packt, Amazon, and other book retailers. It is also
available via subscription to O'Reilly Safari and Packt Mapt.

246
10. Decoupling

[Link]

Please follow me on Twitter @thomasnield9727 for updates on all things Rx. If


you have any issues, questions, or concerns please feel free to file an issue or
email me at thomasnield@[Link].

Until next time!

Thomas Nield

247

You might also like