Configure Kafka Producer and Consumer in Spring Boot

Credits : https://codingnconcepts.com/spring-boot/configure-kafka-producer-and-consumer/

This post describes how to configure Kafka producer and consumer in spring boot application and also explains how to create service classes to send and receive Kafka messages to and from configured kafka topic respectively.

Setup Spring Boot Project

It is recommended to use Spring Initializr to generate initial project. Our project should have Web and Kafka dependencies.

Maven Project

Click on the below link to generate maven project with pre-defined configuration: spring boot initalizr link

A typical pom.xml file for a kafka project look like this:-

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>

Gradle Project

Click on the below link to generate gradle project with pre-defined configuration:-

https://start.spring.io/#!type=gradle-project&language=java&platformVersion=2.5.0.RELEASE&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=springboot-kafka&name=springboot-kafka&description=Kafka%20producer%20and%20consumer%20configuration&packageName=com.example.kafka&dependencies=web,kafka

A typical build.gradle file for a kafka project look like this:-

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.kafka:spring-kafka'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
}

Kafka Configuration

Next, we need to create Kafka producer and consumer configuration to be able to publish and read messages to and from the Kafka topic. Spring boot auto configure Kafka producer and consumer for us, if correct configuration is provided through application.yml or spring.properties file and saves us from writing boilerplate code.

A typical Kafka producer and consumer configuration looks like this:-

application.yml
#APP SPECIFIC CUSTOM PROPERTIES
app:
  kafka:
    producer:
      topic: <PRODUCER_TOPIC_NAME>
    consumer:
      topic: <CONSUMER_TOPIC_NAME_1, CONSUMER_TOPIC_NAME_2, CONSUMER_TOPIC_NAME_3>
#SPRING PROPERTIES
spring:
  kafka:
  	bootstrap-servers: localhost:9200,localhost:9300,localhost:9400
    properties:
      #Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string
      ssl.endpoint.identification.algorithm:
    ssl:
      protocol: SSL
      trust-store-location: classpath:/app/store/truststore.jks
      trust-store-password: <TURST_STORE_PASSWORD>
      key-store-location: classpath:/app/store/keystore.jks
      key-store-password: <KEY_STORE_PASSWORD>
      key-password: <KEY_PASSWORD>
    producer:    
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: <CONSUMER_GROUP_ID>
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Kafka SSL Configuration

Please note that in the above example for Kafka SSL configuration, Spring Boot looks for key-store and trust-store (*.jks) files in the Project classpath: which works in your local environment. Generally you don’t keep these files in generated Jar and keep them outside in production environment. In such cases, refer these files using file: in the configuration.

application-prod.yml
spring:
  kafka:
    properties:
      #Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string
      ssl.endpoint.identification.algorithm:
    ssl:
      protocol: SSL
      trust-store-location: file:/app/store/truststore.jks
      trust-store-password: <TURST_STORE_PASSWORD>
      key-store-location: file:/app/store/keystore.jks
      key-store-password: <KEY_STORE_PASSWORD>
      key-password: <KEY_PASSWORD>

It is recommended to always give absolute path in production environment to avoid any error.

# Absolute Path
file:/app/store/truststore.jks 

# Relative Path
file:app/store/truststore.jsk  
ssl.endpoint.identification.algorithm

The endpoint identification algorithm used by clients to validate server host name. The default value is https. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker’s certificate.

If you have enabled SSL for kafka Server, then sometime Spring Boot startup throw error due to hostname verification. You can disable the server host name verification by setting the property ssl.endpoint.identification.algorithm to an empty string to avoid the error.

Kafka Full Configuration

You can refer to the Spring Boot official documentation for full list of available kafka producer and consumer configuration.

Here is a list of important auto-configuration properties:-

PropertyDescription
spring.kafka.bootstrap-serversComma separated list of kafka servers (host:port) running as a cluster. Applies to both producer and consumer unless overridden.
spring.kafka.producer.bootstrap-serversKafka bootstrap server for producer. Overrides spring.kafka.bootstrap-servers
spring.kafka.consumer.bootstrap-serversKafka bootstrap server for consumer. Overrides spring.kafka.bootstrap-servers
spring.kafka.client-idClient-ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.producer.client-idClient-ID to pass for producer. Overrides spring.kafka.client-id
spring.kafka.consumer.client-idClient-ID to pass for consumer. Overrides spring.kafka.client-id
spring.kafka.ssl.*Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password
spring.kafka.properties .ssl.endpoint.identification.algorithmIf you have enabled SSL for kafka Server, then host name verification can be disabled by setting this property to empty string, otherwise spring boot startup throw error
spring.kafka.producer.*Kafka Producer related configurations
spring.kafka.consumer.*Kafka Consumer related configurations

We have also created application specific property to configure Kafka producer and consumer topics:-

PropertyDescription
app.kafka.producer.topicKafka topic name to publish messages
app.kafka.consumer.topicComma separated list of Kafka topic names if you want consumer service to consume from multiple kafka topics

Spring Boot Kafka Producer

Create Kafka Producer

Let’s create a KafkaProducerService interface and its implementation to send messages to a Kafka topic. We just autowire KafkaTemplate and use its send method to publish messages to the topic.

Please read more about KafkaTemplate which comes with overloaded send method to publish messages with topicpartitionkeytimestamp and routing information.

public interface KafkaProducerService {
    void send(String message);
}
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServiceImpl.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.kafka.producer.topic}")
    private String topic;

    @Override
    public void send(String message) {
        logger.info("message sent: {}", message);
        kafkaTemplate.send(topic, message);
    }
}

Please note that by default KafkaTemplate use DefaultKafkaProducerFactory which is auto initialized by spring boot based on kafka producer configuration provided in application.yml or application.properties file.

If you wish to customized the default configuration then you need to provide a bean definition of KafkaTemplate.

Customize Kafka Producer Configuration

Let’s create a KafkaProducerConfig class to customize the configuration. We just autowire ProducerFactory which gives us instance of DefaultKafkaProducerFactory, then we just add our customized configuration on top of it.

For example, below we have provided encrypted passwords for trust-storekey-store, and key in our configuration application.yml file for security purpose and we want to set decrypted password to the ProducerFactory for ssl connections.

We have passed this customized ProducerFactory to KafkaTemplate bean initialization. That’s it!

@Configuration
public class KafkaProducerConfig {

    @Autowired
    private ProducerFactory<Integer, String> producerFactory;

    @Autowired
    private CryptoService cryptoService;

    public Map<String, Object> producerConfig() {
        Map<String, Object> producerConfig = new HashMap<>(producerFactory.getConfigurationProperties());
        decryptAndAddToConsumerConfig(producerConfig, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
        decryptAndAddToConsumerConfig(producerConfig, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
        decryptAndAddToConsumerConfig(producerConfig, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
        return producerConfig;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
    }

    private void decryptAndAddToConsumerConfig(Map<String, Object> config, String property) {
        config.compute(property, (k, v) -> cryptoService.decrypt((String) v));
    }
}

Spring Boot Kafka Consumer

Create Kafka Consumer

Let’s create a KafkaConsumerService interface and its implementation to receive messages from a Kafka topic.

We just use @KafkaListener annotation at method level and pass the kafka consumer topic names. Spring boot automatically binds this method to the kafka consumer instance. As soon as any message is published to those topics, this method receive them in realtime.

public interface KafkaConsumerService {

    void receive(String message);
}
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);

    @KafkaListener(topics = {"#{'${app.kafka.consumer.topic}'.split(',')}"})
    public void receive(@Payload String message) {
        logger.info("message received: {}", message);
    }
}

Please note that @KafkaListener use ConcurrentKafkaListenerContainerFactory to create an instance of kafka consumer. This factory use the default configuration from DefaultKafkaConsumerFactory which is auto initialized by spring boot based on kafka consumer configuration provided in application.yml or application.properties file.

If you wish to customized the default configuration then you need to provide a bean definition of ConcurrentKafkaListenerContainerFactory by yourself.

Customize Kafka Consumer Configuration

Let’s create a KafkaConsumerConfig class to customize the configuration. We just autowire ConsumerFactory which gives us instance of DefaultKafkaConsumerFactory, then we just add our customized configuration on top of it.

For example, below we have provided encrypted passwords for trust-storekey-store, and key in our configuration application.yml file for security purpose and we want to set decrypted password to the ConsumerFactory for ssl connections.

We have passed this newly created ConsumerFactory to ConcurrentKafkaListenerContainerFactory bean initialization. That’s it!

We have also used @EnableKafka annotation at class level which tells spring boot to auto detect @KafkaListener annotation applied on any method in spring boot application and use custom configuration instead.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private ConsumerFactory<Integer, String> consumerFactory;

    @Autowired
    private CryptoService cryptoService;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        return factory;
    }

    private Map<String, Object> consumerConfig() {
        Map<String, Object> consumerConfig = new HashMap<>(consumerFactory.getConfigurationProperties());
        decryptAndAddToConsumerConfig(consumerConfig, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
        decryptAndAddToConsumerConfig(consumerConfig, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
        decryptAndAddToConsumerConfig(consumerConfig, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
        return consumerConfig;
    }

    private void decryptAndAddToConsumerConfig(Map<String, Object> config, String property) {
        config.compute(property, (k, v) -> cryptoService.decrypt((String) v));
    }
}

Summary

Spring boot provides a wrapper over kafka producer and consumer implementation in Java which helps us to easily configure-

  • Kafka Producer using KafkaTemplate which provides overloaded send method to send messages in multiple ways with keys, partitions and routing information.
  • Kafka Consumer using @EnableKafka annotation which auto detects @KafkaListener annotation applied to any method and that methods becomes a Kafka Listener.

You can download complete source code from github and read official spring documentation Spring for Apache Kafka for further exploration.

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments