Kafka - Topics, Partitions and Offsets
- Kafka Topic
- Anatomy of a topic
- Retention periods
- Topic replication factor
- Kafka Topic durability
- The need for partitions
- Partitions and Offsets
- Concept of Leader for a partition
- How many topics should I create for my application?
- How many partitions should I create?
- Can we change the number of partitions on the fly?
- Partitions vs Replicas
- How many replicas should my partitions have? What replication factor should I use?
Kafka Topic
- It is a way of organizing data within a Kafka Cluster
- A topic is a collection of partitions.
A category to which records are published
. e.g. If we have a large news site, each news category could be a single Kafka topic.- To use anologies,
- they are like
Table
in a RDBMS (without all the constraints). There is no data verification. - they are similar to
Message Queues
- they are like
- Each topic is identified by its name.
- We can have as many topics in a cluster as we want. We can have thousands of topics. e.g. user-clicks, logs, purchases, tweets, truck gps, taxi gps, order events, etc.
- They support any kind of message format. e.g.
json
,avro
,text file
,binary
, etc. A stream of data within the kafka cluster
.- The sequence of messages is called a
data stream
. - We cannot query topics. Instead, use kafka producers to send data to topics and kafka consumers to read data from topics.
Topics are immutable.
Once data is written to a partition, it cannot be changed. We cannot delete it. We cannot update it.
Anatomy of a topic
As and when we create topics, we can look at the directory structure (of the docker container) to see whats going on with each topic.
[explorer436@explorer436-p50-20eqs27p03 01-kafka-setup]$ tree .
.
├── compose
│ ├── command-to-start.org
│ ├── data
│ │ ├── bootstrap.checkpoint
│ │ ├── cleaner-offset-checkpoint
│ │ ├── __cluster_metadata-0
│ │ │ ├── 00000000000000000000.index
│ │ │ ├── 00000000000000000000.log
│ │ │ ├── 00000000000000000000.timeindex
│ │ │ ├── 00000000000000000044.snapshot
│ │ │ ├── leader-epoch-checkpoint
│ │ │ ├── partition.metadata
│ │ │ └── quorum-state
│ │ ├── hello-world-0
│ │ │ ├── 00000000000000000000.index
│ │ │ ├── 00000000000000000000.log
│ │ │ ├── 00000000000000000000.timeindex
│ │ │ ├── leader-epoch-checkpoint
│ │ │ └── partition.metadata
│ │ ├── log-start-offset-checkpoint
│ │ ├── meta.properties
│ │ ├── recovery-point-offset-checkpoint
│ │ └── replication-offset-checkpoint
│ ├── docker-compose.yaml
│ └── props
│ └── server.properties
└── image
├── Dockerfile
└── runner.sh
7 directories, 23 files
The entries in the data
directory correspond to topics in our cluster.
hello-world-0
is a topic.
cluster_metadata-0
is also a topic - but it is an internal topic.
Retention periods
- Data is kept in the topic only for a limited time (default is one week - but this is configurable).
root@ba7f577efce3:/kafka/bin# kafka-topics.sh
Create, delete, describe, or change a topic.
--config <String: name=value> A topic configuration override for the
topic being created or altered. The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs. It is
supported only in combination with --
create if --bootstrap-server option
is used (the kafka-configs CLI
supports altering topic configs with
a --bootstrap-server option).
- https://kafka.apache.org/documentation/#topicconfigs_delete.retention.ms
- https://kafka.apache.org/documentation/#topicconfigs_retention.bytes
log.retention.bytes
log.retention.ms
log.retention.minutes
log.retention.hours
- https://kafka.apache.org/documentation/#brokerconfigs_log.retention.bytes
- https://kafka.apache.org/documentation/#brokerconfigs_log.retention.hours
Topic replication factor
- Topics in kafka should have a
replication factor
> 1 (usually between 2 and 3) - That way, if a broker is down, another broker can serve the data (from a copy of the topic)
- e.g. TopicA with 2 partitions and replication factor = 2
- If Broker102 goes down, Broker101 and Broker103 can still serve data.
Broker101 Broker102 Broker103
TopicA ------ TopicA--------- replication -------TopicA
Partition0 | Partition1 Partition1
|
|
replication
|
---------- TopicA
Partition0
Kafka Topic durability
- For a topic replication factor = 3, topic data durability can withstand 2 brokers loss.
- If replication factor = n, you can loose n-1 brokers and still recover your data.
The need for partitions
Problem 1 - Ordering of messages
For a bank account number, if there is a deposit transaction of $100 and a withdrawl transaction of $50, the ordering of these two trancsactions is important. If the order is not respected, there is a chance that the user will see errors while trying to withdraw cash from the account.
Problem 2 - Scaling of consumer instances
If we assume that the Kafka broker does not have any partitions, and if we have two consumer instances, both the instances will be reading messages from the same topic (that doesn’t have any partitions).
Why is this a problem?
Only Consumer1 will be reading all the messages from the topic.
And Consumer2 will not be reading/processing any messages at all. Why? Because Kafka delivers messages in the order
that it receives the messages.
The load on the instance of Consumer1 will be very high - while Consumer2 is idle.
------------ ------------
Topic A - Partition 0 |-- |0|1|2|3|4|5|6|7|8|9|10|11|12| -> | Consumer 1 | | Consumer 2 |
------------ ------------
Partitions solve two problems at the same time
- Message ordering
- Scaling for consumer instances
All the Kafka - Messages with the same key will end up in the same Partition.
If we use bank account numbers as the keys, all the transactions for that account number will be performed in order.
We have to choose the Keys according to our business scenarios.
Partitions and Offsets
Topics are split into partitions
(e.g. 100 partitions). The messages sent to kafka topics end up in these partitions. Messages within each partitions are ordered.
|-- Partition 0 |0|1|2|3|4|5|6|7|8|9|10|11|12|
|
kafka topic -- |-- Partition 1 |0|1|2|3|4|5|6|7|8|
|
|-- Partition 2 |0|1|2|3|4|5|6|7|8|9|10|
Offsets belong to Partitions - not to Topics.
- Each message within a partition gets an incremental id. This id is called
offset
. Offset
only have a meaning for a specific partition. e.g.offset 3
inpartition 0
does not represent the same data asoffset 3
inpartition 1
. Offsets are not re-used even if previous messages have been deleted.- Order of messages is guaranteed only within a partition but not across partitions.
- Data is assigned randomly to a partition unless a key is provided.
# to print offset, time etc
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic hello-world \
--property print.offset=true \
--property print.timestamp=true
--from-beginning
e.g. scenario for a topic:
- Lets say you have a fleet of trucks.
- Each truck reports its GPS position to kafka.
- Each truck will send a message to kafka every 20 seconds.
- Each message will contain the truck ID and the truck position (latitude and longitude).
- You can have a topic “truck_gps” that contains the positions of all trucks.
- You can choose to create a topic with 10 partitions (arbitrary number).
- Consumers can consume data from this topic and do whatever they want to do with the same stream of data e.g. one consumer can create a location dashboard; another consumer can set up notification services based on this data;
Concept of Leader for a partition
- At any time,
only one broker can be a leader for a given partition
. - Producers can only send data to the broker that is the leader of a partition.
- The other brokers will replicate the data.
- Therefore,
each partition has one leader and multiple ISRs (in-sync replica)
- e.g.
- Broker101 is leader for TopicA Partition0
- Broker102 is leader for TopicA Partition1
How many topics should I create for my application?
Consider this scenario: We are working with an e-commerce application that deals with events.
- order-created events
- order-updated events
- order-cancelled events
Or, do we need a single topic called order-events
It depends.
How will you solve the issue using an RDBMS table? Similar to that approach, we need to decide whether we need separate topics or use a single topic based on business scenario.
How many partitions should I create?
Depends based on throughput.
Consider that the consumer can process 100 events/second. As per the business use-case, we are going to receive 1000 events/second from the producers. If we create a topic with only one partition, the consumer will not be able to catch up with the producer because the consumer is slow. Based on the math, we need 10 consumers to catch up with the messages that the producer is producing. So, we need a minimum of 10 partitions. To be safe, to provide extra buffer, we can add additional partitions. e.g. 10 + 2 = 12 partitions.
Consider another use-case. The consumer can process 10 million events/second. The topic will receive about 300,000 events/second. In this case, since the consumer’s capacity to process the events is high, we can get away with creating just one partition. What about the producer broker communication? What if the producer is not able to write 300,000 events/second to one single broker? For this reason, even if the consumer doesn’t need more partitions, for the producer’s sake, we need to create more partitions. e.g. If we have 3 partitions, the producer will be able to work with 3 brokers and the overall solution will be able to support the expected throughput.
So, the answer depends on a few factors like
- what is the expected throughput?
- what is the producer’s capacity? How many messages will it generate per second?
- what is the consumer’s capacity? How many messages will it process per second?
Can we create a topic with too many partitions just to be safe? Kafka has the responsibility to elect leaders and followers for every partition. If we have too many partitions, it is a lot of work for kafka and we will overwhelm kafka cluster.
Pick a number based on initial estimations, add a buffer to that number - just to be safe. Worst-case scenario, we can always alter the number of partitions later.
Can we change the number of partitions on the fly?
- We can (see the
alter
command in the CLI commands) - But, we have to be very careful - because, that will mess up the order in which the messages are processed by the consumer applications.
- When new partitions are added to the topic, because of
kafka-reassign-partitions
, the producer will keep sending messages with a given key to the other partitions in the topic. If the order of the messages is mission critical, this will lead to serious problems.
How to solve?
- Design upfront properly
- Accept the message ordering issues for a short period of time
- Stop the producer. Drain the existing partitions. Start the producer.
- (If all the above options do not work) create a new topic with 4 (or whatever the desired number is) partitions.
- Let the producer send messages to the new topic.
- Let the consumers consume new partitions once they have processed old topic messages.
Partitions vs Replicas
Partitions are for scaling.
Replicas are for high availability.
How many replicas should my partitions have? What replication factor should I use?
Decide based on the number of brokers that can go down at a given time in the cluster.
If we have a broker that will never go down, we will not need a replica. However, in real life, we will not have brokers that will never go down.
e.g. In a 100 node cluster, if we can expect 5 servers to go down at a time, we need 6 replicas.
If we can expect N servers to go down at a time, we need N+1 replicas.
What about in-sync replicas?
The default value for min.insync.replicas is 1. But if we choose min.insync.replicas=2, we need N+2 as the replicas for our topic.
e.g. In a 100 node cluster, if we can expect 5 servers to go down at a time, we need 7 replicas for the topic to support the min.insync.replicas criteria.