본문 바로가기

개발/RxSwift

[RxSwift] Combining Observables & Error Handling Operators

반응형
 

ReactiveX - Operators

Introduction Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementa

reactivex.io

 

Combine Observables

  • CombineLatest (다중, 스트림 전부 항목이 있어야 활성화되며 가장 최근 방출한 값을 합침)
  • SwitchLatest (하나의 스트림만 주목)
  • Zip (같은 순서의 항목을 짝으로 묶어 배출)
  • Merge (다중, 스트림 하나라도 값이 있으면 가장 최근 방출된 항목을 합침)
  • StartWith (우선 실행할 항목 지정)

 


 

CombineLatest

Combine

여러 Observable의 가장 최근에 방출한 값을 합쳐서 하나의 결과로 방출한다.
Comebine에 포함된 Observable 중에 하나라도 아무런 값이 들어가있지 않다면, 아무 값도 방출하지 않는다.

let income = PublishSubject<Int>()
let consume = PublishSubject<Int>()
let unit = PublishSubject<String>()

Observable
    .combineLatest(income, consume, unit) { i, c, u in
        "소득량: \(i)\(u), 소비량: \(c)\(u)"
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

income.onNext(100_000)
consume.onNext(50_000)
unit.onNext("원")

income.onNext(200_000)

// next(소득량: 100000원, 소비량: 50000원)
// next(소득량: 200000원, 소비량: 50000원)

사용 사례는 필수적으로 선택해야하는 여러 값을 한번에 등록하거나 업로드 할 때 유용해보인다. (Ex. 회원가입 시 필수적인 정보들 등록)

+ WithLatestFrom 

첫 번째 Observable에서 항목이 방출될 때마다 다른 Observable의 최신 항목과 결합해서 방출하는 연산자로
CombineLatest와 작동 방식이 유사하지만 지정된 스트림의 유무가 차이점으로 존재한다.

 

SwitchLatest

 

Switch

Observable에서 발행하는 Observable을 다른 Observable로 변경하여 대상을 변경한 시점부터 해당 항목을 발행한다.

let value1 = PublishSubject<Int>()
let value2 = PublishSubject<Int>()
let value3 = PublishSubject<Int>()
let result = PublishSubject<Observable<Int>>()

let observable = result.switchLatest()
    .subscribe { print($0) }

result.onNext(value1) // 관찰할 Observable 지정
value1.onNext(0)
value2.onNext(1000)

result.onNext(value2) // 관찰할 Observable 지정
value1.onNext(1)
value2.onNext(2000)
value2.onNext(2050)

result.onNext(value3) // 관찰할 Observable 지정
value1.onNext(2)
value1.onNext(3)
value2.onNext(3000)
value3.onNext(10000)

result.onNext(value1) // 관찰할 Observable 지정
value1.onNext(4)

observable.disposed(by: disposeBag)

// next(0)
// next(2000)
// next(2050)
// next(10000)
// next(4)

 

 

Zip

Zip

두 개 이상의 Observable에서 발행 순서가 같은 항목을 짝으로 묶어 방출한다. 

Observable
    .zip(income, unit) { ($0, $1) }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

income.onNext(100_000)
income.onNext(200_000)
income.onNext(300_000)
income.onNext(400_000)

unit.onNext("원")
unit.onNext("원")
unit.onNext("불")
unit.onNext("달러")

income.onNext(600_000)
income.onNext(700_000)

// next((100000, "원"))
// next((200000, "원"))
// next((300000, "불"))
// next((400000, "달러"))

 

 

Merge

Merge

여러 Observable을 하나로 합쳐서 결과를 방출한다.
CombineLatest와 다른 점은, 포함된 Observable의 값이 비어있더라도
하나의 Observable에 새로운 값이 입력되면 그 값을 즉시 방출한다는 점이다.

Observable.of(income, consume)
    .merge()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

income.onNext(100_000)
consume.onNext(50_000)
income.onNext(200_000)

// next(100000)
// next(50000)
// next(200000)

 

+ Concat

Concat은 Merge와 비슷하게 동작하나, 여러 개의 스트림을 직렬로 나열하듯 이어 붙인다. 
여러 Observable이 동시에 항목을 배출하더라도 앞선 항목들이 모두 배출된 다음 추가된 Observable을 구독하여 항목을 배출한다.
Concat에 Hot Observable을 붙이게 되면 앞선 Observable이 끝나는 시점에 따라 배출받는 항목이 달라질 수 있다.

 

 

StartWith

StartWith

Observable이 아이템을 발행하기 전 StartWith로 추가한 항목들을 먼저 발행한다.

Observable.of(1, 2, 3)
    .startWith(4, 5, 6)
    .startWith(7)
    .subscribe { print("startWith = \($0)") }
    .disposed(by: disposeBag)
    
// startWith = next(7)
// startWith = next(4)
// startWith = next(5)
// startWith = next(6)
// startWith = next(1)
// startWith = next(2)
// startWith = next(3)
// startWith = completed

 

 

 


Error Handling Operators

  • Catch (에러 캐치 후 onError 방지)
  • Retry (재시도)

 


Catch

Catch

에러 발생 시 특정 값을 반환하여 onError 없이 시퀀스를 종료한다.

let observabled = Observable<String>
    .create { observe -> Disposable in
        observe.onNext("1")
        observe.onNext("2")
        observe.onNext("3")
        observe.onError(NSError(domain: "testError", code: 0, userInfo: nil))
        
        return Disposables.create()
    }

observabled
    .catch { err -> Observable<String> in
        return .just("Error occured")
    }
    .subscribe { print( "catch = \($0)") }
    .disposed(by: disposeBag)

// catch = next(1)
// catch = next(2)
// catch = next(3)
// catch = next(Error occured)
// catch = completed

 

 

Retry

Retry

onErrer 발생 시 해당 오류를 전달하지 않고 Observable을 다시 시도하는 연산자이다.
위 그림처럼 오류와 함게 종료되는 시퀀스에서도 항상 다음 알림을 Observer에게 전달하므로 중복 방출이 일어날 수 있다.
maxAttemptCount로 재시도 횟수를 지정할 수 있다. 

Observable<String>.create { observer in
    observer.onNext("one")
    observer.onNext("two")
    observer.onNext("three")
    
    let error = NSError(domain: "testError", code: 0, userInfo: nil)
    observer.onError(error)
    
    return Disposables.create()
}
.debug()
.retry(2)
.subscribe { print("retry = \($0)") }
.disposed(by: disposeBag)

// - debug : subscribed
// - debug : Event next(one)
// retry = next(one)
// - debug : Event next(two)
// retry = next(two)
// - debug : Event next(three)
// retry = next(three)
// - debug : Event error(Error Domain=testError Code=0 "(null)")
// - debug : isDisposed
//
// - debug : subscribed
// - debug : Event next(one)
// retry = next(one)
// - debug : Event next(two)
// retry = next(two)
// - debug : Event next(three)
// retry = next(three)
// - debug : Event error(Error Domain=testError Code=0 "(null)")
// - debug : isDisposed
// retry = error(Error Domain=testError Code=0 "(null)")
반응형

'개발 > RxSwift' 카테고리의 다른 글

[RxSwift] Conditional & Boolean Operators  (0) 2022.03.19
[RxSwift] Observable Utility Operators  (0) 2022.03.18
[RxSwift] Filtering Observable  (0) 2022.03.11
[RxSwift] Transforming Observable  (0) 2022.03.10
[RxSwift] Observable 생성 연산자  (0) 2022.03.08