Kafka - Transactions
Reading material
- https://www.confluent.io/blog/transactions-apache-kafka/
How Transactions Work
- https://developer.confluent.io/courses/architecture/transactions/
What is needed to do this?
The producer has to explicitly set the parameters to indicate to the kafka brokers that it wants to do things transactionally.
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "money-transfer"
The role of Transaction Coordinator
The transaction coordinator is a module running inside every Kafka broker. The transaction log is an internal kafka topic.
- The transaction coordinator will instantaneously deliver messages from the topic to all the consumers that are listening to the topic in a
read uncommitted
way. - However, the transaction coordinator will not instantaneously deliver messages from the topic to all the consumers that are listening to the topic in a
read committed
way - unless the producercommits
the message. If a message is notcommitted
, it is still in apending
status.
If the producer writes a message to a topic but does not commit it, and if there was an error, and that message is not committed, will that message be deleted from the topic? No. Kafka topics are append only - immutable. Once messages are written to it, they cannot be removed. However, the transaction coordinator assigns markers to them to keep track of whether they are committed by the producer or not.
Should the consumers read messages from topics with “read committed” mode or “read uncommitted” mode?
Depends.
Is the producer writing messages in a transactional way? If not, just go with “read uncommitted” (which is the default).
If the producer writing messages in a transactional way and if the consumer wants to go with “committed records only”, go with “read committed” mode.
Source: https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
To use the transactional producer and the attendant APIs, you must set the transactional.id configuration property. If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.
Example scenario
We may run into scenarios where an application has to read messages from one topic, do some processing and write messages to a different topic.
This is called read-process-write application
We need all these operations to be done in one single transaction. Read message from a topic, acknowledge it, process it, write a message to another topic, and receive producer acknowledgement for it.
Here, the scenario is,
- The transfer requests are coming in the topic
transfer-requests
- After they are picked up by the application, validated and processed, the transaction events are going out to the topic
transaction-events
- Set-up to test transactions
kafka-topics.sh --bootstrap-server localhost:9092 --topic transfer-requests --create kafka-topics.sh --bootstrap-server localhost:9092 --topic transaction-events --create // To put transfer request messages into the topic "transfer-requests" kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic transfer-requests \ --property key.separator=: \ --property parse.key=true // To read transaction events from the topic "transaction-events" // Read the messages that are committed after successful acknowledging of the message kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic transaction-events \ --property print.key=true \ --isolation-level=read_committed \ --from-beginning // To read transaction events from the topic "transaction-events" // Read all messages regardless of whether they are are committed after successful acknowledging of the message or not kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic transaction-events \ --property print.key=true \ --from-beginning
- Main code changes:
- In the Producer,
private Mono<SenderResult<String>> sendTransaction(TransferEvent event){ Flux<SenderRecord<String, String, String>> senderRecords = toSenderRecords(event); TransactionManager manager = sender.transactionManager(); return manager .begin() .then(this.sender.send(senderRecords) // delaying for demo .concatWith(Mono.delay(Duration.ofSeconds(1)).then(Mono.fromRunnable(event.acknowledge()))) .concatWith(manager.commit()) .last()) .doOnError(ex -> log.error(ex.getMessage())) .onErrorResume(ex -> manager.abort()); }
- In the ProducerConfig,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "money-transfer"
- In the Producer,
ProducerFencedException
This fatal exception indicates that another producer with the same transactional.id has been started. It is only possible to have one producer instance with a transactional.id at any given time, and the latest one to be started “fences” the previous instances so that they can no longer make transactional requests. When you encounter this exception, you must close the producer instance.
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/errors/ProducerFencedException.html
How to prevent it?
Always append something unique to the transactional.id