RxSwift

【Swift】RxSwiftを理解する

はじめに

今回RxSwiftを使ってみました。というのも業務で作ってるiOSアプリのコードがMVVMもどきのくちゃくちゃなコードかつファイルごとにどこに書いてるかがばらばらで、新メンバーの共有コストも高く開発効率も悪かったので、しっかり合ったアーキテクチャを選定し採用してリファクタを行うことになりました。

そこで今回採用したのがRxSwiftを用いてのMVVMです。

採用理由としては、

  • 値の変化を検知や非同期処理に強いのでコードがスッキリかけそう
  • RxSwiftを使いこなせばいけてるコードっぽくなりそう!

の主に2つです。
しかし

  • 勉強コストが高い
  • 簡単にかけるからこその気をつけないとややこしいコードになりやすい

のようなデメリットがあります。

今回は自分がRxSwiftを使うにあたって勉強したことをまとめるのとともに、開発チームのメンバーにも理解してもらいたいという目的でこの記事を書きました。

RxSwiftとは

RxはReactive Extensionsの略でFRP(Functional Reactive Programming)の一つとされています。それのSwift用なのでRxSwiftなだけで、RxJavaとかRxJSとか様々な言語で用意されています。

FRPのよく使われる具体例としてはExcelがあります

このようにA1とB1に入力した値を用いてC1が自動で計算されるようにしました。
このように設定すると、

B1の値を変えたとしても瞬時にC1の値が変わります。
このように状態変化を監視して状態変化をトリガーとして処理を走らせるようにできます。

〜参考URL〜
あなたが求めていたリアクティブプログラミング入門
リアクティブプログラミングへの理解がイマイチだったのでまとめてみた

ストリーム

Rxでは、様々な処理をストリームとして扱います。ストリームというのは時間順に並んだ進行中のイベントを表します。ストリーム=川だとよく例えられます。

このようにももが流れてくるようなイメージです。
実際には次のようなダイヤグラムと呼ばれる図を用います。

この図で、

  • ○は何かしらの型の値
  • ☓はエラー
  • 縦棒は完了

を表します。
初めてこの図を見たときに、ストリームはももが流れてくるようなイメージって言われた後にこのダイヤグラムをみて、これを想像しました。

けどこのようなドンとかカッが右から左に流れてくるわけではなく、○は固定で時間が左から右にかわっていくイメージで僕はイメージしています。

僕のイメージとしては高校物理の波動分野で出てくる、ある点における変位の時間変化のy-tグラフに近いイメージなのかなって思います。(僕のイメージなので納得できない人はスルーしてください笑)

このストリームを下の図のように、

抽出したり、値を計算したりして別のストリームを作成することができます。
そうすることで、新しく作成したストリームに対してストリームのイベントを検知して処理を走らせるように設定することができます。

実際にどう書くか

ここまで概念的なことを書いてきましたがここからは実際にRxSwiftではどのようにコードを書いていくかをまとめていきます。

Observable

Observableは文字の意味の通りで、監視可能なものを表すClassを表します。
先程のストリームのダイヤグラムの図で○と☓と縦棒がでてきましたがあの図がまさにObservableを表しています。

なのでObservableが通知するものは、

  • onNext ・・・ 通常のイベント通知(値を入れることができ、何回でも呼ばれる)
  • onError ・・・ エラー通知(1回だけ、呼ばれた時点で終了)
  • onCompleted ・・・ 完了(1回だけ、呼ばれた時点で終了)

これらで、onError,onCompletedに関しては呼ばれた時点で監視が終了します。
コードとしては、

let contentOffset = tableView.rx.contentOffset
        
contentOffset
    .subscribe(onNext: {
        print("next")
    }, onError: { _ in
        print("error")
    }, onCompleted: { 
        print("completed")
    })

このように書きます。これはtableViewのcontentOffsetが変わるたび、すなわちスクロールするたびにイベントが発生し、座標を返します。

コードにいきなり書いてしまいましたが、subscribeと書くことで、Observableを購読することができ、イベントが発生するたびにsubscribeにonNextで渡すクロージャーが呼ばれます。

このコードではスクロールするたびに”next”がプリントされます。
onError,onCompletedの際も、それぞれで渡したクロージャーが呼ばれます。今回みたいにUIKitをsubscribeする場合はとくにエラー処理や完了処理をしないのでその場合はonNextだけ書くことも可能です。

ここで注意ですが、Observableではイベントを検知できるものであり、自身でイベントを発生するものではありません。

Disposable

Observableをsubscribeしたときに返されるのがDisposableです。
Disposableは購読を解除するためのもので、disposeメソッドを呼ぶことで購読を解除します。

このように購読を解除しないとインスタンスが開放されたとしても購読は続いている状態になりメモリーリークにつながってしまいます。

ただし、onError,onCompletedに関しては、この時点で購読を解除するのでdisposeする必要はありません。

とはいえ、disposeを呼んだ時点で購読を解除してしまうので、呼ぶタイミングはどのように設定すればいいのかが難しくなってきます。

先程インスタンスが開放されたとしても購読は続いている状態にあると書きましたが、特に開放されるタイミングを指定したい場合を除けば、インスタンスを開放するタイミングで購読も解除できれば簡単にできると思います、使用するのがDisposeBagです。

DisposeBagは自身が開放されたときに管理下にあるDisposableをすべて解放することができます。DisposeBagを管理下に置くのはaddDisosableToメソッドを呼ぶことでできます。

すなわち、DisposeBagをプロパティとして定義しておけば、インスタンスが開放される瞬間にそのプロパティも開放されるので、DisposeBagの管理下のDisposableを解放することができます。

なので先程のコードは、

private let disposeBag = DisposeBag()
let contentOffset = tableView.rx.contentOffset
        
contentOffset
    .subscribe(onNext: {
        print("next")
    }, onError: { _ in
        print("error")
    }, onCompleted: { 
        print("completed")
    })
    .addDisposableTo(disposeBag)

というように書き直すことができます。
なのでObservableをsubscribeする場合は.addDisposableTo(disposeBag)を書くようにしましょう。

Subject

SubjectはObservableでもあり、Observarでもあります。どういうことかというと、イベントを検知できるものでもあり、イベントを発生させるものでもあります。

Observableの説明の際に自身でイベントを発生させることはできないと書きましたが、Subjectはsubscriveしたら購読することが出来るObservableとしての機能をもちつつ、onNext,onError,onCompletedメソッドを呼ぶことでイベントを発生させることができます。

RxSwiftで使えるSubjectクラスは、

  • PublishSubject
  • ReplaySubject
  • BehaviorSubject
  • Variable

以上の4つがあり使い方が異なります。

PublishSubject

まずは一般的なPublishSubjectを用いて例を上げていきます。

let disposeBag = DisposeBag()
let subject = PublishSubject<String>()

subject.onNext("a") // printはされない

subject.subscribe({ event in
    print(event)
}).addDisposableTo(disposeBag)

subject.onNext("a") // a
subject.onNext("b") // b

このようにonNextメソッドを呼ぶことでイベントを発生させることができ、subscribeすることで購読し、イベントを検知した際の処理を定義することができます。

これならObservableは使わなくてSubjectだけ使えばよくね?ってなると思いますが、そうした場合に弊害があったりします。

Observableは基本的には外部からでもイベントを検知できるように外部に公開します。そうした場合にSubjectはObservableでもあるので外部に公開することになって値の変更も自由にすることができてしまいカプセル化を崩してしまいます。理想としてはサブジェクトの値が変更されイベントが発生するのはクラスの内部で、それを外部に通知できる状態です。

なので、

class Hoge {
    private let hogeSubject = PublishSubject<Int>()  //Subjectはprivate

    var hoge: Observable<Int> { return hogeSubject } //Observableは外部に公開する

    func hoge() {
        hoge.onNext(1) //Subjectでイベント発行
    }
}

というようにするのが一般的です。

ReplaySubject

PublishSubjectはsubscribeされた時点移行のイベントを検知できるものですが、ReplaySubjectは指定した数だけ最新のイベントをキャッシュさせることができ、subscribeされたタイミングでキャッシュされてるイベントが送信されます。以下に例をあげます。

let subject = ReplaySubject<String>.create(bufferSize: 2)

subject.onNext("a")
subject.onNext("b")

subject
    .subscribe(onNext: { event in
        print("1 \(event)")
    })
    .addDisposableTo(disposeBag)

subject.onNext("c")
subject.onNext("d")
  
subject
    .subscribe(onNext: { event in
        print("2 \(event)")
    })
      .addDisposableTo(disposeBag)

subject.onNext("e")
subject.onNext("f")

とした場合には、

1 a
1 b
1 c
1 d
2 c
2 d
1 e
2 e
1 f
2 f

このような結果になります。1でsubscribeされてる時点でそれ以前のキャッシュされていたイベント2個分が1に送信されます。それ以降はPublishSubjectと変わりません。
bufferSizeで指定した数がキャッシュさせる数になっていて、createUnbounded()とすると全てキャッシュされます。

BehaviorSubject

BehaviorSubjectはReplaySubjectのbufferSizeが1のときと同様の挙動を示します。

これまで扱ってきたものはイベントを発生検知することができイベントに値は含まれるものの、常に値を持つものではありませんでした。
BehaviorSubjectは、現在値を取得できるvalueメソッドを持っています。ただし、ObservableでもあるのでonErrorやonCompletedが既に発生している可能性があるのでvalueメソッドを使う際はその分岐を加える必要があります。

do {
    try let hoge = hogeSubject.value()
    if hoge {
        print(hoge)
    }
} catch {
    // hogeSubjectが既にエラーか完了で終了している場合
}

Variable

VariableはRxSwift独自のものでBehaviorSubjectを使いやすくしたものです。VariableはBehaviorSubjectからObservableを取り除いたようなもので、それゆえにonError,onCompletedは発生させません。なのでvalueを取得するさいにBehaviorSubjectのように分岐を加える必要がありません。
Observableとして使いたい場合は、asObservableメソッドを呼ぶことでObservableに変換することができます。すなわち値を格納することもでき、変更を検知したい場合はObservableに変更して購読するようにできます。

let variable: Variable<String> = Variable("a")

variable.asObservable()
    .subscribe(onNext: { value in
        print(value)
    })
    .addDisposableTo(disposeBag)

variable.value = "b"
print("print: \(variable.value)")

// 結果
a // subscribeした時点でのvalue
b
print: b

このようになります。

今回のRxSwiftを用いてリファクタを行う際にVariableをメインで使用しました。

class Viewmodel {
    let articles: Variable<[Article]> = Variable([])

    ︙
    article.value = article
}

class ViewController: UIViewController {
    @IBOutlet weak var tableView: UITableView!

    private let disposeBag = DisposeBag()

    viewmodel.articles.asObservable()
        .subscribe(onNext: { [weak self] articles in
            self?.tableView?.reloadData()
        })
        .addDisposableTo(disposeBag)
}

このようにViewModelでarticlesを持つようにしてarticlesの変更を検知してViewControllerのtableViewをリロードさせるというようにしています。

Hot-Cold

これまでに扱ったObservableはHotなObservableと呼ばれています。
特徴は、

  • どこからでもSubscribeされなくても動作はしている
  • 複数からsubscribeされても同じ動作結果

という特徴があります。Variableを使うことが多い場合ほとんどがHotになることが多いです。

それに対してColdなObservableも存在します。
特徴はHotなObservableの逆で

  • subscribeされないと動作しない、されると動作開始
  • 複数からsubscribeされたら別々に動作する

となります。この特徴を非同期処理に適用しています

static func send<T : SampleRequest>(request: T, callbackQueue: CallbackQueue? = nil) -> Future<T.Response, SessionTaskError> {
        
    let promise = Promise<T.Response, SessionTaskError>()
        
    Session.send(request, callbackQueue: callbackQueue) { result in
            
        switch result {
        case let .success(data):
            promise.success(data)
                
        case let .failure(error):
            promise.failure(error)
        }
    }
        
    return promise.future
}

static func rxSendRequest<T: SampleRequest>(request: T, callbackQueue: CallbackQueue? = nil) -> Observable<T.Response> {
    return Observable.create { observer in
        let task = Session.send(request) { result in
            switch result {
            case .success(let res):
                observer.on(.next(res))
                observer.on(.completed)
            case .failure(let err):
                observer.onError(err)
            }
        }
        return Disposables.create {
            task?.cancel()
        }
    }
}

//呼び出し側
APIManager.rx_sendRequest(request: GetArticlesRequest(queryParameters: params))
    .subscribe { [weak self] result in
        guard let `self` = self else { return }
        switch result {
            case .next(let data):
             
            case .error(let error):

            default:
                break
        }
    }
    .addDisposableTo(disposeBag)

Observable.createと書くことでsubscribeされたタイミングでわたされた処理が開始します。Session.sendでresultが返ってきた際にonErrorかonCompletedがよばれ購読が解除されます。Disposables.createでは解放されるときの処理を書くことができます。rxSendRequestが呼ばれるたびに新しくObservableが生成され、subscribeしたタイミングで初めて処理が開始し、1回通信をしたらその時点で購読が解除されるようになります。

今回リファクタをするにあたって勉強した知識をまとめていきました。基本的かつ実装にあたって最低限必要なことしか学んでないので不十分な点が多いと思いますがこれから知識をさらに深めていきたいと考えています。