Knative Eventing with Kafka and Spring Cloud

Knative Eventing with Kafka and Spring Cloud

In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be a perfect match. Why? Let me invite you to read the article.

However, before we proceed you need to have a piece of knowledge about Knative basic concepts. Therefore, I suggest you read more about it. You can start with those two articles: Spring Boot on Knative and Microservices on Knative with GraalVM and Spring Boot. Of course, you can as well refer to the Knative documentation.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

Today we will base on the simple architecture that complies with an eventual consistency pattern. It is also known as a SAGA pattern. What exactly is that? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

knative-eventing-kafka-arch

Prerequisites

Before we start, we first need to install Knative on the Kubernetes cluster. I’m using a local instance of Kubernetes. But you may as well use any remote like GKE. However, the latest version of Knative requires a Kubernetes cluster v1.17 or later. Of course, we need to install both Serving and Eventing components. You may find the detailed installation instructions here.

That’s not all. We also need to install Kafka Eventing Broker. Here’s the link to the releases site. It includes several deployments and CRDs. You should pay special attention to the KafkaSource and KafkaBinding CRDs, since we will use them later.

Finally, we need to install Kafka cluster on Kubernetes. The recommended way to do that is with the Strimzi operator. Strimzi provides container images and operators for running Kafka on Kubernetes. It also comes with a set of CRDs for managing the Kafka cluster. Once you install it you may proceed to the next steps. I installed it in the kafka namespace. Here’s the list of running pods.

Step 1: Create and configure Knative Kafka Broker

In the first step, we are going to create a Kafka cluster using Strimzi CRD. To simplify, we won’t use any more advanced configuration settings. For example, I used ephemeral storage, which is not recommended in production. I set three instances of Zookeeper. I heard that Kafka is finally planning to resign from Zookeeper, but the current version still bases on it.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

The Knative broker allows to route events to different event sinks or consumers. We may use different broker providers. When an event is sent to the broker, all request metadata other than the CloudEvent data and context attributes are stripped away. The event delivery mechanism hides details of event routing from the event producer and consumer. The default broker class is MTChannelBasedBroker. We will change it into Kafka.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

In this article, we won’t directly use Kafka broker. Instead, we will use the KafkaSource object that takes events from a particular topic and sends them to the subscriber. If you want to use Broker you need to define Knative Trigger that refers to it.

The broker refers to the ConfigMap kafka-broker-config. The most important thing there is to set the address of the Kafka cluster. If you didn’t change anything in the default Kafka installation files it is ${KAFKA_CLUSTER_NAME}-kafka-bootstrap and port 9092.

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

Step 2: Create an application with Spring Cloud Stream

Let’s start with dependencies. Each of our applications uses an in-memory H2 database. They integrate with the database using the Spring Data JPA repository pattern. However, the most important thing is that they all base on Spring Cloud Stream to interact with Kafka topics. Spring Cloud Stream requires adding a concrete binder implementation to the classpath. That’s why we add the spring-cloud-starter-stream-kafka starter. For some time the Spring Cloud Stream programming model is built on top of Spring Cloud Function. Fortunately, we may easily export functions as an HTTP endpoint. This feature will be useful for us later. Currently, let’s just take a look at a list of included dependencies.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

Here’s the model class for the order-service. Once the order is created and saved in the database, the order-service sends it to the output Kafka topic.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
    @Id
    private Integer id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

We have three functions inside the order-service application main class. Two of them send events to the output destination continuously. On the other hand, the third of them confirm() wait for incoming events. We will discuss it later. The orderEventSupplier function represents the first step in our scenario. It creates a new order with test data, saves it in the database before sending.

@SpringBootApplication
@Slf4j
public class OrderSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderSagaApplication.class, args);
    }
    private static int num = 0;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
    }
    @Bean
    public Supplier<Order> orderConfirmSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> confirm() {
        return this::doConfirm;
    }
    @Autowired
    OrderRepository repository;
    private void doConfirm(Message<Order> msg) {
        Order o = msg.getPayload();
        log.info("Order received: {}", o);
        Order order = repository.findById(o.getId()).orElseThrow();
        if (order.getStatus() == OrderStatus.NEW) {
            order.setStatus(OrderStatus.IN_PROGRESS);
        } else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
            order.setStatus(OrderStatus.CONFIRMED);
            log.info("Order confirmed : {}", order);
            queue.offer(order);
        }
        repository.save(order);
    }
}

The name of the output Kafka topic is order-events. We set it for both Supplier functions using the Spring Cloud Stream bindings pattern. On the other hand, the Consumer function will not receive events directly from the Kafka topic. Why? Because it is a part of Knative Eventing process and I will explain it later in the step. For now, it is important to specify that only suppliers bind to the external destination using the spring.cloud.function.definition property.

spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier

Finally, we need to create the KafkaBinding that will inject Kafka bootstrap information into the application container (through the Knative Service). Then, the application can access it as the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-order-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: order-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Step 3: Create Kafka sources and Spring Cloud Function endpoints

Ok, we have already created a function responsible for generating and sending orders to the Kafka topic inside the order-service. So, now our goal is to receive and handle it on the customer-service and product-service sides. Our applications won’t directly listen for incoming events on the Kafka topic. To clarify, the basic Knative Eventing assumption is that the application don’t care how the events are published. It will just receive the events as an HTTP POST. And here comes KafkaSource object. It takes a list of input topics and a destination sink as parameters. In our case, it gets messages from order-events and send it as HTTP POST to the endpoint /customers/reserve of the customer-saga Knative Service.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Here’s an implementation of the customer-saga application. Thanks to Spring Cloud Function Web it automatically exports the reserve function as the HTTP endpoint with the path /reserve. Once, the consumer receives the event it performs the rest of business logic. If the input order has a NEW status the customer-saga creates reservation for a particular amount on the customer account. Then it sends event response to the order-saga. In other words, it first puts event into BlockingQueue. We also use a Supplier function for sending events to the Kafka topic. This time supplier function takes Order objects from BlockingQueue. Finally, if our application receives confirmation order from order-saga it commits the whole transaction by removing reserved amount.

@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerSagaApplication.class, args);
    }
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Autowired
    private CustomerRepository repository;
    
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
        log.info("Customer: {}", customer);
        if (order.getStatus() == OrderStatus.NEW) {
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
        }
        repository.save(customer);
    }
}

We can also set the base context path for HTTP endpoints using the spring.cloud.function.web.path property. So, the final path of our target endpoint is /customers/reserver. It is the same as the address defined in the KafkaSource definition.

spring.cloud.function.web.path: /customers

Here’s a configuration for the customer-saga inside the application.yml file.

spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier

The implementation of the business logic inside product-saga is pretty similar to the customer-saga. There is a single Consumer function that receives orders, and a single Supplier responsible for sending a response to the order-saga.

@SpringBootApplication
@Slf4j
public class ProductSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductSagaApplication.class, args);
    }
    @Autowired
    private ProductRepository repository;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Product product = repository.findById(order.getProductId()).orElseThrow();
        log.info("Product: {}", product);
        if (order.getStatus() == OrderStatus.NEW) {
            product.setReservedItems(product.getReservedItems() + order.getProductsCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            product.setReservedItems(product.getReservedItems() - order.getProductsCount());
        }
        repository.save(product);
    }
}

Step 4: Run applications on Knative Eventing and Kafka

Here’s a typical definition of the Knative Service for our applications. I’m using the dev.local option, but if you run a remote cluster you may replace it with your Docker username or any other repository account you have.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: order-saga
spec:
  template:
    spec:
      containers:
        - image: dev.local/order-saga

I use Skaffold together with Jib Maven Plugin for building and deploying applications on Knative. My target namespace is serverless. With the tail option you may observe logs after deployment. Of course, you may as well use the skaffold dev command.

$ skaffold run --tail -n serverless

After running all our applications on Knative eventing with Kafka we may verify a list of services using kn CLI.

knative-eventing-kafka-services

Then, we may verify that all KafkaBindings have been created. To do let’s just execute the following kubectl command.

The next important component is KafkaSource. We have already created three sources, a single one per application.

knative-eventing-kafka-sources

After starting, the order-saga application continuously generates and sends a new order each second. Both product-saga and customer-saga receive events and send responses. Thanks to that, the traffic is exchanged without any interruption. Except for the application pods we have three pods with Kafka sources.

Let’s just take a look at the application logs. Here are the logs from the order-saga. As you see it receives the order reservations from both customer-saga and product-saga. After that, it confirms the order and sends a response back to the order-events topic on Kafka. Basically, that’s what we wanted to achieve.

Final Thoughts

I hope you enjoyed this article. Knative is still a relatively new solution. I think we may expect some new and interesting features in the near future. With Knative Eventing you may use some other event sources than Kafka. Personally, I’m waiting for integration with RabbitMQ, which is under development now. For a full list of available solutions, you may refer to that site.

It is my third article about Knative and Spring Boot. You may expect more articles about Knative soon! Next time, I’m going to show you an example with another popular Java framework – Quarkus.

2 COMMENTS

comments user
sophia

You may find the detailed installation instructions here.
here link is return 404 now

    comments user
    piotr.minkowski

    Thanks! Fixed. Knative changed the link of the docs

Leave a Reply