rxjs: has to deal with data that comes over time
- wrapper around data source(stream of values)
- async data (not limited to this)
- is push-based(the observer pushes data into the observer)
- can start emitting values before any observer subscribes to it
- starts emitting values when it has at least one observer
- does something whenever
- a value/error occurs
- the observable reports the
completestatus
- 3 methods
next()error()complete()The observable knows when to call these methods throughout the Subscription
subscribe()- tells the observable that someone wants to know about its values
- connects observer with observable
- what starts a stream execution
- sequence of data elements which are available over time
- give shape to the emitted data so that the subs can only see the final result and not the variable collection of emitted values
-
mergeMap/flatMap,switchMap,concatMap,exhaustMap -
maps outer observable to inner observable and automatically subscribes to the inner one
-
is unicast: each subscribed observer owns an independent execution of the observable
Example
const src = new Observable(obs => { obs.next(Math.random().toFixed(2)) }); src.subscribe(v => console.log('subscription 1', v)); src.subscribe(v => console.log('subscription 2', v));
-
frequently called
Observable-of-Observables -
emits events that are Observable themselves
By default, a high-order observable will emit values are that the streams resulted from the inner observable.
After flattening, the outer observable will emit the inner observable's events, instead of emitting the inner observable itself.
- 0-N values
- sync or async
- lazy(
cold) or eager(hot) - cancellable or non cancellable
- unicast or multicast
- 1 value
- eager
- async
- multicast only
-
is a hot observable: the new observers won't be able to get the previous values if they do not subscribe at the right time
-
is multicast: the observable execution is shared among multiple subscribers
Example
const subj = new Subject(); subj.subscribe(v => console.log('subscription 1', v)); // Same number subj.subscribe(v => console.log('subscription 2', v)); // Same number subj.next(Math.random());
-
when subscribing to a subject, it registers the given Observer in a list of Observers
-
can be used as data producer and as data consumer: using subjects as a data consumer, you can convert Observables from unicast to multicast
Example
const observable = new Observable(obs => obs.next(Math.random())); const subject = new Subject(); subject.subscribe(v => console.log('subscription 1', v)); // Same number subject.subscribe(v => console.log('subscription 2', v)); // Same number /* * All the subscribers of the subject will receive the value emitted * by the initial observable */ observable.subscribe(subject);
- map into a single observable
- run subscriptions/reqs in parallel
- post, put, delete requests when order is NOT important
- cancels the current inner observable if another value is emitted by the stream
- use it for search requests or cancelable requests (type ahead etc..)
- runs subscriptions/requests in order
- will wait for the current inner observable to complete before subscribing to the next one
- does buffer emitted values
- the opposite of
switchMap - will not subscribe to the next observable until the current one completes
- use for login(don't want more requests until the initial one is complete)
-
keeps accumulating values until observable emits values or completes
Example
const s = new Subject(); interval(300) .pipe( take(5) ) .subscribe( v => { s.next(v); }, ); // Emits value if 1s has passed without something happening const debouncedSubj$ = s.pipe(debounceTime(1000)); // Accumulate the emitted values const addVal$ = s.pipe(buffer(debouncedSubj$)); addVal$.subscribe(res => { console.log('res', res); // --> [0, 1, 2, 3, 4] })
- collects values emitted from the source Observable and stores them as an array;
when it starts collecting values, it calls the provided function(the function returns an observable); after the returned observable emits, the buffer will be closed(thus, the collected items will be sent to the consumer) and then it will restart collecting value;
Example
const clicks = fromEvent(document, 'click');
clicks
.pipe(
bufferWhen(() => timer(1000))
)
.subscribe(console.log)- starts collecting values when
openingsemits and close the buffer whenclosingSelectoremits
Example
const clicks = fromEvent(document, 'click');
const openings = of(1, 2);
const buffered = clicks.pipe(bufferToggle(openings, i =>
i % 2 === 0 ? timer(2000) : EMPTY
));
buffered.subscribe(x => console.log(x));-
collect values until
bufferSizeis reached -
if
startBufferEveryis specified, a new buffer will start eachstartBufferEveryvalues
Example
of(1, 2, 3, 4, 5, 6, 7, 8)
.pipe(
bufferCount(3)
)
// .subscribe(console.log)
/*
--->
[1, 2, 3]
[4, 5, 6]
[7, 8]
*/
of(1, 2, 3, 4, 5, 6, 7, 8)
.pipe(
bufferCount(3, 2)
)
.subscribe(console.log)
/*
--->
[1, 2, 3]
[3, 4, 5]
[5, 6, 7]
[7, 8]
*/-
the
$obsdetermines the time span of emission silence -
if no other value is emitted while awaiting
X time, the crt value will be emitted, otherwise discarded
Example
const example = Observable.create(subs => {
subs.next('123')
subs.next('222')
subs.next('333')
setTimeout(() => {
subs.next('444')
}, 700)
setTimeout(() => {
subs.next('555')
}, 1201)
});
example.pipe(debounce(() => timer(500)))
.subscribe(console.log)
// => 333, 444, 555-
emits when the operation inside(i.e:
promise) is readyExample
const p = () => new Promise((resolve, reject) => { resolve(); }); of({}) .pipe( flatMap(() => defer(() => p())), ) .subscribe(() => {});
-
each subscriber gets a new subscription
Example
const getObs = () => { let cnt = 1; return interval(1000) .pipe(map(() => cnt++)); }; const observable = getObs(); // observable // .subscribe(v => console.log('SUBSCRIBER 1: ', v)) // observable // .subscribe(v => console.log('SUBSCRIBER 2: ', v)) /* SUBSCRIBER 1: 7 SUBSCRIBER 2: 8 SUBSCRIBER 1: 9 SUBSCRIBER 2: 10 SUBSCRIBER 1: 11 SUBSCRIBER 2: 12 ... */ const getObsWithDefer = () => { return defer(() => { let cnt = 1; return interval(1000) .pipe(map(() => cnt++)) }); } const observableWithDefer = getObsWithDefer(); observableWithDefer .subscribe(v => console.log('SUBSCRIBER 1: ', v)) observableWithDefer .subscribe(v => console.log('SUBSCRIBER 2: ', v)) /* SUBSCRIBER 1: 1 SUBSCRIBER 2: 1 SUBSCRIBER 1: 2 SUBSCRIBER 2: 2 SUBSCRIBER 1: 3 SUBSCRIBER 2: 3 SUBSCRIBER 1: 4 SUBSCRIBER 2: 4 */
-
searches for the first item in the Observable that matches the condition, then completes
-
does not emit an error if a valid value is not found
Example
of(3, 1, 5, 9, 15, 14, 75, 30)
.pipe(
find(v => v % 15 === 0)
)
.subscribe(console.log) // 15-
like
first, but if multiple values that match the condition are emitted, it will emit with error notification -
if the source Observable emits items but none match the specified predicate, then
undefinedis emitted
Example
of(3, 1, 5, 9, 15, 14, 75, 30)
.pipe(
single(v => v % 15 === 0)
)
.subscribe(
console.log, // 15, if /* 14, 75, 30 */ - commented
console.warn // Sequence contains more than one element, if /* 14, 75, 30 */ - uncommented
) - flattens an Observable-of-Observables by using the
combineLateststrategy when the source Observable-of-Observables completes
Example
// Observable-of-Observables
const highOrder = of(3, 4, 2)
.pipe(
map(v => interval(v * 1000).pipe(take(3)))
)
highOrder
.pipe(
combineAll()
)
.subscribe(
console.log,
null,
() => console.log('completed')
)
/*
3 ------0------1------2
4 --------0--------1--------2
2 ----0----1----2-------------------------
combineAll()
3 4 2
0 0 0
0 0 1
1 0 1
1 0 2
1 1 2
2 1 2
2 2 2
*/- first observable to emit is used, the others are being ignored
Example
race(
timer(1500).pipe(mapTo('1500!')),
timer(2000).pipe(mapTo('2000!')),
timer(1000).pipe(mapTo('1000!')),
timer(1200).pipe(mapTo('1200!')),
)
.subscribe(console.log) // 1000!-
generates a sequence of values based on an
initialValue,conditionanditeratate -
very similar to a traditional for loop
Example
generate(
2, // Initial value
x => x < 300, // Condition
x => x ** 2 // Iterate
).subscribe(console.log)
/*
--->
2
4
16
256
*/-
similar to
mergeMap, but it applies the project function to every source values as well as every output value - it is recursive -
it receives a project function that will return an Observable; this function will be applied to each emitted value by the source Observable, but also to the result of the function, which will get merged
Example
const clicks = fromEvent(document, 'click');
clicks
.pipe(
mapTo(1),
expand(v => of(v + 1).pipe(delay(1000))),
take(10)
)
.subscribe(console.log)-
similar to
throttleTime, but it will get the last silenced value -
when a value is emitted, it is ignored and ignores the next ones for
durationsms and then, when it is the case, it emits the most recent ignored value
Example
merge(
of(1), // Ignored, Printed if `throttleTime` was used
of(2), // Last Ignored
of(3).pipe(delay(400)), // Printed if `throttleTime` was used
of(4).pipe(delay(901)), // Printed if `throttleTime` was used
of(5).pipe(delay(600)), // Last ignored (400(referring to `3`) + 300 > 600)
of(6).pipe(delay(1200)), // Ignored (901 + 300 > 1200)
)
.pipe(/* throttleTime */auditTime(300))
.subscribe(console.log) // 2 5
// ===============================================
merge(
of(1), // Ignored;
of(2), // Last ignored
of(3).pipe(delay(400)), // New value, ignored 3 and any value that will be emitted in the range [400, 400 + 300]!;
of(4).pipe(delay(800)), // New value!(outside of [400, 400 + 300]); will be that last ignored as `6` will appear outside of [800, 800 + 300]
of(5).pipe(delay(600)), // Last ignored in the range [400, 400 + 300]
of(6).pipe(delay(1200)), // If this was missing, `4` wouldn't be printed
)
.pipe(/* throttleTime */auditTime(300))
.subscribe(console.log) // 2 5 4- you can get the source observable(second parameter) and you can return it in order to re-subscribe
Example
let cnt = 0;
concat(of('OK!'), throwError('foo'))
.pipe(
tap(v => console.log('emitted value!', v)),
catchError((err , $src) => {
console.log('performing logic...', err)
// Beware! This is a closure...
return (++cnt) <= 3 ? $src.pipe(map((v) => 'hmm!' + '*'.repeat(cnt) + v)) : of(`err caught: ${err}`);
})
)
.subscribe(console.warn)Example
// ----------------4------------>
// 0---------------------------->
// ----1------------------------>
// mergeAll()
// 0---1-----------4------------>
function promiseDelay(ms) {
return new Promise(resolve => {
setTimeout(() => resolve('done' + ms), ms);
});
}
of(promiseDelay(4000), promiseDelay(0), promiseDelay(1000))
.pipe(
mergeAll()
)
.subscribe(console.log)
/*
--->
done0
done1000
done4000
*/Example
const customFilter = (isEven: boolean) =>
filter((v: number) => v % 2 === 0 && isEven ? true : !isEven && v % 2 ? true: false)
const customFilterTwo = isEven => obs => obs.pipe(
filter((v: number) => v % 2 === 0 && isEven ? true : !isEven && v % 2 ? true: false)
)
// The same behavior as `customFilter`
const customFilterThree = isEven => customFilterTwo(isEven);
of(1, 2, 3, 4, 5, 6, 7, 8)
.pipe(
// customFilter(false),
// customFilterTwo(true),
customFilterThree(false),
)
.subscribe(console.log)