In the previous post we have learned how pass JSON and XML messages via the SNS/SQS using pass4s. This time we’ll focus on another problem, often faced when implementing messaging in backend systems. We’ll learn how to handle large messages.

As with previous posts, you’ll find all code examples in the companion repository https://github.com/majk-p/pass4s-playground. Along the way I also might refer to the code in that repo.

The problem

Let’s begin with how a typical SNS-SQS setup looks like. A simple diagram might look like this:

message sent via SNS and SQS

The producer prepares the message and sends it directly to SNS endpoint. The subscription makes sure the message reaches the right SQS queue. In the end the consumer consumes the message. This works perfectly fine for small messages.

Since we are implementing the messaging over AWS stack, we need to learn about it’s limitations. Taking a look at the Quotas and restrictions in FAQ for SNS we can find a following information:

Q: How much and what kind of data can go in a message?

With the exception of SMS messages, Amazon SNS messages can contain up to 256 KB of text data, including XML, JSON and unformatted text.

This means such solution is not very useful for sending large portions of data. At this point you might either decide to go for a different messaging solution or try to work around this limitation.

Different messaging platforms will have varying limits of message size, but even if large messages are supported, this might come with an impact on performance. This is why pass4s implements S3 proxy that can be used regardless of your messaging implementation.

Solution

In the AWS documentation we can find the preferred solution for this problem. AWS proposes publishing large messages to S3. The reference implementation can be found in the AWS Java SDK, and pass4s follows it closely for compatibility.

The solution is based on an idea, that instead of sending the message itself, the end user publishes the message content to S3, and only sends a link to the created S3 object via SNS/SQS. The drawing below presets the difference compared to non-proxied setup. The white arrows represent S3 operations, and the blue ones represent the message flow.

message sent via SNS and SQS using proxy

Implementation

Hopefully the theory is clear, let’s try it out ourselves.

Setup with Localstack

To start with, on our Localstack instance we need to add an S3 bucket for message storage. This can be done by extending the docker-compose setup-resources container with a following call:

# Create na S3 bucket for large messages
aws s3 mb s3://large-messages

If you skipped previous posts on pass4s, this post refers to docker-compose.yml file published here https://github.com/majk-p/pass4s-playground/blob/master/docker-compose.yml.

Using the proxy with pass4s

The first thing we need is to add a pass4s-s3proxy module to our project. This can be done by adding "com.ocadotechnology" %% "pass4s-s3proxy" % "0.2.3" to your build.sbt. The module documentation and example usage can be found here: https://ocadotechnology.github.io/pass4s/docs/modules/s3proxy

I’ll assume we are implementing S3 proxy in an existing codebase, so let’s take https://github.com/majk-p/pass4s-playground/tree/master/json as our starting point.

In both consumer and producer we’ll add following imports for applying s3 proxy:

import com.ocadotechnology.pass4s.s3proxy.S3ProxyConfig
import com.ocadotechnology.pass4s.s3proxy.S3Client
import com.ocadotechnology.pass4s.s3proxy.syntax._

In both cases applying the proxy will mean using .usingS3Proxy(config) syntax provided by the last import. To make that work we need to provide implicit instances of S3Client.

Producer

The producer extended with an s3 proxy would look like this:

object S3ProxiedJsonProducer extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

    val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET")
    val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
    val localstackURI = new URI("http://localhost.localstack.cloud:4566")

    val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)

    val bucketName = "large-messages"

    val senderConfig =
      S3ProxyConfig
        .Sender
        .withSnsDefaults(bucketName)
        .copy(minPayloadSize = Some(0))

    val awsResources =
      for {
        sns <- SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
        s3  <- S3Client.usingLocalAws[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
      } yield (sns, s3)

    awsResources.use { case (connector, s3Client) =>
      implicit val _s3Client: S3Client[IO] = s3Client
      val broker = Broker.fromConnector(connector)

      val domainMessageSender =
        broker
          .sender
          .usingS3Proxy(senderConfig)
          .asJsonSender[PotentiallyLargeMessage](snsDestination)

      val domainMessage = PotentiallyLargeMessage("hello world!", 10)

      IO.println(s"Sending message: $domainMessage to $snsDestination") *>
        domainMessageSender.sendOne(domainMessage) *>
        IO.println("Sent, exiting!").as(ExitCode.Success)
    }
  }
}

The first important thing to notice is the senderConfig

val senderConfig =
  S3ProxyConfig
    .Sender
    .withSnsDefaults(bucketName)
    .copy(minPayloadSize = Some(0))

This is the configuration for S3 client. The minPayloadSize parameter is intentionally set to 0, meaning that we want to send all messages to S3 regardless of their size.

val awsResources =
  for {
    sns <- SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
    s3  <- S3Client.usingLocalAws[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
  } yield (sns, s3)

The next thing is to instantiate the S3Client, similarly to the connector the method yields a resource so here those are combined into a single resource. The S3Client comes with multiple constructors, for a production use case you are more likely to use usingRegion one.

awsResources.use { case (connector, s3Client) =>
  implicit val _s3Client: S3Client[IO] = s3Client
  val broker = Broker.fromConnector(connector)

  val domainMessageSender =
    broker
      .sender
      .usingS3Proxy(senderConfig)
      .asJsonSender[PotentiallyLargeMessage](snsDestination)

  val domainMessage = PotentiallyLargeMessage("hello world!", 10)

  IO.println(s"Sending message: $domainMessage to $snsDestination") *>
    domainMessageSender.sendOne(domainMessage) *>
    IO.println("Sent, exiting!").as(ExitCode.Success)
}

The second line here might seem a little hacky. You would usually instantiate it using implicit0 from better-monadic-for or making it a given in Scala 3. We need it in the implicit scope to be able to call .usingS3Proxy(senderConfig). This is the only thing you need to do to enable the S3 proxy for your sender. The rest of it looks exactly the same as for the JSON producer from the previous post.

The producer is ready, let’s see the Consumer.

Consumer

Here’s the code of s3 proxied JSON consumer:

object S3ProxiedJsonConsumer extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

    val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");

    val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
    val endpointOverride = new URI("http://localhost.localstack.cloud:4566")

    val sqsSource = SqsEndpoint(SqsUrl("http://localhost.localstack.cloud:4566/000000000000/local_queue"))

    val consumerConfig =
      S3ProxyConfig
        .Consumer
        .withSnsDefaults()
        .copy(
          shouldDeleteAfterProcessing = true // it doesn't by default, just in case there're more listeners
        )

    val awsResources =
      for {
        sqs <- SqsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](endpointOverride, Region.EU_WEST_2, credentialsProvider)
        s3  <- S3Client.usingLocalAws[IO](endpointOverride, Region.EU_WEST_2, credentialsProvider)
      } yield (sqs, s3)

    awsResources.use { case (connector, s3Client) =>
      implicit val _s3Client: S3Client[IO] = s3Client

      val broker = Broker.fromConnector(connector)

      val processor =
        MessageProcessor
          .init[IO]
          .effectful
          .bindBroker(broker)
          .enrich(_.usingS3Proxy(consumerConfig))
          .enrich(_.asJsonConsumer[PotentiallyLargeMessage])

      IO.println(s"Processor listening for messages on $sqsSource") *>
        processor
          .handle(sqsSource) { message =>
            IO.println(s"Received message: $message")
          }
          .use(_ => IO.never)
    }
  }
}

Similarly as in the previous example we need to provide proxy config. In this snippet I’ve enabled deleting contents from S3 after the message is consumed. This makes sense only if a single app is reading from your source. An alternative approach for cleanup would be to set up a retention policy for your bucket.

val consumerConfig =
  S3ProxyConfig
    .Consumer
    .withSnsDefaults()
    .copy(
      shouldDeleteAfterProcessing = true // it doesn't by default, just in case there're more listeners
    )

Similarly as for producer, we need to instantiate the S3 client - this is done exactly the same way as above.

awsResources.use { case (connector, s3Client) =>
  implicit val _s3Client: S3Client[IO] = s3Client

  val broker = Broker.fromConnector(connector)

  val processor =
    MessageProcessor
      .init[IO]
      .effectful
      .bindBroker(broker)
      .enrich(_.usingS3Proxy(consumerConfig))
      .enrich(_.asJsonConsumer[PotentiallyLargeMessage])

  IO.println(s"Processor listening for messages on $sqsSource") *>
    processor
      .handle(sqsSource) { message =>
        IO.println(s"Received message: $message")
      }
      .use(_ => IO.never)

}

The last change is applying the S3 proxy logic to the consumer, which is done as an enrichment with .enrich(_.usingS3Proxy(consumerConfig)).

Let’s run it!

The code is ready so lets launch it.

Localstack output:

localstack_main    | 2023-03-04T18:10:24.239  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sqs.GetQueueUrl => 200
setup-resources_1  | {
setup-resources_1  |     "QueueUrl": "http://localstack:4566/000000000000/local_queue"
setup-resources_1  | }
localstack_main    | 2023-03-04T18:10:24.830  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS s3.CreateBucket => 200
setup-resources_1  | make_bucket: large-messages
pass4s-playground_setup-resources_1 exited with code 0
localstack_main    | 2023-03-04T18:17:43.024  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sqs.ReceiveMessage => 200
localstack_main    | 2023-03-04T18:17:46.140  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sqs.ReceiveMessage => 200
localstack_main    | 2023-03-04T18:17:49.148  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS s3.PutObject => 200
localstack_main    | 2023-03-04T18:17:49.163  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sqs.ReceiveMessage => 200
localstack_main    | 2023-03-04T18:17:49.244  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS sns.Publish => 200
localstack_main    | 2023-03-04T18:17:49.398  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS sqs.ReceiveMessage => 200
localstack_main    | 2023-03-04T18:17:49.660  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS s3.GetObject => 200

Consumer output:

Processor listening for messages on SqsEndpoint(SqsUrl(http://localhost.localstack.cloud:4566/000000000000/local_queue),Settings(30 seconds,true,3,10,1))
Received message: PotentiallyLargeMessage(hello world!,10)

Producer output:

Sending message: PotentiallyLargeMessage(hello world!,10) to SnsDestination(SnsArn(arn:aws:sns:eu-west-2:000000000000:local_sns))
Sent, exiting!

The most interesting part is the localstack output, where you can see that the consumer attempts to read messages, then comes the s3.PutObject when producer wants to send a message. After that sns.Publish indicates that the message is sent to SNS. Afterwards the consumer attempts to receive the message again, it finds a message and gets the content from S3.

AWS sqs.ReceiveMessage => 200
AWS sqs.ReceiveMessage => 200
AWS s3.PutObject => 200
AWS sqs.ReceiveMessage => 200
AWS sns.Publish => 200
AWS sqs.ReceiveMessage => 200
AWS s3.GetObject => 200

The consumer and producer outputs have not changed compared to the examples from previous post and this is the beauty of the solution. You can use the proxy with no refactoring required outside of the producer/consumer initialization.

Side notes

You might have noticed that instead of localhost I’ve replaced the localstack URI to localhost.localstack.cloud. This is due to the fact that Localstack uses virtual hosting of buckets and the local S3 wouldn’t work without that. The provided address still refers to your local machine so there’s nothing to worry about! Here’s the proof:

$ nslookup localhost.localstack.cloud
Server:         127.0.0.53
Address:        127.0.0.53#53

Non-authoritative answer:
Name:   localhost.localstack.cloud
Address: 127.0.0.1

Summary

This covers the problem of using S3 as a proxy for large messages. Thanks for the abstraction layer provided by pass4s, applying the proxy is very simple.