Contents

Protecting state using virtual threads

Adam Warski

27 Oct 2022.10 minutes read

Protecting state using virtual threads webp image

Java 19 contains a preview of the core component of Project Loom: virtual threads. We can now create threads at will, as we're no longer (or at least, much less) constrained by their costs in terms of memory and context switch time.

Where there are threads, there's concurrency. So what does this mean for concurrency on the JVM, then? For sure, blocking APIs will make a comeback, replacing asynchronous, Future-based, or reactive code. But to what extent? That still remains to be seen.

Locks & shared memory?

However, regardless of the way we write code (blocking threads, using Futures or reactive extensions), as an industry we've worked out some best practices when dealing with concurrency. One such practice, at a high level, is to avoid coordinating concurrently running processes using shared memory. This also means avoiding locks, which are used to synchronize access to that memory.

The main problem with shared memory and locks is the possibility of deadlocks (if the locks are not always acquired in the same order). Such an approach might also lead to high contention, and blocking threads: even if the threads are virtual, they might implement important business functionality, which shouldn't be blocked unless absolutely necessary. Finally, we might encounter data races, if not all required locks are acquired or not held long enough.

Instead of shared memory & locks, we can base our design on message passing. That's the bedrock of the actor model, as implemented in Erlang and Akka, but it also underpins Go's goroutines.

In the initial drafts, Project Loom also included a Go-like channel implementation; however, it's not included in the preview. We might suspect, though, that channels will be added to Java's std lib at some later point in time.

Actors

The good news is, implementing an actor, or at least an actor-like abstraction, became much easier with Loom. But first, what's an actor?

An actor encloses over some (potentially mutable) state, which is used when sequentially processing incoming messages from its mailbox. Sending a message (and optionally receiving a reply) is the only option for interacting with the actor. The messages are processed serially, one by one, making it impossible to concurrently access the actor's state.

Translating this into more technical terms, an actor runs an infinite loop, which consumes and processes messages from a queue. Before, implementing this required writing a custom scheduler, which multiplexed many actors onto a bounded thread pool. With Java 19, we can create a virtual thread per actor and rely on the JVM's scheduling.

Let's see how we can implement a basic actor implementation using Loom. Of course, it will come nowhere near production-grade actor implementations like Akka, which includes features such as supervision, error handling, remoting, location transparency, and more. Still, it might just work if you have some mutable state to which access should be protected and serialized.

Implementing using Loom

In our implementation, an actor will be a pair: a queue (we'll use an unbounded LinkedBlockingQueue) and a virtual thread consuming from that queue. We'll need at least two classes: Actor, which implements the run-loop, and ActorRef (following Akka's terminology), which allows sending messages to the actor. The Actor is single-threaded, and ActorRef can be shared freely across multiple threads.

actors

We'll define a base interface for all messages, requiring that they specify the reply type that the sender might expect in response to the message being processed. This diverges from how actors are usually implemented—using uni-directional fire & forget. However, in the the simple use-cases of our actors micro-library, having access to the reply might be useful:

public interface Message<REPLY> {
    default Reply reply(REPLY r) {
        return new Reply(r);
    }
}

public class Reply {
    private final Object reply;
    Reply(Object reply) { this.reply = reply; }
    Object getReply() { return reply; }
}

The Reply class is more of a technicality. It's designed to wrap the reply to the message, to be sent back. It can only be created using Message.reply, which takes a parameter of the correct type (note the package-private constructor). If we want the actor's behavior to be expressed as a simple pattern match (see the example below), we can't express this using generics, as there's no flow-typing in Java.

The ActorRef.send method accepts a message and places it on the actor's queue:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ActorRef {
    private final LinkedBlockingQueue<PendingMessage<?>> queue;

    ActorRef(LinkedBlockingQueue<PendingMessage<?>> queue) {
        this.queue = queue;
    }

    public <R> Future<R> send(Message<R> message) {
        var future = new CompletableFuture<R>();
        queue.add(new PendingMessage<>(message, future));
        return future;
    }
}

//

import java.util.concurrent.CompletableFuture;

record PendingMessage<R>(
    Message<R> message, 
    CompletableFuture<R> future) {}

The queue contains both the original message and the CompletableFuture, which will be used to reply to the message, captured in the PendingMessage record.

Each actor supports only a specific subset of Message implementations. Ideally, we would want to add this as a constraint on ActorRef using a generic parameter. Still, this seems to be impossible due to Java's limited generics (or my limited Java API modeling knowledge). Hence, the design isn't as type-safe as we'd ideally like.

Futures!?

Wait, you say—what is a Future doing there? Aren't we done with this wrapper nonsense? Aren't we going to use blocking code everywhere, as we've got lightweight threads?

Not always—Future is still a handy abstraction! Here it captures an important property: the message is processed asynchronously. That's our conscious decision and might be essential to our business process. We might not be interested in the reply at all (in that case, we simply discard the result of send). Or we might need the reply at some later point in time. Finally, we might pipe the reply into another actor's mailbox!

What lightweight threads buy us here is that we can call .get() when and if we need the results. But having an abstraction over a computation running in the background is still valid.

Well-behaved actors

To implement an actor, the user of our micro-library will need to provide its behavior: what to do when a message arrives. Here's the interface that will need to be implemented:

import java.util.concurrent.Future;

public interface ActorBehavior<MSG extends Message<?>> {
    Future<Reply> onMessage(ActorRef self, MSG message) 
        throws Exception;
}

Unlike ActorRef, you can see that the behavior is constrained to a subtype of messages that the actor handles. The actor has the option to provide the answer asynchronously if, e.g., it depends on the replies arriving from yet another actor. As mentioned above, the only way for user code to create a Reply is by providing a value of the correct type (specified by Message 's generic type) to the Message.reply method.

To create an actor, we have the following static method:

public static <MSG extends Message<?>> ActorRef create(
        ActorBehavior<MSG> behavior) {
    var queue = new LinkedBlockingQueue<PendingMessage<?>>();
    var self = new ActorRef(queue);
    var actor = new Actor(self, queue, behavior);
    Thread.startVirtualThread(actor);
    return self;
}

We start a new virtual thread, which consumes messages from the created queue and returns the associated ActorRef to the caller.

Finally, here's the actor's run-loop. We consume the queue; if the thread is interrupted, it signals the actor to stop. Otherwise, we run the provided behavior:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Actor implements Runnable {
    private final ActorRef self;
    private final LinkedBlockingQueue<PendingMessage<?>> queue;
    private final ActorBehavior<Message<?>> behavior;

    Actor(ActorRef self, LinkedBlockingQueue<PendingMessage<?>> queue, 
            ActorBehavior<?> behavior) {
        this.self = self;
        this.queue = queue;
        this.behavior = (ActorBehavior<Message<?>>) behavior;
    }

    public void run() {
        var running = true;
        while (running) {
            PendingMessage<?> pending = null;
            try {
                pending = queue.poll(1000, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                running = false;
            }

            if (pending != null) {
                try {
                    var reply = behavior
                        .onMessage(self, pending.message());
                    handleReply(pending, reply);
                } catch (Exception e) {
                    logger.error("Exception when processing: " + 
                        pending.message(), e);
                    pending.future().completeExceptionally(e);
                }
            }
        }
    }

// …
}

The reply we get from the behavior is a Future<Reply>. When this is completed, we also need to complete the pending message's future. We create a new virtual thread (they're cheap, after all!) to block on the Future<Reply> and pass the result further. If you know flatMap—that's exactly what we are doing here:

private void handleReply(PendingMessage<?> pending, Future<Reply> reply) {
    if (reply != null) {
        Thread.startVirtualThread(() -> {
            try {
                ((CompletableFuture<Object>) pending.future())
                    .complete(reply.get().getReply());
            } catch (Exception e) {
                pending.future().completeExceptionally(e);
            }
        });
    } else pending.future().complete(null);
}

And we're done! In about 100 lines of code, we've created an extremely simplified but functioning implementation of actors.

Actors in action

As an example, let's write a simple counter actor which accepts three messages: Get, Increase, and Decrease. We'll model these as a sealed interface:

sealed interface CounterMessage<R> extends Message<R> {}
record Increase(int i) implements CounterMessage<Void> {}
record Decrease(int i) implements CounterMessage<Void> {}
record Get() implements CounterMessage<Integer> {}

Next, we'll specify the behavior of the actor:

class CounterActorBehavior implements ActorBehavior<CounterMessage<?>> {
    int counter = 0;

    @Override
    public Future<Reply> onMessage(ActorRef self, 
            CounterMessage<?> message) {
        Reply reply = null;
        switch (message) {
            case Increase(int i) inc -> {
                System.out.println("Increase message, by: " + i);
                counter += i;
                reply = inc.reply(null);
            }
            case Decrease(int i) dec -> {
                System.out.println("Decrease message, by: " + i);
                counter -= i;
                reply = dec.reply(null);
            }
            case Get() get -> {
                System.out.println("Get message, current state: " + 
                    counter);
                reply = get.reply(counter);
            }
        }
        return CompletableFuture.completedFuture(reply);
    }

The actor encloses over mutable state (counter), to which access is protected. It's guaranteed that this state will be only ever accessed by a single thread.

The contract of onMessage forces us to return some kind of reply—here, we always compute it synchronously. The replies are type-safe by construction of the Message.reply method.

Finally, we tie everything together and do some counting:

public static void main(String[] args) throws Exception {
    var actor = Actor.create(new CounterActorBehavior());

    actor.send(new Increase(10));
    actor.send(new Decrease(8));
    var result = actor.send(new Get()).get();
    System.out.println("Got result: " + result);
}

We only wait for the final reply—as we use a FIFO queue, the Get message should be processed after Increase and Decrease. Hence, when running the code, you should see the expected result of 2.

Try it out

The whole example is available on GitHub. In addition to the counter example, there's also a larger one, implementing a proxy actor (manager) to a limited number of workers. The proxy pairs up awaiting messages with free works as they become available. Everything happens in a non-blocking way.

Pitfalls

Actors on the JVM have their pitfalls. Some are unique to the Loom implementation; some are inherent due to the generality of the platform and the type system.

First, we must be cautious when blocking inside an actor. With virtual threads, this might be much more of a problem than before. Keep in mind that if you call a blocking operation inside the actor's behavior, this will block any messages from being further processed. This might be desirable if the business process that the actor implements requires the processing of other actors' messages to stop. But most often, you'll want to run the blocking operation asynchronously (e.g. in a new virtual thread) and return the result to the actor as a message. For that purpose, an actor has access to the self-ActorRef. Our implementation lacks APIs to express the above elegantly, though. However, it's just a matter of adding a couple of utility methods.

Secondly (this hazard is shared with Akka), we must be cautious not to leak the actor's mutable state. If the actor's internal state is, e.g., a mutable collection, before sending the collection outside, we should always make a copy. Moreover, we have to be cautious not to access (read or write!) the actor's state from any kind of callbacks or closures—as these might be run from other threads, by other actors, or when futures complete. This could cause concurrent access to the state—something we wanted to avoid in the first place!

The future

Will actors be the way we do concurrency in post-Loom Java? Or will we settle on other abstractions, such as channels? What kind of libraries will emerge from the community?

These questions are for now unanswered, but if you find yourself wanting to do some shared-memory concurrency, possibly there's an alternative. It might very well be that your use-case requires a shared-memory approach, but it might also be that message-passing offers a simpler and less error-prone solution.

Keep in mind that existing actor implementations continue working just fine using Java 19. However, creating a very basic actor implementation is also an option.

Blog Comments powered by Disqus.