안녕하세요. 메로나입니다.
오늘은 유틸리티 연산자에 대해 공부하겠습니다.
Delay
- 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자 쪽으로의 데이터 전달 지연시킵니다.
- 소비자로 데이터를 통지할 때마다 설정한 시간만큼 지연시킬 수도 있습니다.

// 유형 1
public static void main(String[] args) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());
Observable.just(1, 2, 3, 4, 5)
.doOnNext(item -> {
String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime + " - ori: " + item);
})
.delay(5000L, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime + " - Emitted: " + item);
})
.subscribe();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출력
22:18:10.329 - ori: 1
22:18:10.330 - ori: 2
22:18:10.330 - ori: 3
22:18:10.330 - ori: 4
22:18:10.330 - ori: 5
// 5초후에 데이터 통지
22:18:15.336 - Emitted: 1
22:18:15.337 - Emitted: 2
22:18:15.337 - Emitted: 3
22:18:15.338 - Emitted: 4
22:18:15.338 - Emitted: 5
// 유형 2
public static void main(String[] args) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());
Observable.just(1, 2, 3, 4, 5)
.delay(1000L, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
TimeUnit.MILLISECONDS.sleep(1000L); // 500ms 대기
String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime + " - Emitted: " + item);
})
.subscribe();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 매 초마다 데이터 통지
22:18:50.016 - Emitted: 1
22:18:51.022 - Emitted: 2
22:18:52.027 - Emitted: 3
22:18:53.033 - Emitted: 4
22:18:54.038 - Emitted: 5
DelaySubscription
- 생산자가 데이터의 생성 및 통지 자체를 설정한 시간만큼 지연시킵니다.
- 소비자가 구독을 해도 구독 시점 자체를 지연시킵니다.

public static void main(String[] args) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());
// start 로그
String currentTime1 = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime1 + " - start");
Observable.just(1, 2, 3, 4, 5)
.doOnNext(item -> {
String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime + " - ori: " + item);
})
.delaySubscription(5000L, TimeUnit.MILLISECONDS)
.subscribe(
item -> {
String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime + " - Emitted: " + item);
},
error -> System.out.println("Error: " + error),
() -> System.out.println("Complete")
);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출력
22:22:13.769 - start // 시작 시간
22:22:18.833 - ori: 1 // 발행 시간
22:22:18.833 - Emitted: 1
22:22:18.833 - ori: 2
22:22:18.833 - Emitted: 2
22:22:18.833 - ori: 3
22:22:18.833 - Emitted: 3
22:22:18.834 - ori: 4
22:22:18.834 - Emitted: 4
22:22:18.834 - ori: 5
22:22:18.834 - Emitted: 5
Complete
TimeOut
- 각각의 데이터 통지 시, 지정된 시간 안에 통지가 되지 않으면 에러를 통지합니다.
- 에러 통지 시 전달되는 에러 객체는 TimeoutException 입니다.

public static void main(String[] args) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault());
Observable.range(1, 5)
.map(num -> {
int sleepTime = 0;
if (num == 3) {
sleepTime = 3500;
} else {
sleepTime = 1000;
}
TimeUnit.MILLISECONDS.sleep(sleepTime);
return num;
})
.timeout(3000L, TimeUnit.MILLISECONDS)
.subscribe( item -> {
String currentTime = sdf.format(new Date()); // 현재 시간 포맷팅
System.out.println(currentTime + " - Emitted: " + item);
},
error -> System.out.println("Error: " + error),
() -> System.out.println("Complete")
);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 출력
22:23:43.650 - Emitted: 1
22:23:44.657 - Emitted: 2
// 3초가 넘어서 발행되어서 에러남
Error: java.util.concurrent.TimeoutException: The source did not signal an event for 3000 milliseconds and has been terminated.
오늘은 평소에 많이 사용할만한 rxjava 함수를 다뤄본 것 같습니다.
코드를 작성하면 에러 처리의 중요성을 많이 배우는데 이번에 배운 함수도 많이 사용해 봐야 할 것 같습니다.
20000~
'RxJava' 카테고리의 다른 글
| [RxJava] 데이터 집계 연산자 (0) | 2025.02.22 |
|---|---|
| [RxJava] 조건과 불린 연산자 (0) | 2025.02.22 |
| [RxJava] 에러 처리 연산자 (0) | 2025.02.19 |
| [RxJava] 데이터 결합 연산자 (0) | 2025.02.11 |
| [RxJava] RxJava 퀴즈 문제 (0) | 2025.02.09 |
