Spring WebFlux – programowanie reaktywne w Springu
Spring WebFlux – programowanie reaktywne w Springu
Spring WebFlux (reaktywny odpowiednik Spring MVC) to moduł wprowadzony w Springu 5 (Spring Boot 2) który umożliwia pisanie aplikacji z użyciem podejścia reaktywnego. Dlaczego potrzebujemy reaktywności w Javie? Uzyskanie w czystej Javie kodu asynchronicznego który jest czytelny i łatwy w utrzymaniu to trudna sprawa. Programowanie reaktywne rozwiązuje ten problem. Jest to temat szczególnie istotny przy przetwarzaniu dużych zbiorów danych.
Przed Javą 9 programowanie reaktywne mogliśmy uzyskać za pomocą bibliotek takich jak RxJava czy Reactor (Spring WebFlux wykorzystuje bibliotekę Reactor). Zdecydowanie największą zaletą aplikacji napisanych reaktywnie jest zdolność do ich skalowania przy zachowaniu niedużej, stałej ilości dostępnych wątków i mniejszej ilości pamięci. Mogą natomiast pojawić się opóźnienia ze względu na sterowanie wątkami.
Kontener servletów Apache Tomcat z użyciem specyfikacji Java Servlet < 3.1 każdy request obsługuje w osobnym wątku (pula wątków dla Apache Tomcat wynosi 200). Wyczerpanie ilości dostępnych wątków z puli prowadzi do blokady – kolejne żądania muszą czekać na ‘wolny’ wątek w puli. Apache Tomcat który bazuje na specyfikacji Java Servlet >= 3.1 pozwala na obsługę requestów w sposób asynchroniczny! Co to oznacza? Jeśli kasjer potrzebuje pomocy przełożonego ze względu na reklamację klienta to zamiast blokować całą kolejkę (requesty) czekając na pomoc przełożonego obsługuje kolejne osoby stojące przy kasie nie blokując całej kolejki. Jeśli pojawi się przełożony to dopiero wtedy kasjer razem z przełożonym rozpatrują reklamację klienta. Jest to tzw. serwer nieblokujący w którym zazwyczaj jest 1-2 wątki na rdzeń procesora. Zazwyczaj liczba wątków nie jest większa niż liczba rdzeni procesora. Serwer nieblokujący daje możliwość przetworzenia większej liczby żądań bez utraty wydajności i stabilności!
Dokumentowa baza MongoDB posiada specjalny sterownik dla Javy, który implementuje nieblokujące API – mongodb-driver-reactivestream. Asynchroniczny sterownik do bazy nie blokuje żadnego wątku (JDBC do takich nie należy)!
Korzystając z modułu Spring WebFlux należy poznać dwa podstawowe pojęcia:
- Flux<T> – do zwracania strumienia z 0..n danymi, implementuję interfejs Publisher<T>,
- Mono<T> – do zwracania strumienia z 0..1 danymi, implementuję interfejs Publisher<T> np. Mono<Void> nic nie zwraca.
Do dzieła, tworzymy przykładową aplikację w oparciu o Spring WebFlux – plik pom.xml!
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> <dependency> <groupId>de.flapdoodle.embed</groupId> <artifactId>de.flapdoodle.embed.mongo</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Zależność do de.flapdoodle.embed.mongo pozwala na symulację produkcyjnej bazy danych MongoDB. Zależność do spring-boot-starter-data-mongodb-reactive pozwala na wykorzystanie reaktywnych repozytoriów MongoDB. Testy jednostkowe możliwe są natomiast dzięki zależności do reactor-test.
Tworzymy model (dokument) – klasa Training:
@Document public class Training { private String id; private String name; public Training() { } public Training(String id, String name) { this.id = id; this.name = name; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Training training = (Training) o; return Objects.equals(name, training.name); } @Override public int hashCode() { return Objects.hash(name); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
Tworzymy reaktywne repozytorium – TrainingRepository :
@Repository public interface TrainingRepository extends ReactiveMongoRepository<Training, String> { }
Tworzymy RestController – TrainingController :
@RestController public class TrainingController { private final TrainingRepository trainingRepository; public TrainingController(TrainingRepository bookRepository) { this.trainingRepository = bookRepository; } @GetMapping(value = "/trainings", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Training> getBooks() { return trainingRepository.findAll().delayElements(Duration.ofSeconds(1)); } }
APPLICATION_STREAM_JSON_VALUE – aplikacja zwraca strumień elementów, każdy element będzie zserializowany do formatu JSON.
Klasa konfiguracyjna – konieczne jest wskazanie lokalizacji reaktywnych repozytoriów – ReactiveMongoRepository:
@Configuration @EnableReactiveMongoRepositories(basePackageClasses = TrainingRepository.class) public class MongoConfiguration { }
Zaraz po wystartowaniu plikacji uzupełniamy bazę danych MongoDB przykładowymi danymi:
@Component public class SampleDataInitializer implements ApplicationListener<ApplicationReadyEvent> { private TrainingRepository trainingRepository; public SampleDataInitializer(TrainingRepository repository) { this.trainingRepository = repository; } @Override public void onApplicationEvent(ApplicationReadyEvent event) { Flux<Training> trainingFlux = trainingRepository .findAll() .thenMany( Flux .just("JAVA", "SPRING") .map(name -> new Training(UUID.randomUUID().toString(), name)) .flatMap(trainingRepository::save) ); Mono<Void> all = Mono.when(trainingFlux); all.block(); trainingRepository.findAll().subscribe(training -> System.out.println(training.getName())); } }
W pliku application.properties dodajemy niezbędną konfigurację:
server.port = 7774 spring.data.mongodb.uri = mongodb://localhost:27017/test
Po wejściu na adres:
http://localhost:7774/trainings
w odstępie 1 sekundy podawane są dane (dokumenty z bazy MongoDB):
Przykładowy test jednostkowy aplikacji w którym weryfikujemy czy dane zwracane są prawidłowo:
@RunWith(SpringRunner.class) @SpringBootTest @AutoConfigureWebTestClient public class TrainingControllerTest { @Autowired WebTestClient webTestClient; @Autowired private ApplicationContext context; @Autowired private TrainingRepository trainingRepository; @Before public void setUp() { webTestClient = webTestClient .mutate() .responseTimeout(Duration.ofSeconds(50)) .build(); } @Test public void trainingsTest() { trainingRepository.findAll().subscribe(training -> System.out.println("[TRAINING TEST] " + training.getName())); Flux<Training> trainingStreamFlux = webTestClient.get().uri("/trainings") .accept(MediaType.APPLICATION_STREAM_JSON) .exchange() .expectStatus().isOk() .returnResult(Training.class) .getResponseBody(); StepVerifier.create(trainingStreamFlux) .expectNext( new Training(UUID.randomUUID().toString(), "JAVA")) .expectNext( new Training(UUID.randomUUID().toString(), "SPRING")) .thenCancel() .verify(); } }
Jak wystawić usługę sieciową w sposób funkcyjny – czyli tworzymy uchwyt na usługę!
@Component public class TrainingHandler { private final TrainingRepository trainingRepository; public TrainingHandler(TrainingRepository trainingRepository) { this.trainingRepository = trainingRepository; } public Mono<ServerResponse> getTrainings(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_STREAM_JSON) .body(trainingRepository.findAll().delayElements(Duration.ofSeconds(5)), Training.class); } }
Rejestrujemy beana RouterFunction w klasie konfiguracyjnej – łączenie kodu biznesowego z serwerem WWW:
@Configuration @EnableReactiveMongoRepositories(basePackageClasses = TrainingRepository.class) public class MongoConfiguration { @Bean public RouterFunction<ServerResponse> booksRoute(TrainingHandler trainingHandler) { return RouterFunctions.route(GET("/trainings-handler"), trainingHandler::getTrainings); } }
Po wejściu na adres:
http://localhost:7774/trainings-handler
otrzymujemy wynik:
Moduł WebFlux daje możliwość wywoływania zewnętrznych usług restowych w sposób reaktywny:
@Test public void trainingsTest() { WebClient.create("http://localhost:7774") .get() .uri("/trainings") .retrieve() .bodyToFlux(Training.class) .doOnNext(training -> System.out.println("[training]: " + training.getName())) .blockLast(); }
użycie metody blockLast() jest wymagane ze względu na to, że główny wątek aplikacji w testach może zakończyć się przed zwróceniem wyniku.
Cześć!
Fajny tutorial. Zaciekawiła mnnie struktura testów. W funkcji ‚trainingsTest()’ (tej pierwszej) mamy:
.expectNext( new Training(UUID.randomUUID().toString(), „JAVA”))
.expectNext( new Training(UUID.randomUUID().toString(), „SPRING”))
Taki ‚assert’ nigdy nie przejdzie, ponieważ zawsze porównujemy 2 różne obiekty – pole ID będzie inne w każdym przypadku.
Wydaje mi się, że moglibyśmy ten kod zastąpić takim:
.expectNextMatches(training -> training.getName().equals(„JAVA”))
.expectNextMatches(training -> training.getName().equals(„SPRING”))
Jest to celowe uproszczenie czy coś przeoczyłem?
Cześć Michał,
do metody .expectNext przekazujesz obiekt klasy Training:
.expectNext( new Training(UUID.randomUUID().toString(), „JAVA”) )
zatem tutaj Michał nie masz asercji tylko przekazanie nowego obiektu do metody .expectNext 😉
Jeśli kogoś interesuje programowanie reaktywne, to zachęcam do zapoznania się z moim artykułem na ten temat:
https://arasoftware.pl/2020/07/01/programowanie-reaktywne/