Reactive Programming - Operators
Table of Contents
Reactive Programming - Operators
Reading material
flatMap
vs concatMap
FlatMap merges the emissions of these Observables, so that they may interleave.
concatMap

flatMap

- The main difference between
flatMap
andconcatMap
is, withconcatMap
, the records from each Flux are processed sequentially.- Flat map uses merge operator while concatMap uses concat operator.
- https://github.com/ReactiveX/RxJava/wiki/Combining-Observables#merge
- https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#concat
- With
concatMap
, the final combined Flux will have all the records from Flux1 in order, Flux2 in order, Flux3 in order, Flux4 in order. - With
flatMap
, the final combined Flux will NOT have all the records from Flux1 in order, Flux2 in order, Flux3 in order, Flux4 in order. - If the ordering of messages is important (e.g. banking scenarios), concapMap is the better option.
- For a mix of both flatMap and concatMap scenarios, we will have to use
groupBy
The concatMap
output sequence is ordered - all of the items emitted by the first Observable being emitted before any of the items emitted by the second Observable, while flatMap
output sequence is merged - the items emitted by the merged Observable may appear in any order, regardless of which source Observable they came from.
Operator | Used for |
---|---|
concatMap | Sequential Batch Processing |
flatMap | Parallel Batch Processing |
groupBy | Parallel Batch Processing with message ordering |
Internal Implementations
Reactive Programming - Project Reactor - Operators
Reading material
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Operators.html
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/MonoOperator.html
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxOperator.html
Do
From ReactorX specification:
- doAction
- doAfterTerminate
- doOnComplete
- doOnCompleted
- doOnDispose
- doOnEach
- doOnError
- doOnLifecycle
- doOnNext
- doOnRequest
- doOnSubscribe
- doOnTerminate
- doOnUnsubscribe
- doseq
- doWhile
- drop
- dropRight
- dropUntil
- dropWhile
From Project Reactor specification:
- doFinally() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doFinally-java.util.function.Consumer- - bottom to top (see logs below)
- doFirst() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doFirst-java.lang.Runnable- - bottom to top (see logs below)
- doOnCancel() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnCancel-java.lang.Runnable-
- doOnComplete() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnComplete-java.lang.Runnable- - top to bottom (see logs below)
- doOnDiscard() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnDiscard-java.lang.Class-java.util.function.Consumer-
- doOnEach() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnEach-java.util.function.Consumer-
- doOnError() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnError-java.lang.Class-java.util.function.Consumer- - top to bottom (see logs below)
- doOnNext() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnNext-java.util.function.Consumer-
- Functional programming prefers pure functions (with no side effects)
- Prefer pure functions
- Immutability is not all that bad. You need to know when to use it and use it properly.
- doOnRequest() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnRequest-java.util.function.LongConsumer- bottom to top (see logs below)
- doOnSubscribe() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnSubscribe-java.util.function.Consumer- - top to bottom (see logs below)
- doOnTerminate() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnTerminate-java.lang.Runnable- - top to bottom (see logs below)
12:00:37.076 INFO [ main] r.p.e.o.Lec03DoCallbacks : doFirst-2
12:00:37.081 INFO [ main] r.p.e.o.Lec03DoCallbacks : doFirst-1
12:00:37.086 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnSubscribe-1: reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber@54d9d12d
12:00:37.087 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnSubscribe-2: reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber@6a6cb05c
12:00:37.087 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnRequest-2: 9223372036854775807
12:00:37.087 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnRequest-1: 2
12:00:37.089 INFO [ main] r.p.e.o.Lec03DoCallbacks : producer begins
12:00:37.089 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnNext-1: 0
12:00:37.089 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnNext-2: 0
12:00:37.089 INFO [ main] r.p.e.util.DefaultSubscriber : subscriber received: 0
12:00:37.089 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnNext-1: 1
12:00:37.089 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnNext-2: 1
12:00:37.089 INFO [ main] r.p.e.util.DefaultSubscriber : subscriber received: 1
12:00:37.090 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnCancel-1
12:00:37.090 INFO [ main] r.p.e.o.Lec03DoCallbacks : doFinally-1: cancel
12:00:37.090 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnComplete-2
12:00:37.090 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnTerminate-2
12:00:37.092 INFO [ main] r.p.e.util.DefaultSubscriber : subscriber received complete
12:00:37.092 INFO [ main] r.p.e.o.Lec03DoCallbacks : doFinally-2: onComplete
12:00:37.092 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnDiscard-1: 2
12:00:37.092 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnDiscard-2: 2
12:00:37.092 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnDiscard-1: 3
12:00:37.093 INFO [ main] r.p.e.o.Lec03DoCallbacks : doOnDiscard-2: 3
12:00:37.093 INFO [ main] r.p.e.o.Lec03DoCallbacks : producer ends