Database and Kafka Integration with Testcontainers in Spring Boot

Before jumping into Testcontainers, I assume we all know that today’s applications tend to have many components, frameworks, databases, and event stores with paradigms such as Microservices architecture.

As the complexity grows with more and more moving parts, Software Testing and Automation become even more critical; therefore, many software testing methodologies have been developed to test an application’s functional or non-functional requirements, such as;

  • Unit Testing
  • Integration Testing
  • System Testing
  • Acceptance Testing
  • Performance Testing
  • Security Testing

The naming and scope of each test methodology may vary based on the company, team, and scope. 

This tutorial will demonstrate an Integration test scenario with the Testcontainers library to make sure integrated components work as intended.

What is Testcontainers? 

Testcontainers is a Java library that supports tests by providing lightweight, throwaway instances of databases, tools, event stores such as Kafka, or anything else that can run in a Docker container. With TestContainers we can initiate any component with a Docker image and make the system or integration tests with the external resources.

Integration Tests Setup with Testcontainers Library in Java

I will have a PostgreSQL database, Kafka event store, and a REST API as components in this example. All the components I mentioned are integrated in the way explained below;

  • The application has a Kafka consumer that consumes events and stores them in the PostgreSQL database.
  • The application has a GET endpoint that fetches the data from the database and exposes it through the REST API call.

Based on the business structure defined above, I want to ensure that my Kafka consumer consumes events properly and saves them in the database. 

Possible integration test case for this flow;

Step 1: Create a Kafka producer and produce data on the specified topic.

Step 2: Make a REST API call to fetch the data that has been produced to Kafka previously.

Step 3: Assert produced data and REST API response. 

To achieve this test case, I need to create separate containers for PostgreSQL, Kafka, and a container for the Spring Boot application. 

To be able to use testcontainers add the following dependency in Gradle.

testImplementation 'org.testcontainers:testcontainers:1.16.3'

Set up Postgres Database with Testcontainers

I prefer to use GenericContainer over PostgreSQLContainer for the Postgres database as it feels more accessible to me to pass any environment variable and set up host configs. 

  • Pull the official Postgres docker image and instantiate a container on top of that.
  • It will expose 5432 as this is the default Postgres port
  • Bind docker internal port 5432 to exposed port 5432
  • Setup username and password for Postgres DB
  • Use charset en_US.utf8 for Postgres DB
GenericContainer postgresDBContainer = new GenericContainer("postgres:latest")
       .withCreateContainerCmdModifier((Consumer<CreateContainerCmd>) cmd -> cmd.withHostConfig(
               new HostConfig().withPortBindings(new PortBinding(Binding.bindPort(5432), new ExposedPort(5432)))
       ))
       .withExposedPorts(5432)
       .withEnv("POSTGRES_PASSWORD", "<postgres_username>")
       .withEnv("POSTGRES_USER", "<postgres_password>")
       .withEnv("LANG", "en_US.utf8")
       .withReuse(true);

Set up Kafka Container with Testcontainers

  • Pull the official Apache Kafka docker image and Instantiate a container on top of that.
  • It will expose 9092 as this is the default Kafka port
  • Bind docker internal port 9092 to exposed port 9092
  • Setup log consumer
  • Setup KAFKA_ADVERTISED_HOSTNAME to your IP address
  • Create two topics to be used in integration tests when the container is started
GenericContainer kafkaContainer = new GenericContainer("bitnami/kafka:latest")
       .withExposedPorts(9092)
       .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("testcontainers.kafka")))
       .withCreateContainerCmdModifier((Consumer<CreateContainerCmd>) cmd -> cmd.withHostConfig(
               new HostConfig().withPortBindings(
                       new PortBinding(Binding.bindPort(9092), new ExposedPort(9092)))
       ))
       .withEnv("KAFKA_ADVERTISED_HOSTNAME", localHostAddress())
       .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
       .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "1000000")
       .withEnv("KAFKA_CREATE_TOPICS", "dev.exceptionly.topic1,dev.exceptionly.topic2")
       .withCommand("run", "-e", "{\"kafka_advertised_hostname\":" + localHostAddress() + "}")
       .waitingFor(Wait.forLogMessage("Created topic .*", 3)
               .withStartupTimeout(Duration.ofSeconds(180)))
       .withReuse(true);

Set up Spring Boot Application with Testcontainers from a Dockerfile

I assume that you already have a Dockerfile for your application. If you don’t have one, check out my tutorial to learn how to Dockerize a Spring Boot Application.

  • Initiate a Docker container based on the Dockerfile of the Spring Boot application
  • Expose port 8080 for the  application and 8081 for health checks (optional)
  • Setup environment properties for the Spring Boot application container
  • Setup application.yaml file for the Spring Boot application container
  • Pass values for environment variables
  • Wait for kafkaContainer and postgresDBContainer to be started first.
GenericContainer applicationContainer = new GenericContainer(new ImageFromDockerfile()
       .withDockerfile(Paths.get("../Dockerfile").toAbsolutePath()))
       .withExposedPorts(8080, 8081)
       .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("dev.exceptionly")))
       .waitingFor(Wait.forHttp("/healthcheck").forPort(8081))
       .withFileSystemBind(Paths.get("../dependencies/environment.properties").toAbsolutePath().toString(),
               "/opt/app/config/application-integration-test.properties", BindMode.READ_ONLY)
       .withFileSystemBind(Paths.get("src/integrationTest/resources/application.yml").toAbsolutePath().toString(),
               "/opt/app/config/application.yml", BindMode.READ_ONLY)
       .withEnv("database", localHostAddress())
       .withEnv("kafka", localHostAddress())
       .withEnv("SERVER_HOST", localHostAddress())
       .dependsOn(kafkaContainer)
       .dependsOn(postgresDBContainer)
       .withReuse(true);

Host address to be used in containers;

private static String localHostAddress() {
   try {
       return InetAddress.getLocalHost().getHostAddress();
   } catch (UnknownHostException e) {
           // do something here
   }
}

environment.properties file configuration for Spring Boot application container;

database=exceptionly-db
kafka=exceptionly-kafka

Integration Tests application.yaml configuration for the application docker container

Remember the  ${database} and ${kafka} variables are passed when application container is started.

spring:
     profiles:
       active: integration-tests

     datasource:
       url: jdbc:postgresql://${database}:5432/postgres
       username: <postgres_username>
       password: <postgres_password>

     kafka:
        bootstrap-servers:
          - ${kafka:localhost}:9092

Integration Tests Implementation 

Finally, we have all containers created and ready to be used for integration tests, but we are still missing some configurations, such as Kafka producer. To test the application’s Kafka consumption, we need to create a Kafka producer and publish events on the specified topic. This behavior will imitate the external producer.

public class KafkaEventProducer {

   private final static String TOPIC_1 = "dev.exceptionly.topic1";
   private final KafkaTemplate<String, ReviewResultEvent> producer;

   public KafkaEventProducer(String bootstrapServers) {
       final Map<String, Object> props = new HashMap<>();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       producer = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
   }

   public void send(ReviewResultEvent event) {
       LOGGER.info("Publishing event: {}", event);
       producer.send(new ProducerRecord<>(TOPIC_1, event));
   }
}

Remember the initial test case that drove me to create all these containers and setup;

Step 1: Produce data on the specified topic.

Step 2: Make a REST API call to fetch the data that has been produced to Kafka previously that will hit the database.

Step 3: Assert produced data and REST API response.

KafkaEventProducer producer;

@BeforeAll
void setup() {
  String bootstrapServers = kafkaContainer.getHost() + ":" + kafkaContainer.getMappedPort(9092);
  producer = new KafkaEventProducer(bootstrapServers);
}

@Test
void shouldFetchProducedDataThroughRESTApi() {
  //given
  producer.send( < event1 >);
  producer.send( < event2 >);
  producer.send( < event3 >);

  //when
  var response = given().port(serviceContainer.getMappedPort(8080))
      .auth().basic(USERNAME, PASSWORD)
      .accept(ContentType.JSON)
      .when()
      .get(String.format( < base_url >, < path_param1 >))
      .then()
      .assertThat()
      .statusCode(200)
      .and()
      .extract()
      .response();

  //then
  List<Object> entities = mapper.readValue(response.asString(), Object.class);
  assertEquals(3, entities.size());
  entities.forEach(entity -> assertEntity(entity));
}

Conclusion

TestContainers’s official website has excellent documentation and many modules for different test purposes.

Credits:https://exceptionly.com/2022/04/09/database-and-kafka-integration-testcontainers-spring-boot/

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments