RxJavaをキャッチアップした話し

はじめに

こんにちは、OWNR by RENOSY アプリでAndroid開発を担当しているSuGeunです。

研修ではAsyncTaskを使って非同期式の処理を行いましたが 、 OWNR by RENOSYではデータを取得する非同期処理をRxJavaを使っているので本格的にRxJavaのキャッチアップをはじめました。

RxJavaとは

RxJavaは反応形プログラミング(Reactive Programming)を使うためのライブラリで、Reactive ExtensionsのJVM版になります。

Androidコードで非同期処理やサーバーとのデータをやり取りするときに最適化されて、2.Xバージョンから本格的にReactiveStreams仕様に準拠します。

既存の命令形プログラミング方式と異なり、全てを非同期データストリームで処理するのが 特徴です。

github.com

Reactive Programmingとは?

Reactive Programmingとはデータの流れや転送を改めて定義したパラダイムで、データの流れを先に決め、データが変更されると関わっている関数や数式がアップデートされる方式です。

簡単な例でグーグルマップから焼肉屋を検索して経路を確認します。 自分の位置情報が変更されている間ずっとデータを転送して、マップは更新されますが 移動せずに止まるとマップから自分の位置は更新されません。

グーグルマップを使っているユーザーが動いているのか、動いていないのかだけを サーバー上にお知らせして、そのあとデータの更新処理が行われます。

この仕組みは大きく、データが変更されましたと教えてくれるお知らせ側お知らせを受けてデータを更新する側に分けています。

え?それじゃ、Reactive Programmingはlistenerを使ったProgrammingと何が違うの?

両方も何かの反応を感じて結論をだすと言う部分は同じかもしれないが、結論までの仕組みとしては異なります。

例として二つの入力欄に数字を入れると自動的に加算された結果がでる計算機プログラミングあるとします。 Listenerで作られた場合、入力欄に対して欄に何か変化が生じるときに、それを感知, 入力欄の数字を取得した後、データの加工を進めますが、 リアクティブプログラミングではそれぞれの役割が異なります。

入力欄の何かの変化が生じる時に"何かの変化が起きたよ!"お知らせと同時にデータを配信してくれます。 これは役割として単に結果欄にデータを配信することまで責任を負います。

データを生産する生産者(入力欄)はデータを消費する消費者(結果欄)が配信したデータをどうやって加工するか知らなくてもよいのです。 同じく消費者も生産者が何をするか知らなくてもいいので、生産者はすぐ次のデータを生産します。

Observableクラスで“Hello,RxJava”

Observableクラスを使って”Hello,RxJava”を出力して見ましょう。

val myObservable = Observable.just("Hello", "RxJava") 

val myObserver = object : Observer<String> {
   override fun onComplete() {
       println("動作が正常に終わりました。")
   }

   override fun onSubscribe(d: Disposable) {
   }

   override fun onNext(t: String) {
       println("$t~!")
   }

   override fun onError(e: Throwable) {
       println("エラーが起きました。")
   }
}
myObservable.subscribe(myObserver)
出力結果:Hello~!
     RxJava~!
     動作が正常に終わりました。

myObservableはデータを発行するObservableクラスで、myObserverは貰ったデータを加工するObserverになります。

Observableからデータを備えたらObserverを購読(subscribe)することで、Observerは取得したデータを加工することができます。 just 演算子で"Hello", "RxJava"データを発行します。

ここでMap演算子を使うと簡潔に出力することも可能です。

Observable.just("Hello", "RxJava")
       .map { text -> "$text~!" }
       .doOnComplete { println("動作が正常に終わりました。") }
       .subscribe { text -> println(text) }
出力結果:Hello~!
     RxJava~!
     動作が正常に終わりました。

mapはObservableを別途の新たなObservableにして使えるのができます。 RxJavaは色々な演算子が備えているので公式ドキュメントを参考してください。

http://reactivex.io/documentation/operators.html

SingleClassを使ってみよう

RxJavaではObservable,Flowableなど色々なクラスが用意されていますが、 OWNR by RENOSYは一処理で一個のデータ取得する仕組みが多いので仕様に適合しているSingle クラスを主に使ってます。 Observableクラスはデータを無限に発行することができますが、Singleクラスは1個のデータしか発行できません。

結果が唯一なサーバーAPIから結果を貰う時や非同期処理が終わって結果を返す場合等に 使用され、データ一個発行と同時に終了します。

下記のコードはSingleクラスを使って今年の書いたメモのデータを取得するコードになります。

*OWNR by RENOSYではClean Architectureを採用してます。

Presentation Layer
private fun requestMemo(thisYear: Int) {
   getMemoArticlesByYear.execute(thisYear)
           .subscribeOn(schedulerProvider.io())
           .observeOn(schedulerProvider.ui())
           .doOnSubscribe { view?.showProgressBar() }
           .doFinally { view?.hideProgressBar() }
           .subscribeBy({
               //onSuccessの処理
           }, { 
               //onErrorの処理
           })
}

Thread宣言はSubscribeOnとObserveOnで分けて、データの収入とデータの加工の終了時点で使われます。 SubscribeByでは処理が終わった後のonSuccess、onErrorの処理を書きます。

Domain Layer
fun execute(thisYear: Int): Single<ResponseValues> {
   return taxReturnMemosRepository.getMemoArticlesByYear(thisYear)
           .map { entity -> MemoArticlesModel.convertModel(entity) } // EntityデータをModelに加工するmap演算子を使う
           .map { model -> ResponseValues(model) }            
}
Data Layer
override fun getMemoArticlesByYear(thisYear: Int): Single<MemoArticlesEntity> {
   return remote.getMemoArticlesByYear(thisYear)     
}

最後に

RxJavaは非常に学習コストが高く、学習量も膨大で最初学ぶとき紛らわしい部分が多かったです。 RxJavaの流れやイベント処理を視覚的に見ると理解に役に立つので、RxJavaマーブルダイアグラムに関して調べるのもお勧めです。

rxmarbles.com

Androidアプリ開発者向けにもっと優しく使えるようにしてくれるRxKotlin,RxAndroidもあるので興味があるかはぜひ調べてみてください。