본문 바로가기

개발/RxSwift

[RxSwift] Observable Utility Operators

반응형
 

ReactiveX 한국어

 

runebook.dev

Observable Utility Operators

  • Delay
  • Do
  • Materialize / Dematerialize
  • Timeout
  • Using
  • Serialize
  • ObserveOn
  • SubscribeOn

 


 

Delay

Delay

Observable을 특정 시간만큼 이동히여 방출을 늦춘다.
Observable의 각 항목을 방출하기 전의 특정 시간 단위 동안 일시정지하여 Observable을 수정한다.
Observable에 의해 방출된 항목의 전체 시퀀스를 지정된 증가만큼 앞으로 이동시키는 효과가 있다.

Observable.of(1, 2, 3, 4)
    .delay(DispatchTimeInterval.seconds(3), scheduler: MainScheduler.instance)
    .subscribe{ print($0) }
    .disposed(by: disposedBag)
    
// delay : next(1)
// delay : next(2)
// delay : next(3)
// delay : next(4)
// delay : completed

+ Delay Subscription 

Observable의 구독을 지연시킬 수 있다.

Observable.of(1, 2, 3, 4)
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("dealy subscription : \($0)") }
    .disposed(by: disposedBag)

// dealy subscription : next(1)
// dealy subscription : next(2)
// dealy subscription : next(3)
// dealy subscription : next(4)
// dealy subscription : completed

 

 

Do

Do

다양한 Observable를 수명주기 이벤트에 대해 수행할 조치를 등록한다.
Observable에서 특정 이벤트가 발생할 때 ReactiveX가 호출하는 콜백을 등록할 수 있다.
이 콜백은 Observable의 종속과 관련된 일반 알림 세트와 독립적으로 호출된다.

Do 사용 가능한 메소드

do 에서 접근 가능한 생명주기 부분을 적극 활용하자

Observable.of(1, 2, 3, 4)
    .do { print("do : \($0)") }
    .subscribe{ print("scribe : \($0)") }
    .disposed(by: disposedBag)
    
// do : 1
// scribe : next(1)
// do : 2
// scribe : next(2)
// do : 3
// scribe : next(3)
// do : 4
// scribe : next(4)
// scribe : completed

 

 

Materialize(구체화) / Dematerialize(비물질화)

(구Materialize

항목을 내보낼 때, 이벤트에 대한 구독을 함으로 값을 이벤트로 묶어 방출한다.
next(항목), error(항목) 으로 묶어 배출한다.
Error 이벤트를 따로 처리해주지 않은 경우 Observable이 종료되지만
Materialize를 사용하면 결과를 Observable로 감싸주어 문제없이 이어진다,

방출된 항목으로 전송된 알림을 모두 나타내거나 프로세스를 취소한다.
Materialize 연산자는 원래 onNext 알림과 터미널 onCompleted / onError 알림 모두와 같은 일련의 호출을 
Observable이 생성한 일련의 항목으로 변환한다.

Dematerialize

Dematerialize 는 이 프로세스를 반대로 수행한다. 
Materialize에 의해 변환된 Observable에서 작동하여 원래 형태로 되돌린다.

Observable
    .of(1, 2, 3, 4)
    .materialize()
    .map { event -> Event<Int> in
        switch event {
        case .error:
            return .next(-1)
        default:
            return event
        }
    }
    //.dematerialize()
    .subscribe { print($0) }
    .disposed(by: disposedBag)
    
// next(next(1))
// next(next(2))
// next(next(3))
// next(next(4))
// 만약 Error 발생 시 Observable은 종료된다.
// next(completed)
// completed

...
    .dematerialize()
    .subscribe { print($0) }
    .disposed(by: disposedBag)

// next(1)
// next(2)
// next(3)
// next(4)
// completed

 

 

Timeout

Timeout

Observable을 미러링하지만 특정 시간이 경과한 후 방출된 항목이 없으면 오류 알림을 발행한다. 
Observable이 제한 시간을 걸어, 지정된 시간 동안 항목을 내보내지 못하면 onError 종료로 Observable을 중단할 수 있다.

button.rx.tap
    .timeout(5, scheduler: MainScheduler.instance)
    .do(onError: { error in
        if case .timeout = error as? RxError {
            // 에러 처리
        }
    })
    .subscribe { print("timeout : \($0)") }

 

 

Using

Using

Observable과 동일한 생명주기를 가진 일회용 자원을 생성한다.
Observable이 Observable 수명 동안만 존재하고 Observable이 종료될 때 함께 폐기된다.

final class Test : Disposable {
    init() {
        print("init")
    }
    
    func dispose() {
        print("disposed")
    }
}

Observable
    .using { () -> Test in
        return Test()
    } observableFactory: { disposable in
        return Observable<Int>.of(1, 2, 3, 4)
    }
    .subscribe { print("using : \($0)") }
    .disposed(by: disposedBag)
    
// init
// using : next(1)
// using : next(2)
// using : next(3)
// using : next(4)
// using : completed
// disposed

 

 

Serialize

Serialize

Observable이 직렬화된 호출을 하고 정상 동작하도록 강제한다.

Observable이 다른 스레드에서 Observer의 메소드를 비동기식으로 호출할 수 있다.
해당 Observable이 onNext 알림 중 하나 전에 onCompleted / onError 알림을 보내려고 시도하거나 두 개의 다른 스레드에서
동시에 onNext 알림을 만들 수 있기 때문에 Observable 계약을 위반하게 할 수 있다.
Serialize 연산자를 적용하여 이런 Observable 을 올바르게 작동하고 동기적으로 만들 수 있다.

 

 

ObserveOn

ObserveOn

ObserveOn이 호출된 다음 Observable을 관찰할 스케줄러를 지정한다.
ReactiveX의 많은 구현은 Scheduler을 사용하여 다중 스레드 환경에서 스레드 간의 Observable 전환을 관리한다.
ObserveOn 연산자를 사용해 Observable에게 특정 스케줄러의 관찰자에게 알림을 보내도록 지시할 수 있다.

ObserveOn - Error

ObserveOn은 onError 종료 알림을 받으면 즉시 전달하며, 아직 느리게 관찰되지 않은 항목이 처음 인식하는 항목을 수신할 때까지 기다리지 않는다. 위 다이어그램에서 onError 알림이 소스 Observable에서 방출한 항목보다 앞서는 것을 볼 수 있다.

Scheduler

SubscribeOn의 운영은 유사하지만, 관찰을 지시 자체가 지정된 스케줄러에서 작동 뿐만 아니라, 그 스케줄러에 옵저버를 통지한다.

기본적으로 Observable과 적용되는 연산자 체인은 작업을 수행하고 Subscribe 메서드가 호출되는 동일한 스레드에서 관찰자에게 알린다. SubscribeOn 연산자는 Observable이 작동해야하는 다른 스케줄러를 지정하여 동작을 변경한다.
ObserveOn 연산자는 Observable이 관찰자에게 알림을 보내는 데 사용할 다른 스케줄러를 지정한다.

SubscribeOn 연산자는 해당 연산자가 호출되는 연산자 체인의 지점에 관계없이 Observable이 작동하기 시작할 스레드를 지정한다.
Observable 연산자 체인 중 여러 지점에서 ObserveOn을 여러번 호출하여 해당 연산자 중 특정 스레드가 작동하는 스레드를 변경할 수 있다. 

Observable<Int>.of(1, 2, 3, 4)
    .observe(on: ConcurrentDispatchQueueScheduler.init(qos: .background)) // #1
    .subscribe(on: MainScheduler.instance) // #2
    .do(onNext: { _ in
        print("subscribeOn : \(Thread.isMainThread)")
    })
    .observe(on: MainScheduler.instance) //#3
    .subscribe(onNext: { _ in
        print("observeOn 2 : \(Thread.isMainThread)")
    }, onDisposed: {
        print("dispose : \(Thread.isMainThread)")
    })
    .disposed(by: disposedBag) 
    
// subscribeOn : false
// subscribeOn : false
// subscribeOn : false
// subscribeOn : false
// observeOn 2 : true
// observeOn 2 : true
// observeOn 2 : true
// observeOn 2 : true
// dispose : true

#1 : 스케줄러를 background로 지정하여 하단부터 적용을 시작한다.
#2 : Observe 가 지정되지 않은 곳에 MainThread를 적용하나 앞선 Observe에 따라 백그라운드로 동작한다.
#3 : 스케줄러를 MainThread로 지정하여 하단부터 적용을 시작한다.

 

 

SubscribeOn

subscribeOn

Observable이 작동할 스케줄러를 지정하며 SubscribeOn 호출 시점과 상관없이 시작하는 스케줄러를 나타낸다.
중첩해서 사용하는 것은 혼란을 줄 수 있으므로 가급적 한 번을 호출하여 사용하자.

ReactiveX의 많은 구현은 Scheduler을 사용해 다중 스레드 환경에서 스레드간 Observable 전환을 관리한다.
Observable의 SubscribeOn 연산자를 호출하여 Observable이 특정 스케줄러에서 작업을 수행하도록 지시할 수 있다.
ObserveOn의 운영자는 유사하지만 더 제한된다. Observable이 지정된 스케줄러에서 관찰자에게 알림을 보내도록 지시한다.

반응형