Spring Boot Tests with Kotlin and Embedded Kafka

Yauheni Lukashkin
SelectFrom
Published in
3 min readJan 13, 2022

--

Photo by Scott Graham on Unsplash

Hardly anybody develops isolated web applications these days. Almost all the time, developers integrate their applications with other applications and systems. We implement the integrations using different technologies and protocols. Depending on the situation, it can be REST, SOAP, queues, etc.

The question arises from how we can test the integrated application and how we can provide good unit test coverage. Unfortunately, there doesn’t exist a uniform approach to testing our code. Different systems require different testing solutions. All the solutions are based on the emulation of real systems. In this article, we will demonstrate how to emulate Kafka queues using an embedded Kafka approach.

First of all, we have to configure the main beans to listen to messages and send them. Demo examples are presented below in brief form.

  1. Create configurations for @KafkaListener:
@Bean
fun bookConsumerFactory(properties: KafkaProperties) =
DefaultKafkaConsumerFactory(
properties.buildConsumerProperties(),
StringDeserializer(),
JsonDeserializer(Book::class.java)
)
@Bean
fun bookKafkaListenerContainerFactory(
bookConsumerFactory: DefaultKafkaConsumerFactory<String, Book>
) = ConcurrentKafkaListenerContainerFactory<String, Book>()
.apply {
this.consumerFactory = bookConsumerFactory
}
@Service
class BookListener(val bookService: BookService) {
@KafkaListener(
topics = ["bookTopic"],
containerFactory = "bookKafkaListenerContainerFactory"
)
fun bookListener(book: Book?) {
bookService.process(book)
}
}

2. Create configurations for a message sender:

@Bean
fun userProducerFactory(properties: KafkaProperties) =
DefaultKafkaProducerFactory(
properties.buildProducerProperties(),
StringSerializer(),
JsonSerializer<User>()
)
@Bean
fun userKafkaTemplate(
userProducerFactory: DefaultKafkaProducerFactory<String, User>
) = KafkaTemplate(userProducerFactory)
@Component
class UserSender(val userKafkaTemplate: KafkaTemplate<String, User>{
fun send(user: User) {
kafkaTemplate.send("userTopic", user)
}
}

So, the main configurations are ready and we can start testing. We need to make sure that everything is correct.

There are 5 steps to creating small demonstration tests to check our configurations.

  1. Add a dependency:
testImplementation("org.springframework.kafka:spring-kafka-test")

2. Configure reverse configurations to listen to the sender and send messages to the listener:

@TestConfiguration
class TestConfiguration {

@Bean
fun userConsumer(
properties: KafkaProperties,
embeddedKafkaBroker: EmbeddedKafkaBroker
): Consumer<String, User> {
val configs = properties.buildConsumerProperties()
configs[ConsumerConfig.GROUP_ID_CONFIG] = "userGroup"
val consumerFactory = DefaultKafkaConsumerFactory(
configs,
StringDeserializer(),
JsonDeserializer(User::class.java)
)
val consumer: Consumer<String, User> =
consumerFactory.createConsumer()
embeddedKafkaBroker
.consumeFromAnEmbeddedTopic(consumer, "userTopic")
return consumer
}

@Bean
fun bookProducerFactory(properties: KafkaProperties) =
DefaultKafkaProducerFactory(
properties.buildProducerProperties(),
StringSerializer(),
JsonSerializer<Book>()
)

@Bean
fun bookKafkaTemplate(bookProducerFactory:
DefaultKafkaProducerFactory<String, Book>) =
KafkaTemplate(bookProducerFactory)
}

3. Create an abstract test class with common configurations:

@SpringBootTest
@EmbeddedKafka(
partitions = 1,
brokerProperties = [
"listeners=PLAINTEXT://localhost:9092",
"port=9092"
],
topics = ["bookTopic", "userTopic"]
)
@DirtiesContext
@ActiveProfiles("test")
@Import(TestConfiguration::class)
abstract class AbstractKafkaTest

4. Create a unit test to check the listener:

class BookListenerConfigurationTest : AbstractKafkaTest() {

@Autowired
lateinit var bookKafkaTemplate: KafkaTemplate<String, Book>

@MockBean
lateinit var bookService: BookService

@Test
fun testBookListener() {
val book = Book("name", "author")
doNothing()
.`when`(bookService)
.process(any(Book::class.java))

bookKafkaTemplate.send("bookTopic", book)

await().atMost(Durations.TEN_SECONDS).untilAsserted {
verify(bookService).process(any(Book::class.java))
}
}
}

5. Create a unit test to check the sender:

class UserSenderConfigurationTest : AbstractKafkaTest() {

@Autowired
lateinit var userConsumer: Consumer<String, User>

@Autowired
lateinit var userSender: UserSender

@Test
fun testUserSender() {
val user = User(1, "username")

userSender.send(user)

val responseMessage: User = KafkaTestUtils
.getSingleRecord(userConsumer, "userTopic")
.value()

assertEquals(user.id, responseMessage.id)
assertEquals(user.username, responseMessage.username)
}
}

That’s all! It’s a small example of unit testing for Spring Boot Kafka configurations. You can create different tests for your business logic.

As you can see, I have added the command “await” to test the listener. It’s a necessary operation because the sending action needs a little time, like in a real Kafka queue. You can add the ‘disabled’ annotation to skip the tests for each run. Also, you can add a new Spring test profile and run the tests when it is necessary.

Summary

We have tried to test Kafka configurations using the Embedded Kafka approach. The example demonstrates small configurations which are enough to start. This approach is one of the easiest to understand and to be implemented.

Try different technologies and improve your skills!

The world’s fastest cloud data warehouse:

When designing analytics experiences which are consumed by customers in production, even the smallest delays in query response times become critical. Learn how to achieve sub-second performance over TBs of data with Firebolt.

--

--