Understanding reactor-kafka

Sample implementation

See https://github.com/explorer436/programming-playground/tree/main/java-playground/kafka-examples/reactive-kafka-playground

Understanding reactor-kafka

io.projectreactor.kafka.reactor-kafka provides interfaces.

sender and receiver are words specific to this library - not to kafka. They chose different names (from the standard names) to avoid confusion.

This library has two packages:

  1. receiver
    1. io/projectreactor/kafka/reactor-kafka/1.3.17/reactor-kafka-1.3.17.jar!/reactor/kafka/receiver/KafkaReceiver.class
  2. sender
    1. io/projectreactor/kafka/reactor-kafka/1.3.17/reactor-kafka-1.3.17.jar!/reactor/kafka/sender/KafkaSender.class

These interfaces delegate the calls to spring’s org.springframework.kafka.spring-kafka classes.

Flux

Within reactor-kafka, the preferred abstraction for the Kafka Consumer (KafkaReceiver) is an inbound Flux, where the framework publishes all events received from Kafka.

/**
 * Starts a Kafka consumer that consumes records from the subscriptions or partition
 * assignments configured for this receiver. Records are consumed from Kafka and delivered
 * on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
 * when the returned Flux terminates.
 * <p>
 * Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
 * to commit the offset corresponding to the record. Acknowledged records are committed
 * based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
 * Records may also be committed manually using {@link ReceiverOffset#commit()}.
 *
 * @param prefetch amount of prefetched batches
 * @return Flux of inbound receiver records that are committed only after acknowledgement
 */
Flux<ReceiverRecord<K, V>> receive(Integer prefetch);

Within reactor-kafka, the preferred abstraction for the Kafka Producer (KafkaSender) is an outbound Flux, where the framework publishes all events received to Kafka.

 /**
 * Sends a sequence of records to Kafka and returns a {@link Flux} of response record metadata including
 * partition and offset of each record. Responses are ordered for each partition in the absence of retries,
 * but responses from different partitions may be interleaved in a different order from the requests.
 * Additional correlation metadata may be passed through in the {@link SenderRecord} that is not sent
 * to Kafka, but is included in the response {@link Flux} to match responses to requests.
 * <p>
 * Results are published when the send is acknowledged based on the acknowledgement mode
 * configured using the option {@link ProducerConfig#ACKS_CONFIG}. If acks=0, records are acknowledged
 * after the requests are buffered without waiting for any server acknowledgements. In this case the
 * requests are not retried and the offset returned in {@link SenderResult} will be -1. For other ack
 * modes, requests are retried up to the configured {@link ProducerConfig#RETRIES_CONFIG} times. If
 * the request does not succeed after these attempts, the request fails and an exception indicating
 * the reason for failure is returned in {@link SenderResult#exception()}.
 * {@link SenderOptions#stopOnError(boolean)} can be configured to stop the send sequence on first failure
 * or to attempt all sends even if one or more records could not be delivered.
 *
 * <p>
 * Example usage:
 * <pre>
 * {@code
 *     source = Flux.range(1, count)
 *                  .map(i -> SenderRecord.create(topic, partition, null, key(i), message(i), i));
 *     sender.send(source, true)
 *           .doOnNext(r -> System.out.println("Message #" + r.correlationMetadata() + " metadata=" + r.recordMetadata()));
 * }
 * </pre>
 *
 * @param records Outbound records along with additional correlation metadata to be included in response
 * @return Flux of Kafka producer response record metadata along with the corresponding request correlation metadata.
 *         For records that could not be sent, the response contains an exception that indicates reason for failure.
 */
<T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records);

Spring support for reactor-kafka

There are only two classes that act as the bridge (or connection) between spring-kafka library and reactor-kafka library. Other than providing these two templates, spring-kafka does not provide any additional support for reactor-kafka

The reactor-kafka is already very efficient. It implements a lot of things by itself - retry pattern, resilience features, and any other functionalities that the developers will need.

  1. ReactiveKafkaProducerTemplate
    1. https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.html
    2. Uses KafkaSender from reactor-kafka
  2. ReactiveKafkaConsumerTemplate
    1. https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.html
    2. Uses KafkaReceiver from reactor-kafka
    3. This Flux is established by invoking one of the following methods on the ReactiveKafkaConsumerTemplate:
      1. receive,
      2. receiveAtmostOnce,
      3. receiveAutoAck,
      4. receiveBatch or
      5. receiveExactlyOnce

Spring uses org/springframework/boot/spring-boot-autoconfigure/3.3.2/spring-boot-autoconfigure-3.3.2.jar!/org/springframework/boot/autoconfigure/kafka/KafkaProperties.class to read the properties from the application.yaml file.

org.springframework.kafka.support.serializer.JsonSerializer and org.springframework.kafka.support.serializer.JsonDeserializer come from spring-kafka.


Links to this note