The pass4s provides a functional abstraction for messaging in Scala. It provides implementations for AWS SQS/SNS and ActiveMQ.

In this post I want to show you the basic usage of the library to implement the messaging on top of SNS/SQS. All the examples in this post can be found in https://github.com/majk-p/pass4s-playground.

Localstack

Before we see the Scala code, let’s setup localstack, set up a SNS (Simple Notification Service) topic, SQS (Simple Queue Service) queue and a subscription between them. This post assumes you have a basic understanding of SNS/SQS setup. If you need a refresher please refer to this part of AWS docs: https://docs.aws.amazon.com/sns/latest/dg/subscribe-sqs-queue-to-sns-topic.html

I’ll try to make this example a straightforward one so you should be able to get this intuitively. Let’s get started.

docker-compose

For localstack we’ll use docker-compose. Our setup consists of two containers, both based on localstack/localstack image. The first on is the localstack service itself, configured for sns and sqs.

The second one is the client. It starts with a delay and attempts to create the topic, queue and a subscription.

As a refresher, the topic is where you send the outgoing messages. The queue is where you read the messages from, and the subscription is what glues the two together on the AWS side.

The full example looks as shown below:

version: "3.8"

services:
  localstack:
    container_name: localstack_main
    image: localstack/localstack
    hostname: localhost.localstack.cloud
    ports:
      - "127.0.0.1:4566:4566"            # LocalStack Gateway
      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
    environment:
      - DOCKER_HOST=unix:///var/run/docker.sock
      - SERVICES=sqs,sns
      - EAGER_SERVICE_LOADING=1
      - SKIP_SSL_CERT_DOWNLOAD=1
      - HOSTNAME_EXTERNAL=localhost.localstack.cloud
    volumes:
      - "/tmp/localstack:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"
  
  setup-resources:
    image: localstack/localstack
    environment:
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=AWSSECRET
      - AWS_DEFAULT_REGION=eu-west-2
    entrypoint: /bin/sh -c
    command: >
      "
        sleep 15
        alias aws='aws --endpoint-url http://localstack:4566'
        # Executing SNS
        aws sns create-topic --name local_sns
        # Executing SQS
        aws sqs create-queue --queue-name local_queue
        # Subscribing to SNS to SQS
        aws sns subscribe --attributes 'RawMessageDelivery=true' --topic-arn arn:aws:sns:eu-west-2:000000000000:local_sns --protocol sqs --notification-endpoint arn:aws:sqs:eu-west-2:000000000000:local_queue
        aws sqs get-queue-url --queue-name local_queue
      "      
    depends_on:
      - localstack

Please notice the --attributes 'RawMessageDelivery=true' in aws sns subscribe line. The pass4s library assumes we are operating on raw messages.

Implementation

Now that we have the infrastructure in place, let’s implement the producer and consumer. Make sure to run the docker-compose up at this point and notice the ARNs and URLs of generated resources:

localstack_main    | 2023-02-25T15:36:31.073  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sns.CreateTopic => 200
setup-resources_1  | {
setup-resources_1  |     "TopicArn": "arn:aws:sns:eu-west-2:000000000000:local_sns"
setup-resources_1  | }
localstack_main    | 2023-02-25T15:36:31.555 DEBUG --- [   asgi_gw_0] l.services.sqs.provider    : creating queue key=local_queue attributes=None tags=None
localstack_main    | 2023-02-25T15:36:31.556  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sqs.CreateQueue => 200
setup-resources_1  | {
setup-resources_1  |     "QueueUrl": "http://localstack:4566/000000000000/local_queue"
setup-resources_1  | }
localstack_main    | 2023-02-25T15:36:32.066  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sns.Subscribe => 200
setup-resources_1  | {
setup-resources_1  |     "SubscriptionArn": "arn:aws:sns:eu-west-2:000000000000:local_sns:2e4211f2-43cb-4b24-91cd-0da0bda8bea6"
setup-resources_1  | }
localstack_main    | 2023-02-25T15:36:32.578  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS sqs.GetQueueUrl => 200
setup-resources_1  | {
setup-resources_1  |     "QueueUrl": "http://localstack:4566/000000000000/local_queue"
setup-resources_1  | }

Consumer

Here’s how you can implement a fully working consumer:

//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.2.2"
//> using lib "com.ocadotechnology::pass4s-core:0.2.2"
//> using lib "com.ocadotechnology::pass4s-high:0.2.2"
//> using lib "com.ocadotechnology::pass4s-connector-sqs:0.2.2"
//> using lib "org.typelevel::log4cats-noop:2.5.0"

package net.michalp.pass4splayground

import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sqs.SqsConnector
import com.ocadotechnology.pass4s.connectors.sqs.SqsEndpoint
import com.ocadotechnology.pass4s.connectors.sqs.SqsSource
import com.ocadotechnology.pass4s.connectors.sqs.SqsUrl
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

object BaseConsumer extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

    val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET")
    val localstackURI = new URI("http://localhost:4566")
    val sqsSource = SqsEndpoint(SqsUrl("http://localhost:4566/000000000000/local_queue"))

    val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
    val sqsConnector =
      SqsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

    sqsConnector.use { connector =>
      val broker = Broker.fromConnector(connector)

      IO.println(s"Processor listening for messages on $sqsSource") *>
        broker
          .consumer(sqsSource)
          .consume(message => IO.println(s"Received message: $message"))
          .background
          .void
          .use(_ => IO.never)
    }
  }
}

You can find the code in https://github.com/majk-p/pass4s-playground/blob/master/basic/BaseConsumer.scala, if you want to test it locally, you can run it using scala-cli

scala-cli run https://raw.githubusercontent.com/majk-p/pass4s-playground/master/basic/BaseConsumer.scala

Now let’s discuss the most important parts of the code.

We start the implementation by initializing the configuration. Since we are using localstack, we need to provide the credentials and URL override. If you want to use it in production, the SqsConnector provides multiple constructors, some of which don’t require the endpoint override.

val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET")
val localstackURI = new URI("http://localhost:4566")
val sqsSource = SqsEndpoint(SqsUrl("http://localhost:4566/000000000000/local_queue"))

val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
val sqsConnector =
  SqsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

The sqsConnector is a Resource[IO,SqsConnector.SqsConnector[IO]], so we’ll have to acquire it.

sqsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  IO.println(s"Processor listening for messages on $sqsSource") *>
    broker
      .consumer(sqsSource)
      .consume(message => IO.println(s"Received message: $message"))
      .background
      .void
      .use(_ => IO.never)
}

After acquiring the resource, we create a broker instance. Think of the Broker as a kind of router for all incoming and outgoing messages. This is how it’s defined in the library.

trait Broker[F[_], +P] {
  def consumer[R >: P](source: Source[R]): Consumer[F, Payload]

  def sender[R >: P]: Sender[F, Message[R]]
}

For our simple example we only have a single consumer but this abstraction can handle more complexity if required.

The most interesting part is the binding:

broker
  .consumer(sqsSource)
  .consume(message => IO.println(s"Received message: $message"))
  .background
  .void
  .use(_ => IO.never)

We use the broker to obtain a consumer of sqsSource. Then using consume method we provide an effectful logic for handling the messages (here we just print the message). If required you can use consumeCommit that can apply a database transaction in ConnectionIO assuming you can provide a transactor ConnectionIO ~> IO.

The last three calls are there just to obtain a resource with .background for handling the messages, ignore the resource value with .void and obtain the resource forever using .use(_ => IO.never).

Let’s start the implemented consumer using scala-cli run BaseConsumer.scala and move on.

Producer

Now that we can consume messages, let’s have a second program that is going to produce the messages. Here’s the code

//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.2.2"
//> using lib "com.ocadotechnology::pass4s-core:0.2.2"
//> using lib "com.ocadotechnology::pass4s-high:0.2.2"
//> using lib "com.ocadotechnology::pass4s-connector-sns:0.2.2"
//> using lib "org.typelevel::log4cats-noop:2.5.0"

package net.michalp.pass4splayground

import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sns.SnsArn
import com.ocadotechnology.pass4s.connectors.sns.SnsConnector
import com.ocadotechnology.pass4s.connectors.sns.SnsDestination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

object Producer extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

    val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");
    val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
    val localstackURI = new URI("http://localhost:4566")

    val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)

    val snsConnector =
      SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

    snsConnector.use { connector =>
      val broker = Broker.fromConnector(connector)

      val message = Message(Message.Payload("hello world!", Map()), snsDestination)

      IO.println(s"Sending message $message to $snsDestination") *>
        broker.sender.sendOne(message) *>
        IO.println("Sent, exiting!").as(ExitCode.Success)
    }
  }
}

As before we start with configuring

val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");
val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
val localstackURI = new URI("http://localhost:4566")

val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)

val snsConnector =
  SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

Please notice the difference - instead of SqsEndpoint we now create SnsDestination and start the SnsConnector. The connector is a resource, same as before.

snsConnector.use { connector =>
  val broker = Broker.fromConnector(connector)

  val message = Message(Message.Payload("hello world!", Map()), snsDestination)

  IO.println(s"Sending message $message to $snsDestination") *>
    broker.sender.sendOne(message) *>
    IO.println("Sent, exiting!").as(ExitCode.Success)
}

In a similar way we did before, we start off by creating a Broker. Thanks to the broker we have access to the broker.sender instance. For this simple example we’ll just send a single message using def sendOne(msg: A): F[Unit] but you can easily change that to use def send: fs2.Pipe[F, A, Unit] in case you want to produce multiple messages using fs2 Stream.

Outcome

Let’s run the code!

$ scala-cli run Producer.scala

Compiling project (Scala 2.13.10, JVM)
Compiled project (Scala 2.13.10, JVM)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sending message Message(Payload(hello world!,Map()),SnsDestination(SnsArn(arn:aws:sns:eu-west-2:000000000000:local_sns))) to SnsDestination(SnsArn(arn:aws:sns:eu-west-2:000000000000:local_sns))
Sent, exiting!

☝️ you can try launching that couple of times

When you launch the consumer, it should be able to produce similar output:

$ scala-cli run https://raw.githubusercontent.com/majk-p/pass4s-playground/master/basic/BaseConsumer.scala

Compiling project (Scala 2.13.10, JVM)
Compiled project (Scala 2.13.10, JVM)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Processor listening for messages on SqsEndpoint(SqsUrl(http://localhost:4566/000000000000/local_queue),Settings(30 seconds,true,3,10,1))
Received message: Payload(hello world!,Map())
Received message: Payload(hello world!,Map())
Received message: Payload(hello world!,Map())

Summing up

Today we have learned the basic usage of pass4s for AWS SNS/SQS. The library is much more powerful than that, in future posts I’ll show you how to serialize/deserialize messages with circe and proxy larger messages over S3.

Please refer to the library documentation https://ocadotechnology.github.io/pass4s/ and the demo example in the repository https://github.com/ocadotechnology/pass4s/blob/main/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala where you can see how to use the library with JMS.