Spring Webflux Mono and Flux

2024-04-22
  • 리액터 구성 요소
    • publisher: 발행자, 게시자, 생산자, 방출자 (Flux or Mono)
    • subscriber: 구독자, 소비자
    • emit: 퍼블리셔가 데이터를 내보내는 것
    • sequence: 퍼블리셔가 emit하는 데이터의 연속적인 흐름을 정의해 놓은 것. 연산자 체인 형태로 정의됨
    • subscribe: subscriber가 시퀀스를 구독하는 것
    • dispose: subscriber 시퀀스 구독을 해지하는 것

Mono

0개 또는 1개 요소의 스트림을 나타냄, 단일 값을 내보내는 리액터 요소 (publisher)

사용 예시

단일 값 생성

Mono<String> stringMono = Mono.just("아일릿 민주");

Mono.just는 주로 이미 생성된 객체를 Mono로 변환할 때 사용된다.

null이 될 수도 있는 데이터는 Mono.justOrEmpty를 사용하면 된다.

예외 처리

Mono<Illit> illitMono = findIllitPort.findByName("민주") // Mono<Illit>
													.switchIfEmpty(Mono.error(new UserNotFoundException("민주가 없서요")));

switchIfEmpty는 상위 스트림이 Null value를 반환할 때 사용된다.

  • 마블 다이어그램 switchIfEmpty

    Untitled

    첫번 째 스트림은 empty일 때, 파란색 요소는 not empty일 때를 나타낸다.

    구독을 했는데 반환이 empty일 때 (Complete 신호만 있을 때) switchIf Empty 바디 내의 비동기 함수를 구독해 결과를 반환

    구독을 했는데 데이터가 있을 때 보이는 거와 같이 내부에 들어오고, 업 스트림의 데이터가 방출되는 것을 알 수 있다.

에러 처리

Mono<Illit> illitMono = findIllitPort.findByName("민주") // Mono<Illit>
													.switchIfEmpty(Mono.error(new UserNotFoundException("민주가 없서요")))
													.onErrorMap(e -> e instanceof UserNotFoundException ? e : new TrashException());

onErrorMap을 사용해 상위 스트림에서 발생하는 에러를 다른 에러로 맵핑해줄 수 있다.

  • 마블 다이어그램

    Untitled

Mono<Illit> illitMono = findIllitPort.findByName("민주") // Mono<Illit>
													.switchIfEmpty(Mono.error(new UserNotFoundException("민주가 없서요")))
													.onErrorResume(e -> getDefaultIllit()); // 극단적인 예시임. 원래는 switchIfEmpty, defaultIfEmpty를 사용해 처리하는 게 맞음
													
private Mono<Illit> getDefaultIllit() {
		return Mono.just(
            Illit.builder()
                 .name("아일릿")
                 .age("18")
                 .build()
    );
}

onErrorResume를 사용해 에러가 발생했을 때 어떤 작업을 수행할 수 있도록 만들어준다.

  • 마블 다이어그램

    Untitled

    에러가 발생하면 onErrorResume 바디내의 비동기 함수를 구독해 나온 결과로 새로운 스트림을 발행한다.

연산자 사용

Mono<IllitResponse> illitResponse = findIllitPort.findByName("민주") // Mono<Illit>
													.switchIfEmpty(Mono.error(new UserNotFoundException("민주가 없서요")))
													.map(this::toIllitResponse); // Illit to IllitResponse
													
private IllitResponse toIllitResponse(Illit illit) {
		return IllitResponse.of(illit);
}

Mono.map 연산자는 동기적인 작업, 다운 스트림에 전달될 value가 변하는 작업을 실행할 때 주로 사용함. (Mono,Flux를 반환하지 않는 작업)

  • 마블 다이어그램

    Untitled

    상위 스트림 요소를 가져와 새로운 Mono를 발행한다.

    형태 변환이 없으면 doOnNext와 같은 연산자 사용

Mono<IllitResponse> illitResponse = findIllitPort.findByName("민주") // mono<illit>
													.switchIfEmpty(Mono.error(new UserNotFoundException("민주가 없서요")))
													.flatMap(this::toIllitResponseMono);
													
private **Mono**<IllitResponse> toIllitResponseMono(Illit illit) {
		return Mono.justOrEmpty(IllitResponse.of(illit));
}-

Mono.flatMap 연산자는 주로 비동기적인 작업을 실행할 때 사용함. (Mono, Flux를 반환하는 작업)

  • 마블 다이어그램

    Untitled

    상위 스트림 요소를 가져와 flatMap 바디내의 비동기 함수를 구독해 새 Mono를 발행한다.

public Mono<IllitWithItemResponse> minjuPowerMethod() {
		return findIllitPort.findByName("민주") // Mono<Illit>
									.zipWith(findItemPort.findByName("민주"), (minju, item) -> {
												return new IllitWithItemResponse(minju, item);
									})
}

public Mono<IllitWithItemResponse> minjuPowerMethod() {
		return findIllitPort.findByName("민주") // Mono<Illit>
									.zipWith(findItemPort.findByName("민주")) // Mono<Tuple<Illit, item>>
									.map(tuple -> {
											return new IllitWithItemResponse(tuple.getT1(), tuple.getT2());
									});
}

record IllitWithItemResponse(Illit illit, Item item) {}

기본적으로 zipWith을 사용하면 상위 스트림 결과와 zipWith 바디내의 작업이 결합돼 튜플이 반환되지만, (minju, item)과 같이 명시해서 사용할 수 있다.

  • 마블 다이어그램

    Untitled

    업 스트림의 요소와 zipWith 바디 내부에 있는 요소를 병렬로 실행하고 결합시킨다.

    여기서 하나의 작업이라도 예외가 발생하면 결합이 되지 않는다.

Eager & Lazy

switchIfEmpty

@GetMapping("/test")
public Mono<String> testController() {
    return Mono.justOrEmpty("아일릿 최고입니다.")
            .switchIfEmpty(test());
}

private Mono<String> test() {
    System.out.println("test 호 출");
    return Mono.just("empty입니당");
}

Log : test 호 출, Response : 아일릿 최고입니다.

switchIfEmpty를 사용할 때는 원래의 Mono(상위 스트림)가 비어 있을 경우에 사용할 대신 사용할 Mono를 제공해 test 메서드가 실행은 되지만(Eager) test에서 반환하는 Mono는 원래의 Mono를 대체할 수 없다.

switchIfEmpty를 사용해 만약 상위 스트림이 empty일 때 다른 로직을 수행할 수 있게 한다.

  • 마블 다이어그램 switchIfEmpty
    • 초록색 요소는 empty일 때, 파란색 요소는 not empty일 때를 나타낸다.
    • 구독을 했는데 반환이 empty일 때 (Complete 신호만 있을 때) switchIfEmpty 바디 내의 비동기 함수를 구독해 결과를 반환
    • 구독을 했는데 데이터가 있을 때 보이는 거와 같이 내부에 들어오고, 업 스트림의 데이터가 방출되는 것을 알 수 있다.

defer

@GetMapping("/test")
public Mono<String> testController() {
    return Mono.justOrEmpty("아일릿 최고입니다.")
            .switchIfEmpty(Mono.*defer*(this::test));
}

private Mono<String> test() {
    System.out.println("test 호 출");
    return Mono.just("empty입니당");
}

Log : , Response : 아일릿 최고입니다.

defer를 사용해 실행을 구독 시점까지 지연시켜 Lazy하게 처리할 수 있다.

test가 아예 실행이 안됨

  • 마블 다이어그램 defer

    Untitled

    바디 내의 비동기 함수를 구독해 Mono를 반환 시킨다.

    defer 바디 밖에 구독이 있는데 이게 바로 구독이 된 시점에만 defer 바디를 실행할 수 있게 해준다. (Lazy)

Mono switchIfEmpty() is always called

Flux

0개에서 n개 요소의 스트림을 나타냄, 여러 개의 값을 내보낼 수 있는 리액터 요소 (publisher)

사용 예시

스트리밍 데이터

Flux<Integer> streamInteger = Flux.range(1, 10);

연산자 사용

Flux<IllitResponse> illitResponseFlux = findIllitPort.findAll().map(this::toIllitResponse)

Mono와의 결합

Mono<Illit> illitMono = findIllitPort.findByName("민주");
Flux<Item> itemFlux = illitMono.flatMap(findItemPort::findByIllit);

Mono<IllitWithItemResponse> illitResponseMono = illitMono.zipWith(itemFlux.collectList(),
    (illit, item) -> {
        return toIllitWithItemResponse(illit, item);
    })
																			  
private IllitWithItemResponse toIllitWithItemResponse(Illit illit, List<Item> item) {
		return new IllitWithItemResponse(illit, item)
}
																			  
record IllitWithItemResponse(Illit illit, List<Item> item) {}

filter vs filterWhen

filterWhen은 비동기 작업

filter는 동기 작업