본문 바로가기

RxJava

[RxJava] 데이터 결합 연산자

안녕하세요. 메로나입니다.

 

오늘은 RxJava 함수 중 데이터 결합 연산자에 대해 공부하겠습니다.

 

merge
  • 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고, 통지 시점이 같을 경우 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지합니다.

merge - 구글

 public static void main(String[] args) {
         Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
                 .take(5); // 5개만 발행

            Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
                    .take(5) // 5개만 발행
                    .map(data -> data + 1000L);

            Observable.merge(observable1, observable2)
                    .subscribe(System.out::println);

            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
    
// 출력
0
1
1000
2
3
1001
4
1002
1003
1004

 

 

concat
  • 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지가 됩니다.
  • 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지된 후, 다음 Observable의 데이터가 통지됩니다.

concat-구글

public static void main(String[] args) {
        // Observable.interval(200L, TimeUnit.MILLISECONDS) : 200ms마다 0부터 시작하는 Long 데이터를 발행
        // take(5) : 5개의 데이터만 발행
        Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
                .take(5); // 5개만 발행

        // Observable.interval(400L, TimeUnit.MILLISECONDS) : 400ms마다 0부터 시작하는 Long 데이터를 발행
        // take(5) : 5개의 데이터만 발행
        // map(data -> data + 1000L) : 1000씩 증가하는 데이터를 발행
        Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
                .take(5)
                .map(data -> data + 1000L);

        // concat : observable1의 데이터를 모두 발행한 후 observable2의 데이터를 발행
        Observable.concat(observable1, observable2)
                .subscribe(System.out::println);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
// 출력
0
1
2
3
4
1000
1001
1002
1003
1004

 

 

zip
  • Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지합니다.
  • 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점을 완료 통지 시점을 맞춥니다.

zip-구글

public static void main(String[] args) {
        Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
                .take(3); // 5개만 발행
        
        Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
                .take(5)
                .map(data -> data + 1000L);

        // zip : observable1과 observable2의 데이터를 1:1로 묶어서 합침
        Observable.zip(observable1, observable2, (data1, data2) -> data1 + " | " + data2)
                .subscribe(System.out::println);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
// 출장
0 | 1000
1 | 1001
2 | 1002

 

combineLatest
  • 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수현 인터페이스에 전달하고, 새로운 데이터를 생성해 통지합니다.

combineLatest - 구글

public static void main(String[] args) {
        Observable<Long> observable1 = Observable.interval(500L, TimeUnit.MILLISECONDS)
                .take(4);

        Observable<Long> observable2 = Observable.interval(700L, TimeUnit.MILLISECONDS)
                .take(4)
                .map(data -> data + 1000L);

        // combineLatest : observable1과 observable2의 데이터 중 최신 데이터를 합침
        Observable.combineLatest(observable1, observable2, (data1, data2) -> data1 + " | " + data2)
                .subscribe(System.out::println);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
// 출력
0 | 1000
1 | 1000
1 | 1001
2 | 1001
3 | 1001
3 | 1002
3 | 1003

 

오늘은 데이터 결합 연산자에 대해 공부했습니다.

combineLatest는 조금 헷갈리는데 자주 사용하면서 익히는 시간을 가지겠습니다.

20000~

'RxJava' 카테고리의 다른 글

[RxJava] 유틸리티 연산자  (0) 2025.02.21
[RxJava] 에러 처리 연산자  (0) 2025.02.19
[RxJava] RxJava 퀴즈 문제  (0) 2025.02.09
[RxJava] 데이터 변환 연산자  (0) 2025.02.06
[RxJava] 데이터 필터링 연산자  (0) 2025.02.01