Rxjavafx Guide
Rxjavafx Guide
Introduction 1.1
Preface 1.2
1. Getting Started 1.3
2. RxJava Fundamentals 1.4
3. Events and Value Changes 1.5
4. Collections 1.6
5. Combining Observables 1.7
6. Bindings 1.8
7. Dialogs and Multicasting 1.9
8. Concurrency 1.10
9. Switching, Throttling, and Buffering 1.11
10. Decoupling 1.12
1
Introduction
2
Preface
Preface
Over the past year or so, I have discovered so much can be achieved leveraging
reactive programming in JavaFX applications. To this day, I still discover amazing
patterns that RxJava allows in JavaFX applications. RxJavaFX is merely a layer
between these two technologies to make them talk to each other, just like
RxAndroid bridges RxJava and the Android UI.
I guess the best way to introduce RxJavaFX is to share how it came about. In
2014, I had already developed some Swing applications that were used internally
at my company. These applications were quite involved with lots of interactivity,
data processing, and complex user inputs. Like most UI applications, these were a
beat-down to build. Naturally, I was drawn to JavaFX and was particularly
intrigued by the ObservableValue , ObservableList , and other data
structures that notified the UI automatically of their changes. No more
[Link]() ! Although I briefly considered HTML5 as my
next platform, JavaFX showed more promise in the environment I worked in.
I originally set out to use reactive programming as a way to handle UI events, and
ReactFX was perfect for this. But I began to suspect I was missing the bigger
picture. I researched reactive programming further and discovered RxJava, a
3
Preface
reactive API with a rich ecosystem of libraries built around it, including [RxAndroid]
([Link] and [RxJava-JDBC]
([Link] RxJava rapidly became a core
technology in the Android stack, and I wondered if it had the same potential in
JavaFX. As I studied RxJava, I was immediately drawn to the RxJava-JDBC
library. Effectively, I could leverage bindings that were bound to database queries.
It soon became clear that with reactive programming, events are data, and data
are events!
During my struggle, I stumbled on the RxJavaFX project. It was a small library that
converted Node and ObservableValue events into RxJava Observables. It
also contained a Scheduler for the JavaFX thread. I knew immediately this was
the alternative to ReactFX I needed, but some folks at Netflix were having some
build issues with it. Ben Christensen was eager to give it away to someone who
knew JavaFX, as nobody at Netflix used JavaFX. After a period of no activity, I
reluctantly volunteered to take ownership of it. After hours of Googling, trawling
GitHub projects with similar issues, and making a few tweaks, the build and tests
finally succeeded. I was able to get it released on Maven Central and RxJavaFX
was now live.
4
Preface
5
1. Getting Started
1. Getting Started
Reactive programming is about composing events and data together, and treating
them identically. This idea of "events are data, and data are events" is powerful,
and because UI's often have to coordinate both it is the perfect place to learn and
apply reactive programming. For this reason, this book will teach RxJava from a
JavaFX perspective and assume no prior RxJava knowledge. If you already have
experience with RxJava, you are welcome to skip the next chapter.
I would highly recommend being familiar with JavaFX (or TornadoFX) before
starting this book. [Mastering JavaFX 8 Controls (Hendrik Ebbers) and Pro
JavaFX 8 (James Weaver and Weiqi Gao) are excellent books to learn JavaFX. If
you are interested in leveraging JavaFX with the Kotlin language, check out the
TornadoFX Guide written by Edvin Syse and Thomas Nield (yours truly). I will
explain why this book shows examples in both the Java and Kotlin languages
shortly. For now, let us explore the benefits of using RxJava with JavaFX.
6
1. Getting Started
RxJava has a rapidly growing and active community. The creators and
maintainers of RxJava do an awesome job of answering questions and being
responsive to developers of all skill levels. Reactive programming has enabled an
exciting new domain filled with new ideas, practical applications, and constant
discovery. RxJava is one of the many ReactiveX API's standardized across many
programming languages. Speaking of other languages, let us talk about Kotlin.
If you have never checked out Kotlin, I would higly recommend giving it a look. It
is an intuitive language that only takes a few hours for a Java developer to learn.
The reason I present Kotlin in this book is because it created a unique opportunity
on the JavaFX front. Towards the end of Kotlin's beta, Edvin Syse released
TornadoFX, a lightweight Kotlin library that significantly streamlines development
of JavaFX applications.
For instance, with TornadoFX you can create an entire TableView using just the
Kotlin code below:
tableview<Person> {
column("ID", Person::id)
column("Name", Person::name)
column("Birthday", Person::birthday)
column("Age", Person::age)
}
7
1. Getting Started
I had the privilege of joining Edvin's project not long after TornadoFX's release,
and the core team has created a phenomenal JavaFX suite of features enabled by
the Kotlin language. I would highly recommend giving the TornadoFX Guide a look
to learn more.
If you are not interested in Kotlin, no worries! The Java version of code samples
will be always be presented first and you can ignore the Kotlin ones.
Setting Up
Currently, RxJavaFX and RxKotlinFX support both RxJava 1.x and RxJava 2.x
(and aligned with their own respectiveX 1.x and 2.x versions). RxJava 2.x brings a
number of large changes to RxJava, and this is the version that the guide will
cover.
You should prefer RxJava 2.x because RxJava 1.x support will discontinue early
2018.
Java
To setup RxJavaFX 2.x for Java, use the Gradle or Maven configuration below
where x.y.z is the version number you want to specify.
Gradle
compile '[Link].rxjava2:x.y.z'
Maven
8
1. Getting Started
<dependency>
<groupId>[Link].rxjava2</groupId>
<artifactId>rxjavafx</artifactId>
<version>x.y.z</version>
</dependency>
Kotlin
If you are using Kotlin, you will want to use RxKotlinFX instead of RxJavaFX.
Make sure you have configured Maven or Gradle to use a Kotlin configuration,
and include the dependencies below. Note the x.y.z is where you put the
targeted version number, and I included TornadoFX and RxKotlin as
dependencies since the examples will use them.
Gradle
compile '[Link]:rxkotlinfx:x.y.z'
compile '[Link]:tornadofx:x.y.z`
compile '[Link].rxjava2:rxkotlin:x.y.z'
Maven
<dependency>
<groupId>[Link]</groupId>
<artifactId>rxkotlinfx</artifactId>
<version>x.y.z</version>
</dependency>
<dependency>
<groupId>[Link]</groupId>
<artifactId>tornadofx</artifactId>
<version>x.y.z</version>
</dependency>
<dependency>
<groupId>[Link]</groupId>
<artifactId>rxkotlin</artifactId>
<version>x.y.z</version>
</dependency>
9
1. Getting Started
Figure 1.1. shows a Venn diagram showing the stack of technologies typically
used to built a reactive JavaFX application with Kotlin. The overlaps indicate that
library is used to interoperate between the 3 domains: JavaFX, RxJava and Kotlin
Figure 1.1
Summary
In this chapter we got a high level overview of reactive programming and the role
RxJavaFX plays in connecting JavaFX and RxJava together. There was also an
explanation why Kotlin is presented alongside Java in this book, and why both
RxKotlinFX and TornadoFX are compelling options when building JavaFX
applications. You can go through this book completely ignoring the Kotlin
examples if you like.
10
1. Getting Started
In the next chapter we will cover the fundamentals of RxJava, and do it from a
JavaFX perspective. If you are already experienced with RxJava, you are
welcome to skip this chapter. But if you have been looking for a practical domain
to apply RxJava, read on!
11
2. RxJava Fundamentals
2. RxJava Fundamentals
RxJava has two core types: the Observable and the Observer . In the
simplest definition, an Observable pushes things. A given Observable<T>
will push items of type T through a series of operators that form other
Observables, and finally the terminal Observer is what consumes the items at
the end of the chain.
You will need to create a source Observable where emissions originate from,
and there are many factories to do this. To create a source Observable that
pushes items 1 through 5, declare the following:
Java
12
2. RxJava Fundamentals
import [Link];
Kotlin
import [Link]
Java
import [Link];
[Link]([Link]::println);
}
}
13
2. RxJava Fundamentals
Kotlin
import [Link]
A lambda is a special type of argument specifying an action. This one will take
each emission and print it, and this subscribe() operation creates a
Observer for us based on this lambda argument.
Java 8 and Kotlin have their own ways of expressing lambdas. If you need to
learn more about Java 8 lambdas, I would recommend reading at least the
first two chapters of Java 8 Lambdas by Richard Warburton before
proceeding. You can read the Kotlin Reference to learn about lambdas in
Kotlin. Lambdas are a critical syntax feature that we will use constantly in this
book.
Go ahead and run the code above, and you should get the following:
OUTPUT:
1
2
3
4
5
Understandings Observers
14
2. RxJava Fundamentals
Java
import [Link];
[Link]([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);
}
}
Kotlin
import [Link]
import [Link]
[Link](
onNext = ::println,
onError = { [Link]() },
onComplete = { println("Done!") }
)
}
OUTPUT:
15
2. RxJava Fundamentals
1
2
3
4
5
Done!
Let's briefly break down the Observer to understand it better. The lambdas are
just a shortcut to allow the subscribe() method to quickly create an
Observer for you. You can create your own Observer object explicitly by
extending ResourceObserver and implementing its three abstract methods:
onNext() , onError() , and onComplete() . You can then pass this
Observer to the subscribe() method.
Java
16
2. RxJava Fundamentals
import [Link];
import [Link];
import [Link];
@Override
public void onError(Throwable e) {
[Link]();
}
@Override
public void onNext(Integer integer) {
[Link](integer);
}
};
[Link](subscriber);
}
}
Kotlin
17
2. RxJava Fundamentals
import [Link]
import [Link]
[Link](subscriber)
The Observer interface defines these three methods. The onNext() is what
is called to pass an emission. The onError() is called when there is an error,
and onComplete() is called when there are no more emissions. Logically with
infinite Observables, the onComplete() is never called.
Although this example is helpful for understanding the Observer , it also shows
implementing Observer objects can be pretty verbose. Therefore, it is helpful to
use lambdas instead for conciseness. These three methods on the Observer
are critical for understanding RxJava, and we will revisit them several times in this
chapter.
18
2. RxJava Fundamentals
It is critical to note that the onNext() can only be called by one thread at a time.
There should never be multiple threads calling onNext() concurrently, and we
will learn more about this later when we cover concurrency. For now just note
RxJava has no notion of parallelization, and when you subscribe to a factory like
[Link](1,2,3,4,5) , you will always get those emissions serially, in
that exact order, and on a single thread.
Java
import [Link];
Kotlin
19
2. RxJava Fundamentals
import [Link]
You can also turn any Iterable<T> into an Observable<T> quickly using
[Link]() . It will emit all items in that Iterable<T> and
then call onComplete() when it is done.
Java
import [Link];
import [Link];
import [Link];
Kotlin
import [Link]
Using Operators
20
2. RxJava Fundamentals
map()
Say you have an Observable<String> that pushes String values.
Java
import [Link];
Observable<String> source =
[Link]("Alpha", "Beta", "Gamma", "Delta"
, "Epsilon");
}
}
Kotlin
import [Link]
In RxJava, you can use hundreds of operators to transform emissions and create
new Observables with those transformations. For instance, you can create an
Observable<Integer> off an Observable<String> by using the map()
21
2. RxJava Fundamentals
Java
import [Link];
Observable<String> source =
[Link]("Alpha","Beta","Gamma","Delta",
"Epsilon");
[Link]([Link]::println);
}
}
Kotlin
import [Link]
[Link](::println)
}
OUTPUT:
22
2. RxJava Fundamentals
5
4
5
5
7
The source Observable pushes each String to the map() operator where
it is mapped to its length . That length is then pushed from the map()
operator to the Observer where it is printed.
You can do all of this without any intermediary variables holding each
Observable , and instead do everything in a single "chain" call. This can be
done in one line or broken up into multiple lines.
Java
import [Link];
Kotlin
23
2. RxJava Fundamentals
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.map { [Link] }
.subscribe(::println)
}
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon") // ca
lls onNext() on map()
.map(s -> [Link]()) // calls onNext() on Observer
.subscribe(i -> [Link](i));
filter()
Another common operator is filter() , which suppresses emissions that fail to
meet a certain criteria, and pushes the ones that do forward. For instance, you
can emit only Strings where the length() is at least 5. In this case, the
filter() will stop "Beta" from proceeding since it is 4 characters.
Java
24
2. RxJava Fundamentals
import [Link];
Kotlin
import [Link]
OUTPUT:
Alpha
Gamma
Delta
Epsilon
distinct()
There are also operators like distinct() , which will suppress emissions that
have previously been emitted to prevent duplicate emissions (based on each
emission's hashcode() / equals() implementation).
25
2. RxJava Fundamentals
Java
import [Link];
Kotlin
import [Link]
OUTPUT:
5
4
7
You can also provide a lambda specifying an attribute of each emitted item to
distinct on, rather than the item itself.
26
2. RxJava Fundamentals
Java
import [Link];
Kotlin
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.distinct { [Link] }
.subscribe(::println)
}
OUTPUT:
Alpha
Beta
Epsilon
take()
The take() operator will cut off at a fixed number of emissions and then
unsubscribe from the source. Afterwards, it will call onComplete() downstream
to the final Observer .
27
2. RxJava Fundamentals
Java
import [Link];
Kotlin
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.take(3)
.subscribe(::println)
}
OUTPUT:
Alpha
Beta
Gamma
Java
28
2. RxJava Fundamentals
import [Link];
Kotlin
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.takeUntil { [Link]("D" )}
.subscribe(::println)
}
OUTPUT:
Alpha
Beta
Gamma
Delta
count()
Some operators will aggregate the emissions in some form (in a classic
MapReduce fashion), and then push that aggregation as a single emission to the
Observer . Obviously, this requires the onComplete() to be called so that the
aggregation can be finalized and pushed to the Observer .
29
2. RxJava Fundamentals
One of these aggregation operators is count() . It will simply count the number
of emissions and when its onComplete() is called, and push the count up to the
Observer as a single emission. Then it will call onComplete() up to the
Observer .
Java
import [Link];
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.count()
.subscribe([Link]::println);
}
}));
Kotlin
import [Link]
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.count()
.subscribeBy { println(it) }
}
OUTPUT:
30
2. RxJava Fundamentals
toList()
The toList() is similar to the count() , and it also will yield a Single
rather than an Observable . It will collect the emissions until its onComplete()
is called. After that it will push an entire List containing all the emissions to the
Observer .
Java
import [Link];
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.toList()
.subscribe([Link]::println);
}
}
Kotlin
31
2. RxJava Fundamentals
import [Link]
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.toList()
.subscribeBy { println(it) }
}
OUTPUT:
reduce()
When you need to do a custom aggregation or reduction, you can use
reduce() to achieve this in most cases (to aggregate into collections and other
mutable structures, you can use its cousin collect() ). This will return a
Single (if a "seed" value is provided) or a Maybe (if no "seed" value is
provided). But say we wanted the sum of all lengths for all emissions. Starting with
a seed value of zero, we can use a lambda specifying how to "fold" the emissions
into a single value.
Java
32
2. RxJava Fundamentals
import [Link];
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.map(String::length)
.reduce(0,(current,next) -> current + next)
.subscribe([Link]::println);
}
}
Kotlin
import [Link]
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.map { [Link] }
.reduce(0) { current,next -> current + next }
.subscribeBy { println(it) }
}
OUTPUT:
26
The lambda in reduce() will keep adding two Integer values, where one of
them is the "rolling total" ( current ) or seed 0 value, and the other is the new
value ( next ) to be added. As soon as onComplete() is called, it will push the
result to the Observer .
33
2. RxJava Fundamentals
scan()
The reduce() will push a single aggregated value derived from all the
emissions. If you want to push the "running total" for each emission, you can use
scan() instead. This can work with infinite Observables since it will push each
accumulation for each emission, rather than waiting for all emissions to be
accumulated.
Java
import [Link];
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
)
.map(String::length)
.reduce(0,(current,next) -> current + next)
.subscribe([Link]::println);
}
}
Kotlin
import [Link]
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon")
.map { [Link] }
.reduce(0) { current,next -> current + next }
.subscribe(::println)
}
OUTPUT:
34
2. RxJava Fundamentals
0
5
9
14
19
26
flatMap()
There are hundreds of operators in RxJava, but we will only cover one more for
now. Throughout the book we will learn more as we go, and the most effective
way to learn operators is to seek them out of need.
The flatMap() is similar to map() , but will map the emission to another set of
emissions via another Observable . This is one of the most powerful operators
in RxJava and is full of use cases, but for now we will just stick with a simple
example.
Say we have some String emissions where each one contains concatenated
numbers separated by a slash / . We want to break up these numbers into
separate emissions (and omit the slashes). You can call split() on each
String and specify splitting on the slashes / , and this will return an array of
the separated String values. Then you can turn that array into an
Observable inside the flatMap() .
Java
35
2. RxJava Fundamentals
import [Link];
[Link]("123/52/6345","23421/534","758/2341/7493
2")
.flatMap(s -> [Link]([Link]("/"))
)
.subscribe([Link]::println);
}
}
Kotlin
import [Link]
import [Link]
[Link]("123/52/6345","23421/534","758/2341/74932")
.flatMap { [Link]("/").toObservable() }
.subscribe(::println)
}
OUTPUT:
123
52
6345
23421
534
758
2341
74932
36
2. RxJava Fundamentals
If you observe this closely, hopefully you will find the flatMap() is pretty
straightforward. You are taking each emission and replacing it with another set of
emissions, by providing another Observable derived off that emission. There is
a lot of very powerful ways to leverage the flatMap() , especially when used
with infinite, concurrent, and hot Observables which we will cover later.
Java
import [Link];
[Link]("123/52/6345","23421/534","758/2341/7493
2")
.flatMapSingle(s ->
[Link]([Link]("/"))
.map(Integer::valueOf)
.reduce(0, (curr,next) -> curr + next)
)
.subscribe([Link]::println);
}
}
Kotlin
37
2. RxJava Fundamentals
import [Link]
import [Link]
[Link]("123/52/6345","23421/534","758/2341/74932")
.flatMapSingle {
[Link]("/").toObservable()
.map { [Link]() }
.reduce(0) { curr, next -> curr + next }
}
.subscribe(::println)
}
OUTPUT:
6520
23955
78031
But the fundamental benefit of pushing is it allows a notion of emissions over time.
Our previous examples do not exactly show this, but now we will dive into some
examples that do.
38
2. RxJava Fundamentals
So far we just pushed data out of Observables. But did you know you can push
events too? As stated earlier, data and events are basically the same thing in
RxJava. Let's take a simple JavaFX Application with a single Button .
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
import tornadofx.*
39
2. RxJava Fundamentals
Rendered UI:
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](button)
.subscribe([Link]::println);
[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
40
2. RxJava Fundamentals
import [Link]
import tornadofx.*
If you click the Button a couple times your console should look something like
this:
OUTPUT:
[Link][source=Button@751b917f[styleClass=butto
n]'Press Me']
[Link][source=Button@751b917f[styleClass=butto
n]'Press Me']
[Link][source=Button@751b917f[styleClass=butto
n]'Press Me']
Wait, did we just treat the ActionEvent like any other emission and push it
through the Observable ? Yes we did! As said earlier, this is the powerful part of
RxJava. It treats events and data the same way, and you can use all the operators
we covered earlier. For example, we can use scan() to push how many times
the Button was pressed, and push that into a Label . Just map() each
ActionEvent to a 1 to drive increments.
Java
41
2. RxJava Fundamentals
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](button)
.map(ae -> 1)
.scan(0,(x,y) -> x + y)
.subscribe(clickCount -> [Link](clic
[Link]()));
[Link]().add(countLabel);
[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
42
2. RxJava Fundamentals
import [Link]
import tornadofx.*
button("Press Me")
.actionEvents()
.map { 1 }
.scan(0) {x,y -> x + y }
.subscribe { [Link] = [Link]() }
}
}
So how does all this work? The Observable<ActionEvent> we created off this
Button is emitting an ActionEvent item every time the Button is pressed.
Every time that Button is clicked, it pushes an ActionEvent emission
through the Observable . There is no notion of completion either as this
Observable is always alive during the life of the Button .
Of course you could use operators that make the operation finite, like take() . If
you only take 5 ActionEvent emissions from the Button , it will stop pushing
on emission 4 . Then it will unsubscribe from the source and call
onComplete() down the chain to the Observer .
Java
43
2. RxJava Fundamentals
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](button)
.map(ae -> 1)
.scan(0,(x,y) -> x + y)
.take(5)
.subscribe(
clickCount -> [Link](clickCo
[Link]()),
Throwable::printStackTrace,
() -> [Link]("Done!")
);
[Link]().addAll(countLabel, doneLabel,button);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
44
2. RxJava Fundamentals
import [Link]
import [Link]
import tornadofx.*
button("Press Me")
.actionEvents()
.map { 1 }
.scan(0) {x,y -> x + y }
.take(5)
.subscribeBy(
onNext = { [Link] = [Link]()
},
onError = { [Link]() },
onComplete = { [Link] = "Done!" }
)
}
}
45
2. RxJava Fundamentals
Java
Observable<String> source =
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon");
Kotlin
Java
Observable<String> source =
[Link]("Alpha","Beta","Gamma","Delta", "Epsilon"
);
You will find the emissions are replayed for each Observer .
OUTPUT:
46
2. RxJava Fundamentals
Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 1: Delta
Observer 1: Epsilon
Observer 2: Alpha
Observer 2: Beta
Observer 2: Gamma
Observer 2: Delta
Observer 2: Epsilon
While data and events are the same in RxJava, Hot Observables are often used
to represent events, such as an Observable<ActionEvent> built off a
Button .
Java
import [Link];
import [Link];
47
2. RxJava Fundamentals
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
Observable<ActionEvent> clicks =
[Link](button);
//Observer 1
[Link](ae ->
[Link]("Observer 1 Received Click!"));
[Link]().addAll(button,secondSubButton);
[Link](new Scene(vBox));
[Link]();
}
48
2. RxJava Fundamentals
Kotlin
import [Link]
import tornadofx.*
//Observer 1
[Link] { println("Observer 1 Received Click!")
}
RENDERED UI:
49
2. RxJava Fundamentals
Click the "Press Me" Button 3 times, then click the "Subscribe Observer 2"
Button . Finally click "Press Me" 2 more times, and you should get this output in
your console.
Notice that Observer 1 received those first three clicks, and then we
subscribed Observer 2 . But notice that Observer 2 has missed those first
three clicks. It will never get them because it subscribed too late to a hot
Observable. The only emissions Observer 2 receives are the ones that happen
after it subscribes.
After Observer 2 is subscribed, you can see the last two emissions were
pushed simultaneously to both Observer 1 and Observer 2 .
ConnectableObservable
We will learn several ways to create hot Observables in this book for different
tasks, but one that is worth mentioning now is the ConnectableObservable .
Among a few other subtle behaviors it creates, it can turn a cold Observable
into a hot one by forcing its emissions to become hot. To create one, you can take
any Observable and call its publish() method. You can then set up the
Observers and then call connect() to start firing the emissions.
50
2. RxJava Fundamentals
One reason you may do this is because it might be expensive to replay emissions
for each Observer , especially if it is emitting items from a slow database query
or some other intensive operation. Notice too that each emission interleaves and
goes to each Observer simultaneously.
Java
import [Link];
import [Link];
ConnectableObservable<String> source =
[Link]("Alpha","Beta","Gamma","Delta",
"Epsilon").publish();
[Link]();
}
}
Kotlin
51
2. RxJava Fundamentals
import [Link]
[Link]()
}
OUTPUT:
Observer 1: Alpha
Observer 2: Alpha
Observer 1: Beta
Observer 2: Beta
Observer 1: Gamma
Observer 2: Gamma
Observer 1: Delta
Observer 2: Delta
Observer 1: Epsilon
Observer 2: Epsilon
Disposing
There is one last operation we need to cover: unsubscribing. Unsubscription
should happen automatically for finite Observables once onComplete() is
called. But for infinite or long-running Observables, there will be times you want to
52
2. RxJava Fundamentals
stop the emissions and cancel the entire operation. This will also free up
resources in the Observable chain and clean up any resources it was using.
For instance, let's take our incrementing Button example from earlier and add
another Button that will unsubscribe the emissions. We need to save the
Disposable returned from the subscribe() method, and then we can refer
to it later to call dispose() and stop emissions.
Java
53
2. RxJava Fundamentals
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().addAll(button,unsubscribeButton,count
Label);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
54
2. RxJava Fundamentals
import [Link]
import tornadofx.*
button("Unsubscribe").setOnAction {
[Link]()
}
}
}
Note that when you press the "Unsubscribe" Button , the increments stop
because the Observer was disposed, and it instructed the Observable to
stop sending emissions. Disposal automatically happens with finite Observables
once onComplete() is called. But with infinite or long-running Observables, you
need to manage their disposal if you intend to terminate them at some point.
When you have infinite Observables that need to be disposed, it is very critical to
call dispose() on any Disposables when you are done with them. If you do not
do this, you will run into memory leak problems and the garbage collector will not
be able to free those resources.
When you have a lot of Disposables to manage and you want to dispose them all
at once, you can use a CompositeDisposable which acts as a collection of
Disposables. You can add any number of Disposables to it, and when you want to
55
2. RxJava Fundamentals
Java
Disposable disposable1 =
[Link](ae -> [Link]("Clicked!"))
;
[Link](subscription1);
Disposable disposable1 =
[Link](ae -> [Link]("Clicked Her
e Too!"));
[Link](disposable1);
Kotlin
[Link] { println("Clicked!") }
.addto(disposables)
56
2. RxJava Fundamentals
Summary
In this chapter we covered some RxJava fundamentals. The Observable treats
data and events in the same way, and this is a powerful idea that applies really
well with JavaFX. Cold Observables replay emissions to each Observer
independently. Hot Observables will broadcast emissions live to all Observers
simultaneously, and not replay missed emissions to tardy Observers.
57
2. RxJava Fundamentals
This book will continue to cover RxJava and apply it in a JavaFX context. There
are hundreds of operators and unfortunately we will not be able to cover them all,
but we will focus on the ones that are especially helpful for building JavaFX
applications. If you want to learn more about RxJava and its operators
comprehensively, please check out my Packt book Learning RxJava).
In the next chapter, we are going to dive a little deeper into JavaFX events, and
turn Node and ObservableValue events into Observables.
58
3. Events and Value Changes
Java
59
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](listView, KeyEvent.KEY_TYPED)
.map(KeyEvent::getCharacter)
.filter(s -> [Link]("[0-9]"))
.subscribe(s -> [Link]().sel
ect(s));
[Link]().add(listView);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
60
3. Events and Value Changes
import [Link]
import [Link]
import tornadofx.*
listview<String> {
(0..9).asSequence().map { [Link]() }.forEach {
[Link](it) }
events(KeyEvent.KEY_TYPED)
.map { [Link] }
.filter { [Link](Regex("[0-9]")) }
.subscribe { [Link](it)}
}
}
}
61
3. Events and Value Changes
If you want to combine keystrokes to form entire Strings rather than a series
of single characters, you will want to use throttling, buffering, and switching
operators to combine them based on timing windows. We will cover these
later in Chapter 9.
Java
62
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](rectangle, MouseEvent.MOUSE_MO
VED)
.map(me -> [Link]() + "-" + [Link]())
.subscribe(positionLabel::setText);
[Link]().addAll(positionLabel,rectangle);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
63
3. Events and Value Changes
import [Link]
import [Link]
import [Link]
import tornadofx.*
fill = [Link]
events(MouseEvent.MOUSE_MOVED)
.map { "${it.x}-${it.y}" }
.subscribe { [Link] = it }
}
}
}
Figure 3.2 - A red rectangle that pushes the cursor coordinates when its hovered
over.
64
3. Events and Value Changes
JavaFX is packed with events everywhere, and you will need to know which
events you are targeting on a given Node control. Be sure to look at the
JavaDocs for the control you are using to see which event types you want to
target.
Currently you can target events on Node , Window , and Scene types and
there should be factories to support each one.
ActionEvents
In the previous chapter we were exposed to the simple ActionEvent . You can
actually target the ActionEvent using the events factory and emit them through
an Observable<ActionEvent> .
Java
65
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](button, [Link])
.subscribe(ae -> [Link]("Pressed!"))
;
[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
66
3. Events and Value Changes
import [Link]
import [Link]
import tornadofx.*
Java
67
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](button)
.subscribe(ae -> [Link]("Pressed!"))
;
[Link]().add(button);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
68
3. Events and Value Changes
import [Link]
import tornadofx.*
ObservableValue Changes
This is where reactive JavaFX starts to get interesting. Up to this point we only
have worked with events. There is some metadata on event emissions that can be
useful, but we are not quite working with data in the traditional sense.
Java
69
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]([Link]())
.subscribe(v -> [Link](v + " was sel
ected"));
[Link]().add(comboBox);
[Link](new Scene(hBox));
[Link]();
}
}
Kotlin
70
3. Events and Value Changes
import [Link]
import tornadofx.*
[Link]("Alpha","Beta","Gamma","Delta","Epsilon"
)
valueProperty().toObservable()
.subscribe { println("$it was selected") }
}
}
}
When you select different items in the ComboBox , you should get a console
output that looks something like this:
For the next few examples, let's just focus on the Observable chain. Notice that
the [Link]() (or toObservable() for Kotlin) does not
push the initial null value, because RxJava 2 does not emit null values. However,
you can provide a second argument to put a sentinel value for a null.
Java
71
3. Events and Value Changes
[Link]([Link](), "N/A")
.subscribe(v -> [Link](v + " was selected")
);
Kotlin
valueProperty().toObservable("N/A")
.subscribe { println("$it was selected") }
Java
[Link]([Link]())
.subscribe(v -> [Link](v + " was selected")
);
Kotlin
valueProperty().toNullableObservable()
.subscribe { println("$it was selected") }
Remember that we can use any RxJava operators. We can map() each String's
length() and push that to the Observer .
Java
[Link]([Link]())
.map(String::length)
.subscribe(i ->
[Link]("A String with length " + i + " w
as selected")
);
Kotlin
72
3. Events and Value Changes
valueProperty().toObservable()
.map { [Link] }
.subscribe { println("A String with length $it was selec
ted") }
OUTPUT:
Let's get a little more creative, and use scan() to do a rolling sum of the lengths
with each emission.
Java
[Link]([Link]())
.map(String::length)
.scan(0,(x,y) -> x + y)
.subscribe(i -> [Link]("Rolling length total
: " + i));
Kotlin
valueProperty().toObservable()
.map { [Link] }
.scan(0,(x,y) -> x + y)
.subscribe { println("Rolling length total: $it") }
When you make a few selections to the ComboBox , your output should look
something like this depending on which Strings you selected.
OUTPUT:
73
3. Events and Value Changes
This example may be a bit contrived, but hopefully you are starting to see some of
the possibilities when you have a chain of operators "reacting" to a change in a
ComboBox . Pushing each value every time it is selected in a ComboBox allows
you to quickly tell other parts of the UI to update accordingly.
Again, you can use this factory on any ObservableValue . This means you can
hook into any JavaFX component property and track its changes reactively. The
possibilities are quite vast. For instance, for every selection event in a
ComboBox , you can query a database for items of that selection, and populate
them into a TableView . Then that TableView may have Observables built off
its events and properties to trigger other controls to update.
You also have the option of pushing the old and new value in a Change item
through the changesOf() factory. This can be helpful for validation, and you can
restore that old value back into the control if the new value fails to meet a
condition.
For example, you can emit the old value and new value together on each typed
character in a TextField . If at any moment the text is not numeric (or is an
empty String ), the previous value can be restored immediately using the
Observer .
Java
74
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]([Link]())
.map(s -> [Link]().matches("[0-9]+") ? [Link]
tNewVal() : [Link]())
.subscribe(textField::setText);
[Link]().add(textField);
[Link](new Scene(hBox));
[Link]();
}
}
Kotlin
75
3. Events and Value Changes
import [Link]
import tornadofx.*
If you study the Observable operation above, you can see that each Change
item is emitted holding the old and new value for each text input. Using a regular
expression, we validated for text inputs that are not numeric or are empty. We
then map() it back to the old value and set it to the TextField in the
Observer .
Error Recovery
When working with Observables built off UI events, sometimes an error can occur
which will be communicated up the chain via onError() . In production, you
should always have the Observer handle an onError() so the error does not
just quietly disappear. But when you are dealing with UI input events, there is
likely one other error handling issue to consider.
Say you have this simple JavaFX Application with a Button that adds a
numeric input from a TextField , and adds it to a total in a Label (Figure
3.3).
Java
76
3. Events and Value Changes
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](button)
.map(ae -> [Link]([Link]()))
.scan(0,(x,y) -> x + y)
.subscribe(i -> {
[Link]([Link]());
[Link]();
}, e -> new Alert([Link], [Link]
ssage()).show());
77
3. Events and Value Changes
Kotlin
import [Link]
import [Link]
import [Link]
import tornadofx.*
label("Input Number")
val input = textfield()
val totalLabel = label("")
button("Add to Total").actionEvents()
.map { [Link]() }
.scan(0) {x,y -> x + y }
.subscribeBy(
onNext = {
[Link] = [Link]()
[Link]()
},
onError = { Alert([Link], it
.message).show() }
)
}
}
Figure 3.3
78
3. Events and Value Changes
That TextField should only have numeric inputs, but nothing is stopping non-
numeric inputs from being emitted. Therefore, if you type a non-numeric value in
that TextField and click the Button , you will get an Alert as specified in
the Observers's onError() (Figure 3.4).
Figure 3.4
Despite the error being handled, there is one problem here. The Observable is
now dead. It called onError() and closed the stream, assuming nothing could
be done to recover from it. You will find the Button is no longer sending
emissions. You can fix this by adding the retry() operator right before the
Observer . When its onError() is called, it will intercept and swallow the
error, then resubscribe again.
Java
[Link](button)
.map(ae -> [Link]([Link]()))
.scan(0,(x,y) -> x + y)
.retry()
.subscribe(i -> {
[Link]([Link]());
[Link]();
}, e -> new Alert([Link], [Link]()).s
how());
79
3. Events and Value Changes
Kotlin
button("Add to Total").actionEvents()
.map { [Link]() }
.scan(0) {x,y -> x + y }
.retry()
.subscribeBy(
onNext = {
[Link] = [Link]()
[Link]()
},
onError = { Alert([Link], [Link]
).show() }
)
If you type in a non-numeric input, it will resubscribe and start all over. The
scan() operator will send another initial emission of 0 and result in
everything being reset. You also have the option of moving the retry() before
the scan() operation (so it intercepts the error before the scan() ), and that
would maintain the current rolling total rather than canceling it and starting over at
0.
But notice that the onError() in the Observer is never called, and we never
get an Alert . This is because the retry() intercepted the onError() call
and kept it from going to the Observer . To get the Alert , you may want to
move it to a doOnError() operator before the retry() . The error will flow
through it to trigger the Alert before the retry() intercepts it.
Java
80
3. Events and Value Changes
[Link](button)
.map(ae -> [Link]([Link]()))
.scan(0,(x,y) -> x + y)
.doOnError( e -> new Alert([Link], [Link]
sage()).show())
.retry()
.subscribe(i -> {
[Link]([Link]());
[Link]();
});
Kotlin
button("Add to Total").actionEvents()
.map { [Link]() }
.scan(0) {x,y -> x + y }
.doOnError { Alert([Link], [Link]).sh
ow() }
.retry()
.subscribe {
[Link] = [Link]()
[Link]()
}
There are a couple of error-handling operators in RxJava that are worth being
familiar with. But for UI input controls, you will likely want to leverage retry()
so Observables built off UI controls do not remain dead after an error. This is
especially critical if you are kicking off complex reactive processes.
81
3. Events and Value Changes
It is also worth noting that the best way to handle errors is to handle them
proactively. In this example, it would have been good to forbid numbers from
being entered in the TextField in the first place (like our previous exercise).
Another valid check would be to filter() out non-numeric values so they are
suppressed before being turned into an Integer .
Java
[Link](button)
.map(ae -> [Link]())
.filter(s -> [Link]("[0-9]+"))
.map(Integer::valueOf)
.scan(0,(x,y) -> x + y)
.subscribe(i -> {
[Link]([Link]());
[Link]();
});
Kotlin
button("Add to Total").actionEvents()
.map { [Link] }
.filter { [Link](Regex("[0-6]+")) }
.map { [Link]() }
.scan(0) {x,y -> x + y }
.subscribeBy(
onNext = {
[Link] = [Link]()
[Link]()
},
onError = { Alert([Link], [Link]).sh
ow() }
)
Summary
82
3. Events and Value Changes
But there are a few more facilities we need to be truly productive with reative
JavaFX, starting next with JavaFX Collections. This is where the line between
data and events truly become blurred in surpringly useful ways.
83
4. Collections
You should get a UI that looks like Figure 4.1 when you run the code below.
Java
import [Link];
import [Link];
import [Link];
84
4. Collections
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableList<String> values =
[Link]("Alpha","Beta",
"Gamma");
[Link](values)
.flatMapSingle(list ->
[Link](list).map(String
::length).distinct().toList()
).subscribe(lengths -> distinctLengthsListView.g
etItems().setAll(lengths));
85
4. Collections
[Link](addButton)
.map(ae -> [Link]())
.filter(s -> s != null && ![Link]().isEmpty())
.subscribe(s -> {
[Link](s);
[Link]();
});
[Link]().addAll(valuesLabel,valuesListView,dis
tinctLengthsLabel,
distinctLengthsListView,inputField,addButton);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
label("VALUES")
listview(values)
label("DISTINCT LENGTHS")
86
4. Collections
listview<Int> {
[Link]()
.flatMapSingle {
[Link]().map { [Link] }.dist
inct().toList()
}.subscribe {
[Link](it)
}
}
label("INPUT")
val inputField = textfield()
button("ADD").actionEvents()
.map { [Link] }
.filter { ![Link]().isEmpty() }
.subscribe {
[Link](it)
[Link]()
}
}
}
Figure 4.1
87
4. Collections
Go ahead and type in "Delta", then click "ADD". Then do the same for "Epsilon".
You should now see Figure 4.2.
Figure 4.2
88
4. Collections
See that? Not only did "Delta" and "Epsilon" get added to the top ListView , but
the distinct length of 7 was added to the bottom one. So how exactly was this
made possible?
This is a useful pattern because as we have just seen, we can transform this
ObservableList emission inside a flatMap() any way we want. In this
example, we effectively created a new ObservableList<Integer> that
89
4. Collections
Let's leverage this idea in another way. Instead of putting the distinct lengths in
another ObservableList<Integer> , let's concatenate them as a String and
push it into a Label 's text.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableList<String> values =
[Link]("Alpha","Beta",
"Gamma");
90
4. Collections
[Link](values)
.flatMapSingle(list ->
[Link](list)
.map(String::length)
.distinct().reduce("",(x,y) -> x
+ ([Link]("") ? "" : "|") + y)
).subscribe(distinctLengthsConcatLabel::setText)
;
[Link](addButton)
.map(ae -> [Link]())
.filter(s -> s != null && ![Link]().isEmpty())
.subscribe(s -> {
[Link](s);
[Link]();
});
[Link]().addAll(valuesLabel,valuesListView,dis
tinctLengthsLabel,
distinctLengthsConcatLabel,inputField,addButton)
;
[Link](new Scene(root));
[Link]();
}
}
Kotlin
91
4. Collections
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
label("VALUES")
listview(values)
label("DISTINCT LENGTHS")
label {
textFill = [Link]
[Link]()
.flatMapSingle {
[Link]()
.map { [Link] }
.distinct()
.reduce("") { x,y -> x + (if (x
== "") "" else "|") + y }
}.subscribe {
text = it
}
}
label("INPUT")
val inputField = textfield()
92
4. Collections
button("ADD").actionEvents()
.map { [Link] }
.filter { ![Link]().isEmpty()}
.subscribe {
[Link](it)
[Link]()
}
}
}
Figure 4.3
93
4. Collections
Java
@Override
public void start(Stage stage) throws Exception {
ObservableSet<String> values =
[Link]("Alpha","Beta","Gamm
a");
[Link](values)
.subscribe([Link]::println);
[Link]("Delta");
[Link]("Alpha"); //no effect
[Link]("Beta");
[Link](0); //quit
}
}
Kotlin
94
4. Collections
import [Link]
import [Link]
import tornadofx.*
[Link]()
.subscribe { println(it) }
[Link]("Delta")
[Link]("Alpha") //no effect
[Link]("Beta")
[Link](0) //quit
}
}
OUTPUT:
Java
95
4. Collections
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableMap<Integer,String> values =
[Link]();
[Link](values)
.subscribe([Link]::println);
[Link](1,"Alpha");
[Link](2,"Beta");
[Link](3,"Gamma");
[Link](1,"Alpha"); //no effect
[Link](3,"Delta");
[Link](2);
[Link](0);
}
}
Kotlin
96
4. Collections
import [Link]
import [Link]
import tornadofx.*
[Link](values)
.subscribe { println(it) }
[Link](1, "Alpha")
[Link](2, "Beta")
[Link](3, "Gamma")
[Link](1, "Alpha") //no effect
[Link](3, "Delta")
[Link](2)
[Link](0);
}
}
OUTPUT:
{}
{1=Alpha}
{1=Alpha, 2=Beta}
{1=Alpha, 2=Beta, 3=Gamma}
{1=Alpha, 2=Beta, 3=Delta}
{1=Alpha, 3=Delta}
97
4. Collections
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableList<String> values =
[Link]("Alpha","Beta",
"Gamma");
[Link](values)
.subscribe([Link]::println);
[Link]("Delta");
[Link]("Epsilon");
[Link]("Alpha");
[Link](2,"Eta");
[Link](0);
}
}
Kotlin
98
4. Collections
import [Link]
import [Link]
import tornadofx.*
[Link]().subscribe { println(it) }
[Link]("Delta")
[Link]("Epsilon")
[Link]("Alpha")
values[2] = "Eta"
[Link](0)
}
}
OUTPUT:
ADDED Delta
ADDED Epsilon
REMOVED Alpha
ADDED Eta
REMOVED Delta
Note that this factory has no initial emission. It will only emit changes going
forward after subscription. A ListChange is emitted with the affected value and
whether it was ADDED , REMOVED , or UPDATED . Interestingly, note that calling
set() on the ObservableList will replace an element at a given index, and
99
4. Collections
result in two emissions: one for the REMOVED item, and another for the ADDED
item. When we set the item at index 2 to "Eta", it replaced "Delta" which was
REMOVED , and then "Eta" was ADDED .
Java
class User {
private final int id;
private final Property<String> name =
new SimpleStringProperty();
Kotlin
100
4. Collections
Whenever this name property for any User changes, this change will be
pushed as an emission. It will be categorized in a ListChange as UPDATED .
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableList<User> values =
[Link](user -> new Ob
servableValue[]{ [Link]() });
[Link](values)
.subscribe([Link]::println);
[Link](0).nameProperty().setValue("Thomas Nield");
[Link](0);
101
4. Collections
Kotlin
102
4. Collections
import [Link]
import [Link]
import tornadofx.*
import [Link]
[Link](values)
.subscribe { println(it) }
[Link](0)
}
}
OUTPUT:
103
4. Collections
This can be helpful to react not just to items in the list being added or removed,
but also when their properties are modified. You can then use this behavior to, for
example, to drive updates to concatenations.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableList<User> values =
[Link](user -> new Ob
servableValue[]{[Link]()});
[Link](values)
.flatMapSingle(list ->
[Link](list)
.map(User::getName)
.reduce("",(u1,u2) -> u1 + (u1.e
quals("") ? "" : ", ") + u2)
)
.subscribe([Link]::println);
104
4. Collections
[Link](0).nameProperty().setValue("Thomas Nield");
[Link](0);
}
static final class User {
private final int id;
private final Property<String> name = new SimpleStringPr
operty();
Kotlin
105
4. Collections
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
[Link](values)
.flatMapSingle {
[Link]().map {[Link] }
.reduce("") { u1,u2 -> u1 + (if (u1
== "") "" else ", ") + u2 }
}
.subscribe { println(it) }
[Link](0)
}
}
106
4. Collections
OUTPUT:
Tom Nield
Tom Nield, Jason Shwartz
Thomas Nield, Jason Shwartz
Note also there are factories that target only ADDED , REMOVED , and UPDATED
events. These will only emit items corresponding to those event types, and also
are available under the JavaFxObservable utility class. Here is a complete list
of these additional factories as well as the others we covered so far.
107
4. Collections
There may be times you want to emit only distinct changes to a JavaFX
ObservableList . What this means is you want to ignore duplicates added or
removed to the collection and not emit them as a change. This can be helpful to
synchronize two different ObservableLists, where one has duplicates and the
other does not.
Take this application that will hold two ListView<String> instances each
backed by an ObservableList<String> . The top ListView<String> will
hold duplicate values, but the bottom ListView<String> will hold only distinct
values from the top ListView (Figure 4.5).
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]([Link]())
.subscribe(c -> {
if ([Link]().equals([Link]))
108
4. Collections
[Link]().add([Link]
ue());
else
[Link]().remove([Link]
Value());
});
[Link]().addAll(header1,listView,header2,
distinctListView,inputField,addButton,removeButt
on);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
import [Link]
import [Link]
import [Link]
109
4. Collections
import tornadofx.*
label("Values")
val listView = listview<String>()
label("Distinct Values")
val distinctListView = listview<String>()
label("Input/Remove Value")
val inputField = textfield()
[Link]()
.subscribe {
if ([Link] == [Link])
[Link]([Link])
else
[Link]([Link])
}
button("Add").actionEvents()
.map { [Link] }
.subscribe {
[Link](it)
[Link]()
}
button("Remove").actionEvents()
.map { [Link] }
.subscribe {
[Link](it)
[Link]()
}
}
}
110
4. Collections
Figure 4.5
You also have the option of choosing an attribute of the item to distinct on rather
than the item itself. If you wanted to only emit distinct values based on the first
character, you can pass a lambda argument to the factory that substrings out the
first character.
111
4. Collections
Java
[Link]([Link](), s -> s.
substring(0,1))
.subscribe(c -> {
if ([Link]().equals([Link]))
[Link]().add([Link]());
else
[Link]().remove([Link]());
});
Kotlin
[Link] { [Link](0,1) }
.subscribe {
if ([Link] == [Link])
[Link]([Link])
else
[Link]([Link])
}
Figure 4.6
112
4. Collections
As you may see, this might be helpful to sample only one item with a distinct
property. If you add "Alpha" and then "Apple", only "Alpha" will be emitted to the
bottom ListView since it was the first to start with "A". The "Alpha" will only be
removed from the bottom ListView when both "Alpha" and "Apple" are
removed, when there are no longer any "A" samples.
If you want to push the mapped value itself rather than the item it was derived
from, you can use the distinctMappingsOf() factory (or
distinctMappingChanges() for Kotlin) (Figure 4.7).
Java
113
4. Collections
[Link]([Link](), s -> s.
substring(0,1))
.subscribe(c -> {
if ([Link]().equals([Link]))
[Link]().add([Link]
ue());
else
[Link]().remove([Link]
Value());
});
Kotlin
[Link] { [Link](0,1) }
.subscribe {
if ([Link] == [Link])
[Link]([Link])
else
[Link]([Link])
}
Figure 4.7
114
4. Collections
If you input "Alpha", an "A" will show up in the bottom ListView . Adding "Apple"
will have no effect as "A" (its first character) has already been distincly ADDED .
When you remove both "Alpha" and "Apple", the "A" will then be REMOVED from
the bottom.
Summary
In this chapter we covered how to reactivly use JavaFX ObservableCollections.
When you emit an entire collection every time it changes, or emit the elements
that changed, you can get a lot of functionality that simply does not exist with
115
4. Collections
JavaFX alone. We also covered distinct additions and removals, which can be
helpful to create an ObservableList that distincts off of another
ObservableList .
Hopefully by now, RxJava is slowly starting to look useful. But we have only just
gotten started. The power of Rx really starts to unfold when we combine
Observables, leverage concurrency, and use other features that traditionally take
a lot of effort. Next, we will cover combining Observables.
116
5. Combining Observables
5. Combining Observables
So far in this book, we have merely set the stage to make Rx useful. We learned
how to emit JavaFX Events, ObservableValues, and ObservableCollections
through RxJava Observables. But there is only so much you can do when a
reactive stream is built off one source. When you have emissions from multiple
Observables being joined together in some form, this is truly where things
transition from merely being useful to definitely game-changing.
There are several ways to combine emissions from multiple Observables, and we
will cover many of these combine operators. What makes these operators
especially powerful is they are not only threadsafe, but also non-blocking. They
can merge concurrent sources from different threads, and we will see this in
action later in Chapter 7.
Concatenation
One of the simplest ways to combine Observables is to use the concat()
operators. You can specify two or more Observables emitting the same type T
and it will fire emissions from each one in order.
Java
import [Link];
[Link](source1,source2)
.map(String::length)
.toList()
.subscribe([Link]::println);
}
117
5. Combining Observables
Kotlin
import [Link]
import [Link]
[Link](source1,source2)
.map { [Link] }
.toList()
.subscribeBy { println(it) }
}
OUTPUT:
[5, 4, 5, 5, 7]
Java
118
5. Combining Observables
import [Link];
[Link](source2)
.map(String::length)
.toList()
.subscribe([Link]::println);
}
Kotlin
import [Link]
import [Link]
[Link](source2)
.map { [Link] }
.toList()
.subscribeBy { println(it) }
}
OUTPUT:
[5, 4, 5, 5, 7]
If you want to do a concenation but put another Observable in front rather than
after, you can use startWith() instead.
Java
119
5. Combining Observables
import [Link];
[Link](source2)
.subscribe([Link]::println);
}
Kotlin
import [Link]
import [Link]
[Link](source2)
.subscribe { println(it) }
}
OUTPUT:
Delta
Epsilon
Alpha
Beta
Gamma
Again, this operator is likely not one you would use with infinite Observables. You
are more likely to use this for data-driven Observables rather than UI events.
Technically, you can specify an infinite Observable to be the last Observable
120
5. Combining Observables
When you want to simultaneously combine all emissions from all Observables,
you might want to consider using merge() , which we will cover next.
Merging
Merging is almost like concatenation but with one important difference: it will
combine all Observables of a given emission type T simultaneously. This
means all emissions from all Observables are merged together at once into a
single stream without any regard for order or completion.
This is pretty helpful to merge multiple UI event sources since they are infinite. For
instance, you can consolidate the ActionEvent emissions of two buttons into a
single Observable<ActionEvent> using [Link]() . (Figure 5.1).
Java
121
5. Combining Observables
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();
[Link](
[Link](firstButton),
[Link](secondButton)
).subscribe(i -> [Link]("You pressed one of
the buttons!"));
[Link]().addAll(firstButton,secondButton);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
122
5. Combining Observables
import [Link]
import [Link]
import tornadofx.*
[Link](
[Link](),
[Link]()
)
.subscribe {
println("You pressed one of the buttons!")
}
}
}
Figure 5.1
When you press either Button , it will consolidate the emissions into a single
Observable<ActionEvent> which goes to a single Observer . But let's make
this more interesting. Change these two Buttons so they are labeled "UP" and
"DOWN", and map their ActionEvent to either a 1 or -1 respectively.
Using a scan() we can create a rolling sum of these emissions and push the
incrementing/decrementing number to a Label (Figure 5.2).
Java
123
5. Combining Observables
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();
[Link](
[Link](buttonUp).map(ae
-> 1),
[Link](buttonDown).map(
ae -> -1)
).scan(0,(x,y) -> x + y)
.subscribe(i -> [Link]([Link]()));
[Link]().addAll(label,buttonUp,buttonDown);
[Link](new Scene(root));
[Link]();
}
}
Kotlins
124
5. Combining Observables
import [Link]
import [Link]
import tornadofx.*
[Link](
[Link]().map { 1 },
[Link]().map { -1 }
).scan(0) { x,y -> x + y }
.subscribe {
[Link] = [Link]()
}
}
}
Figure 5.2
When you press the "UP" Button , it will increment the integer in the Label .
When you press the "DOWN" Button , it will decrement it. This was
accomplished by merging the two infinite Observables returned from the map()
operator. The 1 or -1 is then pushed to the scan() operation where it is
emitted as a rolling total.
Like concatenation, there is also an operator version you can use instead of the
factory to merge an Observable<T> with another Observable<T>
125
5. Combining Observables
Java
[Link](buttonUp).map(ae -> 1)
.mergeWith(
[Link](buttonDown).map(ae -> -1
)
).scan(0,(x,y) -> x + y)
.subscribe(i -> [Link]([Link]()));
Kotlin
[Link]().map { 1 }
.mergeWith(
[Link]().map { -1 }
).scan(0) { x,y -> x + y }
.subscribe {
[Link] = [Link]()
}
With both concatentation and merging, you can combine as many Observables as
you want (up to 9 before you have to pass an Iterable<Observable> instead).
But these two operators work with Observables emitting the same type T . There
are ways to combine emissions of different types which we will see next.
Zip
One way you can combine multiple Observables, even if they are different types,
is by "zipping" their emissions together. Think of a zipper on a jacket and how the
teeth pair up. From a reactive perspective, this means taking one emission from
the first Observable , and one from a second Observable , and combining
both emissions together in some way.
Take these two Observables, one emitting Strings and the other emitting Integers.
For each String that is emitted, you can pair it with an emitted Integer and
join them together somehow.
Java
126
5. Combining Observables
import [Link];
Kotlin
import [Link]
import [Link]
import [Link]
127
5. Combining Observables
OUTPUT:
A-1
B-2
C-3
D-4
E-5
Done!
Notice that "A" paired with the "1", and "B" paired with the "2", and so on. Again,
you are "zipping" them just like a jacket zipper. But take careful note of something:
there are 6 letters emissions and 5 numbers emissions. What happened to
that sixth letter "F" since it had no number to zip with? Since the two zipped
sources do not have the same number of emissions, it was ignored the moment
onComplete() was called by numbers . Logically, it will never have anything to
pair with so it gave up and proceeded to skip it and call onComplete() down to
the Subscriber .
There is also an operator equivalent called zipWith() you can use. This should
yield the exact same output.
Java
128
5. Combining Observables
import [Link];
Kotlin
import [Link]
import [Link]
import [Link]
129
5. Combining Observables
Zipping can be helpful when you need to sequentially pair things from two or more
sources, but from my experience this rarely works well with UI events. Let's adapt
this example to see why.
Suppose you create two ComboBox controls holding these letters and numbers
respectively. You want to create an Observable off each one that emits the
selected values. Then you want to zip the values together, concatentate them into
a single String , and print them in an Observer . You are looking to combine
two different user inputs together (Figure 5.3).
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
Observable<String> letterSelections =
[Link]([Link]
rty());
Observable<Integer> numberSelections =
[Link]([Link]
rty());
130
5. Combining Observables
[Link](new Scene(root));
[Link]();
}
}
Kotlin
131
5. Combining Observables
import [Link]
import [Link]
import [Link]
import tornadofx.*
[Link](letterSelections,numberSelections) { l,
n -> "$l-$n"}
.subscribeBy(
onNext = { println(it) },
onError = { [Link]() },
onComplete = { println("Done!") }
)
}
}
Figure 5.3
This seems like a good idea, right? When I select a letter, and I select a number,
the two are zipped together and sent to the Observer ! But there is something
subtle and problematic with this. Select multiple letters without selecting any
numbers, then select multiple numbers. Notice how the letters are backlogged and
132
5. Combining Observables
each one is waiting for a number to be paired with? This is problematic and
probably not what you want. If you select "A", then "B", then "C" followed by "1",
then "2", then "3", you are going to get "A-1", "B-2", and "C-3" printed to the
console.
Here is another way of looking at it. The problem with our zipping example is for
every selected "letter", you need to select a "number" to evenly pair with it. If you
make several selections to one combo box and neglect to make selections on the
other, you are going to have a backlog of emissions waiting to be paired. If you
select eight different letters (shown below), and only four numbers, the next
number you select is going to pair with the "D", not "F" which is currently selected.
If you select another letter its only going to worsen the backlog and make it more
confusing as to what the next number will pair with.
A 1
B 5
A 3
A 6
D
C
A
F
If you want to only combine the latest values from each Observable and ignore
previous ones, you might want to use combineLatest() which we will cover
next.
Note you can make zip more than two Observables. If you have three
Observables, it will zip three emissions together before consolidating them into a
single emission.
Combine Latest
With our zipping example earlier, it might be more expected if we combine values
by chasing after the latest values. Using combineLatest() instead of zip() ,
we can select a value in either ComboBox . Then it will emit with the latest value
from the other ComboBox .
133
5. Combining Observables
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
Observable<String> letterSelections =
[Link]([Link]
rty());
Observable<Integer> numberSelections =
[Link]([Link]
rty());
[Link](letterSelections, numberSelecti
ons, (l, n) -> l + "-" + n)
.subscribe([Link]::println,
Throwable::printStackTrace,
() -> [Link]("Done!")
);
134
5. Combining Observables
[Link](new Scene(root));
[Link]();
}
}
Kotlin
import [Link]
import [Link]
import [Link]
import tornadofx.*
[Link](letterSelections,numberSelecti
ons) {l,n -> "$l-$n"}
.subscribeBy(
onNext = { println(it) },
onError = { [Link]() },
onComplete = { println("Done!") }
)
}
}
If you select "A","4", "E", and then "1", you should get this output.
135
5. Combining Observables
OUTPUT:
A-4
E-4
E-1
Selecting "4" with "A" will emit with the latest letter "A". Then selecting "E" will emit
with the latest number "4", and finally selecting "1" will emit with "E".
Simply put, a change in value for either ComboBox will result in the latest value
for both being pushed forward. For combining UI input events, we often are only
concerned with the latest user inputs and do not care about previous ones.
Therefore, combineLatest() is often useful for JavaFX.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
ObservableList<String> endLocations =
136
5. Combining Observables
[Link](
[Link](startLocations),
[Link](endLocations),
(l1, l2) -> {
ArrayList<String> combined = new ArrayList<>
();
[Link](l1);
[Link](l2);
return combined;
}
).subscribe(allLocations::setAll);
[Link](0);
}
}
Kotlin
import [Link]
137
5. Combining Observables
import [Link]
import [Link]
import tornadofx.*
val endLocations =
[Link]("San Diego", "
Salt Lake City", "Seattle")
[Link]([Link]
able(),
[Link]()) {l1,l2 ->
ArrayList<String>().apply {
addAll(l1)
addAll(l2)
}
}.subscribe {
[Link](it)
}
138
5. Combining Observables
t) }
[Link](0)
}
}
OUTPUT:
If you want to go a step further, you can easily modify this operation so that the
combined ObservableList only contains distinct items from both
ObservableLists. Simply add a flatMapSingle() before the Observer that
intercepts the ArrayList , turns it into an Observable , distincts it, and
collects it back into a List . Notice when you run it, the duplicate "Dallas"
emission is held back.
139
5. Combining Observables
Java
[Link](
[Link](startLocations),
[Link](endLocations),
(l1,l2) -> {
ArrayList<String> combined = new ArrayList<>();
[Link](l1);
[Link](l2);
return combined;
}
).flatMapSingle(l -> [Link](l).distinct().toLis
t())
.subscribe(allLocations::setAll);
Kotlin
[Link]([Link](),
[Link]()) {l1,l2 ->
mutableListOf<String>().apply {
addAll(l1)
addAll(l2)
}
}.flatMapSingle {
[Link]().distinct().toList()
}.subscribe {
[Link](it)
}
OUTPUT:
140
5. Combining Observables
A more advanced but elegant way to accomplish either task above is to return an
Observable<Observable<String>> from the combineLatest() , and then
flatten it with a flatMap() afterwards. This avoids creating an intermediary
ArrayList and is a bit leaner.
Java
[Link](
[Link](startLocations),
[Link](endLocations),
(l1,l2) -> [Link](l1).concatWith(Observ
[Link](l2))
).flatMapSingle(obs -> [Link]().toList())
.subscribe(allLocations::setAll);
Kotlin
141
5. Combining Observables
[Link]([Link](),
[Link]()) { l1,l2 ->
[Link]().concatWith([Link]())
}.flatMapSingle {
[Link]().toList()
}.subscribe {
[Link](it)
}
This is somewhat more advanced, so do not worry if you find the code above
challenging to grasp. It is a pattern where an Observable is emitting
Observables, and you can feel free to move on and study it again later as you get
more comfortable with Rx.
Summary
In this chapter, we covered combining Observables and which combine operators
are helpful to use with UI events vs simply merging data. Hopefully by now, you
are excited that you can achieve tasks beyond what the JavaFX API provides.
Tasks like synchronizing an ObservableList to the contents of two other
ObservableLists become almost trival with reactive programming. Soon we will get
to the most anticipated feature of RxJava: concurrency with observeOn() and
subscribeOn() . But first, we will cover a few final topics before we hit that.
142
6. Bindings
6. Bindings
There are situations where JavaFX will want a Binding rather than an RxJava
Observable or Observer , and we will cover some streamlined utilities to
meet this need. We will also cover JavaFX Dialogs and how to use them
reactively.
Java
143
6. Bindings
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();
[Link]().bind(
[Link]().length().isNotEqualTo(6)
);
[Link]().addAll(label,input,button);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
144
6. Bindings
import tornadofx.*
[Link]().bind(
[Link]().length().isNotEqualTo(6)
)
}
}
 of the
Button .
Java
145
6. Bindings
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
VBox root = new VBox();
[Link]([Link]())
.map(s -> [Link]() != 6)
.subscribe(b -> [Link]().setValu
e(b));
[Link]().addAll(label,input,button);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
146
6. Bindings
import [Link]
import tornadofx.*
[Link]().toObservable()
.map { [Link] != 6 }
.subscribe { [Link]().set(it) }
}
}
If you are fluent in Rx, this is more intuitive than native JavaFX Bindings. It is also
much more flexible as a given ObservableValue remains openly mutable rather
than being strictly bound to another ObservableValue . But there are times you
will need to use Bindings to fully work with the JavaFX API. If you need to create a
Binding off an RxJava Observable , there is a factory/extension function to
turn an RxJava Observable<T> into a JavaFX Binding<T> . Let's take a look
at one place where Bindings are needed: TableViews.
Say you have the given domain type Person . It has a birthday property that
holds a LocalDate . The getAge() is an Observable<Long> driven off the
birthday and is converted to a Binding<Long> . When you change the
birthday , it will push a new Long value to the Binding (Figure 6.2).
Java
import [Link];
import [Link];
147
6. Bindings
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
[Link] = [Link](
[Link](birthdayProperty())
.map(dt -> [Link](dt,L
[Link]()))
);
}
148
6. Bindings
Kotlin
import [Link]
import [Link]
import tornadofx.*
import [Link]
import [Link]
In Java, you can also fluently use the to() operator to map the Observable
to any arbitrary type. We can use it to streamline turning it into a Binding .
[Link] = [Link](birthdayProperty())
.map(dt -> [Link](dt,LocalDate
.now()))
.to(JavaFxObserver::toBinding);
);
Now if you put a few instances of Person in a TableView , each row will then
come to life (Figure 6.2).
Java
import [Link];
import [Link];
import [Link];
149
6. Bindings
import [Link].*;
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().setAll(
new Person("Thomas Nield", [Link](1989,1,18
)),
new Person("Sam Tulsa",[Link](1980,5,12)),
new Person("Ron Johnson",[Link](1975,3,8))
);
[Link]().addAll(nameCol,birthdayCol,ageCol);
150
6. Bindings
[Link](new Scene(table));
[Link]();
}
Kotlin
import [Link]
import tornadofx.*
import [Link]
column("Name",Person::nameProperty)
column("Birthday",Person::birthdayProperty).useTextField
(LocalDateStringConverter())
column("Age",Person::age)
}
}
Figure 6.2
151
6. Bindings
When you edit the "Birthday" field for a given row, you will see the "Age" field
update automatically. This is because the age Binding is subscribed to the
RxJava Observable derived from the birthday Property .
Java
Kotlin
152
6. Bindings
Disposing Bindings
If we are going to remove records from a TableView . we will need to dispose
any Bindings that exist on each item. This will dispose() the Binding from
the RxJava Observable to prevent any memory leaks and free resources.
It is good practice to put a method on your domain type that will dispose all
Bindings on that item. For our Person , we will want to dispose() the age
Binding when that Person is no longer needed.
Java
// existing code
Kotlin
//existing code
Whever you remove items from the TableView , call dispose() on each
Person so all Observables are unsubscribed. If your domain type has several
Bindings, you can add them all to a CompositeBinding . This is basically a
153
6. Bindings
collection of Bindings that you can dispose() all at once. Say we added
another Binding to Person called isAdult (which is conveniently built off
age by turning it into an Observable ). It may be convenient to add both
Bindings to a CompositeBinding in the constructor, so dispose() will
dispose them both.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
[Link] = [Link](birthdayProperty())
.map(dt -> [Link](dt,LocalDate
.now()))
.to(JavaFxObserver::toBinding);
[Link] = [Link](age)
154
6. Bindings
[Link](age);
[Link](isAdult);
}
Kotlin
155
6. Bindings
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
import [Link]
Lazy Bindings
When you create a Binding<T> off an Observable<T> , it will subscribe
eagerly and request emissions immediately. There may be situtations you would
rather a Binding<T> be lazy and not subscribe to the Observable<T> until a
value is first needed (specifically, when getValue() is called). This is
156
6. Bindings
particularly helpful for data controls like TableView where only visible records in
view will request values. If you scroll quickly, it will only request values when you
slow down on a set of records. This way, the TableView does not have to
calculate all values for all records, but rather just the ones you see.
If we wanted to make our two reactive Bindings on Person lazy, so they only
subscribe when that Person is in view, call toLazyBinding() instead of
toBinding() .
Java
[Link] = [Link](birthdayProperty())
.map(dt -> [Link](dt,LocalDate
.now()))
.to(JavaFxObserver::toLazyBinding);
Kotlin
In some situations, you may have a Binding that is driven off an Observable
that queries a database (using RxJava-JDBC) or some other service. Because
these requests can be expensive, toLazyBinding() can be valuable to
initiatlize the TableView more quickly. Of course, this lazy loading can
sometimes cause laggy scrolling by holding up the JavaFX thread, and we will
learn about concurrency later in this book to mitigate this.
Summary
In this chapter we learned about turning Observables into JavaFX Bindings, which
helps interop RxJava with JavaFX more thoroughly. Typically you do not need to
use Bindings often as RxJava provides a robust means to synchronize properties
and events, but some parts of the JavaFX API expect a Binding which you now
have the means to provide.
157
6. Bindings
158
7. Dialogs and Multicasting
Dialogs
JavaFX Dialogs are popups to quickly show a message to the user or solicit an
input. They can be helpful in reactive applications, so they also have a factory to
turn their response into an Observable .
You can pass an Alert or Dialog to the fromDialog() factory, and it will
return an Observable that emits the response as a single emission. Then it will
call onCompleted() .
Java
159
7. Dialogs and Multicasting
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](
new Alert([Link], "Are you
sure you want to do this?")
).subscribe(response -> [Link]("You pressed "
+ [Link]()));
[Link](0);
}
}
Kotlin
160
7. Dialogs and Multicasting
import [Link]
import [Link]
import tornadofx.*
[Link](0)
}
}
For example, say you have a "Run Process" Button that will kick of a simple
process emitting the integers 1 through 10, and then collects them into a List .
Pretend this process was something more intensive, and you want the user to
confirm on pressing the Button if they want to run it. You can use a Dialog to
intercept ActionEvent emissions in a flatMapMaybe() , map to the
Dialog 's response, and allow only emissions that are [Link] . Then
you can flatMap() that emission to kick off the process (Figure 6.3), which
actually yields a Single so we will use flatMapSingle() .
Java
161
7. Dialogs and Multicasting
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](runButton)
.flatMapMaybe(ae ->
[Link](new Alert(Al
[Link], "Are you sure you want to run the pr
ocess?"))
.filter(response -> [Link]
als([Link]))
).flatMapSingle(response -> [Link](1,10
).toList())
.subscribe(i -> [Link]("Processed in
teger list: " + i));
[Link](new Scene(root));
[Link]();
}
}
162
7. Dialogs and Multicasting
Kotlin
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
Figure 6.3
163
7. Dialogs and Multicasting
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](runButton)
.flatMap(ae ->
[Link](1,10)
.flatMapMaybe(i ->
[Link]
164
7. Dialogs and Multicasting
log(
new Alert(Alert.
[Link],
"Are you
sure you want to process integer " + i + "?",
ButtonTy
[Link], [Link])
).filter(response -> res
[Link]([Link]))
.map(response -> i)
)
[Link](new Scene(root));
[Link]();
}
}
Kotlin
165
7. Dialogs and Multicasting
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
Figure 6.4
166
7. Dialogs and Multicasting
The map(response -> i) is a simple trick you can do to take a response after
it has been filtered, and map it back to the integer. If you say "YES" to 1, 3, 6 and
"NO" to everything else, you should get the output above. 2,4,5,7,9, and 10 never
made it to the Observer because "NO" was selected and filtered them out.
That is how you can reactively leverage Dialogs and Alerts, and any control that
implements Dialog to return a single result can be reactively emitted in this
way.
Multicasting
For the sake of keeping the previous chapters accessible, I might have mislead
you when I said UI events are hot Observables. The truth is by default, they are a
gray area between a hot and cold Observable (or should I say "warm"?).
Remember, a "hot" Observable will emit to all Observers at once, while a "cold"
Observable will replay emissions to each Observer individually. This is a
pragmatic way to separate the two, but UI event factories in RxJavaFX (as well as
RxBindings for Android) awkwardly operate as both hot and cold unless you
multicast, or force an emission to hotly be emitted to all Observers.
To understand this subtle impact, here is a trick question. Say you have a Maybe
driven off a Dialog or Alert , and it has two Observers. Do you think the
response is going to go to both Subscribers?
Java
167
7. Dialogs and Multicasting
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](0);
}
}
Kotlin
168
7. Dialogs and Multicasting
import [Link]
import [Link]
import [Link]
import tornadofx.*
[Link](0)
}
}
Try running it and you will see the Alert popup twice, once for each
Observer . This is almost like it's a cold Observable and it is "replaying" the
Dialog procedure for each Observer . As a matter of fact, that is exactly what
is happening. Both Observer are receiving their own, independent streams. You
can actually say OK on one Observer and CANCEL to the other. The two
Subscribers are, in fact, not receiving the same emission as you would expect in a
hot Observable .
This behavior is not a problem when you have one Observer . But when you
have multiple Observers, you will start to realize this is not a 100% hot
Observable . It is "hot" in that previous emissions are missed by tardy
169
7. Dialogs and Multicasting
Observers, but it is not "hot" in that a single set of emissions are going to all
Observers. To force the latter to happen, you can multicast, and that will force this
Observable to be 100% hot.
Java
170
7. Dialogs and Multicasting
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]();
[Link](0);
}
}
Kotlin
171
7. Dialogs and Multicasting
import [Link]
import [Link]
import [Link]
import tornadofx.*
[Link]()
[Link](0)
}
}
When you run this program, you will now see the Alert only pop up once, and
the single response will go to both Observers simultaneously. Every operator
before the publish() will be a single stream of emissions. But take note that
everything after the publish() is subject to be on separate streams from that
point.
The Maybe as well as the Single are not able to multicast, so just turn
them into an Observable via toObservable() when you need to.
172
7. Dialogs and Multicasting
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](0);
}
}
Kotlin
173
7. Dialogs and Multicasting
import [Link]
import [Link]
import [Link]
import tornadofx.*
[Link](0)
}
}
174
7. Dialogs and Multicasting
Observable is going to fire emissions the moment it is subscribed, you may just
want to manually set up a ConnectableObservable , subscribe the Observers,
and call connect() yourself.
Again, use multicasting for UI event Observables when there is more than one
Observer. Even though most of the time this makes no functional difference, it is
more efficient. It also will prevent subtle misbehaviors like we saw in cases like the
Dialog , where we want to force a single emission stream to go to all Observers
rather than each Observer getting its own emissions in a cold-like manner. If
you have only one Observer , the additional overhead of
ConnectableObservable is not necessary.
Replaying Observables
Another kind of ConnectableObservable is the one returned by the
replay() operator. This will hold on to the last x number of emissions and
"replay" them to each new Observer .
Java
175
7. Dialogs and Multicasting
Kotlin
OUTPUT:
1
2
3
4
5
5
The replay() operator can be helpful to replay the last emitted value for a UI
input (e.g a ComboBox ) so new Observers immediately get that value rather than
missing it. There are other argument overloads for replay() to replay
emissions for other scopes, like time windows. But simply holding on to the last
emission is a common helpful pattern for reactive UI's.
You can also use the cache() operator to horde and replay ALL emissions,
but keep in mind this can increase memory usage and cause data to go stale.
Summary
176
7. Dialogs and Multicasting
If you got this far, congrats! We have covered a lot. We ran through reactive
usage of Dialogs, which you can use to intercept emissions and get a user input
for each one, as well as multicasting. The topic of multicasting is critical to
understand because UI Observables do not always act hot when multiple
Observers are subscribed. Creating a ConnectableObservable is an effective
way to force an Observable to become hot and ensure each emission goes to
all Observers at once, and remove redundant listeners
Make sure you are somewhat comfortable with the material we covered so far,
because next we are going to cover concurrency. This is the topic that everything
leads up to, as RxJava revolutionizes how we multithread safely.
177
8. Concurrency
8. Concurrency
Concurrency has become a critical skill in software development. Most computers
and smart phones now have multiple core processors, and this means the most
effective way to scale performance is to leverage all of them. For this full utilization
to happen, code must explicitly be coded for multiple threads that can be worked
by multiple cores.
Even if you have less threads than tasks (such as two threads and three tasks),
the two threads can tackle two of the tasks immediately. The first one to get done
can then move on to the third task. This is essentially what a thread pool does. It
has a fixed number of threads and is given a "queue" of tasks to do. Each thread
will take a task, execute it, and then take another. "Reusing" threads and giving
them a queue of tasks, rather than creating a thread for each task, is usually more
efficient since threads are expensive to create and dispose.
178
8. Concurrency
Using subscribeOn()
By default, for a given Observable chain, the thread that calls the
subscribe() method is the thread the Observable sends emissions on. For
instance, a simple subscription to an Observable inside a main() method will
fire the emissions on the main daemon thread.
Java
import [Link];
Kotlin
179
8. Concurrency
import [Link]
OUTPUT:
Java
180
8. Concurrency
import [Link];
import [Link];
import [Link];
try {
[Link](3);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
import [Link]
import [Link]
import [Link]
[Link](3)
}
181
8. Concurrency
OUTPUT:
This way we can declare our Observable chain and an Observer , but then
immediately move on without waiting for the emissions to finish. Those are now
happening on a new thread named RxNewThreadScheduler-1 . Notice too we
have to call [Link](3) afterwards to make the main thread
sleep for 3 seconds. This gives our Observable a chance to fire all emissions
before the program exits. You should not have to do this sleep() with a JavaFX
application since its own non-daemon threads will keep the session alive.
A critical behavior to note here is that all emissions are happening sequentially on
a single RxNewThreadScheduler-1 thread. Emissions are strictly happening
one-at-a-time on a single thread. There is no parallelization or racing to call
onNext() throughout the chain. If this did occur, it would break the
Observable contract. It may surprise some folks to hear that RxJava is not
parallel, but we will cover some concurrency tricks with flatMap() later to get
parallelization without breaking the Observable contract.
182
8. Concurrency
Java
import [Link];
import [Link];
import [Link];
//Observer 1
[Link](i ->
[Link]("Observer 1 receiving " + i +
" on thread "
+ [Link]().getName())
);
//Observer 2
[Link](i ->
[Link]("Observer 2 receiving " + i +
" on thread "
+ [Link]().getName())
);
try {
[Link](3);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
183
8. Concurrency
import [Link]
import [Link]
import [Link]
//Observer 1
[Link] { println("Observer 1 receiving $it on thre
ad ${[Link]().name}") }
//Observer 2
[Link] { println("Observer 2 receiving $it on thre
ad ${[Link]().name}") }
[Link](3)
}
OUTPUT:
184
8. Concurrency
The most effective way to keep thread creation under control is to "reuse" threads.
You can do this with the different Schedulers . A Scheduler is RxJava's
equivalent to Java's standard Executor . You can create your own Scheduler
by passing an Executor to the [Link]() factory. But for most
cases, it is better to use RxJava's standard Schedulers as they are optimized to
be conservative and efficient for most cases.
Computation
If you are doing computation-intensive operations, you will likely want to use
[Link]() which will maintain a conservative number of
threads to keep the CPU from being taxed.
Observable operations that are doing calculation and algorithm-heavy work are
optimal to use with the computation Scheduler . If you are not sure how many
threads will be created by a process, you might want to make this one your go-to.
IO
If you are doing a lot of IO-related tasks, like sending web requests or database
queries, these are much less taxing on the CPU and threads can be created more
liberally. [Link]() is suited for this kind of work. It will add and remove
threads depending on how much work is being thrown at it at a given time, and
reuse threads as much as possible.
But be careful as it will not limit how many threads it creates! As a rule-of-thumb,
assume it will create a new thread for each task.
Immediate
185
8. Concurrency
You will likely not use this Scheduler very often since it is the default. The code
above is no different than declaring an Observable with no subscribeOn() .
Trampoline
An interesting Scheduler is the [Link]() . It will schedule
the emissions to happen on the immediate thread, but allow the immediate thread
to finish its current task first. In other words, this defers execution of the emissions
but will fire them the moment the current thread declaring the subscription is no
longer busy.
You will likely not use the Trampoline Scheduler unless you encounter nuanced
situations where you have to manage complex operations on a single thread and
starvation can occur. The JavaFX Scheduler uses a trampoline mechanism, which
we will cover next.
JavaFX Scheduler
Finally, the JavaFxScheduler is packaged with the RxJavaFX library. It
executes the emissions on the JavaFX thread so they can safely make
modifications to a UI. It uses a trampoline policy against the JavaFX thread,
making it highly resilient against recursive hangups and thread starvation.
The JavaFX Scheduler is not in the Schedulers class, but rather is stored as a
singleton in its own class. You can call it like below:
186
8. Concurrency
In Kotlin, The RxKotlinFX library can save you some boilerplate by using an
extension function instead.
Java
[Link](button)
.subscribeOn([Link]()) // has no effect
.subscribe(ae -> [Link]("You clicked me!"));
Kotlin
[Link]()
.subscribeOn([Link]()) // has no effect
.subscribe { println("You clicked me!") }
Also note that the JavaFX Scheduler is already used when declaring UI code, and
will be the default subscribeOn() Scheduler since it is the immediate thread.
Therefore, you will rarely call subscribeOn() to specify the JavaFxScheduler.
You are more likely to use it with observeOn() .
187
8. Concurrency
Intervals
While we are talking about concurrency, it is worth mentioning there are other
factories that already emit on a specific Scheduler . For instance, there are
factories in both RxJava and RxJavaFX to emit at a specified time interval.
Here is an application that will increment a Label every second (Figure 8.1).
Java
188
8. Concurrency
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().add(label);
[Link](new Scene(root));
[Link](60);
[Link]();
}
}
Kotlin
189
8. Concurrency
import [Link]
import [Link]
import tornadofx.*
import [Link]
minWidth = 60.0
label {
[Link](1, [Link], JavaFxSched
[Link]())
.map { [Link]() }
.subscribe { text = it }
}
}
}
OUTPUT:
0
1
2
3
4
Figure 8.1
190
8. Concurrency
Using observeOn()
A lot of folks get confused by the difference between subscribeOn() and
observeOn() , but the distinction is quite simple. A subsribeOn() instructs
the source Observable what thread to emit items on. However, the
observeOn() switches emissions to a different thread at that point in the chain.
Java
191
8. Concurrency
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]("Alpha","Beta","Gamma","Delta","Epsilon"
)
.subscribeOn([Link]())
.toList()
.observeOn([Link]())
.subscribe(list -> [Link]().setAll(li
st));
[Link]().add(listView);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
192
8. Concurrency
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
listview<String> {
[Link]("Alpha","Beta","Gamma","Delta","Epsi
lon")
.subscribeOn([Link]())
.toList()
.observeOnFx()
.subscribeBy { [Link](it) }
}
}
}
Figure 8.2
The five Strings are emitted and collected into a List on a [Link]()
thread. But immediately after the toList() is an observeOn() that takes that
List and emits it on the JavaFX Scheduler. Unlike the subscribeOn() where
193
8. Concurrency
placement does not matter, the placement of the observeOn() does. It switches
to a different thread at that point in the Observable chain.
This all happens a bit too fast to see this occuring, so let's exaggerate this
example and emulate a long-running database query or request. Use the
delay() operator to delay the emissions by 3 seconds. Note that delay()
subscribes on the [Link]() by default, so having a
subscribeOn() no longer has any effect. But we can pass the
[Link]() as a third argument to make it use an IO thread instead
(Figure 8.3).
Java
194
8. Concurrency
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().add(listView);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
195
8. Concurrency
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
listview<String> {
[Link]("Alpha","Beta","Gamma","Delta","Epsi
lon")
.delay(3, [Link], [Link]())
.toList()
.observeOnFx()
.subscribeBy { [Link](it) }
}
}
}
Figure 8.3
 calls. Here is a more real-
life example: let's say you want to create an application that displays a text
response (such as JSON) from a URL. This has the potential to create an
unrespsonsive application that freezes while it is fetching the request. But using
an observeOn() we can switch this work from the FX thread to an IO therad,
then call another observeOn() afterwards to put it back on the FX thread.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
[Link](true);
[Link](button)
.map(ae -> [Link]())
.observeOn([Link]())
.map(MyApp::getResponse)
197
8. Concurrency
.observeOn([Link]())
.subscribe(output::setText);
[Link]().setAll(label,input,output,button);
[Link](new Scene(vBox));
[Link]();
}
private static String getResponse(String path) {
try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();
}
}
}
Kotlin
198
8. Concurrency
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
label("Input URL")
val input = textfield()
val output = textarea {
isWrapText = true
}
button("Submit").actionEvents()
.map { [Link] }
.observeOn([Link]())
.map {
URL([Link]).readText()
}.observeOnFx()
.subscribe {
[Link] = it
}
}
}
Figure 8.4
199
8. Concurrency
Figure 8.5
Of course, you can click the "Submit" Button multiple times and that could
queue up the requests in an undesirable way. But at least the work is kept off the
UI thread. In the next chapter we will learn about the switchMap() to mitigate
excessive user inputs and kill previous requests, so only the latest emission is
chased after.
But we will take a stateful strategy for now to prevent this from happening.
200
8. Concurrency
doOnXXXFx() Operators
Remember the doOnXXX() operators like doOnNext() , doOnComplete() ,
etc? RxKotlinFX has JavaFX equivalents that will perform on the FX thread,
regardless of which Scheduler is being used. This can be helpful to modify UI
elements in the middle of an Observable chain.
For example, you might want to disable the Button and change its text during
processing. Your first instinct might be to use the doOnNext() to achieve this.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
[Link](true);
[Link](button)
.map(ae -> [Link]())
201
8. Concurrency
.observeOn([Link]())
.doOnNext(path -> {
[Link]("BUSY");
[Link](true);
})
.map(MyApp::getResponse)
.observeOn([Link]())
.subscribe(r -> {
[Link](r);
[Link]("Submit");
[Link](false);
});
[Link]().setAll(label,input,output,button);
[Link](new Scene(vBox));
[Link]();
}
private static String getResponse(String path) {
try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();
}
}
}
Kotlin
202
8. Concurrency
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
label("Input URL")
val input = textfield()
val output = textarea {
isWrapText = true
}
[Link]()
.map { [Link] }
.observeOn([Link]())
.doOnNext {
[Link] = "BUSY"
[Link] = true
}
.map {
URL([Link]).readText()
}.observeOnFx()
.subscribe {
[Link] = it
[Link] = "Submit"
[Link] = false
}
}
}
203
8. Concurrency
But if you try to execute a request this way, you will get an error indicating that the
submitButton is not being modified on the FX thread. This is occuring because
an IO thread (not the FX thread) is trying to modify the Button . You could move
this doOnNext() operator before the observeOn([Link]()) so it
catches the FX thread, and that would address the issue. But there will be times
where you must call a doOnNext() deep in an Observable chain that is
already on another thread (such as updating a ProgressBar ).
Java
import [Link];
import [Link];
import [Link]
rs;
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
204
8. Concurrency
[Link](true);
[Link](button)
.map(ae -> [Link]())
.observeOn([Link]())
.compose([Link](t -
> {
[Link]("BUSY");
[Link](true);
}))
.map(MyApp::getResponse)
.observeOn([Link]())
.subscribe(r -> {
[Link](r);
[Link]("Submit");
[Link](true);
});
[Link]().setAll(label,input,output,button);
[Link](new Scene(vBox));
[Link]();
}
private static String getResponse(String path) {
try {
return new Scanner(new URL(path).openStream(), "UTF-
8").useDelimiter("\\A").next();
} catch (Exception e) {
return [Link]();
}
}
}
Kotlin
205
8. Concurrency
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
label("Input URL")
val input = textfield()
val output = textarea {
isWrapText = true
}
[Link]()
.map { [Link] }
.observeOn([Link]())
.doOnNextFx {
[Link] = "BUSY"
[Link] = true
}
.map {
URL([Link]).readText()
}.observeOnFx()
.subscribe {
[Link] = it
[Link] = "Submit"
[Link] = true
}
}
}
206
8. Concurrency
Java does not have extension functions like Kotlin. But RxJava does have a
compose() operator that you can pass a Transformer to, as well as a
lift() operator to accept custom operators. Between these two methods,
it is possible to create your own operators for RxJava. However, these are
beyond the scope of this book. You can read about creating your own
operators in my book _Learning RxJava.
Here are all the doOnXXXFx() operators availabe in RxKotlin. These behave
exactly the same way as the doOnXXX() operators introduced in Chapter 2, but
the action specified in the lambda will execute on the FX thread.
doOnNextFx()
doOnErrorFx()
doOnCompletedFx()
doOnSubscribeFx()
doOnUnsubscribeFx()
doOnTerminateFx()
doOnNextCountFx()
doOnCompletedCountFx()
doOnErrorCountFx()
Parallelization
Did you know the flatMap() (as well as flatMapSingle() and
flatMapMaybe() ) is actually a concurrency tool? RxJava by default does not do
parallelization, so effectively there is no way to parallelize an Observable . As
we have seen, subscribeOn() and observeOn() merely move emissions
from one thread to another thread, not one thread to many threads. However, you
can leverage flatMap() to create several Observables parallelizing emissions
on different threads.
207
8. Concurrency
Java
import [Link];
import [Link];
try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
public static int runLongProcess(int i) {
try {
[Link]((long) ([Link]() * 1000));
} catch (Exception e) {
[Link]();
}
return i;
}
}
Kotlin
208
8. Concurrency
import [Link]
import [Link]
[Link](15000)
}
OUTPUT:
Received 1 on RxComputationScheduler-1
Received 3 on RxComputationScheduler-3
Received 5 on RxComputationScheduler-1
Received 9 on RxComputationScheduler-1
Received 4 on RxComputationScheduler-4
Received 8 on RxComputationScheduler-4
Received 2 on RxComputationScheduler-3
Received 6 on RxComputationScheduler-3
Received 7 on RxComputationScheduler-3
Received 10 on RxComputationScheduler-3
Your output may look different from what is above, and that is okay since nothing
is deterministic when we do this sort of parallelized concurrency. But notice we
have processing happening on at least three threads (RxComputationScheduler-
209
8. Concurrency
It is critical to note that the flatMap() can fire emissions from multiple
Observables inside it, all of which may be running on different threads. But to
respect the Observable contract, it must make sure that emissions leaving the
flatMap() towards the Observer are serialized in a single Observable . If
one thread is busy pushing items out of the flatMap() , the other threads will
leave their emissions for that occupying thread to take ownership of in a queue.
This allows the benefit of concurrency without any blocking or synchronization of
threads.
You can learn more about achieving RxJava parallelization in two articles
written by yours truly: Acheiving Parallelization and [Maximizing
Parallelization]([Link]
[Link]).
Summary
In ths chapter we have learned one of the main selling points of Rxjava: concise,
flexible, and composable concurrency. You can compose Observables to change
concurrency policies at any time with the subscribeOn() and observeOn() .
This makes applications adaptable, scalable, and evolvable over long periods of
time. You do not have to mess with synchronizers, semaphores, or any other low-
level concurrency tools as RxJava takes care of these complexities for you.
But we are not quite done yet. As we will see in the next chapter, we can leverage
concurrency to create features you might have thought impractical to put in your
applications.
210
9. Switching, Throttling, and Buffering
In UI development, users will inevitably click things that kick off long-running
processes. Even if you have concurrency in place, users that rapidly select UI
inputs can kick of expensive processes, and those processes will start to queue
up undesirably. Other times, we may want to group up rapid emissions to make
them a single unit, such as typing keystrokes. There are tools to effectively
overcome all these problems, and we will cover them in this chapter.
Flowables are highly critical if you are working with large amounts of data
concurrently. However, you cannot effectively use Flowables against user
interface events because a user cannot programmatically be told to "slow dow'
and respect a backpressure request. You can, however, use reactive operators to
"knock down" emissions from a rapidly-firing source which this chapter will cover.
You can also compose Flowables and Observables together using conversion
operators like Observable#toFlowable() and Flowable#toObservable() ,
as well as Observable#flatMapPublisher() and
Flowable#flatMapObservable() .
211
9. Switching, Throttling, and Buffering
But backpressure and Flowables are beyond the scope of this book. Please read
Chapter 8 of my Packt book Learning RxJava to get thorough examples,
explanations, and use cases for Flowables and backpressure.
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().setAll("Alpha","Beta","Gamma",
"Delta","Epsilon","Zeta","Eta");
[Link]([Link]
l().getSelectedItems())
212
9. Switching, Throttling, and Buffering
[Link]().addAll(listView, itemsView);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
213
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
import tornadofx.*
listview<String> {
[Link]
vable()
.flatMapSingle { [Link]()
.flatMap { [Link]().map(Char
::toString).toObservable() }
.toList()
}.subscribe { [Link](it) }
}
}
}
Figure 9.1
214
9. Switching, Throttling, and Buffering
This is a pretty quick computation which hardly keeps the JavaFX thread busy.
But in the real world, running database queries or HTTP requests can take awhile.
The last thing we want is for these rapid inputs to create a queue of requests that
will quickly make the application unusable as it works through the queue. Let's
emulate this by using the delay() operator. Remember that the delay()
operator already specifies a subscribeOn() internally, but we can specify an
argument which Scheduler it uses. Let's put it in the IO Scheduler. The
Observer must receive each emission on the JavaFX thread, so be sure to
observeOn() the JavaFX Scheduler before the emission goes to the
Observer .
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
215
9. Switching, Throttling, and Buffering
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().setAll("Alpha","Beta","Gamma",
"Delta","Epsilon","Zeta","Eta");
[Link]([Link]
l().getSelectedItems())
.flatMapSingle( list -> [Link](
list)
.delay(3, [Link], Schedulers.i
o())
.flatMap (s -> [Link]([Link]
lit("(?!^)")))
.toList()
).observeOn([Link]())
.subscribe(l -> [Link]().setAll(l));
[Link]().addAll(listView, itemsView);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
216
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
listview<String> {
[Link]
vable()
.flatMapSingle { [Link]()
.delay(3,
[Link], Schedulers
.io())
.flatMap { [Link]().map(Char
::toString).toObservable() }
.toList()
}.observeOnFx().subscribe { [Link](it)
}
}
}
}
Now if we click several items on the top ListView , you will notice a 3-second
lag before the letters show up on the bottom ListView . This emulates long-
running requests for each click, and now we have these requests queuing up and
causing the bottom ListView to go berserk, trying to display each previous
217
9. Switching, Throttling, and Buffering
request before it gets to the current one. Obviously, this is undesirable. We likely
want to kill previous requests when a new one comes in, and this is simple to do.
Just change the flatMapSingle() that emits the List<String> of
characters to a switchMap() . Since there is no switchMapSingle() , just
convert that resulting Single from toList() to an Observable .
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link]().setAll("Alpha","Beta","Gamma",
"Delta","Epsilon","Zeta","Eta");
[Link]([Link]
l().getSelectedItems())
.switchMap ( list -> [Link](lis
t)
.delay(3, [Link], Schedulers.i
o())
218
9. Switching, Throttling, and Buffering
[Link]().addAll(listView, itemsView);
[Link](new Scene(root));
[Link]();
}
}
Kotlin
219
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
listview<String> {
[Link]
vable()
.switchMap { [Link]()
.delay(3, [Link], Schedule
[Link]())
.flatMap { [Link]().map(Char
::toString).toObservable() }
.toList()
.toObservable()
}.observeOnFx().subscribe { [Link](it)
}
}
}
}
This makes the application much more responsive. The switchMap() works
identically to any variant of flatMap() , but it will only chase after the latest user
input and kill any previous requests. In other words, it is only chasing after the
220
9. Switching, Throttling, and Buffering
latest Observable derived from the latest emission, and unsubscribing any
previous requests. The switchMap() is a powerful utility to create responsive
and resilient UI's, and is the perfect way to handle click-happy users!
You can also use the switchMap() to cancel long-running or infinite processes
using a neat little trick with [Link]() . For instance, a
ToggleButton has a true/false state depending on whether it is selected. When
you emit its false state, you can return an empty Observable to kill the
previous processing Observable , as shown below. When the ToggleButton
is selected, it will kick off an [Link]() that emits a consecutive
integer every 10 milliseconds. But unselecting the ToggleButton will cause the
flatMap() to switch to an [Link]() , killing and unsubscribing
from the [Link]() (Figure 9.2).
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
[Link]([Link](
))
.switchMap(selected -> {
221
9. Switching, Throttling, and Buffering
if (selected) {
[Link]("STOP");
return [Link](10, TimeUnit.
MILLISECONDS);
} else {
[Link]("START");
return [Link]();
}
})
.observeOn([Link]())
.subscribe(i -> [Link]([Link]())
);
[Link]().setAll(toggleButton,timerLabel);
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
222
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
import [Link]
import tornadofx.*
import [Link]
[Link]().toObservable()
.switchMap { selected ->
if (selected) {
[Link] = "STOP"
[Link](10, [Link]
CONDS)
} else {
[Link] = "START"
[Link]()
}
}.observeOnFx()
.subscribe {
[Link] = [Link]()
}
}
}
Figure 9.2
223
9. Switching, Throttling, and Buffering
The switchMap() can come in handy for any situation where you want to switch
from one Observable source to another.
Buffering
We may want to collect emissions into a List , but doing so on a batching
condition so several lists are emitted. The buffer() operators help accomplish
this, and they have several overload flavors.
The simplest buffer() specifies the number of emissions that will be collected
into a List before that List is pushed forward, and then it will start a new
one. In this example, emissions will be grouped up in batches of 10 .
Java
import [Link];
Kotlin
import [Link]
OUTPUT:
224
9. Switching, Throttling, and Buffering
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
There are other flavors of buffer() . Another will collect emissions based on a
specified time cutoff. If you have an Observable emitting at an interval of 300
milliseconds, you can buffer them into a List at every second. This is what the
output would look like:
Java
import [Link];
import [Link];
[Link](300, [Link])
.buffer(1, [Link])
.subscribe([Link]::println);
try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
225
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
[Link](10000)
}
OUTPUT:
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11, 12]
[13, 14, 15]
[16, 17, 18]
[19, 20, 21, 22]
[23, 24, 25]
[26, 27, 28]
[29, 30, 31, 32]
Java
226
9. Switching, Throttling, and Buffering
import [Link];
import [Link];
[Link](300, [Link])
.buffer(oneSecondInterval)
.subscribe([Link]::println);
try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
227
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
[Link](300, [Link])
.buffer(oneSecondInterval)
.subscribe { println(it) }
[Link](10000)
}
OUTPUT:
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11, 12]
[13, 14, 15]
[16, 17, 18]
[19, 20, 21, 22]
[23, 24, 25]
[26, 27, 28]
[29, 30, 31, 32]
This is a helpful way to buffer() lists because you can use another
Observable to control when the Lists are emitted. We will see an example of
this at the end of this chapter when we group up keystrokes.
228
9. Switching, Throttling, and Buffering
Note that there are also window() operators that are similar to buffer() , but
they will return an Observable<Observable<T>> instead of an
Observable<List<T>> . In other words, they will return an Observable
emitting Observables rather than Lists. These might be more desirable in some
situations where you do not want to collect Lists and want to efficiently do further
operations on the groupings.
You can read more about buffer() and window() on the RxJava Wiki.
Throttling
When you have a rapidly firing Observable , you may just want to emit the first
or last emission within a specified scope. For example, you can use
throttleLast() (which is also aliased as sample() ) to emit the last
emission for each fixed time interval.
Java
import [Link];
import [Link];
[Link](300, [Link])
.throttleLast(1, [Link])
.subscribe([Link]::println);
try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
229
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
[Link](300, [Link])
.throttleLast(1, [Link])
.subscribe { println(it) }
[Link](10000)
}
OUTPUT:
2
5
8
12
15
18
22
25
28
32
throttleFirst() will do the opposite and emit the first emission within each
time interval. It will not emit again until the next time interval starts and another
emission occurs in it.
Java
230
9. Switching, Throttling, and Buffering
import [Link];
import [Link];
[Link](300, [Link])
.throttleFirst(1, [Link])
.subscribe([Link]::println);
try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
import [Link]
import [Link]
[Link](300, [Link])
.throttleFirst(1, [Link])
.subscribe { println(it) }
[Link](10000)
}
OUTPUT:
231
9. Switching, Throttling, and Buffering
0
4
8
12
16
20
24
28
32
Java
232
9. Switching, Throttling, and Buffering
import [Link];
import [Link];
[Link](1, [Link])
.subscribe([Link]::println);
try {
[Link](10000);
} catch (InterruptedException e) {
[Link]();
}
}
}
Kotlin
233
9. Switching, Throttling, and Buffering
import [Link]
import [Link]
[Link](1, [Link])
.subscribe { println(it) }
[Link](10000)
}
OUTPUT:
A9
B0
B1
C3
I labeled each source as "A", "B", or "C" and concatenated that with the index of
the emission that was throttled. You will notice that the 10 rapid emissions
resulted in the last emission "A9" getting fired after the 1-second interval of "B"
resulted in that inactivity. Then "B0" and "B1" had 1 second breaks between them
resulting in them being emitted. But "B3" did not go forward because "C" started
firing at 500 millisecond intervals and gave no inactivity interval for it to fire. Then
"C3" was the last emission to fire at the final respite.
If you want to see more examples and marble diagrams of these operators, check
out the RxJava Wiki article.
234
9. Switching, Throttling, and Buffering
Grouping Up Keystrokes
Now we will move on to a real-world example that puts everything in this chapter
in action. Say we have a ListView<String> containing all 50 states of the
United States (I saved them to a plain text file on GitHub Gist. When we have the
ListView selected, we want users to be able to start typing a state and it will
immediately jump to the first state that starts with that inputted String .
Achieving this can be a bit tricky. As a user is typing rapidly, we want to collect
those emissions into a single String to turn individual characters into words.
When the user stops typing, we want to stop collecting characters and push that
String forward so it is selected in a ListView . Here is how we can do that:
Java
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
235
9. Switching, Throttling, and Buffering
[Link]().add(listView);
[Link](new Scene(root));
[Link]();
}
236
9. Switching, Throttling, and Buffering
}
}
}
Kotlin
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
import [Link]
[Link](200, [Link]).startWith(
"")
237
9. Switching, Throttling, and Buffering
.switchMap {
[Link] { x,y -> x + y }
.switchMap { input ->
[Link]()
.filter { [Link]
().startsWith([Link]()) }
.take(1)
}
}.observeOnFx()
.subscribe {
[Link](it)
}
}
}
Figure 9.3 - A ListView that will select states that are being typed
238
9. Switching, Throttling, and Buffering
two separate tasks: 1) Signal the user has stopped typing after 200ms of inactivity
via debounce() 2) Receive that signal emission within a switchMap() , where
typedKeys is used again to infinitely scan() typed characters and
concatentate them together as the user types. Then each concatenation of
characters is compared to all the states and finds the first one that matches. That
state is then put back on the FX thread and to the Observer to be selected.
This is probably the most complex task I have found in using RxJava with JavaFX,
but it is achieving an incredible amount of complex concurrent work with little
code. Take some time to study the code above. Although it may take a few
moments (or perhaps days) to sink in, try to look at what each part is doing in
isolation. An infinite Observable is doing a rolling concatenation of user
keystrokes to form Strings (and using a switchMap() to kill off previous
searches). That inifinite Observable is killed after 200 ms of inactivity and
replaced with a new inifinte Observable , effectively "resetting" it.
Once you get a hang of this, you will be unstoppable in creating high-quality
JavaFX applications that can not only cope, but also leverage rapid user inputs.
Summary
In this chapter, we learned how to handle a high volume of emissions effectively
through various strategies. When Observers cannot keep up with a hot
Observable , you can use switching, throttling, and buffering to make the
volume manageable. We also learned some powerful patterns to group up
emissions based on timing mechanisms, and make tasks like processing
keystrokes fairly trivial.
We are almost done with our RxJava journey. In the final chapter, we will cover a
question probably on many readers' minds: decoupling UI's when using RxJava.
239
10. Decoupling
RxJava has a special reactive type called a Subject , which comes in a number
of implementations. A Subject operates as both an Observable and an
Observer . However, you need to be selective when to use Subjects as they can
introduce antipatterns. They can come in handy to decouple reactive streams by
having multiple "sources" subscribe a Subject to their emissions, and then that
Subject will pass those emissions downstream to any receiving Observers.
Subjects automatically multicast as well.
Below, we do a simple separation between the source of text input values and an
Observer that consumes them by putting them in a Label . The Subject
will act as a proxy between them.
Java
240
10. Decoupling
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
@Override
public void start(Stage stage) throws Exception {
[Link](new Scene(vBox));
[Link]();
}
}
Kotlin
241
10. Decoupling
import [Link]
import [Link]
import tornadofx.*
label {
[Link](String::reversed)
.subscribe { text = it }
}
}
}
Java
242
10. Decoupling
import [Link];
import [Link];
import [Link];
import [Link];
private MyEventModel() {}
Kotlin
import [Link]
import [Link]
object MyEventModel {
val refreshRequests = [Link]<ActionEvent>()
}
243
10. Decoupling
Java
[Link](tableView, KeyEvent.KEY_PRESSED)
.filter(ke -> [Link]() && [Link]().equals(
KeyCode.R))
.map(ke -> new ActionEvent());
.subscribe([Link]().getRefreshRequests
());
Kotlin
244
10. Decoupling
These three event sources are now proxied through one Subject . You can then
have one or more Observers subscribe() to this Subject , and they will
respond to any of these three sources requesting a refresh.
Java
Kotlin
[Link]
.subscribe { refresh() }
You can set up as many models as you like with as many Subjects as you like to
pass different data and events back-and-forth throughout your application.
245
10. Decoupling
You can learn more about these subjects in Rx documentation as well as the
Learning RxJava Packt Book.
Summary
In this chapter we covered how to separate reactive streams between UI
components with the Subject , which can serve as a proxy between Observable
sources and downstream Observers. You can put Subject instances in a
backing class to serve as an Rx-flavored event bus to relay data and events. Use
the Subject to consolidate mutliple event sources that drive the same action, or
to cleanly separate your Observable sources and terminal Observers.
Closing
You have reached the end of this book. Congrats! Keep researching RxJava and
learn what it can do inside and outside of JavaFX. You will find it is used on
Android via the RxAndroid and RxBindings libraries, as well as on backend
development with RxNetty and other frameworks. I encourage you to keep
learning the various operators and check out books and online resources to grow
your proficiency.
I highly encourage reading my Packt book Learning RxJava to get more thorough
knowledge of RxJava beyond JavaFX applications. I wrote it in the same style and
spirit as this book, with the intent of helping the largest number of people possible.
The book can be purchased on Packt, Amazon, and other book retailers. It is also
available via subscription to O'Reilly Safari and Packt Mapt.
246
10. Decoupling
[Link]
Thomas Nield
247