Reactive Programming - Operators

Reactive Programming - Operators

Reading material

  1. https://reactivex.io/documentation/operators.html

flatMap vs concatMap

  1. https://reactivex.io/documentation/operators/flatmap.html

FlatMap merges the emissions of these Observables, so that they may interleave.

concatMap

flatMap

  1. The main difference between flatMap and concatMap is, with concatMap, the records from each Flux are processed sequentially.
    1. Flat map uses merge operator while concatMap uses concat operator.
    2. https://github.com/ReactiveX/RxJava/wiki/Combining-Observables#merge
    3. https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#concat
  2. With concatMap, the final combined Flux will have all the records from Flux1 in order, Flux2 in order, Flux3 in order, Flux4 in order.
  3. With flatMap, the final combined Flux will NOT have all the records from Flux1 in order, Flux2 in order, Flux3 in order, Flux4 in order.
  4. If the ordering of messages is important (e.g. banking scenarios), concapMap is the better option.
  5. For a mix of both flatMap and concatMap scenarios, we will have to use groupBy

The concatMap output sequence is ordered - all of the items emitted by the first Observable being emitted before any of the items emitted by the second Observable, while flatMap output sequence is merged - the items emitted by the merged Observable may appear in any order, regardless of which source Observable they came from.

Operator Used for
concatMap Sequential Batch Processing
flatMap Parallel Batch Processing
groupBy Parallel Batch Processing with message ordering

Internal Implementations

https://github.com/explorer436/programming-playground/tree/main/java-playground/kafka-examples/reactive-kafka-playground/reactive-kafka-standalone-examples/src/main/java/kafka/examples/reactive/kafka/standalone/examples/batch_or_parallel_processing

Reactive Programming - Project Reactor - Operators

Reading material

  1. https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Operators.html
  2. https://projectreactor.io/docs/core/release/api/reactor/core/publisher/MonoOperator.html
  3. https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxOperator.html

Do

From ReactorX specification:

  1. doAction
  2. doAfterTerminate
  3. doOnComplete
  4. doOnCompleted
  5. doOnDispose
  6. doOnEach
  7. doOnError
  8. doOnLifecycle
  9. doOnNext
  10. doOnRequest
  11. doOnSubscribe
  12. doOnTerminate
  13. doOnUnsubscribe
  14. doseq
  15. doWhile
  16. drop
  17. dropRight
  18. dropUntil
  19. dropWhile

From Project Reactor specification:

  1. doFinally() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doFinally-java.util.function.Consumer- - bottom to top (see logs below)
  2. doFirst() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doFirst-java.lang.Runnable- - bottom to top (see logs below)
  3. doOnCancel() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnCancel-java.lang.Runnable-
  4. doOnComplete() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnComplete-java.lang.Runnable- - top to bottom (see logs below)
  5. doOnDiscard() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnDiscard-java.lang.Class-java.util.function.Consumer-
  6. doOnEach() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnEach-java.util.function.Consumer-
  7. doOnError() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnError-java.lang.Class-java.util.function.Consumer- - top to bottom (see logs below)
  8. doOnNext() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnNext-java.util.function.Consumer-
    1. Functional programming prefers pure functions (with no side effects)
    2. Prefer pure functions
    3. Immutability is not all that bad. You need to know when to use it and use it properly.
  9. doOnRequest() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnRequest-java.util.function.LongConsumer- bottom to top (see logs below)
  10. doOnSubscribe() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnSubscribe-java.util.function.Consumer- - top to bottom (see logs below)
  11. doOnTerminate() - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnTerminate-java.lang.Runnable- - top to bottom (see logs below)
12:00:37.076 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doFirst-2
12:00:37.081 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doFirst-1
12:00:37.086 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnSubscribe-1: reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber@54d9d12d
12:00:37.087 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnSubscribe-2: reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber@6a6cb05c
12:00:37.087 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnRequest-2: 9223372036854775807
12:00:37.087 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnRequest-1: 2
12:00:37.089 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : producer begins
12:00:37.089 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnNext-1: 0
12:00:37.089 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnNext-2: 0
12:00:37.089 INFO  [           main] r.p.e.util.DefaultSubscriber   : subscriber received: 0
12:00:37.089 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnNext-1: 1
12:00:37.089 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnNext-2: 1
12:00:37.089 INFO  [           main] r.p.e.util.DefaultSubscriber   : subscriber received: 1
12:00:37.090 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnCancel-1
12:00:37.090 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doFinally-1: cancel
12:00:37.090 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnComplete-2
12:00:37.090 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnTerminate-2
12:00:37.092 INFO  [           main] r.p.e.util.DefaultSubscriber   : subscriber received complete
12:00:37.092 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doFinally-2: onComplete
12:00:37.092 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnDiscard-1: 2
12:00:37.092 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnDiscard-2: 2
12:00:37.092 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnDiscard-1: 3
12:00:37.093 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : doOnDiscard-2: 3
12:00:37.093 INFO  [           main] r.p.e.o.Lec03DoCallbacks       : producer ends

Internal implementation

https://github.com/explorer436/programming-playground/tree/main/java-playground/reactive-programming-examples/reactive-streams-using-project-reactor/src/main/java/reactive/programming/examples/operators