Testing Spring Boot Kafka with Testcontainers
Advantages of using Testcontainers
- Running a single node Kafka installation with just one line of code
- No need to manage external Zookeeper installation, required by Kafka.
Testing Kafka Producer
1. Adding Dependencies
First we need to add Testcontainer related dependencies and awaitality jar dependencies.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> .... <properties> <java.version>11</java.version> <testcontainers.version>1.16.3</testcontainers.version> </properties> <dependencies> .... <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> <version>4.2.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers-bom</artifactId> <version>${testcontainers.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project>
2. Add Maven Plugin
Add Fail Safe
maven plugin to run the integration test.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> ... <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <version>3.0.0-M5</version> </plugin> </plugins> </build> </project>
Writing Integration Tests
In our producer application we are performing following tasks
- Create topics called
create-employee-events
,springboot-topic
on startup - Publishing
Employee
objects tocreate-employee-events
topic received from rest client.
Testing Topic creation.
KafkaAdmin and AdminClient classes can be used to create and read topics in kafka cluster.
When using Spring Boot, a KafkaAdmin bean is automatically registered. So you can just autowire into application.
@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext public class SpringBootKafkaProducerIT { private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaProducerIT.class); static KafkaContainer kafka; static { kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")); kafka.start(); } @Autowired private KafkaAdmin admin; @Test public void testCreationOfTopicAtStartup() throws IOException, InterruptedException, ExecutionException { AdminClient client = AdminClient.create(admin.getConfigurationProperties()); Collection<TopicListing> topicList = client.listTopics().listings().get(); assertNotNull(topicList); assertEquals(topicList.stream().map(l -> l.name()).collect(Collectors.toList()), Arrays.asList("create-employee-events","springboot-topic")); } } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers); } }
Testing publishing events.
- First we create topic named
create-employee-events
- Next publish event to topic by invoking
EmployeeController
class - We Create KafkaConsumer class to subscribe to the topic and poll the topic to read the published message. I have used awaitality jar to wait the client to read the published message
@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext public class SpringBootKafkaProducerIT { private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaProducerIT.class); static KafkaContainer kafka; static { kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")); kafka.start(); } @Autowired EmployeeController employeeController; @Autowired private KafkaAdmin admin; @Test public void testPublishEmployee() throws IOException, InterruptedException, ExecutionException { // first create the create-employee-events topic // you can ignore this step if you have already created topic String topicName = "create-employee-events"; NewTopic topic1 = TopicBuilder.name(topicName).build(); AdminClient client = AdminClient.create(admin.getConfigurationProperties()); client.createTopics( Collections.singletonList(topic1)); Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee"); KafkaConsumer<Integer, Employee> consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(topicName)); Employee emp = new Employee(); emp.setId(1); emp.setName("Test"); employeeController.publishEmployee(emp); Collection<TopicListing> topicList = client.listTopics().listings().get(); assertEquals(topicList.size(),2); List<String> topicNameList = topicList.stream().map(l -> l.name()).collect(Collectors.toList()); List<String> expectedTopicNameList = Arrays.asList("springboot-topic","create-employee-events"); assertTrue(topicNameList.containsAll(expectedTopicNameList) && expectedTopicNameList.containsAll(topicNameList)); await().atMost(10, TimeUnit.SECONDS).until(() -> { ConsumerRecords<Integer, Employee> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) { return false; } records.forEach( r -> System.out.println(r.topic() + " *** "+ r.key() + " *** "+ r.value())); Assertions.assertThat(records.count()).isEqualTo(1); Assertions.assertThat(records.iterator().next().value().getName()).isEqualTo("Test"); Assertions.assertThat(records.iterator().next().value().getId()).isEqualTo(1); return true; }); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers); } }
Running Producer Integration Tests.
Use following command to run the integration tests.
mvn clean verify
Testing Kafka Consumer
We add dependencies and maven plugin ( Step 1 and 2) similar to producer application.
Testing consuming events
- We create topic named
create-employee-events
using kafka admin client. - Using KafkaProducer we publish an event to topic.
- We expect the Kafka consumer to read the message from topic. As consumer application reading and print the message to console, we going to check that console contains required message.
@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @ExtendWith(OutputCaptureExtension.class) public class SpringBootKafkaConsumerIT { private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaConsumerIT.class); static KafkaContainer kafka; static { kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")); kafka.start(); } @Autowired private KafkaAdmin admin; @Test public void testPublishEmployee(CapturedOutput output) throws IOException, InterruptedException, ExecutionException { // first create the create-employee-events topic String topicName = "create-employee-events"; NewTopic topic1 = TopicBuilder.name(topicName).build(); AdminClient client = AdminClient.create(admin.getConfigurationProperties()); client.createTopics( Collections.singletonList(topic1)); Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<Integer, Employee> producer = new KafkaProducer(props); Employee emp = new Employee(); emp.setId(1); emp.setName("Test"); producer.send(new ProducerRecord<>(topicName, emp.getId(), emp)).get(); Thread.sleep(1000); Assertions.assertThat(output).contains("Employee{id=1, name='Test'}"); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.bootstrap-servers",kafka::getBootstrapServers); } }
Running Consumer Integration Tests.
Use following command to run the integration tests.
mvn clean verify
Complete code for blog post can be downloaded from GitHub.
Producer – sureshgadupu/springboot-kafka-producer (github.com)
Consumer – sureshgadupu/springboot-kafka-consumer (github.com)
Credits: https://fullstackcode.dev/2022/04/24/testing-spring-boot-kafka-with-testcontainers/