Reactive Programming - Project Reactor - Flux

Flux

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

A Flux is a Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

  1. Emits 0 to N items
  2. Stream of messages
  3. Followed by an onComplete or onError
  4. Backpressure (Producer emits too much data when the consumer cannot handle it)
  5. Many additional methods specific to handle stream processing

Working with a Flux

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

We can do various transformations to this stream, including transforming it to an entirely different type of 0-N item stream.

  1. The top side numbers are the N items which are published by the Publisher to the Subscriber. It is a type of Flux item stream and the last line represents the completion of that flux stream. The operator box represents the transform operation on that flux stream.
  2. That transformation operation will be applied to the items of flux stream one by one and that is the onNext() method. The bottom set of numbers represents the Flux stream that was emitted after applying the transformation.
  3. The red X icon represents some error that happened when applying the transformation to that particular item in that Flux stream and that is the onError() method.
  4. If there were no errors, the transformation will be applied to all the elements with the onComplete() method.

Creating Flux publisher using the Factory methods

(not an exhaustive list)

Factory methods to create Flux from existing data Usage
just Data already present in memory
fromIterable Data already present in memory
fromArray Data already present in memory
fromStream Data already present in memory
empty no item to emit
error emit error
range(start, count) Range/Count
interval(duration) Periodic
from(mono) Mono -> Flux
Mono.from(flux) Flux -> Mono
Flux.defer(() -> …) To delay the execution

Flux.create -> FluxSink

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
    return create(emitter, OverflowStrategy.BUFFER);
}
  1. It is designed to be used when we have a single subscriber
  2. FluxSink is threadsafe. We can share it with multiple threads.
  3. We can keep on emitting data into the sink without worrying about downstream demand.
  4. FluxSink will deliver everything to Subscriber safely.

Flux.generate -> SynchronousSink

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
    Objects.requireNonNull(generator, "generator");
    return onAssembly((Flux)(new FluxGenerate(generator)));
}
  1. It is stateless.
  2. It will take a lambda expression and it will emit an item based on that lambda expression until infinity.

FluxSink vs SynchronousSink

  1. Flux.create accepts Consumer<? super FluxSink<T>> emitter

  2. Flux.generate accepts Consumer<SynchronousSink<T>> generator

  3. With FluxSink, we can call next() any number of times. With Flux.create(), Consumer is invoked only once.

  4. With FluxSink, we can call next() any number of times. With Flux.create(), Consumer is invoked only once.

  5. With Flux.create(), Consumer can emit 0..N items immediately without worrying about downstream demand.

  6. With Flux.generate(), Consumer can emit only 1 item. Downstream demand is respected.

  7. With Flux.create(), is thread-safe. Thread-safety is not applicable to Flux.generate()

  8. How is SynchronousSink different from Mono?

    1. SynchronousSink will keep emitting that one item until it is stopped.
    2. With Mono, after it emits one item, it calls onComplete()~
    private static void synchronousSink() {
        Flux.generate(synchronousSink -> {
                    log.info("invoked");
                    synchronousSink.next(1);
                    // synchronousSink.next(2); // cannot emit more than one item.
    
                    // We can emit complete or error signals
                    // synchronousSink.complete();
                    // synchronousSink.error(new RuntimeException("oops"));
                })
                .take(4)
                .subscribe(Util.subscriber());
    }
    17:33:34.251 INFO  [           main] r.p.e.f.Lec06FluxGenerate      : invoked
    17:33:34.254 INFO  [           main] r.p.e.util.DefaultSubscriber   :  received: 1
    17:33:34.255 INFO  [           main] r.p.e.f.Lec06FluxGenerate      : invoked
    17:33:34.255 INFO  [           main] r.p.e.util.DefaultSubscriber   :  received: 1
    17:33:34.255 INFO  [           main] r.p.e.f.Lec06FluxGenerate      : invoked
    17:33:34.255 INFO  [           main] r.p.e.util.DefaultSubscriber   :  received: 1
    17:33:34.255 INFO  [           main] r.p.e.f.Lec06FluxGenerate      : invoked
    17:33:34.255 INFO  [           main] r.p.e.util.DefaultSubscriber   :  received: 1
    17:33:34.257 INFO  [           main] r.p.e.util.DefaultSubscriber   :  received complete