Kafka - security
Most of the time, these will be configured by the team that handles setting up and configuring Kafka.
However, the developers still need to know about this because, the developers need to make changes to the way the kafka configuration is done in the applications.
- SASL
- Simple Authentication and Security Layer
- SASL mechanism configuration - standard mechanism names are listed here: http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
- Some common mechanisms are
- Plain
- LDAP
- OAuth
- …
- JAAS
- Java Authentication and Authorization Service (API)
Security protocols
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html
- PLAINTEXT - Un-authenticated, non-encrypted channel
- SASL_PLAINTEXT - SASL authenticated, non-encrypted channel (SASL - Simple Authentication and Security Layer)
- SASL_SSL - SASL authenticated, SSL channel
- SSL - SSL channel
TODO
- https://kafka.apache.org/10/documentation/streams/developer-guide/security.html
- https://medium.com/@bhattchitrangna/securing-your-kafka-a-beginners-guide-to-kafka-security-ab2978a4d82e
Using SASL + PLAIN security mechanism
What is the difference in kafka set-up in 05-kafka-security-sasl-plain vs 01-kafka-server-setup?
-
The difference is in props/security.properties ->
In this set-up, we are using:
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
In the regular set-up, we used to have:listeners=PLAINTEXT://:9092,CONTROLLER://:9093
With this, kafka will not allow client applications to connect to it without a username and a password. The communication between the clients and kafka brokers is not encrypted. It is still plain text. But this is adding one layer of security - which is username/password combination.
-
One other difference is in props/security.properties ->
We have to add this:
# This PLAIN is different from PLAINTEXT above sasl.enabled.mechanisms=PLAIN sasl.mechanism.controller.protocol=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN
Other options for
sasl.enabled.mechanisms
arePlain, LDAP, OAuth, etc.
-
And we need to have a file with the credentials: props/jaas.conf. In jaas.conf,
- username and password are for broker to broker communication.
- user_admin and user_client are for client to broker communication.
How to modify the client applications to use SASL security?
With spring, the configuration will go into the application.properties file instead of being in a java class. Other than that, there shouldn’t be any other differences.
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
/*
goal: to demo a simple kafka producer using SASL PLAINTEXT
*/
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
public static void main(String[] args) {
var producerConfig = Map.<String, Object>of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM, "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=\"Kafka\" username=\"client\" password=\"client-secret\";"
);
var options = SenderOptions.<String, String>create(producerConfig);
var flux = Flux.interval(Duration.ofMillis(100))
.take(100)
.map(i -> new ProducerRecord<>("order-events", i.toString(), "order-"+i))
.map(pr -> SenderRecord.create(pr, pr.key()));
var sender = KafkaSender.create(options);
sender.send(flux)
.doOnNext(r -> log.info("correlation id: {}", r.correlationMetadata()))
.doOnComplete(sender::close)
.subscribe();
}
}
Using SASL + SSL security mechanism
What is the difference in kafka set-up in 06-kafka-security-sasl-ssl vs 01-kafka-server-setup?
-
Notice the mapping for
certs
in docker-compose.yaml -
The props/jaas.conf file looks similar to the set-up in
Using SASL + PLAIN security mechanism
-
The difference is in props/security.properties ->
In this set-up, we are using:
listeners=SASL_SSL://:9092,CONTROLLER://:9093
In the regular set-up, we used to have:listeners=PLAINTEXT://:9092,CONTROLLER://:9093
With this, kafka will not allow client applications to connect to it without SSL encryption. The communication between the clients and kafka brokers is encrypted. But this is adding one layer of security - which is username/password combination.
-
One other difference is in props/security.properties ->
We have to add this:
sasl.enabled.mechanisms=PLAIN sasl.mechanism.controller.protocol=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN ssl.keystore.location=/certs/kafka.keystore.jks ssl.keystore.password=changeit ssl.truststore.location=/certs/kafka.truststore.jks ssl.truststore.password=changeit #+end_src
The kafka server will be using the keystore and the client applications will be using the truststore.
How to modify the client applications to use SASL + SSL security?
With spring, the configuration will go into the application.properties file instead of being in a java class. Other than that, there shouldn’t be any other differences.
The application will need the truststore certificate.
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL",
SaslConfigs.SASL_MECHANISM, "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=\"Kafka\" username=\"client\" password=\"client-secret\";"
SSLCONFIGS.SSL_TRUST_STORE_LOCATION_CONFIG, Paths.get("src/main/resources/kafka.truststore.jks").toAbsolutePath().toString()
SSLCONFIGS.SSL_TRUST_STORE_PASSWORD_CONFIG, "changeit")