paulwong

SPRING REACTOR 使用样例

SpringReactorTest.java

package com.paul.testreactivestream.reactor;


import java.util.List;

import org.junit.jupiter.api.Test;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class SpringReactorTest {
    
    private void subscribeAndEnd(Flux<?> flux) {
        
        flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
            .subscribe(System.out::println);
        
        flux.blockLast();
    }
    
    @Test
    public void createAFlux_just() throws InterruptedException {
        Flux<String> fruitFlux = 
                Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
                    .log()
                    ;
        fruitFlux.subscribe(
                     f -> System.out.println(
                                 String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
                             )
                  )
                 ;
        fruitFlux.blockLast();
        
//        Thread.currentThread().join();
    }
    
    @Test
    public void zipFluxesToObject() {
        Flux<String> characterFlux = 
                Flux.just("Garfield", "Kojak", "Barbossa");
        
        Flux<String> foodFlux = 
                Flux.just("Lasagna", "Lollipops", "Apples");
        
        Flux<String> zippedFlux = 
                Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
        
        this.subscribeAndEnd(zippedFlux);
    }
    
    @Test
    public void map() {
        Flux<Player> playerFlux = 
                Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                    .map(n -> {
                        String[] split = n.split("\\s");
                        return new Player(split[0], split[1]);
                    })
                    ;
        this.subscribeAndEnd(playerFlux);
    }
    
    @Test
    public void flatMap() {
        Flux<Player> playerFlux = 
                Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                    .flatMap(
                        n -> Mono.just(n)
                                 .map(p -> {
                                    String[] split = p.split("\\s");
                                    return new Player(split[0], split[1]);
                                  })
                                 .subscribeOn(Schedulers.parallel())
                     );
        this.subscribeAndEnd(playerFlux);
    }
    
    @Test
    public void buffer() {
        Flux<List<String>> fruitFlux = 
                Flux.just(
                        "apple", "orange", "banana", "kiwi", "strawberry"
                     )
                    .buffer(3);
        this.subscribeAndEnd(fruitFlux);
    }
    
    @Test
    public void bufferAsyn() {
        Flux<String> flux =
            Flux.just(
                    "apple", "orange", "banana", "kiwi", "strawberry"
                 )
                .buffer(3)
                .flatMap(x ->
                    Flux.fromIterable(x)
                        .map(y -> y.toUpperCase())
                        .subscribeOn(Schedulers.parallel())
    //                    .log()
                 );
        this.subscribeAndEnd(flux);
    }
    
    @Test
    public void all() {
        Mono<Boolean> animalFlux = 
                Flux.just(
                        "aardvark", "elephant", "koala", "eagle", "kangaroo"
                     )
                    .all(c -> c.contains("a"))
                    ;
        animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                  .subscribe(System.out::println);
    
    }

}

posted on 2021-11-23 13:59 paulwong 阅读(333) 评论(0)  编辑  收藏 所属分类: REACTIVE STREAMS


只有注册用户登录后才能发表评论。


网站导航: