Kafka - Transactions

Reading material

  1. https://www.confluent.io/blog/transactions-apache-kafka/ How Transactions Work
  2. https://developer.confluent.io/courses/architecture/transactions/

What is needed to do this?

The producer has to explicitly set the parameters to indicate to the kafka brokers that it wants to do things transactionally.

ProducerConfig.TRANSACTIONAL_ID_CONFIG, "money-transfer"

The role of Transaction Coordinator

The transaction coordinator is a module running inside every Kafka broker. The transaction log is an internal kafka topic.

  1. The transaction coordinator will instantaneously deliver messages from the topic to all the consumers that are listening to the topic in a read uncommitted way.
  2. However, the transaction coordinator will not instantaneously deliver messages from the topic to all the consumers that are listening to the topic in a read committed way - unless the producer commits the message. If a message is not committed, it is still in a pending status.

If the producer writes a message to a topic but does not commit it, and if there was an error, and that message is not committed, will that message be deleted from the topic? No. Kafka topics are append only - immutable. Once messages are written to it, they cannot be removed. However, the transaction coordinator assigns markers to them to keep track of whether they are committed by the producer or not.

Should the consumers read messages from topics with “read committed” mode or “read uncommitted” mode?

Depends.

Is the producer writing messages in a transactional way? If not, just go with “read uncommitted” (which is the default).

If the producer writing messages in a transactional way and if the consumer wants to go with “committed records only”, go with “read committed” mode.

Source: https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

To use the transactional producer and the attendant APIs, you must set the transactional.id configuration property. If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.

Example scenario

We may run into scenarios where an application has to read messages from one topic, do some processing and write messages to a different topic.

This is called read-process-write application

We need all these operations to be done in one single transaction. Read message from a topic, acknowledge it, process it, write a message to another topic, and receive producer acknowledgement for it.

See https://github.com/explorer436/programming-playground/tree/main/java-playground/kafka-examples/reactive-kafka-playground/reactive-kafka-standalone-examples/src/main/java/kafka/examples/reactive/kafka/standalone/examples/transactions

Here, the scenario is,

  1. The transfer requests are coming in the topic transfer-requests
  2. After they are picked up by the application, validated and processed, the transaction events are going out to the topic transaction-events
  3. Set-up to test transactions
    kafka-topics.sh --bootstrap-server localhost:9092 --topic transfer-requests --create
    
    kafka-topics.sh --bootstrap-server localhost:9092 --topic transaction-events --create
    
    // To put transfer request messages into the topic "transfer-requests"
    kafka-console-producer.sh \
        --bootstrap-server localhost:9092 \
        --topic transfer-requests \
        --property key.separator=: \
        --property parse.key=true
    
    // To read transaction events from the topic "transaction-events"
    // Read the messages that are committed after successful acknowledging of the message
    kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 \
        --topic transaction-events \
        --property print.key=true \
        --isolation-level=read_committed \
        --from-beginning
    
    // To read transaction events from the topic "transaction-events"
    // Read all messages regardless of whether they are are committed after successful acknowledging of the message or not
    kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 \
        --topic transaction-events \
        --property print.key=true \
        --from-beginning
    
  4. Main code changes:
    1. In the Producer,
      private Mono<SenderResult<String>> sendTransaction(TransferEvent event){
          Flux<SenderRecord<String, String, String>> senderRecords = toSenderRecords(event);
          TransactionManager manager = sender.transactionManager();
      
          return manager
               .begin()
               .then(this.sender.send(senderRecords)
                       // delaying for demo
                       .concatWith(Mono.delay(Duration.ofSeconds(1)).then(Mono.fromRunnable(event.acknowledge())))
                       .concatWith(manager.commit())
                       .last())
               .doOnError(ex -> log.error(ex.getMessage()))
               .onErrorResume(ex -> manager.abort());
      }
      
    2. In the ProducerConfig,
      ProducerConfig.TRANSACTIONAL_ID_CONFIG, "money-transfer"
      

ProducerFencedException

This fatal exception indicates that another producer with the same transactional.id has been started. It is only possible to have one producer instance with a transactional.id at any given time, and the latest one to be started “fences” the previous instances so that they can no longer make transactional requests. When you encounter this exception, you must close the producer instance.

https://kafka.apache.org/24/javadoc/org/apache/kafka/common/errors/ProducerFencedException.html

How to prevent it?

Always append something unique to the transactional.id


Links to this note