본문 바로가기

개발/RxSwift

[RxSwift] Mathematical & Aggregate, Connectable Observable Operators

반응형
 

ReactiveX - Observable

Observable In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Ob

reactivex.io

 

Mathematical & Aggregate Operators 종류

  • Concat
  • Reduce

Concat

Concat

둘 이상의 Observable 을 끼워넣어 방출한다.
Observable을 직렬로 나열해 이어붙이는 식이다.

Hot Observable을 사용할 시 주의해야한다.
Concat으로 Hot Observable을 연결하면 이전에 방출한 항목을 읽을 수 없어 손실이 발생한다.
모든 Observable이 완료되고 난 후 Hot Observable을 구독하기 때문이다.

+ Concat과 반대로 Merge가 있다.

Hot Observable 이란?

생성과 동시에 이벤트를 방출하는, 구독되는 시점과 관계없이 이벤트를 방출하는 개념이다.
Observable을 Hot으로 바꾸는 방법은 ConnectableObservable과 Subject 클래스로 가능하다.

ConnectableObservable의 경우, publish() 메소드를 사용해 Cold를 Hot으로 변환할 수 있다.
publish() 후에 connect() 메소르를 사용하면 옵저버에서 데이터를 처리하는 과정을 허용한다.

Subject는 AsyncSubject, PublishSubject, BehaviorSubject, ReplaySubject 를 이용해
Observable을 생성하면 자동으로 Hot으로 선언하게 된다.
Cold Observable 이란?

Hot Observable과 달리 구독되는 시점부터 이벤트를 생성하여 방출하는 개념이다.
Hot Observable에서 쓰이지 않는 방식을 Cold라고 보면 된다.

 

 

Reduce

Reduce

Observable이 방출하는 각 항목에 함수를 순차적으로 적용하고 최종 값을 방출한다.
Observable의 첫 항목에 함수를 적용한 다음, 두 번째 항목과 함께 함수 결과를 다시 함수에 공급하여
Observable이 최종 항목을 방출할 때까지 이 프로세스를 계속한다.
최종적으로 Reduce는 Observable 함수에서 반환된 값을 방출한다.

Scan은 중간에서 일어나는 모든 작업을 방출하고, Reduce는 마지막 값만 방출하는데 차이가 있다.

Observable
    .range(start: 1, count: 5)
    .reduce(0, accumulator: +)
    .subscribe { print("reduce : \($0)") }
    .disposed(by: disposedBag)
    
// reduce : next(15)
// reduce : completed

 


Connectable Observable 

Observable 과 비슷하지만 subscribe 이전에 connect() 메소드를 사용하지 않으면 아무런 요소를 방출하지 않는다는 차이가 있다.
이 방식으로 Observable이 항목을 내보내기 전에 모든 관찰자가 Observable을 구독할 때까지 기다릴 수 있다.

 

Connectable Observable Operators 종류

  • Multicase publish
  • RefCount
  • Share
  • Replay
    - replay
    - shareReplay

 

Multicast publish

Publish

일반 Observable을 연결 가능한 Observable로 변환한다.
Observable의 시퀀스를 하나의 subject를 통해 multicast로 이벤트를 전달하게 된다.
구독 공유의 기본이 되는 연산자지만, 복잡한 구조를 지니고 있기에 많이 사용되지는 않는다.

원본 Observable에서 항목을 방출하면 파라미터로 받은 subject로 들어가 모든 구독자에게 공유한다.

let test = PublishSubject<Int>()
let multicast = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(4)
    .multicast(test)
    
multicast
    .subscribe { print("multicast = \($0)") }
    .disposed(by: disposedBag)

multicast
    .delay(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print($0) }
    .disposed(by: disposedBag)

multicast.connect()

// multicast = next(0)
// multicast = next(1)
// multicast = next(2)
// multicast = next(3)
// multicast = completed
// multicast delay = next(0)
// multicast delay = next(1)
// multicast delay = next(2)
// multicast delay = next(3)
// multicast delay = completed
let unicast = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(4)

unicast
    .subscribe { print("unicast = \($0)") }
    .disposed(by: disposedBag)

unicast
    .delay(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("unicast delay = \($0)") }
    .disposed(by: disposedBag)
    
// unicast = next(0)
// unicast = next(1)
// unicast = next(2)
// unicast = next(3)
// unicast = completed
// unicast delay = next(0)
// unicast delay = next(1)
// unicast delay = next(2)
// unicast delay = next(3)
// unicast delay = completed

 

 

 

RefCount

RefCount

Connectable Observable에 연결하거나 연결을 끊는 프로세스를 자동화한다.
Connectable Observable에서 작동하며 일반 Observable을 반환한다.
첫 번째 관찰자가 Observable을 구독하면 RefCount는 기본 Observable에 연결하고, RefCount는 다른 옵저버가
얼마나 많은 옵저버를 구독하는지 추적하며 마지막 옵저버가 종료될 때까지 Observable에서 연결을 끊지 않는다.
모든 구독자가 구독을 중지하면 sequence 을 중지한다.
다시 구독하면 connect를 호출하면 새로운 sequence를 시작한다.

 

 

Share

공유하는 Observable을 만들 수 있는 연산자다. 
처음 구독을 할 때만 sequence가 생성되고 그 이후는 구독을 공유한다.

let share = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .do { print("share = \($0)") }
    .take(3)
//    .share()

share.subscribe { print("subscribe 1 : \($0)") }
share.subscribe { print("subscribe 2 : \($0)") }

// share = 0
// subscribe 1 : next(0)
// share = 0
// subscribe 2 : next(0)
// share = 1
// subscribe 1 : next(1)
// share = 1
// subscribe 2 : next(1)
// share = 2
// subscribe 1 : next(2)
// subscribe 1 : completed
// share = 2
// subscribe 2 : next(2)
// subscribe 2 : completed

Share() 을 쓰는 경우

let share = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .do { print("share = \($0)") }
    .take(3)
    .share()

share.subscribe { print("subscribe 1 : \($0)") }
share.subscribe { print("subscribe 2 : \($0)") }

// share = 0
// subscribe 1 : next(0)
// subscribe 2 : next(0)
// share = 1
// subscribe 1 : next(1)
// subscribe 2 : next(1)
// share = 2
// subscribe 1 : next(2)
// subscribe 2 : next(2)
// subscribe 1 : completed
// subscribe 2 : completed

+ share(replay: Int)
+ share(replay: Int, scope: SubjectLifetimeScope)

 

 

Replay

Replay

Observable이 항목을 방출하기 시작한 후에 구독하더라도 모든 옵저버가 동일한 시퀀스의 방출된 항목을 볼 수 있다.

let replay = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .replay(5)
    
replay.connect()

 

반응형