Testing Spring Boot Kafka with Testcontainers

Advantages of using Testcontainers

  • Running a single node Kafka installation with just one line of code
  • No need to manage external Zookeeper installation, required by Kafka.

Testing Kafka Producer

1. Adding Dependencies

First we need to add Testcontainer related dependencies and awaitality jar dependencies.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	
	....
	<properties>
		<java.version>11</java.version>
		<testcontainers.version>1.16.3</testcontainers.version>
	</properties>
	<dependencies>
		....
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>junit-jupiter</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>kafka</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.awaitility</groupId>
			<artifactId>awaitility</artifactId>
			<version>4.2.0</version>
			<scope>test</scope>
		</dependency>

	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.testcontainers</groupId>
				<artifactId>testcontainers-bom</artifactId>
				<version>${testcontainers.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	

</project>

2. Add Maven Plugin

Add Fail Safe maven plugin to run the integration test.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	...

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-failsafe-plugin</artifactId>
				<version>3.0.0-M5</version>
			</plugin>
		</plugins>
	</build>

</project>

Writing Integration Tests

In our producer application we are performing following tasks

  1. Create topics called create-employee-events,springboot-topic on startup
  2. Publishing Employee objects to create-employee-events topic received from rest client.

Testing Topic creation.

KafkaAdmin and AdminClient classes can be used to create and read topics in kafka cluster.

When using Spring Boot, a KafkaAdmin bean is automatically registered. So you can just autowire into application.

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class SpringBootKafkaProducerIT {
    private static final Logger logger =  LoggerFactory.getLogger(SpringBootKafkaProducerIT.class);

    static KafkaContainer kafka;

    static {
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
        kafka.start();
    }
  

    @Autowired
    private KafkaAdmin admin;


    @Test
    public void testCreationOfTopicAtStartup() throws IOException, InterruptedException, ExecutionException {
       AdminClient client = AdminClient.create(admin.getConfigurationProperties());
        Collection<TopicListing> topicList = client.listTopics().listings().get();
        assertNotNull(topicList);
        assertEquals(topicList.stream().map(l -> l.name()).collect(Collectors.toList()), Arrays.asList("create-employee-events","springboot-topic"));
    }

      }

    @DynamicPropertySource
    public static void properties(DynamicPropertyRegistry registry) {
          registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers);


    }

}

Testing publishing events.

  1. First we create topic named create-employee-events
  2. Next publish event to topic by invoking EmployeeController class
  3. We Create KafkaConsumer class to subscribe to the topic and poll the topic to read the published message. I have used awaitality jar to wait the client to read the published message
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class SpringBootKafkaProducerIT {
    private static final Logger logger =  LoggerFactory.getLogger(SpringBootKafkaProducerIT.class);

    static KafkaContainer kafka;

    static {
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
        kafka.start();
    }

    @Autowired
    EmployeeController employeeController;

    @Autowired
    private KafkaAdmin admin;




  
    @Test
    public void testPublishEmployee() throws IOException, InterruptedException, ExecutionException {
        // first create the create-employee-events topic
        // you can ignore this step if you have already created topic
        String topicName = "create-employee-events";
        NewTopic  topic1 =  TopicBuilder.name(topicName).build();

        AdminClient client = AdminClient.create(admin.getConfigurationProperties());
        client.createTopics( Collections.singletonList(topic1));

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");

        KafkaConsumer<Integer, Employee> consumer = new KafkaConsumer(props);

        consumer.subscribe(Collections.singletonList(topicName));

        Employee emp = new Employee();
        emp.setId(1);
        emp.setName("Test");
        
         employeeController.publishEmployee(emp);

        Collection<TopicListing> topicList = client.listTopics().listings().get();
        assertEquals(topicList.size(),2);
        List<String> topicNameList = topicList.stream().map(l -> l.name()).collect(Collectors.toList());
        List<String> expectedTopicNameList =  Arrays.asList("springboot-topic","create-employee-events");
        assertTrue(topicNameList.containsAll(expectedTopicNameList) && expectedTopicNameList.containsAll(topicNameList));

        await().atMost(10, TimeUnit.SECONDS).until(() -> {
            ConsumerRecords<Integer, Employee> records = consumer.poll(Duration.ofMillis(100));

            if (records.isEmpty()) {
                return false;
            }
            records.forEach( r -> System.out.println(r.topic() + " *** "+ r.key() + " *** "+ r.value()));
            Assertions.assertThat(records.count()).isEqualTo(1);
            Assertions.assertThat(records.iterator().next().value().getName()).isEqualTo("Test");
            Assertions.assertThat(records.iterator().next().value().getId()).isEqualTo(1);
            return true;
        });

    }

    @DynamicPropertySource
    public static void properties(DynamicPropertyRegistry registry) {
         registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers);


    }

}

Running Producer Integration Tests.

Use following command to run the integration tests.

mvn clean verify

Testing Kafka Consumer

We add dependencies and maven plugin ( Step 1 and 2) similar to producer application.

Testing consuming events

  1. We create topic named create-employee-events using kafka admin client.
  2. Using KafkaProducer we publish an event to topic.
  3. We expect the Kafka consumer to read the message from topic. As consumer application reading and print the message to console, we going to check that console contains required message.
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
@ExtendWith(OutputCaptureExtension.class)
public class SpringBootKafkaConsumerIT {
    private static final Logger logger =  LoggerFactory.getLogger(SpringBootKafkaConsumerIT.class);

    static KafkaContainer kafka;

    static {
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
        kafka.start();
    }

    @Autowired
    private KafkaAdmin admin;




    @Test
    public void testPublishEmployee(CapturedOutput output) throws IOException, InterruptedException, ExecutionException {
        // first create the create-employee-events topic
        String topicName = "create-employee-events";
        NewTopic  topic1 =  TopicBuilder.name(topicName).build();

        AdminClient client = AdminClient.create(admin.getConfigurationProperties());
        client.createTopics( Collections.singletonList(topic1));


        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        KafkaProducer<Integer, Employee> producer = new KafkaProducer(props);

        Employee emp = new Employee();
        emp.setId(1);
        emp.setName("Test");

        producer.send(new ProducerRecord<>(topicName, emp.getId(), emp)).get();

        Thread.sleep(1000);
        Assertions.assertThat(output).contains("Employee{id=1, name='Test'}");


    }

    @DynamicPropertySource
    public static void properties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers",kafka::getBootstrapServers);


    }

}

Running Consumer Integration Tests.

Use following command to run the integration tests.

mvn clean verify

Complete code for blog post can be downloaded from GitHub.

Producer – sureshgadupu/springboot-kafka-producer (github.com)

Consumer – sureshgadupu/springboot-kafka-consumer (github.com)

Credits: https://fullstackcode.dev/2022/04/24/testing-spring-boot-kafka-with-testcontainers/

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments