Understanding reactor-kafka
Sample implementation
Understanding reactor-kafka
io.projectreactor.kafka.reactor-kafka
provides interfaces.
sender
and receiver
are words specific to this library - not to kafka. They chose different names (from the standard names) to avoid confusion.
This library has two packages:
- receiver
io/projectreactor/kafka/reactor-kafka/1.3.17/reactor-kafka-1.3.17.jar!/reactor/kafka/receiver/KafkaReceiver.class
- sender
io/projectreactor/kafka/reactor-kafka/1.3.17/reactor-kafka-1.3.17.jar!/reactor/kafka/sender/KafkaSender.class
These interfaces delegate the calls to spring’s org.springframework.kafka.spring-kafka classes.
Flux
Within reactor-kafka, the preferred abstraction for the Kafka Consumer (KafkaReceiver
) is an inbound Flux, where the framework publishes all events received from Kafka.
/**
* Starts a Kafka consumer that consumes records from the subscriptions or partition
* assignments configured for this receiver. Records are consumed from Kafka and delivered
* on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
* when the returned Flux terminates.
* <p>
* Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
* to commit the offset corresponding to the record. Acknowledged records are committed
* based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
* Records may also be committed manually using {@link ReceiverOffset#commit()}.
*
* @param prefetch amount of prefetched batches
* @return Flux of inbound receiver records that are committed only after acknowledgement
*/
Flux<ReceiverRecord<K, V>> receive(Integer prefetch);
Within reactor-kafka, the preferred abstraction for the Kafka Producer (KafkaSender
) is an outbound Flux, where the framework publishes all events received to Kafka.
/**
* Sends a sequence of records to Kafka and returns a {@link Flux} of response record metadata including
* partition and offset of each record. Responses are ordered for each partition in the absence of retries,
* but responses from different partitions may be interleaved in a different order from the requests.
* Additional correlation metadata may be passed through in the {@link SenderRecord} that is not sent
* to Kafka, but is included in the response {@link Flux} to match responses to requests.
* <p>
* Results are published when the send is acknowledged based on the acknowledgement mode
* configured using the option {@link ProducerConfig#ACKS_CONFIG}. If acks=0, records are acknowledged
* after the requests are buffered without waiting for any server acknowledgements. In this case the
* requests are not retried and the offset returned in {@link SenderResult} will be -1. For other ack
* modes, requests are retried up to the configured {@link ProducerConfig#RETRIES_CONFIG} times. If
* the request does not succeed after these attempts, the request fails and an exception indicating
* the reason for failure is returned in {@link SenderResult#exception()}.
* {@link SenderOptions#stopOnError(boolean)} can be configured to stop the send sequence on first failure
* or to attempt all sends even if one or more records could not be delivered.
*
* <p>
* Example usage:
* <pre>
* {@code
* source = Flux.range(1, count)
* .map(i -> SenderRecord.create(topic, partition, null, key(i), message(i), i));
* sender.send(source, true)
* .doOnNext(r -> System.out.println("Message #" + r.correlationMetadata() + " metadata=" + r.recordMetadata()));
* }
* </pre>
*
* @param records Outbound records along with additional correlation metadata to be included in response
* @return Flux of Kafka producer response record metadata along with the corresponding request correlation metadata.
* For records that could not be sent, the response contains an exception that indicates reason for failure.
*/
<T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records);
Spring support for reactor-kafka
There are only two classes that act as the bridge (or connection) between spring-kafka
library and reactor-kafka
library. Other than providing these two templates, spring-kafka
does not provide any additional support for reactor-kafka
The reactor-kafka
is already very efficient. It implements a lot of things by itself - retry pattern, resilience features, and any other functionalities that the developers will need.
ReactiveKafkaProducerTemplate
- https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.html
- Uses
KafkaSender
fromreactor-kafka
ReactiveKafkaConsumerTemplate
- https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.html
- Uses
KafkaReceiver
fromreactor-kafka
- This Flux is established by invoking one of the following methods on the ReactiveKafkaConsumerTemplate:
- receive,
- receiveAtmostOnce,
- receiveAutoAck,
- receiveBatch or
- receiveExactlyOnce
Spring uses org/springframework/boot/spring-boot-autoconfigure/3.3.2/spring-boot-autoconfigure-3.3.2.jar!/org/springframework/boot/autoconfigure/kafka/KafkaProperties.class
to read the properties from the application.yaml file.
org.springframework.kafka.support.serializer.JsonSerializer
and org.springframework.kafka.support.serializer.JsonDeserializer
come from spring-kafka.