Contents

Reactive Event Sourcing in Java, Part 1: Domain

Andrzej Ludwikowski

14 Oct 2021.9 minutes read

This blog post starts a series of articles that will show Event Sourcing pattern implementation very deeply and from many different angles. My main goals behind the upcoming posts are:

  1. convince you that Event Sourcing is not so hard to implement,
  2. present the right tools to help you do this pretty fast,
  3. show how you can model your domain code without any framework dependencies
  4. explain how you can create consistent and scalable applications without optimistic (or pessimistic) locking
  5. and show how you can do this in Java with your favorite stack like Spring.

The "Reactive" notion in the title is not only a catchphrase. The Reactive Manifesto will be our primary driver while working on this small project. Based on the fundament, which is Message Driven or Event Driven (in our case) approach, we will build an elastic and resilient application that will be responsive yet consistent. I hope that this PoC will be also the source of ideas and production-ready solutions for your future Event Sourcing adaptations. I must warn you here, if you enter this path, it will change your mind. It will not happen immediately, but once you notice that you are thinking about events instead of state mutations and designing your systems as events flow, this change will be permanent. From my perspective, it's extremely hard to go back and code as before, the "old way".

Domain

Many people think that Event Sourcing pattern is very complex to implement. I could argue with that but I know that it might be challenging, especially for the first time. What is problematic for me is when you simply don’t have a choice and you need to implement something that is either very close to Event Sourcing or full Event Sourcing adaptation, but you still refuse to follow this path, because this is something new, challenging and you don't have any experience with that. From my observations, this fear drives you towards a classic approach to state persistence with some extras. With time, these extras that mimic Event Sourcing are getting horribly complex to understand and maintain in comparison to implementation based on Event Sourcing that could, surprisingly, simplify a lot.

Let’s start with a basic theory. If you’ve never heard about Event Sourcing before, there are plenty of materials available for reading and watching, also I would recommend tracking this Github repository. I don't want to reapet all this information, since I prefer the approach where you learn by practice. The theoretical parts will be minimized.

From a domain perspective, Event Sourcing is a quite trivial pattern. There are 3 main building blocks:

  • Commands — define what we want to happen in the system,
  • State — it’s usually an aggregate from the DDD approach, which is responsible for keeping some part of the system consistent and valid (aggregate invariants)
  • Events — capture what has happened in the system.

The state/aggregate usually needs to provide 2 entry point methods:

  • List process(Command command),
  • State apply(Event event).

The hardest part of this series was to find a good domain example, not so trivial as in most tutorials, but also not too complex, to focus more on the Event Sourcing parts. Imagine that we are creating an application for selling tickets to cinema shows. In our naive implementation, the state is modeled as the Show aggregate, which represents a cinema show, where you can reserve your seats. Show record contains an id, title, and collection of seats, in our case, a map for convenient usage, but it could be anything. Other fields like the actual time of the show and probably a few others are omitted.

record Show(ShowId id, String title, Map<SeatNumber, Seat> seats) {

    public static final BigDecimal INITIAL_PRICE = new BigDecimal("100");

    public static Show create(ShowId showId) {
        return new Show(showId, "Show title " + showId.id(), SeatsCreator.createSeats(INITIAL_PRICE));
    }
}

Seat encapsulates information like: seat number, status, and price.

record Seat(SeatNumber number, SeatStatus status, BigDecimal price)

enum SeatStatus {
    AVAILABLE, RESERVED
}

So far, the construction of the show is simplified to only one factory method, which will use another helper method to fill the show with 10 available seats with an initial price.

Now, let’s reserve a seat. To implement this simple use case, first, we need a command.

sealed interface ShowCommand extends Serializable {
    ShowId showId();

    record ReserveSeat(ShowId showId, SeatNumber seatNumber) implements ShowCommand {
    }
}

The ReserveSeat command is a record since we don’t want to modify it. It also extends the common interface ShowCommand. This interface is sealed so all its implementations will be inside it. We don’t allow extending it anywhere else since it makes no sense. This command will be consumed by the process method mentioned earlier.

public Either<ShowCommandError, List<ShowEvent>> process(ShowCommand command, Clock clock) {
    return switch (command) {
        case ReserveSeat reserveSeat -> handleReservation(reserveSeat, clock);
    };
}

The signature of this method might be interesting for you. We are using Either type from the Vavr library, so we can return ShowCommandError in case of some failure or a list of events in case of command successful processing. If this is something new for you and you are throwing exceptions in case of business failures, I would really recommend getting familiar with the Vavr library and using the type system more extensively. The Show aggregate is also using Vavr collections (Map and List), so the whole state is immutable (although it is not mandatory, we will talk about this in the next post). Also, we are passing the clock as a method parameter. It’s way easier to test such code because we can always mock the clock.

Handling the reservation is very simple.

private Either<ShowCommandError, List<ShowEvent>> handleReservation(ReserveSeat reserveSeat, Clock clock) {
    SeatNumber seatNumber = reserveSeat.seatNumber();
    return seats.get(seatNumber).<Either<ShowCommandError, List<ShowEvent>>>map(seat -> {
        if (seat.isAvailable()) {
            return right(List.of(new SeatReserved(id, clock.now(), seatNumber)));
        } else {
            return left(SEAT_NOT_AVAILABLE);
        }
    }).getOrElse(left(SEAT_NOT_EXISTS));
}

First, we need to find the seat, if such a seat doesn’t exist, we can return the SEAT_NOT_EXISTS error, or if the seat is not available SEAT_NOT_AVAILABLE. Finally, if the seat is available then we can reserve it. In reality, the domain validation will be more sophisticated, just remember that the process method is the right place for it. Be aware that we are not mutating the state, only returning an event.

sealed interface ShowEvent extends Serializable {
    ShowId showId();

    Instant createdAt();

    record SeatReserved(ShowId showId, Instant createdAt, SeatNumber seatNumber) implements ShowEvent {
    }
}

This might look very similar to the command but it's something completely different. The SeatReserved event is a fact in our system that will be stored in our event store. It’s the essence of Event Sourcing, where commands are only convenient DTOs used for action encapsulation.

As I mentioned, when we leave the process method the state will be exactly the same. We didn’t change it. And we cannot change it here. We are implementing the Event Sourcing pattern, so get used to the fact that the state in our database does not exist anymore. It’s created by replaying all persisted events. Before we change the state, we need to persist the event (or events) and only then can we apply it and get a new version of the state. That’s why we need a separate method just for applying events.

It's extremely important to understand that mutating the state in the process method might result in very nasty bugs where your state after processing the command is different than when you rebuild it from events. The apply event method signature is not returning any errors. This time, if we cannot find the seat, we are throwing an exception. There is no way to handle such a situation gracefully. It’s not a business error. Most likely, it’s an implementation failure or a concurrency problem. Normally, this shouldn’t happen. Another key aspect of the implementation is that this method cannot have any side effects, as those would also be executed during recovery of the state.

public Show apply(ShowEvent event) {
    return switch (event) {
        case SeatReserved seatReserved -> applyReserved(seatReserved);
    };
}

private Show applyReserved(SeatReserved seatReserved) {
    Seat seat = getSeatOrThrow(seatReserved.seatNumber());
    return new Show(id, title, seats.put(seat.number(), seat.reserved()));
}

private Seat getSeatOrThrow(SeatNumber seatNumber) {
    return seats.get(seatNumber).getOrElseThrow(() -> new IllegalStateException("Seat not exists %s".formatted(seatNumber)));
}

Applying SeatReserved, in our case, will simply override the seat with a reserved status, all in an immutable fashion.

This wasn’t so hard, don’t you think? Of course, a lot of big pieces like persistence, concurrency access, etc. are still missing. Don’t worry, we will cover this later. Actually, I’m doing this on purpose since I would like to emphasize one idea. Your domain code is the most important part of the source code. All the frameworks, databases, etc. that you will use are also relevant, but don't let them dictate how you should implement your domain. Let this be our motto.

Testing

With such an approach, we don’t need to launch Spring Context or Actor System (or anything heavy) to test the domain code. Simple unit tests will do the work.

private Clock clock = new FixedClock(Instant.now());

@Test
public void shouldReserveTheSeat() {
    //given
    var show = randomShow();
    var reserveSeat = randomReserveSeat(show.id());

    //when
    var events = show.process(reserveSeat, clock).get();

    //then
    assertThat(events).containsOnly(new SeatReserved(show.id(), clock.now(), reserveSeat.seatNumber()));
}

The first test is rather obvious. We need to check if the returned event is what we expected it to be. As you can notice, we are using a fixed clock for tests.

Sometimes instead of asserting events, we can apply them and assert the state itself.

//when
var events = show.process(reserveSeat, clock).get();
var updatedShow = apply(show, events);

//then
var reservedSeat = updatedShow.seats().get(reserveSeat.seatNumber()).get();
assertThat(events).containsOnly(new SeatReserved(show.id(), clock.now(), reserveSeat.seatNumber()));
assertThat(reservedSeat.isAvailable()).isFalse();

We expected that the selected seat in the updatedShow variable is not available anymore.
The rest of the tests will be available on Github.

Extending the code

How about adding another command? We would also like to cancel our reservation. It’s the same story as before. We need to process the CancelSeatReservation command, return a new SeatReservationCancelled event, and apply this event.

public Either<ShowCommandError, List<ShowEvent>> process(ShowCommand command, Clock clock) {
    return switch (command) {
        case ReserveSeat reserveSeat -> handleReservation(reserveSeat, clock);
        case CancelSeatReservation cancelSeatReservation -> handleCancellation(cancelSeatReservation, clock);
    };
}

public Show apply(ShowEvent event) {
    return switch (event) {
        case SeatReserved seatReserved -> applyReserved(seatReserved);
        case SeatReservationCancelled seatReservationCancelled -> applyReservationCancelled(seatReservationCancelled);
    };
}

You might notice that we are using pattern matching in Java 17 in a switch statement. It is still a preview feature, so you need to explicitly enable it. Probably for production, you could stick to the pattern matching in if statements, but I wanted to have some fun as well while creating these examples. With sealed interfaces, once you add a new command or event, the Java compiler will highlight all the places where you need to update the code.

Summary

That’s it for the first part. The full source code is available here, remember to check out part_1 tag since we will evolve this project in the upcoming weeks. The key takeaways are that our Event Sourcing domain is free of any framework since we don't need them at this level. There are 2 entry points to our domain aggregate: process command and apply event methods. Before you apply the event, it must be persisted.
The plan for the next part of the series is to make this code thread-safe with the Akka Persistence library. You will be able to check and compare a different strategy to locking based on database, described in: Implementing event sourcing using a relational database.

Stay tuned and if you don't want to miss any lesson, subscribe to SoftwareMill Academy.

Blog Comments powered by Disqus.