티스토리 뷰

반응형

애플리케이션을 설계하고 개발할 때 다양한 케이스의 예외 상황을 고려해야 하는 것처럼, 리액티브 애플리케이션을 설계할 때도 모든 종류의 예외 상황을 처리 할 수 있어야 한다.

우리가 알고 있는 onError 시그널은 리액티브 스트림 스펙의 필수 요소여서 예외를 처리할 수 있는 경로로 전파 할 수 있다.

그러나 최종 구독자가 onError 시그널에 대한 핸들러를 정의하지 않으면 onError는 UnsupportedOperationException을 발생 시킨다.

그리고 리액티브 스트림에서는 onError가 스트림이 종료되었다고 정의하고 있다.

그렇기 때문에 onError 시그널을 받으면 시퀀스가 실행을 중지하게 된다.

아래 onError 시그널을 처리할 수 있는 다양한 방법을 살펴보자.

1. subscribe : 연산자에서 onError 신호에 대한 핸들러 정의

Flux.just(1,2,3)
	.map((i) -> {
		throw new RuntimeException("error"); })
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));

<결과> 
Error : error

2. onErrorReturn : 사전 정의된 정적 값 또는 Exception으로 대체

Flux.just(1,2,3)
	.map((i) -> {
		throw new RuntimeException("error");})
	.onErrorReturn(99)
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));
                    
<결과>
Index : 99
Complete

3. onErrorResume : 예외를 catch하여 대체 워크플로 생성

Flux.just(1,2,3)
	.map((i) -> {
		throw new RuntimeException("error"); })
	.onErrorResume((error) -> Flux.just(4,5,6))
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));
                    
                    
<결과>

Index : 4
Index : 5
Index : 6

Complete

4. onErrorMap : 예외를 catch하고 다른 예외 (Exception)으로 대체

Flux.just(1,2,3)
	.map((i) -> {
		throw new RuntimeException("error"); })
	.onErrorMap((error) -> {
		throw new RuntimeException("custom error");
	})
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));
                    
<결과>
Error : custom error

 5.onErrorContinue : 예외 발생시 에러를 처리하고 나머지 시퀀스를 처리

Flux.just(1,2,3)
	.map((i) -> {
		if(i ==2) {
			throw new RuntimeException("error");
		}
		return i;
	})
	.onErrorContinue((error, i) -> {
		System.out.println("Error index : " + i);
	})
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));

<결과> 

Index : 1
Error index : 2
Index : 3
Complete

6.retry  & retry(n) : 오류 시그널이 발생하면 리액티브 시퀀스를 다시 구독. (재시도는 무한대로 하거나 제한된 시간 동안 할 수 있음  무한 반복하지 않고 횟수를 지정하고 싶다면 retry(n))

Flux.just(1,2,3)
	.map((i) -> {
		if(i ==2) {
			throw new RuntimeException("error");
		}
		return i;
	})
	.retry()
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));

<결과>

Index : 1
Index : 1
Index : 1
Index : 1
Index : 1
Index : 1
Index : 1
Index : 1
Index : 1
Index : 1

.....

Flux.just(1,2,3)
	.map((i) -> {
		if(i ==2) {
			throw new RuntimeException("error");
		}
		return i;
	})
	.retry(2)
	.subscribe(
		(data) -> System.out.println("Index : " + data),
		(error) -> System.err.println("Error : " + error.getMessage()),
		()-> System.out.println("Complete"));

<결과>

Index : 1
Index : 1
Index : 1
Error : error

7.retryBackOff : 지수적인 백오프 알고리즘을 지원해 재시도 할 때마다 대기 시간을 증가 시킴

Flux.just(1,2,3)
	.map((i) -> {
		if(i ==2) {
			throw new RuntimeException("error");
		}
		return i;
	})
	.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)))
	.subscribe(
		(data) -> System.out.println(LocalTime.now() + " Index : " + data),
		(error) -> System.err.println(LocalTime.now() + "Error : " + error.getMessage()),
		()-> System.out.println(LocalTime.now() + "Complete"));

<결과>

18:24:28.149141 Index : 1
18:24:30.874421 Index : 1
18:24:34.409100 Index : 1

8.timeout : 작업 대기 시간을 제한하고 TimeoutException 발생

Flux.just(1, 2, 3)
	.map((i) -> {
		if (i == 2) {
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				System.out.println("InterruptedException !" + e.getMessage());
			}
			throw new RuntimeException("error");
		}
		return i;
	})
	.timeout(Duration.ofSeconds(1))
	.subscribe(
		(data) -> System.out.println(LocalTime.now() + " Index : " + data),
		(error) -> System.err.println(LocalTime.now() + "Error : " + error.getMessage()),
		() -> System.out.println(LocalTime.now() + "Complete"));

<결과>

Error : Did not observe any item or terminal signal within 1000ms in 'map' (and no fallback has been configured)

9.defaultIfEmpty :비어 있는 스트림이 왔을 경우 기본값을 반환

Flux.empty()
	.defaultIfEmpty("Default Value")
	.subscribe(
		(data) -> System.out.println(LocalTime.now() + " Index : " + data),
		(error) -> System.err.println(LocalTime.now() + "Error : " + error.getMessage()),
		() -> System.out.println(LocalTime.now() + "Complete"));
<결과>
Index : Default Value

10.switchIfEmpty : 비어 있는 스트림이 왔을 경우 다른 리액티브 스트림 반환

Flux.empty()
	.switchIfEmpty(Flux.defer(() -> Flux.just(1,2,3)))
	.subscribe(
		(data) -> System.out.println(LocalTime.now() + " Index : " + data),
		(error) -> System.err.println(LocalTime.now() + "Error : " + error.getMessage()),
		() -> System.out.println(LocalTime.now() + "Complete"));

<결과>

18:32:46.810725 Index : 1
18:32:46.819195 Index : 2
18:32:46.819284 Index : 3
반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함