Testing Spring Boot Kafka with Testcontainers
Advantages of using Testcontainers
- Running a single node Kafka installation with just one line of code
- No need to manage external Zookeeper installation, required by Kafka.
Testing Kafka Producer
1. Adding Dependencies
First we need to add Testcontainer related dependencies and awaitality jar dependencies.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
....
<properties>
<java.version>11</java.version>
<testcontainers.version>1.16.3</testcontainers.version>
</properties>
<dependencies>
....
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainers.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
2. Add Maven Plugin
Add Fail Safe maven plugin to run the integration test.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> ... <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <version>3.0.0-M5</version> </plugin> </plugins> </build> </project>
Writing Integration Tests
In our producer application we are performing following tasks
- Create topics called
create-employee-events,springboot-topicon startup - Publishing
Employeeobjects tocreate-employee-eventstopic received from rest client.
Testing Topic creation.
KafkaAdmin and AdminClient classes can be used to create and read topics in kafka cluster.
When using Spring Boot, a KafkaAdmin bean is automatically registered. So you can just autowire into application.
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class SpringBootKafkaProducerIT {
private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaProducerIT.class);
static KafkaContainer kafka;
static {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
kafka.start();
}
@Autowired
private KafkaAdmin admin;
@Test
public void testCreationOfTopicAtStartup() throws IOException, InterruptedException, ExecutionException {
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
Collection<TopicListing> topicList = client.listTopics().listings().get();
assertNotNull(topicList);
assertEquals(topicList.stream().map(l -> l.name()).collect(Collectors.toList()), Arrays.asList("create-employee-events","springboot-topic"));
}
}
@DynamicPropertySource
public static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers);
}
}
Testing publishing events.
- First we create topic named
create-employee-events - Next publish event to topic by invoking
EmployeeControllerclass - We Create KafkaConsumer class to subscribe to the topic and poll the topic to read the published message. I have used awaitality jar to wait the client to read the published message
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class SpringBootKafkaProducerIT {
private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaProducerIT.class);
static KafkaContainer kafka;
static {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
kafka.start();
}
@Autowired
EmployeeController employeeController;
@Autowired
private KafkaAdmin admin;
@Test
public void testPublishEmployee() throws IOException, InterruptedException, ExecutionException {
// first create the create-employee-events topic
// you can ignore this step if you have already created topic
String topicName = "create-employee-events";
NewTopic topic1 = TopicBuilder.name(topicName).build();
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
client.createTopics( Collections.singletonList(topic1));
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");
KafkaConsumer<Integer, Employee> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topicName));
Employee emp = new Employee();
emp.setId(1);
emp.setName("Test");
employeeController.publishEmployee(emp);
Collection<TopicListing> topicList = client.listTopics().listings().get();
assertEquals(topicList.size(),2);
List<String> topicNameList = topicList.stream().map(l -> l.name()).collect(Collectors.toList());
List<String> expectedTopicNameList = Arrays.asList("springboot-topic","create-employee-events");
assertTrue(topicNameList.containsAll(expectedTopicNameList) && expectedTopicNameList.containsAll(topicNameList));
await().atMost(10, TimeUnit.SECONDS).until(() -> {
ConsumerRecords<Integer, Employee> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
return false;
}
records.forEach( r -> System.out.println(r.topic() + " *** "+ r.key() + " *** "+ r.value()));
Assertions.assertThat(records.count()).isEqualTo(1);
Assertions.assertThat(records.iterator().next().value().getName()).isEqualTo("Test");
Assertions.assertThat(records.iterator().next().value().getId()).isEqualTo(1);
return true;
});
}
@DynamicPropertySource
public static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers);
}
}
Running Producer Integration Tests.
Use following command to run the integration tests.
mvn clean verify
Testing Kafka Consumer
We add dependencies and maven plugin ( Step 1 and 2) similar to producer application.
Testing consuming events
- We create topic named
create-employee-eventsusing kafka admin client. - Using KafkaProducer we publish an event to topic.
- We expect the Kafka consumer to read the message from topic. As consumer application reading and print the message to console, we going to check that console contains required message.
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
@ExtendWith(OutputCaptureExtension.class)
public class SpringBootKafkaConsumerIT {
private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaConsumerIT.class);
static KafkaContainer kafka;
static {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
kafka.start();
}
@Autowired
private KafkaAdmin admin;
@Test
public void testPublishEmployee(CapturedOutput output) throws IOException, InterruptedException, ExecutionException {
// first create the create-employee-events topic
String topicName = "create-employee-events";
NewTopic topic1 = TopicBuilder.name(topicName).build();
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
client.createTopics( Collections.singletonList(topic1));
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
KafkaProducer<Integer, Employee> producer = new KafkaProducer(props);
Employee emp = new Employee();
emp.setId(1);
emp.setName("Test");
producer.send(new ProducerRecord<>(topicName, emp.getId(), emp)).get();
Thread.sleep(1000);
Assertions.assertThat(output).contains("Employee{id=1, name='Test'}");
}
@DynamicPropertySource
public static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers",kafka::getBootstrapServers);
}
}
Running Consumer Integration Tests.
Use following command to run the integration tests.
mvn clean verify
Complete code for blog post can be downloaded from GitHub.
Producer – sureshgadupu/springboot-kafka-producer (github.com)
Consumer – sureshgadupu/springboot-kafka-consumer (github.com)
Credits: https://fullstackcode.dev/2022/04/24/testing-spring-boot-kafka-with-testcontainers/