SpringReactorTest.java
package com.paul.testreactivestream.reactor;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SpringReactorTest {
private void subscribeAndEnd(Flux<?> flux) {
flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
.subscribe(System.out::println);
flux.blockLast();
}
@Test
public void createAFlux_just() throws InterruptedException {
Flux<String> fruitFlux =
Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
.log()
;
fruitFlux.subscribe(
f -> System.out.println(
String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
)
)
;
fruitFlux.blockLast();
// Thread.currentThread().join();
}
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux =
Flux.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux =
Flux.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
this.subscribeAndEnd(zippedFlux);
}
@Test
public void map() {
Flux<Player> playerFlux =
Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
})
;
this.subscribeAndEnd(playerFlux);
}
@Test
public void flatMap() {
Flux<Player> playerFlux =
Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(
n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
this.subscribeAndEnd(playerFlux);
}
@Test
public void buffer() {
Flux<List<String>> fruitFlux =
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry"
)
.buffer(3);
this.subscribeAndEnd(fruitFlux);
}
@Test
public void bufferAsyn() {
Flux<String> flux =
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry"
)
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
// .log()
);
this.subscribeAndEnd(flux);
}
@Test
public void all() {
Mono<Boolean> animalFlux =
Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo"
)
.all(c -> c.contains("a"))
;
animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
.subscribe(System.out::println);
}
}