본문 바로가기

개발/RxSwift

[RxSwift] Transforming Observable

반응형

Transforming Observable Operators 

  • Buffer
  • GroupBy
  • Map
  • FlatMap
  • Scan
  • Window

Buffer

Buffer

Observable을 지정된 크기만큼 담고 있다가 버퍼가 꽉 차면 가지고 있던 Observable을 배열로 내보내는 연산자다.
버퍼의 다양한 언어별 구현체에는 어떤 항목이 어떤 버퍼에 들어갈지 선택하는 방법에 따라 다양한 변형이 있다.
Observable이 onError 알림을 실행하면 버퍼가 오류 알림을 실행하기 전에
소스 Observable에서 내보낸 항목을 포함하더라도 버퍼는 먼저 버퍼를 내보내지 않고 이 알림을 즉시 전달한다.

Window 연산자는 Buffer과 유사하지만 항목을 다시 실행하기 전에 데이터 구조로 수집하지 않고 별도의 Observable로 수집한다.
최근 둘러본 상품, 검색 기록 등 리스트가 제한적인 곳에 사용하기 좋아 보인다.

Observable
    .from([1, 2, 3, 4])
    .buffer(timeSpan: .milliseconds(10), count: 2, scheduler: MainScheduler.instance)
    .subscribe { print("buffer = \($0)") }
    .disposed(by: disposeBag)

// buffer = next([1, 2])
// buffer = next([3, 4])
// buffer = next([])
// buffer = completed

 

GroupBy

GroupBy

주어진 조건에 따라 Observable을 그룹으로 만들어 Observable<GroupedObservable<Key, Int>> 형태로 배출한다.
Observable 항목을 원래 Observable 항목과 다른 부분 집합을 내보내는 Observable 집합으로 분할한다.
항목을 내보내는 Observable을 방출이 가능한 Observables로 만들고
각 관측치는 원래 소스 관측치에서 항목의 부분 집합을 내보낸다.

분류가 필요한 데이터에 적용하기 좋아보인다. (예: 식품 - 스팸, 가구 - 욕조) 

Observable
    .range(start: 1, count: 10)
    .groupBy { $0 % 2 == 0 ? "\($0)는 짝수" : "\($0)는 홀수"}
    .subscribe { print("groupBy = \($0.map { $0.key} )") }
    .disposed(by: disposeBag)
    
// groupBy = next(1는 짝수)
// groupBy = next(2는 홀수)
// groupBy = next(3는 짝수)
// groupBy = next(4는 홀수)
// groupBy = next(5는 짝수)
// groupBy = next(6는 홀수)
// groupBy = next(7는 짝수)
// groupBy = next(8는 홀수)
// groupBy = next(9는 짝수)
// groupBy = next(10는 홀수)
// groupBy = completed

//// 그룹화 적용
Observable
    .range(start: 1, count: 10)
    .groupBy { $0 % 2 == 0 ? "짝수" : "홀수"}
    .subscribe { print("groupBy = \($0.map { $0.key} )") }
    .disposed(by: disposeBag)
    
// groupBy = next(홀수)
// groupBy = next(짝수)
// groupBy = completed

 

Map

Map 연산자

Observable Sequence에 함수를 적용해 Observable에서 방출되는 항목을 변환한다.
For 문의 압축 형식으로 모든 항목에 해당 로직을 적용한다고 생각하면 된다.

Observable
    .from([1,2,3,4])
    .map { $0 * 10 }
    .subscribe{ print($0) }
    .disposed(by: disposeBag)
    
// map = next(10)
// map = next(20)
// map = next(30)
// map = next(40)
// map = completed

 

FlatMap

FlatMap

스트림에서 방출되는 항목을 원하는 Observable로 만드는 역할을 한다. (Observable -> T)
두 스트림의 값을 조합할 때 사용한다. (실행 횟수 : n * m)
이들은 FlatMap을 통해 각각의 Observable이 생성되는데, 이 Observable들이 방출하는 아이템들은 다시 하나의 스트림으로
합쳐지기 때문서 아래의 예시와 같이 순서가 섞여 보인다.

FlatMap의 장점은 한 번에 여러 스트림을 이용할 수 있고 아이템의 누락 없이 모든 이벤트에 대한 관찰이 가능한 점이다. 
FlatMap 연산자는 비교적 다양하다.

FlatMap

  • FlatMapFirst
    FlatMap을 통해 먼저 생성된 스트림의 동작이 다 끝날 때까지 새로 방출되는 아이템이 무시된다.
    즉, 첫번째 값의 스트림만 생성된다.
  • FlatMapLatest
    새로운 스트림을 만들고 동작을 수행하는 도중, 새로운 아이템이 방출되면 이전 스트림을 dispose 한다.
    새롭게 들어오는 아이템에 대해 스트림을 생성하여 동작한다.
    즉, 첫번째 값의 스트림을 생성하고 실행하다 다음 값이 있다면 첫번째의 스트림을 멈춘 후 두번째 값의 스트림을 진행한다.
    이를 반복하다 마지막 값의 스트림을 전부 마친 후 종료한다.
  • CombineLatest
    두 가지 스트림 중 하나의 아이템이 방출될 때 지정한 함수를 통해 각 스트림에서 방출된 최신 아이템들을 결합한다.
    이 함수의 return 값에 따른 아이템을 방출한다.
    즉, 각 스트림의 가장 최근 값을 조합해 새로운 항목을 배출한다.
    이용 사례는 마지막에 입력된 값이 중요한 - 회원가입, 로그인 등 입력된 정보를 검사하는 로직에 사용하기 좋다.
  • WithLatestFrom
    기준이 되는 스트림 하나를 지정하여 이 기준이 되는 스트림에 새로운 값이 등록되었을 때,
    타 스트림의 가장 최신 아이템과 지정한 스트림의 가장 최신 아이템의 조합을 얻을 수 있다.
    이용 사례는 CombineLatest와 비슷하게 회원가입, 로그인에 사용할 수 있으다.
    단 여기서는 로그인이나 회원가입이 통과 되었을 때 서버에 로직을 날리는 용도로 사용하면 좋을 것 같다.

let intArray = Observable.of(1, 2, 3)
let stringArray = Observable.of("가", "나", "다")

intArray
    .flatMap { item in stringArray.map { "\(item) : \($0)" } }
    .subscribe { print("flatmap = \($0)")}
    .disposed(by: disposeBag)

// flatmap = next(1 : 가)
// flatmap = next(1 : 나)
// flatmap = next(2 : 가)
// flatmap = next(1 : 다)
// flatmap = next(2 : 나)
// flatmap = next(3 : 가)
// flatmap = next(2 : 다)
// flatmap = next(3 : 나)
// flatmap = next(3 : 다)
// flatmap = completed

+ ObservableConvertibleType
Observable 시퀀스로 변환될 수 있는 타입이다.

+ InfallibleType
Observable과 같은 푸시 스타일 인터페이스이며 오류 이벤트를 방출하지 않는다.
SharedSequence와 달리 리소스를 공유하거나 이벤트를 replay하지 않고 표준의 Obsevable 역할을 한다.
객체를 가져와 시퀀스에 의해 방출되는 이벤트와 함께 객체를 unretained한 안전한 사용(래핑되지 않은 상태)을 제공한다.

 

Scan

Scan

이 연산자는 값이 계속 쌓이는 방식으로 다른 문맥에서는 누적기라고 불리기도 한다.
초기 값을 가지고 들어오는 Observable 항목에 대한 변경을 거친 후 변경된 값을 가지고 새로 들어오는 항목에 사용한다.

관측할 수 있는 각 항목에 함수를 순차적으로 적용하고 각 연속 값을 내보낸다.
Observable에서 방출되는 첫 항목에 함수를 적용한 다음 해당 함수의 결과를 자체적인 첫번째 방출로 내보낸다.
또 함수의 결과를 소스 Observable이 방출하는 두 번째 항목과 함께 함수에 다시 공급하여 두 번째 방출을 발생시킨다.
나머지 시퀀스를 생성하기 위해 관측 가능한 소스로부터 후속 배출물과 함께 후속 배출물을 계속 공급한다.

사용 사례는 월별 소비 계산이나 수의 증가가 계속해 일어나는 곳, 채팅 로그 등에 사용할 수 있다.

Observable
    .repeatElement(10)
    .scan(10, accumulator: { initValue, newValue -> Int in
        initValue + newValue
    })
    .take(5)
    .subscribe { print("scan = \($0)") }
    .disposed(by: disposeBag)

// scan = next(20)
// scan = next(30)
// scan = next(40)
// scan = next(50)
// scan = next(60)
// scan = completed

 

Window

Window

Buffer과 유사하지만 Observable 항목 패킷을 내보내는 대신 Observable Sequence로 변경해 내보낸다.
FlatMap을 함께 사용하면 유용하다.

관찰 가능한 창에서 항목을 한 번에 하나씩 내보내는 대신 주기적으로 관찰 가능한 창으로 세분한 창을 내보낸다.
원래의 Observable을 결과적인 Observable 방출로 세분화하는 방법을 가지고 있으며
각각은 원래 방출된 항목에 대한 Window를 포함하고 있다.
창(Window)이 열리면 새로운 Observable이 방출되고 Observable 소스에서 방출되는 항목이 방출되기 시작함을 의미한다.
창이 닫힐 경우 이는 방출된 Observable 이 관찰가능 소스에서 항목 방출을 중지하고
Observable에게 완료된 알림과 함께 종료됨을 의미한다.

사용 사례는 배출되는 항목을 한번 더 묶어주어야할 경우에 사용된다.
장바구니에서 여러 물품을 주문한 경우 각 물품들에 대한 개별 묶음으로 이용할 수 있어 보인다.

Observable.range(start: 1, count: 10)
    .window(timeSpan: .milliseconds(10), count: 3, scheduler: MainScheduler.instance)
    .subscribe { print("window = \($0)")}
    .disposed(by: disposeBag)
    
// window = next(RxSwift.AddRef<Swift.Int>)
// window = next(RxSwift.AddRef<Swift.Int>)
// window = next(RxSwift.AddRef<Swift.Int>)
// window = next(RxSwift.AddRef<Swift.Int>)
// window = completed


//// 그룹 별로 출력
Observable.range(start: 1, count: 10)
    .window(timeSpan: .milliseconds(10), count: 3, scheduler: MainScheduler.instance)
    .subscribe { value in
        print("map = \(value)")
        
        if let element = value.element {
            element.subscribe { print("map inner = \($0)")}
        }
    }
    .disposed(by: disposeBag)

// map = next(RxSwift.AddRef<Swift.Int>)
// map inner = next(1)
// map inner = next(2)
// map inner = next(3)
// map inner = completed
// map = next(RxSwift.AddRef<Swift.Int>)
// map inner = next(4)
// map inner = next(5)
// map inner = next(6)
// map inner = completed
// map = next(RxSwift.AddRef<Swift.Int>)
// map inner = next(7)
// map inner = next(8)
// map inner = next(9)
// map inner = completed
// map = next(RxSwift.AddRef<Swift.Int>)
// map inner = next(10)
// map inner = completed
// map = completed


//// 전체 출력
Observable.range(start: 1, count: 10)
    .window(timeSpan: .milliseconds(10), count: 3, scheduler: MainScheduler.instance)
    .flatMap { $0 }
    .subscribe { print("window = \($0)")}
    .disposed(by: disposeBag)
    
// window = next(1)
// window = next(2)
// window = next(3)
// window = next(4)
// window = next(5)
// window = next(6)
// window = next(7)
// window = next(8)
// window = next(9)
// window = next(10)
// window = completed

 

반응형