안녕하세요. 메로나입니다.
오늘은 RxJava 함수 중 데이터 결합 연산자에 대해 공부하겠습니다.
merge
- 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고, 통지 시점이 같을 경우 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지합니다.

public static void main(String[] args) {
Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(5); // 5개만 발행
Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(5) // 5개만 발행
.map(data -> data + 1000L);
Observable.merge(observable1, observable2)
.subscribe(System.out::println);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출력
0
1
1000
2
3
1001
4
1002
1003
1004
concat
- 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지가 됩니다.
- 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지된 후, 다음 Observable의 데이터가 통지됩니다.

public static void main(String[] args) {
// Observable.interval(200L, TimeUnit.MILLISECONDS) : 200ms마다 0부터 시작하는 Long 데이터를 발행
// take(5) : 5개의 데이터만 발행
Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(5); // 5개만 발행
// Observable.interval(400L, TimeUnit.MILLISECONDS) : 400ms마다 0부터 시작하는 Long 데이터를 발행
// take(5) : 5개의 데이터만 발행
// map(data -> data + 1000L) : 1000씩 증가하는 데이터를 발행
Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(5)
.map(data -> data + 1000L);
// concat : observable1의 데이터를 모두 발행한 후 observable2의 데이터를 발행
Observable.concat(observable1, observable2)
.subscribe(System.out::println);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출력
0
1
2
3
4
1000
1001
1002
1003
1004
zip
- Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지합니다.
- 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점을 완료 통지 시점을 맞춥니다.

public static void main(String[] args) {
Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(3); // 5개만 발행
Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(5)
.map(data -> data + 1000L);
// zip : observable1과 observable2의 데이터를 1:1로 묶어서 합침
Observable.zip(observable1, observable2, (data1, data2) -> data1 + " | " + data2)
.subscribe(System.out::println);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출장
0 | 1000
1 | 1001
2 | 1002
combineLatest
- 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수현 인터페이스에 전달하고, 새로운 데이터를 생성해 통지합니다.

public static void main(String[] args) {
Observable<Long> observable1 = Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(4);
Observable<Long> observable2 = Observable.interval(700L, TimeUnit.MILLISECONDS)
.take(4)
.map(data -> data + 1000L);
// combineLatest : observable1과 observable2의 데이터 중 최신 데이터를 합침
Observable.combineLatest(observable1, observable2, (data1, data2) -> data1 + " | " + data2)
.subscribe(System.out::println);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출력
0 | 1000
1 | 1000
1 | 1001
2 | 1001
3 | 1001
3 | 1002
3 | 1003
오늘은 데이터 결합 연산자에 대해 공부했습니다.
combineLatest는 조금 헷갈리는데 자주 사용하면서 익히는 시간을 가지겠습니다.
20000~
'RxJava' 카테고리의 다른 글
| [RxJava] 유틸리티 연산자 (0) | 2025.02.21 |
|---|---|
| [RxJava] 에러 처리 연산자 (0) | 2025.02.19 |
| [RxJava] RxJava 퀴즈 문제 (0) | 2025.02.09 |
| [RxJava] 데이터 변환 연산자 (0) | 2025.02.06 |
| [RxJava] 데이터 필터링 연산자 (0) | 2025.02.01 |
