Concurrency with Kafka and Spring Boot

Concurrency with Kafka and Spring Boot

This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single thread, the group of consumers utilizes multiple threads to consume messages. Although each consumer is single-threaded, the processing of records can leverage multiple threads. We will analyze how to achieve it with Spring Boot and Spring for Kafka.

The topic described today, concurrency with Kafka and Spring Boot, rather deals with the basic issues. If you are looking for something more advanced in this area, you can read some of my other articles. In that article, you can find information about Kafka Streams and the Spring Cloud Stream project. You can also read more about Kafka transactions with Spring Boot in the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the no-transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use three different tools in the exercise today. Of course, we will create the Spring Boot consumer app using the latest version of Spring Boot 3 and Java 19. In order to run Kafka locally, we will use Redpanda – a platform compatible with Kafka API. You can easily start and manage Redpanda with their CLI tool – rpk. If you want to install rpk on your laptop please follow the installation instructions available here.

Finally, we need a tool for load testing. I’m using the k6 tool and its extension for integration with Kafka. Of course, it is just a proposition, you can use any other solution you like. With k6 I’m able to generate and send a lot of messages to Kafka quickly. In order to use k6 you need to install it on your laptop. Here are the installation instructions. After that, you need to install the xk6-kafka extension. In the following documentation, you have a full list of the k6 extensions.

Introduction

For the purpose of this exercise, we will create a simple Spring Boot application that connects to Kafka and receives messages from a single topic. From the business logic perspective, it handles transactions between the accounts and stores inside an in-memory database. Here’s a list of dependencies we need to include in the Maven pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

Then, let’s see the configuration settings. Our app connects to the Kafka broker using the address set in the KAFKA_URL environment variable. It expects messages in JSON format. Therefore we need to set JsonDeserializer as a value deserializer. The incoming message is serialized to the pl.piomin.services.common.model.Order object. To make it work, we need to set the spring.json.value.default.type and spring.json.trusted.packages properties. The k6 tool won’t set a header with information containing the JSON target type, so we need to disable that feature on Spring for Kafka with the spring.json.use.type.headers property.

spring:
  application.name: no-transactions-service
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order"
        "[spring.json.trusted.packages]": "pl.piomin.services.common.model"
        "[spring.json.use.type.headers]": false
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Here’s the class representing incoming messages.

public class Order {

   private Long id;
   private Long sourceAccountId;
   private Long targetAccountId;
   private int amount;
   private String status;

   // GETTERS AND SETTERS...
}

The last thing we need to do is to enable Spring for Kafka and generate some test accounts for making transactions.

@SpringBootApplication
@EnableKafka
public class NoTransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(NoTransactionsService.class, args);
   }

   private static final Logger LOG = LoggerFactory
      .getLogger(NoTransactionsService.class);

   Random r = new Random();

   @Autowired
   AccountRepository repository;

   @PostConstruct
   public void init() {
      for (int i = 0; i < 1000; i++) {
         repository.save(new Account(r.nextInt(1000, 10000)));
      }
   }

}

Running Kafka using Redpanda

Once we successfully installed the rpk CLI we can easily run a single-node Kafka broker by executing the following command:

$ rpk container start

Here’s the result. As you see the address of my broker is localhost:51961. The port number is generated automatically, so yours will probably be different. To simplify the next actions, let’s just set it as the REDPANDA_BROKERS environment variable.

Once we created a broker we can create a topic. We will use the topic with the transactions name in our tests. In the first step, we make tests with a single partition.

$ rpk topic create transactions -p 1

Prepare Load Tests for Kafka

Our load test will generate and send orders in JSON format with random values. The k6 tool allows us to write tests in JavaScript. We need to use the k6 Kafka extension library. The address of the Kafka broker is retrieved from the KAFKA_URL environment variable. We are incrementing the order’s id field each time we generate a new message.

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

const writer = new Writer({
  brokers: [`${__ENV.KAFKA_URL}`],
  topic: "transactions",
});

const schemaRegistry = new SchemaRegistry();

export function setup() {
  return { index: 1 };
}

export default function (data) {
  writer.produce({
    messages: [
      {
        value: schemaRegistry.serialize({
          data: {
            id: data.index++,
            sourceAccountId: randomIntBetween(1, 1000),
            targetAccountId: randomIntBetween(1, 1000),
            amount: randomIntBetween(10, 50),
            status: "NEW"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

Before running the test we need to set the KAFKA_URL environment variable. Then we can use the k6 run command to generate and send a lot of messages.

$ k6 run load-test.js -u 1 -d 30s 

Scenario 1: Single-partition Topic Listener

Let’s start with the defaults. Our topic has just a single partition. We are creating the @KafkaListener just with the topic and consumer group names. Once the listener receives an incoming message it invokes the AccountService bean to process the order.

@Inject
AccountService service;
    
@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

Our Spring Boot Kafka app is prepared for concurrency. We will lock the Account entity during the transaction with the PESSIMISTIC_WRITE mode.

public interface AccountRepository extends CrudRepository<Account, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Account> findById(Long id);
}

Here’s the implementation of our AccountService bean for handling incoming orders. The process(...) method is @Transactional. In the first step, we find the source (1) and target (2) Account entity. Then we perform a transfer between the account if there are sufficient funds on the source account (3). I’m also simulating a delay just for test purposes (4). Finally, we can send a response asynchronously to another topic using the KafkaTemplate bean (5).

@Service
public class AccountService {

    private static final Logger LOG = LoggerFactory
            .getLogger(AccountService.class);
    private final Random RAND = new Random();

    KafkaTemplate<Long, Order> kafkaTemplate;
    AccountRepository repository;

    public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, 
                          AccountRepository repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    @Transactional
    public void process(Order order) {
        Account accountSource = repository
                .findById(order.getSourceAccountId())
                .orElseThrow(); // (1)

        Account accountTarget = repository
                .findById(order.getTargetAccountId())
                .orElseThrow(); // (2)

        if (accountSource.getBalance() >= order.getAmount()) { // (3)
            accountSource.setBalance(accountSource.getBalance() - order.getAmount());
            repository.save(accountSource);
            accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
            repository.save(accountTarget);
            order.setStatus("PROCESSED");
        } else {
            order.setStatus("FAILED");
        }

        try {
            Thread.sleep(RAND.nextLong(1, 20)); // (4)
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));

        // (5)
        CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate
           .send("orders", order.getId(), order);
        result.whenComplete((sr, ex) ->
                LOG.debug("Sent(key={},partition={}): {}",
                        sr.getProducerRecord().partition(),
                        sr.getProducerRecord().key(),
                        sr.getProducerRecord().value()));
    }

}

Let’s set the address of the Kafka broker in the KAFKA_URL environment variable and then start the app.

$ export KAFKA_URL=127.0.0.1:51961
$ mvn spring-boot:run

Let’s analyze what happens. Our listener is connecting to the transactions topic. It establishes just a single connection since there is a single partition.

In that case, we have just a single instance of our app running and a single thread responsible for handling messages. Let’s verify the current lag on the partition for our consumer group. As you see, the messages are processed very slowly. At first glance, you may be quite surprised.

Scenario 2: Multiple-partitions Topic Listener

Let’s analyze the next scenario. Now, the transactions topic consists of 10 partitions. We won’t change anything in the app code and configuration. We will just remove the previously created topic and create a new one with 10 partitions using the following commands:

$ rpk topic delete transactions
$ rpk topic create transaction -p 10

Once again, we are starting the app using the following Maven command:

$ mvn spring-boot:run

Let’s analyze the app logs. As you see, although we have 10 partitions there is still a single thread listening on them.

spring-boot-kafka-concurrency-single-partition

So, our situation hasn’t changed anymore. The app performance is exactly the same. However, now we can run another instance of our Spring Boot app. Once you do it you can take a look at the app logs. A new instance of the app takes 5 partitions.

In that case, a rebalancing occurs. The first instance of our Spring Boot holds 5 other partitions. Now the overall performance is twice as good as before. 

Of course, we can run more app instances. In that case, we can scale up to 10 instances since there are 10 partitions on the topic.

Scenario 3: Consumer Concurrency with Multiple Partitions

Let’s analyze another scenario. Now, we are enabling concurrency at the Kafka listener level. In order to achieve it, we need to set the concurrency field inside the @KafkaListener annotation. This parameter is still related to Kafka partitions. So, there is no sense to set the value higher than the number of partitions. In our case, there are 10 partitions – the same as in the previous scenario. 

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "10")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

After that, we can start the Spring Boot app. Let’s see what happens. As you see, we have 10 concurrent connections – each bound to a single thread.

spring-boot-kafka-concurrency-multi

In that case, the app performance for a single instance is around 10 times better than before. There are 10 concurrent threads, which process incoming messages.

spring-boot-kafka-concurrency-processing

However, if we run our load tests the lag on partitions is still large. Here’s the result after sending ~25k messages in 10 seconds.

spring-boot-kafka-concurrency-lag

Theoretically, we can scale up the number of instances to improve the overall performance. However, that approach won’t change anything. Why? Let’s run another one and take a look at the logs. Now, only 5 threads are still bound to the partitions. Five other threads are in the idle state. The overall performance of the system is not changed. 

Scenario 4: Process in Multiple Threads

Finally the last scenario. We will create a thread pool with the Java ExecutorService. We may still use the custom thread pool with the Kafka consumer concurrency feature as shown below (through the concurrency parameter). Each time the listener receives new messages it processes them in a separate thread.

@Service
public class NoTransactionsListener {

    private static final Logger LOG = LoggerFactory
            .getLogger(NoTransactionsListener.class);

    AccountService service;
    ExecutorService executorService = Executors.newFixedThreadPool(30);

    public NoTransactionsListener(AccountService service) {
        this.service = service;
    }

    @KafkaListener(
            id = "transactions",
            topics = "transactions",
            groupId = "a",
            concurrency = "3")
    public void listen(Order order) {
        LOG.info("Received: {}", order);
        executorService.submit(() -> service.process(order));
    }

}

In that case, one thing should be clarified. With the custom thread pool at the app level, we are losing message ordering within the single partition. The previous model guaranteed ordering, since we have a thread per partition. For our Spring Boot app, it is not important, because we are just processing messages independently.

Let’s start the app. There are 3 concurrent threads that receive messages from the partitions.

There are 30 threads for processing messages and 3 threads for listening to the partitions. Once the message is received in the consumer thread, it is handled by the worker threads.

spring-boot-kafka-concurrency-multi-threads

We can run other instances of our Spring Boot Kafka concurrency apps. I’ll run another two. The first instance grabs 4 partitions, while the next two instances 3.

Now, we can run again load test. It generated and sent ~85k messages to our Kafka broker (around 2.7k per second).

spring-boot-kafka-concurrency-k6

Let’s verify the lag within the consumer group using the rpk group command. The lag on partitions is not large. In fact, there are 90 threads within the three app instances that simultaneously are processing the incoming messages. But wait… does it mean that with 90 threads we are able to process 2.7k orders per second? We should also remember about a custom delay between 1 and 20 ms we added before (with the Thread.sleep method).

The lag looks really fine, although the app is not able to process all requests without a delay. That’s because the default ack mode for commit offset in Spring Kafka is BATCH. If we change it to the RECORD mode, which commits the offset when the listener returns after processing the record, we should get a more precise lag value. In order to override that option, we need to define the ConcurrentKafkaListenerContainerFactory bean as shown below.

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setConcurrency(3);
   factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
   return factory;
}

Let’s restart the app and make a load test once again. Now, the lag value is much closer to the reality.

Final Thoughts

Concurrency and performance are one of the most important things to consider when working with Kafka and Spring Boot. In this article, I wanted to explain to you some basics with simple examples. I hope it clarifies some concerns over Spring Kafka project usage.

15 COMMENTS

comments user
Yevhenii Andriienko

Thanks for the article.
I already want to implement it in my project.

comments user
Raicky

Two things to note:

Isn’t the last method bad practice since you’ve already acked the Kafka message but if your app dies for any reason when processing the messages in the executor, you’re essentially losing those records?

2nd, isn’t it better to make use of batch mode from spring cloud stream for these kinds of use cases?

    comments user
    piotr.minkowski

    No, because in BATCH ackMode the offset is also not committed after processing all messages, but after receiving them.
    Spring Cloud Stream doesn’t change anything. It just simplifies several things, but still uses Spring Kafka under the hood.

comments user
Jorge

Piotr,
Greetings from Argentina, I was involved on a huge application that it needed to ingest 300 millions of csv rows in a month and this article of kafka helped me a lot!

Regards!

    comments user
    piotr.minkowski

    Hi. Cool 🙂

comments user
Pushkar Kumar

Hi Piotr, It really made Concurrency & Spring Kafka very clear to me, Thanx for the article. In My use case I will also be handling approximately around 3-4k msg/sec, so Executor service with concurreny field seems to be a valid option to me.

Just wanted to clarify one thing:-

1. Does using Executor Servcie will hinder the offset mainatinence part, because I don’t want to lose any message maybe due to some downtime or exceptions while processing.

Thanks Again!

    comments user
    piotr.minkowski

    Hi!
    Thanks! Well, I don’t think that ExecutorService will hinder anything. Why? It just processes messages using thread pool – normal approach. Just a question to your app – if don’t have to process messages sequentially

comments user
Eduardo Silva

Very good material, thank you for your contribution.

    comments user
    piotr.minkowski

    Thanks!

comments user
Rod

Hello, just came across this article. One interesting thing about starting your own threads like you do there in the end of your article, is how to control offset commits. Am I wrong to assume that offsets would be committed in a total chaotic order, hence, if offset 8 could be committed before offset 1, it basically means you’d lose all messages with offsets 1 to 7?

In other words, it would be hard to guarantee delivery of any messages?

    comments user
    piotr.minkowski

    Definitely no. If you take a look at the scenario 4 once again, you will see that the concurrency at the consumer level is `3`. The thread pool you are asking about (`Executors.newFixedThreadPool(30)`) is processing the already “consumed” messages. Since Spring Kafka is responsible for commiting offsets, it has nothing to do with the offset commitment.

      comments user
      Ankit Sood

      Hi,
      I just saw this and had the same question as ROD. Based on the code:
      public void listen(Order order) {
      LOG.info(“Received: {}”, order);
      executorService.submit(() -> service.process(order));
      }

      We are consuming the message and then submitting it to the thread pool. In this case, we are having the pool size as 30. If all the events are processed in independent threads, then JVM can choose any thread to be process. This can result in a situation where message which was at offset x+10 get processed before the message at offset x.

      I am not sure how acknowledgement works in spring kafka but if we are setting the ACK at record level. This behaviour seems likely and if our application goes down at that moment, the kafka should start from latest offset.

      Please give more clarity on this.

        comments user
        piotr.minkowski

        Yes, there was also a similar question from another person in the article comments. By default, Spring Kafka handles offset commits. When we are processing the already received messages in the threap pool it does not have any impact on the offset commits, because it has already been done by the Spring Kafka.

comments user
Nikita

There are 2 problems with solution 4.

1. out of order processing possible
2. offset commits happen before processing of the messages happen, so in case of failure offset is already commited but messages will be never processed

Leave a Reply