본문 바로가기

개발/RxSwift

[RxSwift] Traits, Subject, Scheduler

반응형
 

GitHub - ReactiveX/RxSwift: Reactive Programming in Swift

Reactive Programming in Swift. Contribute to ReactiveX/RxSwift development by creating an account on GitHub.

github.com

 

Traits (특성)

특성은 읽기 전용으로 관측 가능한 단일 시퀀스 속성을 가진 래퍼 구조이다. 
관찰 가능한 시퀀스를 위한 일종의 Builder 패턴 구현이라고 볼 수 있다.
특성이 구축될 때 .asObservable()을 호출하면 시퀀스로 다시 변환된다.

Traits 요소들은 Side Effect, 부작용을 공유하지 않는 특성이 있다.

  • Single (Nonnull)
  • Maybe (Nullable)
  • Completable (Void)

Single (Nonnull)

항상 한 가지 값(오류 / 알림 중 하나)만 배출하고 한 번만 호출한다.
메소드가 호출되면 Single의 생명주기는 끝나고 구독도 종료된다.

Single의 특성을 살린 사용 사례는 응답과 오류만 반환하는 HTTP Requests를 수행하는 것에 주로 사용된다.
Observable을 Single로 변환하고 싶을 때는 .asSingle() 을 사용하면 된다.

Single은 단일 요소만 모델링할 수 있으며 두 메서드만 사용한다.
ㅁ onSuccess = Single은 자신이 배출하는 하나의 값을 이 메서드를 통해 전달한다.
ㅁ onError = Single은 항목을 배출할 수 없을 때 이 메서드를 통해 Throwable 객체를 전달한다.

/**
  Single 생성 
  */

func getRepo(_ repo: String) -> Single<[String: Any]> {
    return Single<[String: Any]>.create { single in
        let task = URLSession.shared.dataTask(with: URL(string: "https://api.github.com/repos/\(repo)")!) { data, _, error in
            if let error = error {
                single(.error(error))
                return
            }

            guard let data = data,
                  let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
                  let result = json as? [String: Any] else {
                single(.error(DataError.cantParseJSON))
                return
            }

            single(.success(result))
        }

        task.resume()

        return Disposables.create { task.cancel() }
    }
}


/**
  Single 결과 처리
  */

getRepo("ReactiveX/RxSwift")
    .subscribe { event in
        switch event {
            case .success(let json):
                print("JSON: ", json)
            case .error(let error):
                print("Error: ", error)
        }
    }
    .disposed(by: disposeBag)

 

Maybe (Nullable)

Single과 completable 사이에 있는 Observable 의 변형이다.
단일 요소를 방출하거나 요소를 방출하지 않고 완료 / 오류를 낼 수 있다.

세 가지 이벤트 중 하나라도 완료 가능성이 종료된다.
즉 완료된 요소는 요소를 방출할 수 없고 요소를 방출한 이벤트는 완료 이벤트를 전송할 수 없다.
개인 해석으로는 Single에 nil 개념이 추가된 연산자라고 본다.

/**
  Maybe 생성
  */

func generateString() -> Maybe<String> {
    return Maybe<String>.create { maybe in
        maybe(.success("RxSwift"))
        // OR
        maybe(.completed)
        // OR
        maybe(.error(error))

        return Disposables.create {}
    }
}


/**
  Maybe 결과 처리
  */
  
generateString()
    .subscribe { maybe in
        switch maybe {
            case .success(let element):
                print("Completed with element \(element)")
            case .completed:
                print("Completed with no element")
            case .error(let error):
                print("Completed with an error \(error.localizedDescription)")
        }
    }
    .disposed(by: disposeBag)

 

Completable (Void)

Completable은 오류만 완료하거나 방출할 수 있는 관측 가능성의 변형이다.
어떠한 원소도 방출하지 않을 것을 보장한다.
제로 요소를 방출하며 완료 이벤트 또는 오류를 발생시킨다.
이 연산자의 유용한 사례는 작업이 완료되었다는 사실에만 관심을 두고 완료의 요소를 고려하지 않는 경우를 모델링 하는 것이다.

/**
  Completable 생성
  */

func cacheLocally() -> Completable {
    return Completable.create { completable in
       // Store some data locally
       ...
  
       guard success else {
           completable(.error(CacheError.failedCaching))
           return Disposables.create {}
       }

       completable(.completed)
       return Disposables.create {}
    }
}


/**
  Completable 결과 처리
  */
  
cacheLocally()
    .subscribe(onCompleted: {
                   print("Completed with no error")
               },
               onError: { error in
                   print("Completed with an error: \(error.localizedDescription)")
               })
    .disposed(by: disposeBag)

 

 


 

Subjects 

Observer나 Observable처럼 행동하는 ReactiveX의 일부 구현체에서 사용 가능한 일종의 브릿지 혹은 프록시라고 볼 수 있다.
Subject는 Observer이기 때문에 하나 이상의 Observable을 구독할 수 있으며 항목을 재배출하고 관찰하며 새로운 항목을 배출한다.
하나의 Subject는 하나의 Observable을 구독하면서, Observable이 항목들을 배출시키도록 동작시킨다.

Subject의 종류

  • AsyncSubject (pop last, include error)
  • BehaviorSubject (pop first, include error)
  • PublishSubject (start from subscribing, include error)
  • ReplaySubject (free from subscribing, include error)

AsyncSubject (pop last, include error)

Observable로부터 배출된 마지막 값 만을 배출하고 Observable의 동작이 완료된 후에야 동작한다.
만약 Observable이 아무 값도 배출하지 않으면 이 역시 아무 값도 배출하지 않는다.
맨 마지막 값을 뒤 이어 오는 옵저버에 전달하는데, Observable이 오류로 인해 종료될 경우 아무 항목을 배출하지 않고 오류를 그대로 전달한다.

+ 사용 예시는 완료 유무가 중요한 로직이나 결제라인이 있다.

 

BehaviorSubject (pop first, include error)

Observable이 기본값 항목의 발행을 시작하며 그 이후 Observable에 의해 발행되는 항목들을 계속 발행한다.
항목이 새로 추가되면 기본 값을 덮어씌우고 이 기점으로 구독을 한 Observer는 최근 추가된 값(기본 값)을 보여준다.
만약 Observable이 오류 때문에 종료되면 BehaviorSubject는 아무런 항목을 배출하지 않고 오류를 그대로 전달한다.

+ 기본 값을 가지고 시작하기에 주로 초기화용으로 쓰인다.
+ 사용 예시는 기본 데이터가 필요한 곳에 (북마크, 최근 둘러본 상품, 기본 유저 프로필 등) 사용된다.

let sub = BehaviorSubject<String>(value: "가")
sub.onNext("나")
sub.subscribe(onNext: {
    print($0)
}).disposed(by: disposeBag)
sub.onNext("다")
sub.onNext("라")

// print
// 나
// 다
// 라

 

PublishSubject (start from subscribing, include error)

구독 이후에 Observable이 배출한 항목만 Observer에게 배출한다.
PublishSubject는 생성 시점에서 즉시 항목들을 배출하기 시작하는 특성이 있다.
이에 Subject가 생성되는 시점과, Observer가 이 Subject를 구독하기 시작하는 사이 배출되는 항목들을 잃어버릴 수 있는 단점이 있다.
Observable이 배출하는 모든 항목을 보장하려면 Create를 사용해 명시적으로 Observable의 구독 여부를 체크하거나, ReplaySubject를 사용해야한다.
만약 Observable이 오류 때문에 종료되면 PublishSubject는 아무런 항목을 배출하지 않고 오류를 그대로 전달한다.

+ 사용 예시는 시작 시간에 민감한 로직(홈쇼핑, 방송, 알림 등)이 있다. 

let sub = PublishSubject<String>()
        sub.onNext("가")
        sub.subscribe(onNext : {
            print($0)
        }).disposed(by: disposeBag)
        sub.onNext("나")
        sub.onNext("다")
        
        //print
        나
        다

 

ReplaySubject

Observer가 구독을 시작한 시점과 관계없이 Observable이 배출한 모든 항목을 Observer에게 배출한다.
다양한 생성자 오버로드가 존재하며, 재생 버퍼의 크기가 특정 이상으로 증가할 경우 처음 배출 이후 지정한 시간이 경과한 오래된 항목들을 제거한다.
Observer 로 사용할 경우, 멀티 스레드 환경에서는 어느 항목을 먼저 재생해야 하는지 알 수 없는 모호함이 발생할 수 있기 때문에 (비순차적) 호출을 유발하는 onNext, on 메소드를 사용하지 않도록 주의해야한다.
구독하던 Subject가 Complete 되었다면 마지막 값을 전달하며 종료한다.
ReplaySubject의 버퍼 크기를 지정해 원하는 만큼 저장하고 있다가 방출할 수 있다.
구독이 먼저 이루어지고 방출이 시작된다면 buffer size만큼 저장하지 않고 바로 방출한다.

+ 사용 예시는 이전 데이터를 필요로 하는 로직인 히스토리, 검색 기록이 있다.

 


 

Scheduler

스케줄러의 확장에 들어가기 전, 스케줄러와 시퀀스의 관계를 정립하고 가자.
시퀀스는 일련의 연속적인 사건들로 Rx에서는 비동기식 이벤트를 말한다.
또한 시퀀스는 Subscribe가 호출될 때 시작한다.
subscribeOn은 시퀀스가 생생될 때 이를 담당할 스케줄러를 지정하는 작업이다.

그럼 스케줄러는 무엇일까?
스케줄러는 작업을 수행하기 위한 매커니즘을 추상화하는 것을 말한다.
조금 더 풀어 설명하자면, 시퀀스(연속적인 사건들)를 백그라운드, 포그라운드 등 어떻게 처리할지 지정하는 방법을 말한다.

매커니즘 종류로는 현재 Thread, Dispatch queues, 작업 queues, 새로운 thread, thread pool 과 실행 루프(run loops)가 있다.
다른 스케줄러에서 작업을 수행하려면 observeOn(scheduler) 연산자를 사용하면 된다.

MainThread 는 UI 단을 담당하는 기본 스레드이다.
이외의 네트워크 작업은 Background에서 비동기(Async)로 작업하고 멀티스레딩을 통해 분기 처리(observeOn)를 해주면 된다.

시퀀스 생성(Subscribe)를 시작하고 특정 스케줄러에 대해 삭제를 호출하려면 subscribeOn(스케줄러)를 사용한다.
subscribeOn이 명시적으로 지정되지 않은 경우 subscribe은 구독(onNext:) 또는 구독이 호출되는 동일한 스레드/스케줄러에서 호출된다. 

 

Scheduler 주 연산자

  • observeOn
  • subscribeOn
sequence1
  .observeOn(backgroundScheduler)
  .map { n in
      print("This is performed on the background scheduler")
  }
  
sequence2
  .subscribeOn(MainScheduler.instance)
  .map { n in
      print("This is performed on the main scheduler")
  }

 

Scheduler 종류

  • Serial, Concurrent Schedulers
  • Custom Schedulers (스케줄러 위주 설명이기에 커스텀은 스킵합니다)
  • Builtin Schedulers

Serial, concurrent Schedulers

스케줄러는 무엇이든 될 수 있으며 시퀀스를 변환하는 모든 연산자는 추가적인 암묵적 보증을 보존해야 하므로 어떤 종류의 스케줄러를 만드는지 중요하다.
스케줄러가 동시인 경우, Rx의 observeOn과 subscribeOn 연산자는 완벽하게 작동시킬 것을 보증한다.
지금까지 최적화는 dispatch queue scheduler이 수행했으나 Rx가 직렬(Serial) 스케줄러를 사용하면 추가 최적화를 수행할 수 있다. 
serial dispatch queue schedulers의 경우 observeOn한 단순한 dispatch_async 호출에 최적화된다.

Observable.just(“800x600”)
// Background 작업
.observeOn(ConcurrentDispatchQueueScheduler(qos: .default))
.map  { $0.replacingOccurreces(of: “X”, with “ /  ”)  } 
.map { “https://picsum.photos/\($0)/?random” }
.map { URL(string: $0) }
            .filter { $0 != nil }
            .map { $0! }
            .map { try Data(contentsOf: $0) }
            .map { UIImage(data: $0) }
// MainThread로 분기 처리
.observeOn(MainScheduler.instance) 
 .subscribe(onNext: { image in
                self.imageView.image = image
            })
            .disposed(by: disposeBag)
 }

 

Builtin Schedulers

Rx는 모든 종류의 스케줄러를 사용할 수 있지만, 스케줄러가 직렬(Serial)일 경우 몇 개의 추가 최적화를 수행할 수 있다.

  • Current Thread Scheduler (Serial Scheduler)
    • 현재 스레드에서 작업 단위를 예약한다.
    • 이 스케줄러는 요소를 생성하는 연산자의 기본(default) 스케줄러다.
    • 트램폴린 스케줄러 (Trampoline Scheduler) 라고도 불린다.
    • 일부 스레드에서 currentThreadScheduler(state) { } 가 처음으로 호출된 경우 스케줄링된 작업이 즉시 실행되며
      모든 재귀적으로 스케줄링된 작업이 일시적으로 대기 상태가 되는 숨겨진 대기열이 생성된다.
    • 호출 스택의 일부 상위 프레임이 이미 현재를 실행 중인 경우, currentThreadScheduler.instance.schedule(state) { } - 스케줄링된 작업은 현재 실행 중인 작업과 이전에 대기 중인 모든 작업의 실행이 완료되면 대기 상태가 되고 실행된다.
  • Main Scheduler (Serial Scheduler)
    • Main Thread에서 수행해야하는 작업을 추상화한다.
    • 스케줄 메소드가 기본 스레드에서 호출되는 경우 예약없이 즉시 작업을 수행한다.
    • 이 스케줄러는 주로 UI 작업을 수행할 때 사용된다.
  • Serial Dispatch Queue Scheduler (Serial Scheduler)
    • 특정 Dispatch_queue_t 에서 수행해야하는 작업을 추상화한다.
    • 동시 발송 대기열이 전달되더라도 직렬로 변환된다. (순차 실행)
    • Serial scheduler이 observeOn의 최적화가 가능하다
    • Main scheduler은 Serial Dispatch Queue Scheduler의 인스턴스이다.
  • Concurrent Dispatch Queue Scheduler (Concurrent Scheduler)
    • 특정 dispatch_queue_t 에서 수행해야하는 작업을 추상화한다. 
    • Serial Dispatch Queue를 전달해도 상관없다.
    • 이 스케줄러는 백그라운드에서 수행하기에 적합하다.
  • Operation Queue Scheduler (Concurrent Scheduler)
    • 특정 NSOperationQueue에서 수행해야하는 작업을 추상화한다.
    • 이 스케줄러는 백그라운드에서 수행해야 하는 작업 규모가 크고 Max Concurrent Operation Count를 사용해 동시 처리를 미세 조정하려는 경우에 적합하다.

 

스케줄링의 전체 프로세스

subscribe(on: 스케줄러)는 Observable에 스케줄링을 지정하는 연산자다.

// Observable 선언
_ = Observable.create() { observer in 
	print(Thread.inMainThread)
	return Diposables.create()
}
// 스케줄러 지정 
.subscribe(on: ConcurrentDispatchQueueScheduler(qos : .background ) )
.map { _ in 
	print(Thread.isMainThread)
}
// 스케줄러 지정 (변경이 안된다)
.subscribe(on: MainScheduler.instance)
.subscribe { event in 
 	print(Thread.isMainThread)
}
.disposed(by: disposBag)

//false 
//false
//false

 


 

추가적인 메모

Q. Thread Pool 이란?

-> 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해놓고 작업큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다. 
     작업처리 요청이 폭증되어도 스레드의 전체 개수가 늘어나지 않음으로 시스템 성능이 급격히 저하되지 않는다.

 

 

반응형

'개발 > RxSwift' 카테고리의 다른 글

[RxSwift] Combining Observables & Error Handling Operators  (0) 2022.03.14
[RxSwift] Filtering Observable  (0) 2022.03.11
[RxSwift] Transforming Observable  (0) 2022.03.10
[RxSwift] Observable 생성 연산자  (0) 2022.03.08
[RxSwift] Observables  (0) 2022.03.08