본문 바로가기

RxJava

[RxJava] 유틸리티 연산자

안녕하세요. 메로나입니다.

 

오늘은 유틸리티 연산자에 대해 공부하겠습니다.

 

Delay
  • 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자 쪽으로의 데이터 전달 지연시킵니다.
  • 소비자로 데이터를 통지할 때마다 설정한 시간만큼 지연시킬 수도 있습니다.

delay - 마블다이어그램

// 유형 1
public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());

        Observable.just(1, 2, 3, 4, 5)
                .doOnNext(item -> {
                    String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
                    System.out.println(currentTime + " - ori: " + item);
                })
                .delay(5000L, TimeUnit.MILLISECONDS)
                .doOnNext(item -> {
                    String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
                    System.out.println(currentTime + " - Emitted: " + item);
                })
                .subscribe();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
// 출력
22:18:10.329 - ori: 1
22:18:10.330 - ori: 2
22:18:10.330 - ori: 3
22:18:10.330 - ori: 4
22:18:10.330 - ori: 5
// 5초후에 데이터 통지
22:18:15.336 - Emitted: 1
22:18:15.337 - Emitted: 2
22:18:15.337 - Emitted: 3
22:18:15.338 - Emitted: 4
22:18:15.338 - Emitted: 5    


// 유형 2
public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());

        Observable.just(1, 2, 3, 4, 5)
                .delay(1000L, TimeUnit.MILLISECONDS)
                .doOnNext(item -> {
                    TimeUnit.MILLISECONDS.sleep(1000L); // 500ms 대기
                    String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
                    System.out.println(currentTime + " - Emitted: " + item);
                })
                .subscribe();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
// 매 초마다 데이터 통지
22:18:50.016 - Emitted: 1
22:18:51.022 - Emitted: 2
22:18:52.027 - Emitted: 3
22:18:53.033 - Emitted: 4
22:18:54.038 - Emitted: 5

 

 

DelaySubscription
  • 생산자가 데이터의 생성 및 통지 자체를 설정한 시간만큼 지연시킵니다.
  • 소비자가 구독을 해도 구독 시점 자체를 지연시킵니다.

delaySubscription - 마블다이어그램

public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());

        // start 로그
        String currentTime1 = sdf.format(new Date()); // 현재 시간 포맷팅
        System.out.println(currentTime1 + " - start");

        Observable.just(1, 2, 3, 4, 5)
                .doOnNext(item -> {
                    String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
                    System.out.println(currentTime + " - ori: " + item);
                })
                .delaySubscription(5000L, TimeUnit.MILLISECONDS)
                .subscribe(
                        item -> {
                            String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
                            System.out.println(currentTime + " - Emitted: " + item);
                        },
                        error -> System.out.println("Error: " + error),
                        () -> System.out.println("Complete")
                );

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
// 출력
22:22:13.769 - start // 시작 시간
22:22:18.833 - ori: 1 // 발행 시간
22:22:18.833 - Emitted: 1
22:22:18.833 - ori: 2
22:22:18.833 - Emitted: 2
22:22:18.833 - ori: 3
22:22:18.833 - Emitted: 3
22:22:18.834 - ori: 4
22:22:18.834 - Emitted: 4
22:22:18.834 - ori: 5
22:22:18.834 - Emitted: 5
Complete

 

TimeOut
  • 각각의 데이터 통지 시, 지정된 시간 안에 통지가 되지 않으면 에러를 통지합니다.
  • 에러 통지 시 전달되는 에러 객체는 TimeoutException 입니다.

timeout - 마블다이어그램

public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());
        Observable.range(1, 5)
                .map(num -> {
                    int sleepTime = 0;
                    if (num == 3) {
                        sleepTime = 3500;
                    } else {
                        sleepTime = 1000;
                    }
                    TimeUnit.MILLISECONDS.sleep(sleepTime);
                    return num;
                })
                .timeout(3000L, TimeUnit.MILLISECONDS)
                .subscribe( item -> {
                            String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
                            System.out.println(currentTime + " - Emitted: " + item);
                        },
                        error -> System.out.println("Error: " + error),
                        () -> System.out.println("Complete")
                );

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
// 출력
22:23:43.650 - Emitted: 1
22:23:44.657 - Emitted: 2
// 3초가 넘어서 발행되어서 에러남
Error: java.util.concurrent.TimeoutException: The source did not signal an event for 3000 milliseconds and has been terminated.

 

오늘은 평소에 많이 사용할만한 rxjava 함수를 다뤄본 것 같습니다.

코드를 작성하면 에러 처리의 중요성을 많이 배우는데 이번에 배운 함수도 많이 사용해 봐야 할 것 같습니다.

20000~

'RxJava' 카테고리의 다른 글

[RxJava] 데이터 집계 연산자  (0) 2025.02.22
[RxJava] 조건과 불린 연산자  (0) 2025.02.22
[RxJava] 에러 처리 연산자  (0) 2025.02.19
[RxJava] 데이터 결합 연산자  (0) 2025.02.11
[RxJava] RxJava 퀴즈 문제  (0) 2025.02.09