RxJavaを使ってReactiveX (リアクティブ エクステンション)に入門した

ちょっとタイミング遅れてるけど、ReactiveX (Rx, Reactive Extensions) を勉強してみました。

TwitterやWeb上の記事などでよく目にしてたので気にはなってたのだけど、先日やっと触ってみました。

備忘録です。メモメモ。

使ったRxJavaのバージョンは、1.0.9。

全体的な流れとしては

  1. Observable, Observer
  2. Observableを操作する
  3. Scheduler
  4. 歴史や理論的な話
  5. 何をObservableにするか
  6. GUIでもRx
  7. Subject

Observable と Observer

まずは、触ってみる。Web + DB Vol.81 のJavaの鉱脈で紹介されてたのを読みながらコードを書く。

WEB+DB PRESS Vol.81

WEB+DB PRESS Vol.81

  • 作者: 長嶋享,藤吾郎,八木俊広,日高一明,滝口健太郎,田中慎司,泉水翔吾,海野弘成,佐藤太一,吉村総一郎,伊藤直也,川上大喜,こしばとしあき,舘野祐一,中島聡,橋本翔,渡邊恵太,はまちや2,竹原,川添貴生,沢渡真雪,WEB+DB PRESS編集部
  • 出版社/メーカー: 技術評論社
  • 発売日: 2014/06/24
  • メディア: 大型本
  • この商品を含むブログ (1件) を見る

こちらの記事も参考になった。

kirimin.hatenablog.com

Web+DBに書いてあった以下の文章が理解するのを大きく助けてくれた。

連続的なデータをイベントハンドラで処理するプログラミングスタイルをリアクティブプログラミングと呼びます。 リアクティブプログラミングでは、データの生成、そのデータ生成に伴うイベントの通知、イベントの処理を分離して記述します。

簡単な例で試す

簡単なサンプルで整理整頓する。大事なのは「データの生成」、「データ生成に伴うイベントの通知」、「イベントの処理」の3つ。

連続的なデータを表すのがObservableオブジェクト。Observableオブジェクトが「データの生成」と「データ生成に伴うイベントの通知」を行う。

「イベントの処理」を行うのがObserverオブジェクト。

Observable<Integer> observable = Observable.create(observer -> {
    IntStream.range(1, 10).forEach(x -> observer.onNext(x));
    observer.onCompleted();
});
Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.print("COMPLETE!");
    }

    @Override
    public void onError(Throwable e) {
        System.out.print("ERROR!");
    }

    @Override
    public void onNext(Integer integer) {
        if (integer % 2 == 0) System.out.println(integer * 3);
    }
};
observable.subscribe(observer);

上の例だと、1から10までの数値データを生成している。ラムダ式の引数であるobserveronNextメソッドを呼び出すことでObserverオブジェクトにイベントを通知できる。全てのデータを生成したらobserveronCompletedメソッドを呼び出して「完了」を通知する。

observable.subscribe(observer)で、ObserverオブジェクトをObservableオブジェクトに登録すると、さっきのラムダ式内でobserver.onNext(x)を呼び出したタイミングで、Observerオブジェクトで定義したonNextメソッドが実行される。同じようにonCompletedも実行される。

subscribeメソッドを呼び出して初めてObservableはデータを生成し始める(後で分かったけどそうじゃない場合もあるみたい。とりあえずこの時点ではsubscribeしてからデータの生成を始めると思っておく)。

実行すると結果はこうなる。

6
12
18
24
COMPLETE!

もっとすっきり書く

Observableクラスには、Observableオブジェクトの生成を楽にするためのメソッドが用意されている。

http://reactivex.io/documentation/operators.html#creating

また、subscribeメソッドには、ラムダ式を渡してonNext時の処理を定義することができる。

これらを使うと、さっきの例はもっとシンプルに書ける。ただし、onCompletedとonErrorは実装されてない。

Observable.range(1, 10)
    .subscribe(x -> {
        if (x % 2 == 0) System.out.println(x * 3);
    });

onCompletedとonErrorも実装したい場合はラムダ式を3つ渡す。

Observable.range(1, 10)
    .subscribe(
        x -> {
            if (x % 2 == 0) System.out.println(x * 3);
        },
        (e) -> System.out.print("ERROR!"),
        ()  -> System.out.print("COMPLETE!")
);

Observableの操作

Observableオブジェクトには、Java8のStreamにあるようなラムダ式を引数にとるメソッドがある。

Observable.range(1, 10)
    .filter(x -> x % 2 == 0)
    .map(x -> x * 3)
    .subscribe(
        (x) -> System.out.println(x),
        (e) -> System.out.print("ERROR!"),
        ()  -> System.out.print("COMPLETE!")
    );

このObservableオブジェクトに対するメソッドは色々ある。公式のドキュメントでは以下のように分類されてる。

Scheduler

ここまでだと、Java8のjava.util.Streamを使うのとそんなに変わらない感じがする。java.util.Streamでできることをわざわざイベントハンドラで処理すると何が嬉しいのか。

嬉しいのは、Rxを使うと非同期処理になりますよ、ということ。

Rxでは、データの生成やイベントに対する処理をメインスレッドは別のスレッドで動かすための仕組みが用意されている。それがScheduler。

デフォルトでは全てメインスレッドで動いていた。observable.subscribeOnメソッドを使うと、Observableがデータを生成する部分(Observable.createに渡したラムダ式)やObservableを加工する処理(mapやfilter)、Observerが動くスレッドを指定することができる。

指定できるスレッドの種類はここに書いてある。

http://reactivex.io/documentation/scheduler.html

以下のようにsubscribeOn(Schedulers.computation())を指定するとメインスレッドとは別のスレッドで動いてることが分かる(動かすときはメインスレッドが終わらないように最後にSleepを入れるなどする)。

Observable.range(1, 10)
        .subscribeOn(Schedulers.computation())
        .filter(x -> {
            System.out.println("filter: " + Thread.currentThread().getName());
            return x % 2 == 0;
        })
        .map(x -> {
            System.out.println("map: " + Thread.currentThread().getName());
            return x * 3;
        })
        .subscribe(x -> {
            System.out.println("observer: " + Thread.currentThread().getName());
            System.out.println(x);
        });

さらに、以下のように途中でobserveOnメソッドを呼び出すと、それ以降の処理をまた別のスレッドでやらせることができる。

Observable.range(1, 10)
        .subscribeOn(Schedulers.computation())
        .filter(x -> {
            System.out.println("filter: " + Thread.currentThread().getName());
            return x % 2 == 0;
        })
        .observeOn(Schedulers.newThread())
        .map(x -> {
            System.out.println("map: " + Thread.currentThread().getName());
            return x * 3;
        })
        .subscribe(x -> {
            System.out.println("observer: " + Thread.currentThread().getName());
            System.out.println(x);
        });

ObserverとSubscriber

Observableをつくるときにcreateメソッドにラムダ式を渡していた。実態はOnSubscribeという関数型インタフェース。なのでこういう風にも書ける。

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        IntStream.range(1, 10).forEach(x -> subscriber.onNext(x));
        subscriber.onCompleted();
    }
});

ラムダ式の引数は実はObserverではなくSubscriber。SubscriberはObserverインタフェースとSubscriptionインタフェースを実装している。

observable.subscribe(observer)ってやったときの返り値もSubscription。

Subscriptionには、unsubscribeメソッドとisUnsubscribedメソッドが用意されている。subscribeしたときの返り値に対してunsubscribeメソッドを呼び出すと、通知を受けるのを止めることができる。

isUnsubscribeメソッドでobserverがunsubscribeしたかどうかをチェックできる。あと明示的にunsubscribeメソッドを呼び出さなくてもunsubscribeされるときがある。例えば以下のようにtakeメソッドを使ってObservableから5件だけデータを受け取るようなとき。

Observable.range(1, 10)
    .take(5)
    .filter(x -> x % 2 == 0)
    .map(x -> x * 3)
    .subscribe(
        (x) -> System.out.println(x),
        (e) -> System.out.print("ERROR!"),
        ()  -> System.out.print("COMPLETE!")
    );

そのため、Observable.createメソッドを使うときは、onNextメソッド前にisUnsubscribedメソッドでunsubscribeされているかどうかをチェックした方が良い。ずーっと続くようなObservableの場合はやっておかないと長いこと動いちゃう。

Observable<Integer> observable = Observable.create(subscriber -> {
    int i = 0;
    while(true) {
        if (subscriber.isUnsubscribed()) {
            break;
        } else {
            if (i == Integer.MAX_VALUE) {
                subscriber.onCompleted();
            } else {
                subscriber.onNext(i);
                i = i + 1;
            }
        }
    }
});
observable.subscribeOn(Schedulers.computation())
    .take(5)
    .filter(x -> x % 2 == 0)
    .map(x -> x * 3)
    .subscribe(
        (x) -> System.out.println(x),
        (e) -> System.out.print("ERROR!"),
        ()  -> System.out.print("COMPLETE!")
    );

RxJavaを使う練習

ここまで学んだことの練習として以下の記事を読む。ふむふむ、分かるわかる。

techlife.cookpad.com

概念や歴史的な背景など

歴史的な話はこちらの記事が非常に参考になった。

steps.dodgson.org

色々なリアクティブがある。リアクティブという単語は使わずにリアクティブエクステンションとちゃんと言おうと思った。この文章も大事。

Rx は Future に似ている。そして要求応答の対を 1:1 から 1:N に一般化することでストリームが苦手な Future の弱点を乗り越えている

あとこちらも記事も参考になった。

okapies.hateblo.jp

okapies.hateblo.jp

勉強になるなー。先ほどのReactive Pornの記事と同じようなことが書いてある。

Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable は(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。

そして、公式サイトにもあるが、単数/複数、同期/非同期の表がすごくわかりやすかった。

ReactiveX - Intro

表には以下の4パターンがまとめられている。

  • 単数 かつ 同期 => 値
  • 複数 かつ 同期 => 配列
  • 単数 かつ 非同期 => Future/Promise
  • 複数 かつ 非同期 => Observable

HTTPリクエストをObservable化

HTTPリクエストを送るのを上の4パターンでやってみる。HTTPクライアントはOkHttpを使う。

まずは「単数 かつ 同期」。

public void exec(String url) throws IOException {
    Response = syncHttpCall(url);
    System.out.println("[" + url + "] OK!");
}

public Response syncHttpCall(String url) throws IOException {
    OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder()
            .url(url)
            .build();
    return client.newCall(request).execute();
}

次は「複数 かつ 同期」。

public void exec(List<String> urls) throws IOException {
    for (String url : urls) {
        syncHttpCall(url);
        System.out.println("[" + url + "] OK!");
    }
}

これで複数とHTTP通信ができる。

1つのスレッドで動いているのでレスポンスを待っている時間がもったいない。リクエストを投げたら結果が返ってくるまでに他のHTTPリクエストも投げてしまいたい。なので、非同期でHTTP通信するようにしよう。

まずは「単数 かつ 非同期」から考えてみる。Java8から使えるようになったCompletableFutureで非同期処理を書くことができる。CompletableFutureに渡したラムダは、メインスレッドとは別のスレッドで動作する。

public void exec(List<String> urls) throws IOException {
    final String url = urls.get(0);
    CompletableFuture<Response> future = asyncHttpCall(url);
    future.thenAccept(response -> {
        System.out.println("[" + url + "] OK!");
    });
}

private CompletableFuture<Response> asyncHttpCall(String url) {
    CompletableFuture<Response> future = CompletableFuture.supplyAsync(() -> {
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder()
                .url(url)
                .build();
        try {
            return client.newCall(request).execute();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    });
    return future;
}

では、「複数 かつ 非同期」を考える。同じくCompletableFutureを使ってやってみる。

public void exec(List<String> urls) throws IOException {
    for (final String url : urls) {
        CompletableFuture<Response>future = asyncHttpCall(url);
        future.thenAccept(response -> {
            System.out.println("[" + url + "] OK!");
        });
    }
}

これで、複数のHTTPリクエストを非同期に呼び出すことができた。1つ目のHTTPリクエストを投げたあと、レスポンスを待たずに別のスレッドで2つ目のHTTPリクエストを投げることができる。

あれ?「複数 かつ 非同期」もCompletableFutureで実現できてしまった。Observableは不要だったのか。

ここでこの1文を思い出してみる。

Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable は(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。

(時間や順序のある)複数イベントのストリームと書いてある。時間や順序。

CompletableFutureで実装したコードを実際に動かすと分かるが、レスポンスを処理する順番はバラバラになる。レスポンスが早く返ってきたやつから処理される。つまり順序は意識されなくなる。

では、Observableを使って「(時間や順序のある)複数 かつ 非同期」をやってみる。あんまり自信がないけど、多分こんな感じ。

public void exec(List<String> urls) throws IOException {
    Observable<Response> observable = Observable.empty();
    for (String url : urls) {
        observable = observable.concatWith(Observable.from(asyncHttpCall(url)));
    }
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .zipWith(Observable.from(urls), (r, s) -> new Pair<>(r, s))
            .map(pair -> "[" + pair._2 + "] OK! ")
            .subscribe(System.out::println);
}

Observableオブジェクトは、CompletableFutureからつくることができる。そしてObservable同士はconcatWithメソッドを使うと順番を維持して合成することができる。

データを生成する部分、つまりCompletableFutureからデータを取り出す部分はSchedulers.io()を指定する。発生したイベントを処理する部分、つまり取り出したデータを処理する部分はSchedulers.computation()を指定した。

これで、「(時間や順序のある)複数 かつ 非同期」を実現できた!

しかもObservaleは、Observableをメインスレッドで使うこともできるし値を1つだけ持たせることもできるので、4つのパターン全てに対応できる。

ここまでを整理

以下の3つを分けて理解するのが大事っぽい。それぞれを独立して考えるようにする。

  • どうやってObservable化するか
  • Observableをどうやって操作するか
  • Schedulerの設定をどうするのか

Observableの操作はコレクションに対する高階関数での処理(例えばJava8のStreamに対する処理)の考え方が生かせるので、特に新しいことって感じはない。何をどうやってObservableにするか、というのが部分がRxを使うときに新しいところな気がする。

一度、Observableにしてしまえば、Observableを加工したり、どのスレッドを使うかは同じ考え方で対処できる。

なんでもObservable

何をObservableにするか。以下の記事にはこんなことが書いてある。

ninjinkun.hatenablog.com

全てがストリームにできる。これがFRPのマントラだ

ストリームというのはObservableのこと。全てがObservableにできる!

記事を読みながら実際にRxJsを試してみる。RxJsにはUIのイベントをObservableにするメソッドが用意されていて便利!Future/Promiseだけでなく、UIのイベントもObservableで表すことができることが実感できた。

GUIアプリケーションでもRx

Web + DB vol.85 にMVVMパターンとRxJsの説明がある。これも参考になる。

WEB+DB PRESS Vol.85

WEB+DB PRESS Vol.85

  • 作者: 菅原元気,磯辺和彦,山口与力,澤登亨彦,濱田章吾,宮田淳平,松本亮介,海野弘成,佐藤歩,泉水翔吾,佐藤太一,hide_o_55,青木良樹,武本将英,道井俊介,伊藤直也,橋本翔,渡邊恵太,舘野祐一,中島聡,はまちや2,竹原,牧大輔,工藤春奈,WEB+DB PRESS編集部
  • 出版社/メーカー: 技術評論社
  • 発売日: 2015/02/24
  • メディア: 大型本
  • この商品を含むブログを見る

同じような考え方で、RxAndroidも使えそう(試してはない)。RxAndroidについてはWebにたくさんの資料がある。

Androidの場合は、Schedulerを使ったスレッドの制御がすごく役に立ちそうだなー。趣味アプリで試してみよう。

新たな登場人物Subject

クックパッドさんの記事を読んでいてこんなことが書いてあった。

RxJavaの機能の活用方法は概ね以下の様に分解できます。

  • List処理の抽象化・ストリーム化
  • Optional
  • Promise
  • Data Binding
  • Event Bus

「List処理の抽象化・ストリーム化」ってのは最初にやった。PromiseもCompletableFutureからObservableをつくってみたのでやった。「Optional」はObservableに値を1つだけ入れるか空にするかってやればできそう。

「Data Binding」と「Event Bus」はどうすんだ?と思ってたら、こういう風にも書いてあった。

Data BindingやEvent BusはSubjectを利用するので、Observableを知る前に触れるのはオススメできません

ふむふむ。Observable、Observer、Scheduler以外の登場人物、Subjectというものが出てきた。

公式サイト見てみると、Subjectのページがある。

ReactiveX - Subject

SubjectはObservableとしてもObserverとしても振舞う、って書いてある。実際、JavaDocを見てみるとObservableを継承してObserverを実装している。いわゆるObserverパターンのSubjectとは違うっぽい。

Subject (RxJava Javadoc 1.3.8)

"cold"なObservableと"hot"なObservable

公式サイトを読み進めると、Subjectを使うと元々の"cold"なObservableを変形した"hot"なObservableを得ることができる、というようなことが書いてある。

"cold"なObservableと"hot"なObservable?ググるとこんな記事が。

qiita.com

すごく分かりやすいぞ!"Hot"なObservableは、Observerが登録されいなくてもデータを流すのか。1,2,3というデータを表すObservableがあったとして、1と2の間でObserverがsubscribeされたとしたら、そのObserverは2と3を受け取る(1は既に流されてしまった後であるため)。

これが分かると公式サイトのSubjectのページの図も分かりやすい。Subjectには4種類あって、それぞれどうやってデータをObserverに渡すかが異なってる。

さらにもう1つ。"hot"なObservableは分岐することができるって書いてある。

例えば、以下のように標準入力で入力された文字列をストリームのデータとするObservableを用意する。これは"cold"なObservableになる。

Observable<String> observable = Observable.create(subscriber -> {
    InputStreamReader inputStreamReader = new InputStreamReader(System.in);
    try (BufferedReader in = new BufferedReader(inputStreamReader)) {
        while (true) {
            if (subscriber.isUnsubscribed() == false) {
                System.out.print("> ");
                String s = new String(in.readLine());
                subscriber.onNext(s);
            } else {
                break;
            }
        }
    } catch (IOException e) {
        subscriber.onError(e);
    }
}

"cold"なObserverに2つのObserverをsubscribeしても、1個しか出力されない。

observable.subscribe(System.out::println);
observable.subscribe(System.out::println);

では、間にSubjectを挟んでみよう。SubjectはObservableであるので、Observerをsubscribeメソッドで受け取れる。また、SubjectはObserverでもあるので、Observableのsubscribeメソッドで渡すこともできる。

PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.subscribe(System.out::println);
observable.subscribe(subject);

Subjectを間に挟むと標準出力にちゃんと2回出力されるようになる。なぜならSubjectを"cold"なObservableのsubscribeメソッドに渡したことで、"hot"なObservableになったから。

ちなみにわざわざObservableとSubjectをつなげないで、Subject単体でも同じことできる。

PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.subscribe(System.out::println);
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
try (BufferedReader in = new BufferedReader(inputStreamReader)) {
    while (true) {
        System.out.print("> ");
        String s = new String(in.readLine());
        subject.onNext(s);
    }
} catch (IOException e) {
        subject.onError(e);
}

PublishSubjectを使ってEvent Bus

さて、ストリームを分岐するのとPublishSubjectを使ったことでEvent Busのようなことができてる気がする。標準入力というイベントをObservableで表現し、イベントハンドラを複数登録できてる。

PublishSubjectを使う理由を書く。以下のように"add"という文字列を標準入力に入力した場合は、Observerを追加するようにしてみる。

PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(s -> {
    if (s.equalsIgnoreCase("add")) {
        observerCount = observerCount + 1;
        int observerId = observerCount;
        subject.subscribe(string -> System.out.println("[" + observerId + "] " + string));
    }
});
observable.subscribe(subject);

以下のように入力すると、Observerが登録される前の入力文字列は無視されて、登録後の入力文字列だけが出力されていることが分かる。

> hoge
> add
> fuga
[1] fuga
> foo
[1] foo
> add
[1] add
> bar
[1] bar
[2] bar

これはPublishSubjectのおかげ。これがReplaySubjectを使った場合はこうなる。EventBusの挙動としてはおかしい。

> hoge
> add
[1] hoge
[1] add
> fuga
[1] fuga
> foo
[1] foo
> add
[2] hoge
[2] add
[2] fuga
[2] foo
[2] add
[1] add
> bar
[1] bar
[2] bar

BehaviorSubjectを使ってData Binding

残すところData Bindingだが、これはRxJavaだけだとあんまりイメージが・・・。オブジェクトのプロパティを変更したときにイベントが発生するようなフレームワークと組み合わせると良いのかな。とりあえず変更対象のオブジェクトをViewModelというクラスでラップしてやってみる。

private static class View {
    public String value;
}

private static class ViewModel {
    private BehaviorSubject<String> behaviorSubject;
    private String s;

    public ViewModel(String s) {
        this.s = s;
        this.behaviorSubject = BehaviorSubject.create(s);
    }

    public String get() {
        return s;
    }

    public void set(String s) {
        this.s = s;
        behaviorSubject.onNext(s);
    }

    public void bind(View view) {
        behaviorSubject.subscribe(s -> view.value = s);
    }
}

public static void main(String[] args) {

    ViewModel viewModel = new ViewModel("default");

    View view1 = new View();
    System.out.println("######################");
    System.out.println("view1: " + view1.value);

    viewModel.bind(view1);

    System.out.println("######################");
    System.out.println("view1: " + view1.value);

    viewModel.set("hoge");

    System.out.println("######################");
    System.out.println("view1: " + view1.value);

    View view2 = new View();
    viewModel.bind(view2);
    System.out.println("######################");
    System.out.println("view1: " + view1.value);
    System.out.println("view2: " + view2.value);

    viewModel.set("fuga");

    System.out.println("######################");
    System.out.println("view1: " + view1.value);
    System.out.println("view2: " + view2.value);
}

ViewModelにViewオブジェクトをbindしておく。そうするとViewModelのプロパティを書き換えると、それがViewにも反映される。Data Bindingは多分こんな感じ・・・。

実行結果はこうなる。

######################
view1: null
######################
view1: default
######################
view1: hoge
######################
view1: hoge
view2: hoge
######################
view1: fuga
view2: fuga

BehaviorSubjectを使うことで、subjectしたときに最後の値を通知することができる。解決したい課題に対して、適切なSubjectを選択するといいんだな。

まとめ

  • リアクティブプログラミングには色々ある。リアクティブエクステンションとちゃんと言う
  • Rxでは、"時間や順序のある"連続的なデータ(ストリーム)をイベントハンドラで処理する
  • データの生成、そのデータ生成に伴うイベントの通知、イベントの処理を分離して記述する
  • Observableオブジェクトが「データの生成」と「データ生成に伴うイベントの通知」をする
  • Observerオブジェクトが「イベントの処理」をする
  • Observableはコレクションに対する高階関数と同じように操作できる
  • Observable同士を合成できる
  • Schedulerを使って動作するスレッドを指定できる
  • Rxを使うと非同期処理がやりやすい
  • 非同期処理と同期処理を同じようにObservableに対する操作で書ける
  • Observableには"Hot"なObservableと"Cold"なObservableがある
  • "Hot"なObservableはSubjectを使ってつくる
  • Subjectを使うことでEvent BusやData BindingもRxで実現できる
  • 全てをObservableにできる!

思った以上に盛りだくさんだった・・・。色々あって実践っぽいとこで使ってみないと分からない気がするなぁ。趣味Androidアプリで試してみるかなー。