Contents

Integrating Java APIs with Cats Effect

Krzysztof Atłasik

10 Nov 2022.9 minutes read

Integrating Java APIs with Cats Effect webp image

Cats Effect is an excellent and performant framework for doing asynchronous computations in a functional manner. It provides an IO monadic datatype for controlling the actions of your application. Effects wrapped in instances of the IO monad are lazily evaluated. They can be conveniently composed together with the flatMap function creating bigger programs and then evaluated in a resource-safe runtime.

Cats Effect is not only a very popular library utilizing the power of functional programming. It is also the whole ecosystem of related libraries allowing for seamless integration of many routine tasks, like communication with databases, creating HTTP servers or clients, parsing JSON and so on. Some libraries are community-managed and some are developed under the guidance of Typelevel, an organisation behind Cats Effect’s maintenance.

Moreover, since Scala is a language that compiles to bytecode (there’s also Scala.js and Native, but I will focus on JVM in this article), our programs can take advantage of any Java library. The challenge is that Java’s libraries are not “cats-aware”, very often they are synchronous and blocking and sometimes expose clunky interfaces.

In this article, I will integrate the basic functionalities of Google’s PubSub client library for Java to work with Cats Effect 3. There are already many projects implementing interop with various GPC resources and we should in most cases prefer w maintained libraries that our own solution. The point of this article is not to create a full-fledged, cats-based PubSub client, but rather to explore tools provided by Cats Effect.

All code examples are written in Scala 3.

Blocking the future

First I will integrate the publishing of messages. Impure implementation of the publish function using PubSub Publisher might look like this:

val publisher = Publisher.newBuilder(“my topic name”).build()

def publish(message: String): String = {
   val message = PubsubMessage.newBuilder()
       .setData(ByteString.copyFromUtf8(message))
       .build()
   val future = publisher.publish(message)
   future.get()
 }

As a first step, we can simply wrap all effectful function calls into the IO datatype with IO.apply and compose it with for-comprehension:

def publish(message: String): IO[String] = for {
   message <- IO(
     PubsubMessage.newBuilder()
       .setData(ByteString.copyFromUtf8(message))
       .build()
   )
   future = publisher.publish(message)
   messageId <- IO.blocking(future.get())
 } yield messageId

Method publish returns ApiFuture, which is just an override of plain old Future from java.util.concurrent. If API returned CompetableFuture we could simply call IO.fromCompletableFuture to create IO, but there’s no equivalent for older Future. Therefore to obtain the outcome of the publishing we need to invoke get, which blocks the current thread until a response arrives. This is an unsafe operation on the JVM because it blocks a thread, that could otherwise be reused for other computations. This is especially dangerous if we’re blocking on computation-oriented thread pool with a low and fixed number of threads.

Looking ahead, this operation might not be as wasteful when virtual threads finally arrive on JVM, but for now, Project Loom is still not production ready.

The usual approach for dealing with blocking actions is just running them on a separate thread pool, that’s usually unbounded and caching (reusing threads). To give a hint for Cats Effect runtime to perform an action on the blocking pool, we use a special combinator IO.blocking.

When the fiber (green thread implemented in Cats Effect runtime) is cancelled, it will try to interrupt all actions running in the blocking call. Interrupts are ignored by Java’s Future (they can be interrupted with the cancel method). Even though nobody will be waiting for the result of the operation it will still be pending. If we want to mark the operation as not interruptible we can use another combinator IO.uninterruptible. There’s also interruptible which is basically an alias for blocking and interruptibleMany.
This variant retries the attempt to interrupt the underlying thread several times before it gives up.

Integrating asynchronous callbacks

Method publish from google’s library is returning an instance of the extended Future interface called ApiFuture, which allows us to attach a callback that will be evaluated when the pending action is over. With the helper method com.google.api.core.ApiFutures.addCallback we can set up two callback functions onSuccess and onFailure. Cats Effect has a special tool for integrating callback-based APIs: methods IO.async and IO.async_ from the Async typeclass.

First, I’ll discuss IO.async_ (with an underscore at the end). It takes one argument: a method of signature: (Either[Throwable, A] => Unit) => Unit. It looks scary, but only at first glance. Either[Throwable, A] => Unit is just a function that accepts Either and returns Unit. We need to invoke it in the callback. If the action succeeded we pass Right with some value and if it failed we need to pass Left with Throwable.

val result: IO[String] = IO.async_[String] { (cb: Either[Throwable, String] => Unit) =>
   ApiFutures.addCallback(
     future,
     new ApiFutureCallback[String]() {
       override def onFailure(error: Throwable): Unit = cb(error.asLeft)

       override def onSuccess(messageId: String): Unit =
         cb(messageId.asRight)
     }
   )
 }

In the example above when the action fails and the onFailure callback is called, we call cb passing Left containing exception. This will create a failed IO with that error.
When onSuccess is called we invoke cb passing Right containing the id of the message, which was just sent to PubSub. Only the first call of cb has any effect. Any further calls of cb are silently ignored. It is ok, in our case since we create attach a new callback for every created Future.

There’s also IO.async (without the underscore). This time it takes a function with an even more convoluted signature (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]. The difference is that the latter variant allows us to optionally pass IO that will be executed when a fiber invoking IO.async is cancelled (thus Option[IO[Unit]]).

We can utilize that finalizer to pass an action invoking cancel on Future. Method cancel takes a boolean parameter determining whether Future will be only cancelled if it wasn’t yet scheduled or it should actively try to interrupt the thread running the Future. I will use the cancel(false) variant since Java’s interrupt logic is not very reliable.

The other difference is that IO.async expects that we’ll suspend (wrap in IO) action for creating a callback.

IO.async[String] { (callback: Either[Throwable, String] => Unit) =>
  IO {
    ApiFutures.addCallback(
     future,
     new ApiFutureCallback[String]() {
       override def onFailure(error: Throwable): Unit =
         callback(error.asLeft)
       override def onSuccess(messageId: String): Unit =
         callback(messageId.asRight)
       }
    )

     Some(IO(future.cancel(false))) //finalizer
   }
 }

I will use the last approach based on IO.async for my final implementation. Since Publisher from PubSub API allocates some resources, I will wrap its creation and shutdown into Resource from Cats Effect. I will also create another resource for the executor that will be passed as the second argument to addCallback.

import com.google.cloud.pubsub.v1.{Publisher => GooglePublisher}
import com.google.pubsub.v1.PubsubMessage
import cats.effect.IO
import cats.effect.Resource
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import com.google.api.core.ApiFutures
import com.google.api.core.ApiFutureCallback
import com.google.protobuf.ByteString
import cats.syntax.all.*

final class Publisher private (
   internal: GooglePublisher,
   executor: ExecutorService
):

 def publish(message: String): IO[String] = for {
   pubSubMessage <- IO(
     PubsubMessage
       .newBuilder()
       .setData(ByteString.copyFromUtf8(message))
       .build()
   )
   future = internal.publish(pubSubMessage)
   messageId <- IO.async[String] {
     (callback: Either[Throwable, String] => Unit) =>
       IO {
         ApiFutures.addCallback(
           future,
           new ApiFutureCallback[String]() {
             override def onFailure(error: Throwable): Unit =
               callback(error.asLeft)

             override def onSuccess(messageId: String): Unit =
               callback(messageId.asRight)
           },
           executor
         )

         Some(IO(future.cancel(false)))
       }
   }
 } yield messageId

object Publisher:
 def create(topicName: String): Resource[IO, Publisher] = for {
   executor <- Resource.make(
     IO(Executors.newCachedThreadPool())
   )(ec => IO(ec.shutdown()) *> IO(ec.awaitTermination(10, TimeUnit.SECONDS)))
   googlePublisher <- Resource.make(
     IO(GooglePublisher.newBuilder(topicName).build())
   )(p =>
     IO(p.shutdown()) *> IO.interruptible(
       p.awaitTermination(10, TimeUnit.SECONDS)
     )
   )
 } yield Publisher(googlePublisher, executor)

Executing unsafe actions with Dispatcher

In the previous paragraph we implemented an integration for publishing API, now let’s try to do the same with consumer API.

Creating an impure subscriber is dead easy:

Subscriber
  .newBuilder(
    “my-subscription-name”, 
    (message, consumer) => {
       // do something with the message
       consumer.ack()
     }
  ).build()

The builder expects two arguments: the name of the subscription and an instance of MessageReceiver, which is just a functional interface of type PubsubMessage message, AckReplyConsumer consumer) => void working as the callback. If it’s again a callback-based API we could simply use IO.async like last time, right?

The problem is that callback passed to the subscriber will be invoked every time a new message arrives and a limitation of async is that only the first call has any effect.
Fortunately, we can use another approach. Cats Effect has a Queue datatype that we can use as a buffer of incoming messages. We can insert messages into the queue with offer(a: A): F[Unit]. Later we could check if the queue stores any messages using tryTake: F[Option[A]].
I will go on with that approach. First, I need to create a case class to keep the payload of the message and an IO to wrap the effect of confirming the receipt of the message.

final case class PubSubMessage(id: String, payload: String, ack: IO[Unit])

My first naive implementation of the callback looks like this:

def createReceiver(queue: Queue[IO, PubsubMessage]): MessageReceiver = (message, consumer) => {
   val processMessage: IO[Unit] = for {
     payload <- IO(message.getData().toStringUtf8())
     _ <- queue.offer(
       PubSubMessage(message.getMessageId(), payload, IO(consumer.ack()))
     )
   } yield ()
   ??? //how should I call processMessage?
 }

The issue is that processMessage returns IO[Unit], which means it is just a lazy description of the computation. I’d need to explicitly run it to put the message into the queue. One way to achieve this would be running unsafeRunSync on the IO, but then I would break the rule stating that calling this method by user-land code should be avoided.

Instead, we can use another tool tailor-made for this situation. Dispatcher can help us evaluate effects wrapped in IO from the impure callback. Dispatcher offers many methods for evaluating IO, but for our use case I will use unsafeRunSync(io: IO[A]).

def createReceiver(dispatcher: Dispatcher[IO], queue: Queue[IO, PubsubMessage]): MessageReceiver = (message, consumer) => {
   val processMessage: IO[Unit] = for {
     payload <- IO(message.getData().toStringUtf8())
     _ <- queue.offer(
       PubSubMessage(message.getMessageId(), payload, IO(consumer.ack()))
     )
   } yield ()
   dispatcher.unsafeRunSync(processMessage)
}

Dispatcher is based on Supervisor which is a resource responsible for managing the fiber lifecycle. When Dispatcher finalizes all underlying fibers are cancelled.
Another resource we need to take care of is Subscriber from PubSub API. We’ll wrap it in Resource to ensure it’s properly terminated during the application’s shutdown.

I also need a way to expose messages buffered in the queue. I will create two kinds of methods: the first one will return IO[Option[PubSubMessage] returning Some only if there’s a message available and Stream[IO, PubSubMessage] will be returning fs2 Stream.

Now I can finalize the implementation of my consumer:

final class Consumer private (queue: Queue[IO, PubSubMessage]):
 def poll: IO[Option[PubSubMessage]] = queue.tryTake

 val stream: Stream[IO, PubSubMessage] = Stream.fromQueueUnterminated(queue)

object Consumer:

 private def receiver(
     dispatcher: Dispatcher[IO],
     queue: Queue[IO, PubSubMessage]
 ): MessageReceiver = (message, consumer) => {
   val p = for {
     payload <- IO(message.getData().toStringUtf8())
     _ <- queue.offer(
       PubSubMessage(message.getMessageId(), payload, IO(consumer.ack()))
     )
   } yield ()
   dispatcher.unsafeRunSync(p)
 }

 def create(subscriptionName: String) = for {
   queue <- Resource.eval(Queue.unbounded[IO, PubSubMessage])
   dispatcher <- Dispatcher[IO]
   subscriber <- Resource.eval(
     IO(
       Subscriber
         .newBuilder(subscriptionName, receiver(dispatcher, queue))
         .build()
     )
   )
   _ <- Resource.make(
     IO(subscriber.startAsync())
   )(_ => IO(subscriber.stopAsync()))
 } yield Consumer(queue)

Wrapping up

Hopefully, this article will be helpful if you had any doubts about how to integrate impure APIs with the Cats Effect. You can check the complete code of my cats-friendly PubSub API with usage examples on GitHub. Take care!

Blog Comments powered by Disqus.