Contents

Getting started with ZIO, part 2

Jakub Cichy

27 Jul 2023.12 minutes read

Getting started with ZIO, part 2 webp image

Welcome to the second part of "Getting started with ZIO" where we will continue explaining ZIO concepts as we proceed with the development of our application. If you haven't seen part 1, definitely start there! Let's look at the system overview.

system overview

After implementing the consumer and buffer components, it's time to tackle the broadcast next.

Concurrency

Our application server must handle clients who subscribe and unsubscribe from specific Kafka topics.

Therefore we need to handle concurrent subscriptions to the same resource. The Broadcast component will do just that. As always, when writing ZIO services, let's start with service definition.

trait Broadcast {
  // ZIO[Any, Throwable, TDequeue[Message]]
  def subscribe(topic: String): Task[TDequeue[Message]] 

  // ZIO[Any, Nothing, Unit]
  def unsubscribe(topic: String): UIO[Unit] 
}

This looks quite intuitive, right? The return value of subscribe is TDequeue[_], and as its name suggests, it's a queue that can only be dequeued. That fits exactly to our use case, where clients can only read messages. There is also an Enqueue[_] component that only allows writes. ZIO Queue which we used for buffer implementation is the combination of the following.

class Queue[A] extends Dequeue[A] with Enqueue[A]

But what's the T prefix in our example? It is an indicator that our dequeue is transactional, and we will find the T prefix for other transactional types as well in ZIO. Let's explain what they actually are in the example of Broadcast implementation, but first, a word about STM.

STM

To handle subscriptions correctly, we are going to use ZIO implementation of Software Transactional Memory, STM. When multiple threads are operating on the same state, common problems like race conditions and data inconsistency appear. STM brings the idea of database transactions to the hardware as a mechanism for concurrency control while operating on shared memory.

The idea is not new as it originated back in 1986 and was later popularized under the STM term in 1995 by Nir Shavit. Compared with ACID properties of database transactions, STM brings all except for durability, which makes no sense for in-memory operations.

ZIO and STM
ZIO implements this concept and introduces ZSTM[-R, +E, +A] type for transactional tasks, which looks and "feels" just like ZIO type while coding. It's monadic and, therefore, composable. Composing several updates to build up a transaction is probably familiar to Doobie users, where we often define transaction flow as a chain of ConnectionIO's.

There are transactional versions of the most common data structures like an array, set, map, or queue, where each operation (method) yields a value in the ZSTM context. By using them to model our shared state we can easily perform any updates without worrying about concurrency and handling that ourselves by locks or other means.

Personally, I value the STM concept and its implementations, like the one from ZIO , and I see a similarity with the garbage collection as it also solves non-trivial runtime memory management problems. Similarly to garbage collection, it introduces some overhead but I think it's worth it. I would like to see it become more integral to programming languages in general.

Let's examine the broadcast implementation.

case class BroadcastImpl(
    subscribers: TMap[String, Int],
    hubs: TMap[String, THub[Message]]
) extends Broadcast {

  override def subscribe(topic: String): Task[TDequeue[Message]] = {
    val subscribe: USTM[(Int, TDequeue[Message])] = for {
      subscribers <- incrementSubscribers(topic)
      hub         <- resolveHub(topic)
      dequeue     <- hub.subscribe
    } yield (subscribers, dequeue)

    subscribe.commit.flatMap { case (subscribers, dequeue) =>
      ZIO.logInfo(s"Topic $topic has $subscribers subscriber(s)").as(dequeue)
    }
  }

We have a state where we keep track of active subscribers for specific topics and store hubs for each. ZIO Hub and its transactional equivalent THub is a ready-to-use component that solves the problem of distributing values across multiple subscribers. I need to quote ZIO documentation here as it captures the topic's essence.

Queue represents the optimal solution to the problem of how to distribute values, and a Hub represents the optimal solution to the problem of how to broadcast them.

This is a perfect fit for our application. The val subscribe of type USTM (type aliases work the same as for other ZIO types here) is a description of a set of memory operations where we first update the active subscribers counter, incrementing it if there are subscribers for a topic already, or set it to 1 if not.

private def incrementSubscribers(topic: String): USTM[Int] =
  subscribers
    .updateWith(topic) {
      case Some(counter) => Some(counter + 1)
      case _             => Some(1)
    }
    .map(_.get)

Next, we have to resolve a hub, meaning we either want to use the existing one since it's per topic or create a new one.

private def resolveHub(topic: String): USTM[THub[Message]] =
  hubs.get(topic).flatMap {
    case Some(hub) => STM.succeed(hub)
    case _         => THub.unbounded[Message].tap(hubs.put(topic, _))
  }

Finally, we call hub.subscribe to get an instance of TDequeue that will get all the messages we publish to the hub. The subscribe value we create is a description of our update transaction hence to run it, we have to commit.

Calling that method on ZSTM tells the runtime to perform the operations and return the result, which is of type ZIO again. We executed our memory updates, and now we want to go back to the "regular" context where we can perform other, non-transactional operations.

The unsubscribe method will also be straightforward, decrementing the subscriber’s counter and removing the hub if the last subscriber leaves.

override def unsubscribe(topic: String): UIO[Unit] = {
  val unsubscribe = for {
    subscribers <- decrementSubscribers(topic)
    _           <- if (subscribers.isEmpty) hubs.delete(topic) else STM.unit
  } yield subscribers

  unsubscribe.commit.flatMap(subscribers =>
    ZIO.logInfo(s"Topic $topic has ${subscribers.getOrElse(0)} subscriber(s)")
  )
}

Fibers

Having subscription handling in place, we still need to feed messages into our hubs. They are stacked in the buffer, hence we need to consume them as part of the broadcasting flow. We will take advantage of the effectful nature of ZLayer to do that.

val live: URLayer[Buffer, Broadcast] =
  ZLayer {
    for {
      subscribers <- TMap.empty[String, Int].commit
      hubs        <- TMap.empty[String, THub[Message]].commit
      ...
    } yield BroadcastImpl(subscribers, hubs)

First, we need to instantiate TMaps for subscribers and hubs. Next, we need to poll elements from the buffer and publish them to the corresponding hub if such exists. That sounds like a possibly endless streaming flow, so let's use ZStream for that. Just before we exit for comprehension above, we define our stream.

_ <-
  ZStream
    .repeatZIO(Buffer.poll())
    .mapZIO {
      case Some(message) =>
        val publish = for {
          hub <- hubs.get(message.topic)
          _ <- hub match {
            case Some(hub) => hub.publish(message).unit
            case _         => STM.unit
          }
        } yield ()

        publish.commit
      case _ => ZIO.unit
    }
    .run(ZSink.drain)
    .forkDaemon

We are repeatedly polling from the buffer. If we get a message we publish it to the correct hub, which will take care of broadcasting it to all subscribers.

The publish value is also a ZSTM transactional memory operation that needs to be committed. Since we don't care about the results from our stream, and we don't return any values we run it with a sink that ignores inputs. Lastly, we call the forkDeamon on the ZIO effect, which spawns a fiber running the streaming flow in the background.

Fibers are lightweight execution units provided by ZIO that enable running tasks concurrently without needing low-level thread management. Creating fibers is cheap compared to JVM threads.

Another advantage of fibers is that they are composable and typed. ZIO provides a structured concurrency which simply means that fibers are by default scoped to their parents. This is a good approach in general since it's easier to reason about the fiber lifecycle then. In our example, if we just fork the fiber running the stream, its lifecycle will be bound to the fiber running our effectful layer construction code.

Since we want the process to run in the background until our application shuts down, we want its lifetime to be global. Using forkDeamon does just that by attaching the fiber to the global scope.

While it's not desired to use them explicitly wherever possible in our code, some mechanisms, like forking a daemon fiber, come in handy.

Lately, there have been a lot of things happening with virtual threads as they are now part of the latest JDK releases thanks to the development of Project Loom. SoftwareMill is working on a prototype of a structured concurrency library built on top of JVM virtual threads as part of the project ox. I encourage you to read the concept in Adam's text if that interests you.

WebSocket server

The last component in our application is the web socket server that will handle client subscriptions and send messages over the sockets. Service definition is not too exciting this time.

trait Server {
  val serve: Task[ExitCode]
}

For the actual implementation, we are going to use tapir, which is a library for implementing endpoints or sockets definitions in a type-safe way. The library allows us to describe the endpoint inputs , outputs and provide the actual logic using any effect we like, which is ZIO in our example.

Let's define the socket.

final case class ServerImpl(
    port: Int,
    broadcast: Broadcast,
    ec: ExecutionContext
) extends Server {

  private val kTailSocket: ZServerEndpoint[Any, ZioStreams & WebSockets] =
    endpoint.get
      .in("k-tail" / path[String]("topic"))
      .out(webSocketBodyRaw(ZioStreams))
      .zServerLogic(topic => kTailSocketLogic(topic))

  ...
}      

The socket definition is just a regular value. We define the path and single path parameter representing the topic a client wishes to connect to. By specifying the output as webSocketBodyRaw(ZioStreams), we tell the framework that this will be a web socket and that the logic implementation will use ZIO streams. That needs to be specified explicitly since tAPir supports many streaming libraries, like akka or fs2.

Why do we need streams for the implementation of web socket logic? If you think of it, the web socket is a great example of a streaming workflow since it's about processing a possibly infinite source of input frames and sending output frames back to the client.

The zServerLogic method expects us to provide a function with endpoint input as its argument and web socket flow as its return value. Let's examine the kTailSocketLogic.

  private def kTailSocketLogic(
      topic: String
  ): UIO[Stream[Throwable, WebSocketFrame] => Stream[Throwable, WebSocketFrame]] =
    ZIO.succeed { (in: Stream[Throwable, WebSocketFrame]) =>
       ... // out: Stream[Throwable, WebSocketFrame]
    }

First, notice the return value type. It's an effect that returns a function that translates one web socket frame stream (input) to another (output). Here's how we implement the out stream, which returns the output web socket frames.

val out = for {
  isClosed  <- Promise.make[Throwable, Unit]
  dequeue  <- broadcast.subscribe(topic)

  control = in.collectZIO {
    case WebSocketFrame.Ping(bytes) =>
      ZIO.succeed(WebSocketFrame.Pong(bytes))
    case close @ WebSocketFrame.Close(_, _) =>
      isClosed
        .succeed(())
        .zipLeft(broadcast.unsubscribe(topic))
       .as(close)
  }

  messages = ZStream
    .fromTQueue(dequeue)
    .map(msg => WebSocketFrame.text(msg.toJson))

  frames = messages.merge(control).interruptWhen(isClosed)
} yield frames

It's an effectful stream description that we create in for comprehension. First, we define a promise which will be completed if the client closes a connection. We will use that signal to interrupt our stream so that server connections don't hang. Next, we call broadcast.subscribe(topic) and get the message dequeue in return which will be used as a source of the output stream.

Web socket connections should fulfill some of the protocol requirements like handling the "heartbeat". The client can occasionally send a ping frame with some bytes payload to the server, which should respond with pong and exact same bytes.

The control value is a separate stream description that reads the frames sent by the client and by using collectZIO with a partial function, transforms selected input frames to output frames. For WebSocketFrame.Ping(bytes) we respond with WebSocketFrame.Pong(bytes) since the bytes payload needs to be the same.

When the client closes the connection, the WebSocketFrame.Close will be received. In response to that, we complete our isClosed promise and unsubscribe from the broadcast. We return the same close frame, which tells the client that our server handled the closing of the connection correctly.

Last, but not least we have to create a stream of frames containing the actual Kafka messages based on the dequeue we got from the broadcast subscription. The ZStream comes with plenty of constructors which let us create streams, for example, from simple iterable collections or queues.

We use the fromTDequeue, which is a constructor for the transactional version of dequeue, and map messages to frames with JSON text payload. The toJson method invoked on the message object is a syntax from zio-json library. It expects an implicit encoder instance which we define in the Message companion object. The bytes of the key and value of Kafka messages should be returned as base64 encoded strings hence we provide a custom encoder for Array[Byte].

object Message {
  implicit val bytesEncoder: JsonEncoder[Array[Byte]] =
    JsonEncoder.string.contramap(Base64.getEncoder.encodeToString)
  implicit val encoder: JsonEncoder[Message] =
    DeriveJsonEncoder.gen[Message]
}

Our final out stream has to both handle control frames and produce frames with messages. Since both control and messages streams are of the same type, we can just merge them to create the final output stream. We interrupt the stream when the isClosed signal informs us that the connection is closed.

Having socket description in place, we have to run the actual server. TAPir provides multiple server interpreters, which translate the endpoint definitions to concrete server implementations. There is a dedicated ZIO library for HTTP, and it has a tAPIr interpreter as well, however, at the moment it does not support web sockets. We are going to use http4s interpreter since it supports both ZIO and web sockets.

private val webSocketRoutes: WebSocketBuilder2[Task] => HttpRoutes[Task] =
  ZHttp4sServerInterpreter().fromWebSocket(kTailSocket).toRoutes

override val serve: Task[ExitCode] =
  BlazeServerBuilder[Task]
    .withExecutionContext(ec)
    .bindHttp(port, "localhost")
    .withHttpWebSocketApp(wsb => Router("/" -> webSocketRoutes(wsb)).orNotFound)
    .serve
    .compile
    .drain
    .exitCode

Using the interpreter, we translate our socket endpoint to HttpRoutes from http4s. The serve method starts the blaze server, and drain's it as long as the app runs.

Configuration

Let's look at the layer implementation for our server.

object ServerImpl {
  val live: URLayer[KTailConfig & Broadcast, Server] =
    ZLayer {
      for {
        port      <- ZIO.serviceWith[KTailConfig](_.port)
        broadcast <- ZIO.service[Broadcast]
        executor  <- ZIO.executor
      } yield ServerImpl(port, broadcast, executor.asExecutionContext)
    }
}

We need a port number that comes from the configuration. In the previous part, we defined KTailConfig as a case class with all configuration properties of our application. ZIO offers integration with most config provider libraries, like typesafe for HOCON files. Our configuration file is simple and has only one nesting level.

k-tail {
  port: 8080
  port: ${?K_TAIL_PORT}

  ...
}

The configuration is read automatically with the following code.

import zio.config.typesafe.*

case class KTailConfig(...)

object KTailConfig {
  private val config: Config[KTailConfig] = 
    deriveConfig[KTailConfig].nested("k-tail")

  val live: Layer[Config.Error, KTailConfig] =
    ZLayer.fromZIO(
      ZIO.config[KTailConfig](config).tap { config =>
        ZIO.logInfo(...) // log config
      }
    )
}

The magical zio.config.typesafe.* imports bring in the implicit HOCON configuration reader, and translation to the case class is done with typeclass derivation. A lot of macros do the work but for the user it's pretty simple.

Summary

In the second part of the series, we have covered the basics of concurrency and the mechanisms to handle it with ZIO. We used that together with powerful building blocks like hubs to implement a broadcast that is not so trivial in terms of functionality. We also implemented a complete web socket flow with proper control handling. The code from this part is on Github, as always.

In part 3, we are going to test the entire solution with zio-test and test containers. Who knows, maybe we can come up with some improvements to the implementation based on the results. Stay tuned!

Reviewed by: Bartłomiej Żyliński

Blog Comments powered by Disqus.