In the previous post we have learned how implement the SNS/SQS based message flow using pass4s. The examples only shown how to exchange basic string messages. In this one I’ll show you how to exchange more complex types and serialize them in transport. Examples shown in this post are also available in https://github.com/majk-p/pass4s-playground/ so feel free to clone and play around.

Pass4s is elastic in terms of message encoding, it provides JSON and XML serialization mechanisms out of the box.

JSON

The JSON serialization is implemented with Circe and can be included in your project by adding a dependency on "com.ocadotechnology" %% "pass4s-circe" % version. Let’s create a simple domain model that we’d like to use for our message.

import io.circe.Codec
import io.circe.generic.semiauto._

final case class DomainMessage(description: String, value: Long)

object DomainMessage {
  implicit val codec: Codec[DomainMessage] = deriveCodec
}

Any data type is okay as long as you can provide the io.circe.Codec. To be precise the consumer will only need Decoder while the producer needs Encoder.

Consumer

Now that we have a data type we can adapt our existing String based consumer to accept our model. First a short refresher how our current consumer looks like:

sqsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  IO.println(s"Processor listening for messages on $sqsSource") *>
    broker
      .consumer(sqsSource)
      .consume(message => IO.println(s"Received message: $message"))
      .background
      .void
      .use(_ => IO.never)

}

Let’s use MessageProcessor and circe syntax to enrich our message processing with JSON capabilities

import com.ocadotechnology.pass4s.circe.syntax._
import com.ocadotechnology.pass4s.extra.MessageProcessor
/*...*/
sqsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  val processor =
    MessageProcessor
      .init[IO]
      .effectful
      .bindBroker(broker)
      .enrich(_.asJsonConsumer[DomainMessage])

  IO.println(s"Processor listening for messages on $sqsSource") *>
    processor
      .handle(sqsSource) { message =>
        IO.println(s"Received message: $message")
      }
      .use(_ => IO.never)

}

The biggest difference here is the introduction of MessageProcessor. Think of it as a way to apply enrichments to the consumer. It’s usage resembles a builder pattern. We first initialize the builder over a certain effect, in this case IO[_]. After that you can either call effectful like in the snippet or transacted if you wish to perform a transaction when handling a message. The next step is to bind it to a broker using bindBroker so that the processor has access to an existing consumer.

The last call yields MessageHandler instance that provides def enrich[B](f: Consumer[F, A] => Consumer[F, B]) method which allows you to add custom behavior to the consumer. At this point you can add things like logging, tracing or, like in this case, deserialization. The call _.asJsonConsumer[DomainMessage] transforms Consumer[F, Message.Payload] to Consumer[F, DomainMessage].

The MessageHandler provides the handle method:

def handle[R >: P](source: Source[R])(process: A => T[Unit])

This way we provide the logic for handling messages and bind it to a certain source. This way we can provide a common transformation for the consumer first, and then implement logic specific to the data source. The method returns Resource[F, Unit] so we no longer have to call .background.void. Thus this is how our source and logic binding happens:

processor
  .handle(sqsSource) { message =>
    IO.println(s"Received message: $message")
  }

Simple as that, you can now process the incoming JSON messages and model complex data.

Producer

Now we know how to consume JSON messages, but how to produce them? Again a quick recall of how it looked for non-serialized messages:

snsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  val message = Message(Message.Payload("hello world!", Map()), snsDestination)

  IO.println(s"Sending message $message to $snsDestination") *>
    broker.sender.sendOne(message) *>
    IO.println("Sent, exiting!").as(ExitCode.Success)
}

Now let’s try producing DomainMessage that can be processed by our customized consumer:

import com.ocadotechnology.pass4s.circe.syntax._
snsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  val domainMessageSender = broker.sender.asJsonSender[DomainMessage](snsDestination)

  val domainMessage = DomainMessage("hello world!", 10)

  IO.println(s"Sending message: $domainMessage to $snsDestination") *>
    domainMessageSender.sendOne(domainMessage) *>
    IO.println("Sent, exiting!").as(ExitCode.Success)
}

Yes, it’s that simple. The only thing you need to do is to add an import and apply the serialization using .asJsonSender[DomainMessage] syntax.

Check out https://github.com/majk-p/pass4s-playground/tree/master/json for complete code for the above examples.

XML

The XML serialization is implemented with Phobos, you can add it by including following dependency "com.ocadotechnology" %% "pass4s-phobos" % version.

First let’s model the data:

import ru.tinkoff.phobos.decoding._
import ru.tinkoff.phobos.encoding._
import ru.tinkoff.phobos.syntax._
import ru.tinkoff.phobos.derivation.semiauto._

final case class XmlMessage(description: String, value: Long, rows: List[String])

object XmlMessage {
  implicit val xmlEncoder: XmlEncoder[XmlMessage] = deriveXmlEncoder("xmlMessage")
  implicit val xmlDecoder: XmlDecoder[XmlMessage] = deriveXmlDecoder("xmlMessage")
}

The important thing to notice here is that we had to provide XmlEncoder for producer and XmDecoder for consumer.

Consumer

Applying the deserialization on consumer is almost identical as for JSON, just import the right syntax and enrich the consumer using MessageProcessor

import com.ocadotechnology.pass4s.phobos.syntax._

/**/

val processor =
  MessageProcessor
    .init[IO]
    .effectful
    .bindBroker(broker)
    .enrich(_.map(_.text))
    .enrich(
      _.mapM(rawText => IO.println(s"Raw message text: $rawText").as(rawText))
    )
    .enrich(_.asXmlConsumer[XmlMessage])

Producer

Producer is similar, just import the syntax and apply it on the sender

import com.ocadotechnology.pass4s.phobos.syntax._
snsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  val domainMessageSender = broker.sender.asXmlSender[XmlMessage](snsDestination)

  val domainMessage = XmlMessage("hello world!", 10, List("lorem", "ipsum", "dolor"))

  IO.println(s"Sending message: $domainMessage to $snsDestination") *>
    domainMessageSender.sendOne(domainMessage) *>
    IO.println("Sent, exiting!").as(ExitCode.Success)
}

Again the full code example can be found in https://github.com/majk-p/pass4s-playground/tree/master/xml

Summary

Today we have learned how to use pass4s with rich data structures, serializing them in two ways to both JSON and XML. On the way we have used MessageProcessor to simplify building rich message consumers.

In the next post I’ll show you how to proxy the messages through S3 to overcome message size limitations.