Configure Kafka Producer and Consumer in Spring Boot
Credits : https://codingnconcepts.com/spring-boot/configure-kafka-producer-and-consumer/
This post describes how to configure Kafka producer and consumer in spring boot application and also explains how to create service classes to send and receive Kafka messages to and from configured kafka topic respectively.
Setup Spring Boot Project
It is recommended to use Spring Initializr to generate initial project. Our project should have Web and Kafka dependencies.
Maven Project
Click on the below link to generate maven project with pre-defined configuration: spring boot initalizr link
A typical pom.xml file for a kafka project look like this:-
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
Gradle Project
Click on the below link to generate gradle project with pre-defined configuration:-
A typical build.gradle file for a kafka project look like this:-
dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' }
Kafka Configuration
Next, we need to create Kafka producer and consumer configuration to be able to publish and read messages to and from the Kafka topic. Spring boot auto configure Kafka producer and consumer for us, if correct configuration is provided through application.yml
or spring.properties
file and saves us from writing boilerplate code.
A typical Kafka producer and consumer configuration looks like this:-
application.yml
#APP SPECIFIC CUSTOM PROPERTIES app: kafka: producer: topic: <PRODUCER_TOPIC_NAME> consumer: topic: <CONSUMER_TOPIC_NAME_1, CONSUMER_TOPIC_NAME_2, CONSUMER_TOPIC_NAME_3> #SPRING PROPERTIES spring: kafka: bootstrap-servers: localhost:9200,localhost:9300,localhost:9400 properties: #Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string ssl.endpoint.identification.algorithm: ssl: protocol: SSL trust-store-location: classpath:/app/store/truststore.jks trust-store-password: <TURST_STORE_PASSWORD> key-store-location: classpath:/app/store/keystore.jks key-store-password: <KEY_STORE_PASSWORD> key-password: <KEY_PASSWORD> producer: retries: 0 acks: all key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: <CONSUMER_GROUP_ID> auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Kafka SSL Configuration
Please note that in the above example for Kafka SSL configuration, Spring Boot looks for key-store and trust-store (*.jks) files in the Project classpath:
which works in your local environment. Generally you don’t keep these files in generated Jar and keep them outside in production environment. In such cases, refer these files using file:
in the configuration.
application-prod.yml
spring: kafka: properties: #Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string ssl.endpoint.identification.algorithm: ssl: protocol: SSL trust-store-location: file:/app/store/truststore.jks trust-store-password: <TURST_STORE_PASSWORD> key-store-location: file:/app/store/keystore.jks key-store-password: <KEY_STORE_PASSWORD> key-password: <KEY_PASSWORD>
It is recommended to always give absolute path in production environment to avoid any error.
# Absolute Path file:/app/store/truststore.jks # Relative Path file:app/store/truststore.jsk
ssl.endpoint.identification.algorithm
The endpoint identification algorithm used by clients to validate server host name. The default value is https
. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker’s certificate.
If you have enabled SSL for kafka Server, then sometime Spring Boot startup throw error due to hostname verification. You can disable the server host name verification by setting the property ssl.endpoint.identification.algorithm
to an empty string to avoid the error.
Kafka Full Configuration
You can refer to the Spring Boot official documentation for full list of available kafka producer and consumer configuration.
Here is a list of important auto-configuration properties:-
Property | Description |
---|---|
spring.kafka.bootstrap-servers | Comma separated list of kafka servers (host:port) running as a cluster. Applies to both producer and consumer unless overridden. |
spring.kafka.producer.bootstrap-servers | Kafka bootstrap server for producer. Overrides spring.kafka.bootstrap-servers |
spring.kafka.consumer.bootstrap-servers | Kafka bootstrap server for consumer. Overrides spring.kafka.bootstrap-servers |
spring.kafka.client-id | Client-ID to pass to the server when making requests. Used for server-side logging. |
spring.kafka.producer.client-id | Client-ID to pass for producer. Overrides spring.kafka.client-id |
spring.kafka.consumer.client-id | Client-ID to pass for consumer. Overrides spring.kafka.client-id |
spring.kafka.ssl.* | Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password |
spring.kafka.properties .ssl.endpoint.identification.algorithm | If you have enabled SSL for kafka Server, then host name verification can be disabled by setting this property to empty string, otherwise spring boot startup throw error |
spring.kafka.producer.* | Kafka Producer related configurations |
spring.kafka.consumer.* | Kafka Consumer related configurations |
We have also created application specific property to configure Kafka producer and consumer topics:-
Property | Description |
---|---|
app.kafka.producer.topic | Kafka topic name to publish messages |
app.kafka.consumer.topic | Comma separated list of Kafka topic names if you want consumer service to consume from multiple kafka topics |
Spring Boot Kafka Producer
Create Kafka Producer
Let’s create a KafkaProducerService
interface and its implementation to send messages to a Kafka topic. We just autowire KafkaTemplate
and use its send
method to publish messages to the topic.
Please read more about KafkaTemplate which comes with overloaded send method to publish messages with topic, partition, key, timestamp and routing information.
public interface KafkaProducerService { void send(String message); }
@Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServiceImpl.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${app.kafka.producer.topic}") private String topic; @Override public void send(String message) { logger.info("message sent: {}", message); kafkaTemplate.send(topic, message); } }
Please note that by default KafkaTemplate
use DefaultKafkaProducerFactory
which is auto initialized by spring boot based on kafka producer configuration provided in application.yml
or application.properties
file.
If you wish to customized the default configuration then you need to provide a bean definition of KafkaTemplate
.
Customize Kafka Producer Configuration
Let’s create a KafkaProducerConfig
class to customize the configuration. We just autowire ProducerFactory
which gives us instance of DefaultKafkaProducerFactory
, then we just add our customized configuration on top of it.
For example, below we have provided encrypted passwords for trust-store, key-store, and key in our configuration application.yml
file for security purpose and we want to set decrypted password to the ProducerFactory
for ssl connections.
We have passed this customized ProducerFactory
to KafkaTemplate
bean initialization. That’s it!
@Configuration public class KafkaProducerConfig { @Autowired private ProducerFactory<Integer, String> producerFactory; @Autowired private CryptoService cryptoService; public Map<String, Object> producerConfig() { Map<String, Object> producerConfig = new HashMap<>(producerFactory.getConfigurationProperties()); decryptAndAddToConsumerConfig(producerConfig, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); decryptAndAddToConsumerConfig(producerConfig, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); decryptAndAddToConsumerConfig(producerConfig, SslConfigs.SSL_KEY_PASSWORD_CONFIG); return producerConfig; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig())); } private void decryptAndAddToConsumerConfig(Map<String, Object> config, String property) { config.compute(property, (k, v) -> cryptoService.decrypt((String) v)); } }
Spring Boot Kafka Consumer
Create Kafka Consumer
Let’s create a KafkaConsumerService
interface and its implementation to receive messages from a Kafka topic.
We just use @KafkaListener
annotation at method level and pass the kafka consumer topic names. Spring boot automatically binds this method to the kafka consumer instance. As soon as any message is published to those topics, this method receive them in realtime.
public interface KafkaConsumerService { void receive(String message); }
@Service public class KafkaConsumerServiceImpl implements KafkaConsumerService { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class); @KafkaListener(topics = {"#{'${app.kafka.consumer.topic}'.split(',')}"}) public void receive(@Payload String message) { logger.info("message received: {}", message); } }
Please note that @KafkaListener
use ConcurrentKafkaListenerContainerFactory
to create an instance of kafka consumer. This factory use the default configuration from DefaultKafkaConsumerFactory
which is auto initialized by spring boot based on kafka consumer configuration provided in application.yml
or application.properties
file.
If you wish to customized the default configuration then you need to provide a bean definition of ConcurrentKafkaListenerContainerFactory
by yourself.
Customize Kafka Consumer Configuration
Let’s create a KafkaConsumerConfig
class to customize the configuration. We just autowire ConsumerFactory
which gives us instance of DefaultKafkaConsumerFactory
, then we just add our customized configuration on top of it.
For example, below we have provided encrypted passwords for trust-store, key-store, and key in our configuration application.yml
file for security purpose and we want to set decrypted password to the ConsumerFactory
for ssl connections.
We have passed this newly created ConsumerFactory
to ConcurrentKafkaListenerContainerFactory
bean initialization. That’s it!
We have also used @EnableKafka
annotation at class level which tells spring boot to auto detect @KafkaListener
annotation applied on any method in spring boot application and use custom configuration instead.
@EnableKafka @Configuration public class KafkaConsumerConfig { @Autowired private ConsumerFactory<Integer, String> consumerFactory; @Autowired private CryptoService cryptoService; @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig())); return factory; } private Map<String, Object> consumerConfig() { Map<String, Object> consumerConfig = new HashMap<>(consumerFactory.getConfigurationProperties()); decryptAndAddToConsumerConfig(consumerConfig, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); decryptAndAddToConsumerConfig(consumerConfig, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); decryptAndAddToConsumerConfig(consumerConfig, SslConfigs.SSL_KEY_PASSWORD_CONFIG); return consumerConfig; } private void decryptAndAddToConsumerConfig(Map<String, Object> config, String property) { config.compute(property, (k, v) -> cryptoService.decrypt((String) v)); } }
Summary
Spring boot provides a wrapper over kafka producer and consumer implementation in Java which helps us to easily configure-
- Kafka Producer using
KafkaTemplate
which provides overloaded send method to send messages in multiple ways with keys, partitions and routing information. - Kafka Consumer using
@EnableKafka
annotation which auto detects@KafkaListener
annotation applied to any method and that methods becomes a Kafka Listener.
You can download complete source code from github and read official spring documentation Spring for Apache Kafka for further exploration.