Reactive Programming - Project Reactor - Flux
Flux
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
A Flux is a Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
- Emits 0 to N items
- Stream of messages
- Followed by an
onComplete
oronError
- Backpressure (Producer emits too much data when the consumer cannot handle it)
- Many additional methods specific to handle stream processing
Working with a Flux
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
We can do various transformations to this stream, including transforming it to an entirely different type of 0-N item stream.

- The top side numbers are the N items which are published by the Publisher to the Subscriber. It is a type of Flux item stream and the last line represents the completion of that flux stream. The operator box represents the transform operation on that flux stream.
- That transformation operation will be applied to the items of flux stream one by one and that is the onNext() method. The bottom set of numbers represents the Flux stream that was emitted after applying the transformation.
- The red X icon represents some error that happened when applying the transformation to that particular item in that Flux stream and that is the onError() method.
- If there were no errors, the transformation will be applied to all the elements with the onComplete() method.
Creating Flux publisher using the Factory methods
(not an exhaustive list)
Factory methods to create Flux from existing data | Usage |
---|---|
just | Data already present in memory |
fromIterable | Data already present in memory |
fromArray | Data already present in memory |
fromStream | Data already present in memory |
empty | no item to emit |
error | emit error |
range(start, count) | Range/Count |
interval(duration) | Periodic |
from(mono) | Mono -> Flux |
Mono.from(flux) | Flux -> Mono |
Flux.defer(() -> …) | To delay the execution |
Flux.create -> FluxSink
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}
- It is designed to be used when we have a single subscriber
- FluxSink is threadsafe. We can share it with multiple threads.
- We can keep on emitting data into the sink without worrying about downstream demand.
- FluxSink will deliver everything to Subscriber safely.
Flux.generate -> SynchronousSink
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
Objects.requireNonNull(generator, "generator");
return onAssembly((Flux)(new FluxGenerate(generator)));
}
- It is stateless.
- It will take a lambda expression and it will emit an item based on that lambda expression until infinity.
FluxSink vs SynchronousSink
-
Flux.create
acceptsConsumer<? super FluxSink<T>> emitter
-
Flux.generate
acceptsConsumer<SynchronousSink<T>> generator
-
With
FluxSink
, we can callnext()
any number of times. WithFlux.create()
, Consumer is invoked only once. -
With
FluxSink
, we can callnext()
any number of times. WithFlux.create()
, Consumer is invoked only once. -
With
Flux.create()
, Consumer can emit 0..N items immediately without worrying about downstream demand. -
With
Flux.generate()
, Consumer can emit only 1 item. Downstream demand is respected. -
With
Flux.create()
, is thread-safe. Thread-safety is not applicable toFlux.generate()
-
How is
SynchronousSink
different fromMono
?SynchronousSink
will keep emitting that one item until it is stopped.- With
Mono
, after it emits one item, it callsonComplete()~
private static void synchronousSink() { Flux.generate(synchronousSink -> { log.info("invoked"); synchronousSink.next(1); // synchronousSink.next(2); // cannot emit more than one item. // We can emit complete or error signals // synchronousSink.complete(); // synchronousSink.error(new RuntimeException("oops")); }) .take(4) .subscribe(Util.subscriber()); } 17:33:34.251 INFO [ main] r.p.e.f.Lec06FluxGenerate : invoked 17:33:34.254 INFO [ main] r.p.e.util.DefaultSubscriber : received: 1 17:33:34.255 INFO [ main] r.p.e.f.Lec06FluxGenerate : invoked 17:33:34.255 INFO [ main] r.p.e.util.DefaultSubscriber : received: 1 17:33:34.255 INFO [ main] r.p.e.f.Lec06FluxGenerate : invoked 17:33:34.255 INFO [ main] r.p.e.util.DefaultSubscriber : received: 1 17:33:34.255 INFO [ main] r.p.e.f.Lec06FluxGenerate : invoked 17:33:34.255 INFO [ main] r.p.e.util.DefaultSubscriber : received: 1 17:33:34.257 INFO [ main] r.p.e.util.DefaultSubscriber : received complete