読者です 読者をやめる 読者になる 読者になる

techium

このブログは何かに追われないと頑張れない人たちが週一更新をノルマに技術情報を発信するブログです。もし何か調査して欲しい内容がありましたら、@kobashinG or @muchiki0226 までいただけますと気が向いたら調査するかもしれません。

RxJavaのFiltering Operatorの使い方をまとめてみた

ReactiveExtenstionの根本的な概念としてObservableとOperatorがあります。前回はRxJavaをより深く理解するためにObservableの生成メソッドをまとめてみました。

今回は前回に続いて、Observableから流れてきた値をフィルタするFiltering Operatorの使い方をサンプルを作りながら確認してみます。

Filtering Operator一覧

ReactiveXの公式ページでリストアップされているFiltering Operatorは以下のとおりです。

  1. Debounce
  2. Distinct
  3. ElementAt
  4. Filter
  5. First
  6. IgnoreElements
  7. Last
  8. Sample
  9. Skip
  10. SkipLast
  11. Take
  12. TakeLast

結構多いですが以下で1つずつ試してみます。

Debounce

出力元Observableで値を出力後、指定した時間経過後まで新たな出力が行われなかった場合に値を出力する。文章ではわかりづらいが、簡単に言うとタイマーのようなものと思えば良い。

出力元Observableから値が出力されたら指定した時間のタイマーが動き出し、タイマー作動中に他の値が出力されたら前の値のタイマーはキャンセルされ、新しい値に対するタイマーが起動する。タイマーが発火したらその値が出力される。

以下のサンプルはボタンクリック後、2秒間ボタンクリックを行わなければ値が出力される。2秒以内にボタンを再度クリックすると、クリック時点からまた2秒間のカウントが始まる。

Observable<Void> clickObservable = RxView.clicks(mButton);
RxView.clicks(mButton).debounce(2, TimeUnit.SECONDS).subscribe(v -> {
    Log.d("AAA", "debounce detected");
});

Distinct

Observableから重複した値を取り除く。引数に重複判定用のメソッドを渡すことで任意の重複判定を行うこともできる。

Observable.just(1, 1, 2, 3, 2, 1, 3, 3, 4).distinct().subscribe(i -> {
    Log.d("AAA", "distinct : " + i);
});
// distinct : 1
// distinct : 2
// distinct : 3
// distinct : 4

DistinctUntilChanged

Distinctのバリエーションの1つで、連続で同じ値が出力された場合は1つに圧縮されるが、別の値が出力された後に再び元の値が出力された場合、その値を出力するというオペレータ。

文章だけでは分かりにくいが、例えば[A, A, B, A, B, B, B, C]という値を出力するObservableがあった場合、DistinctUntilChangedでフィルタリングすると[A, B, A, B, C]という出力となる。

Observable.just(1, 1, 2, 3, 2, 1, 3, 3, 4).distinctUntilChanged().subscribe(i -> {
    Log.d("AAA", "distinctUntilChanged : " + i);
});
// distinctUntilChanged : 1
// distinctUntilChanged : 2
// distinctUntilChanged : 3
// distinctUntilChanged : 2
// distinctUntilChanged : 1
// distinctUntilChanged : 3
// distinctUntilChanged : 4

ElementAt

指定したインデックス個目の値のみを出力する。

Observable.range(1, 10).elementAt(4).subscribe(i -> {
    Log.d("AAA", "elementAt(4) : " + i);
});
// elementAt(4) : 5

ElementAtOrDefault

指定したインデックス個目の値のみを出力する。指定したインデックスが存在しない場合にデフォルト値を指定することができる。

Observable.range(1, 10).elementAtOrDefault(20, 100).subscribe(i -> {
    Log.d("AAA", "elementAtOrDefault(20, 100) : " + i);
});
// elementAtOrDefault(20, 100) : 100

Filter

booleanを返すフィルタメソッドを渡すことで、メソッドの戻り値がtrueになる値のみを出力する。

Observable.range(1, 50).filter(num -> {
    return num % 10 == 0;
}).subscribe(filtered -> {
    Log.d("AAA", "filter : " + filtered);
});
// filter : 10
// filter : 20
// filter : 30
// filter : 40
// filter : 50

ofType

Filterの一種で、メソッドではなくクラスを渡すことで、そのクラスと一致する値のみを出力する。静的型付け言語の場合基本的に型は指定されているためあまり活用の機会はないかもしれないが、動的型付け言語の場合はもしかしたら便利なのかも?

以下のサンプルではintからIntegerへの暗黙的型変換が行われるため、range(1, 5)で出力される値がIntegerと判定されて出力されている。しかしjust(1.0, 2.0, 3.0)とした場合はDoubleへの暗黙的型変換が行われるため何も出力されない。

Observable.range(1, 5).ofType(Integer.class).subscribe(filtered -> {
    Log.d("AAA", "ofType(Integer) : " + filtered);
});
// ofType(Integer) : 1
// ofType(Integer) : 2
// ofType(Integer) : 3
// ofType(Integer) : 4
// ofType(Integer) : 5

First

最初にObservableから流れてきた値のみを出力する。

Observable.range(1, 100).first().subscribe(i -> {
    Log.d("AAA", "first : " + i);
});
// first : 1

Firstでは引数にbooleanを返すフィルタメソッドを渡すことで、最初にフィルタの条件に該当した値のみを出力することもできる。例えばFizzBuzzを判定するメソッドを渡すと以下のようになる。

Observable.range(1, 100).first(num -> {
    return fizz(num) && buzz(num);
}).subscribe(firstMatched -> {
    Log.d("AAA", "first(FizzBuzz) : " + firstMatched);
});
// first(FizzBuzz) : 15

private boolean fizz(int num) {
    return num % 3 == 0;
}

private boolean buzz(int num) {
    return num % 5 == 0;
}

IgnoreElements

値は何も出力せずにObservableの終了イベントのみに反応する。つまりonErroronCompleteのみが実行される。

Observable.range(1, 10).ignoreElements().subscribe(
    i -> {
        Log.d("AAA", "ignoreElements : onNext : " + i);
    },
    e -> {
        Log.d("AAA", "ignoreElements : onError ");
    },
    () -> {
        Log.d("AAA", "ignoreElements : onComplete");
    }
);
// ignoreElements : onComplete

Last

Observableから最後に出力された値のみを出力する。

Observable.range(1, 100).last().subscribe(i -> {
    Log.d("AAA", "last : " + i);
});
// last : 100

Firstと同じくこちらもbooleanを返すメソッドを渡すことで、その条件に合致する最後の値のみを出力することができる。Firstと同様にFizzBuzzを判定するメソッドを渡すと以下のようになる。

Observable.range(1, 100).last(num -> {
    return fizz(num) && buzz(num);
}).subscribe(firstMatched -> {
    Log.d("AAA", "last(FizzBuzz) : " + firstMatched);
});
// last(FizzBuzz) : 90

Sample

出力元Observableから出力される値を指定した時間間隔ごとに出力する。いわゆるサンプリング。指定した間隔(ここでは3秒)ごとに、まだフィルタリングされていない最新の値があればそれを出力する。出力元Observableが終了したらサンプリング間隔に関わらず最後の値が出力される。

subscribe直後にサンプリングが行われるのかと思いきや、ここではなぜか1秒経過後*1からサンプリングが開始されているように見える。最初の出力は0になるんじゃないのか?この辺りの挙動は謎。教えてエロい人。

Observable.interval(1, TimeUnit.SECONDS).take(10).sample(3, TimeUnit.SECONDS).subscribe(i -> {
    Log.d("AAA", "sample(3) : " + i);
});
// sample(3) : 1
// sample(3) : 4
// sample(3) : 7
// sample(3) : 9

throttleLast

Sampleの別名。単なるエイリアスとのことなので割愛*2

throttleFirst

Sampleは指定した間隔経過時に出力元Observableから値をフィルタリングするが、こちらは指定した間隔の最初に値をフィルタリングする。これについては公式ページのマーブルダイアグラムを見ればよくわかる。端的に言えば、Sampleはサンプリング間隔の最後でサンプリングを実行し、throttleFirstはサンプリング間隔の最初でサンプリングを実行する。

Observable.interval(1, TimeUnit.SECONDS).take(10).throttleFirst(3, TimeUnit.SECONDS).subscribe(i -> {
    Log.d("AAA", "throttleFirst(3) : " + i);
});
// throttleFirst(3) : 0
// throttleFirst(3) : 3
// throttleFirst(3) : 6
// throttleFirst(3) : 9

Skip

Observableから出力される値のうち、引数で指定した最初のn個を無視する。

Observable.range(1, 10).skip(5).subscribe(i -> {
    Log.d("AAA", "skip(5) : " + i);
});
// skip(5) : 6
// skip(5) : 7
// skip(5) : 8
// skip(5) : 9
// skip(5) : 10

Skipでは時間を指定することもできる。時間(int)と単位(TimeUnit.SECONDS等)を指定すると、指定した時間が経過するまでの出力は無視される。以下は0~9を1秒毎に出力するObservableを8秒間Skipする例。

Observable.interval(1, TimeUnit.SECONDS).take(10).skip(8, TimeUnit.SECONDS).subscribe(i -> {
    Log.d("AAA", "skip(8, TimeUnit.SECONDS) : " + i);
});
// skip(8, TimeUnit.SECONDS) : 7
// skip(8, TimeUnit.SECONDS) : 8
// skip(8, TimeUnit.SECONDS) : 9

SkipLast

Observableから出力される値のうち、引数で指定した最後のn個を無視する。

Observable.range(1, 10).skipLast(5).subscribe(i -> {
    Log.d("AAA", "skipLast(5) : " + i);
});
// skipLast(5) : 1
// skipLast(5) : 2
// skipLast(5) : 3
// skipLast(5) : 4
// skipLast(5) : 5

SkipLastでは時間を指定することもできる。時間(int)と単位(TimeUnit.SECONDS等)を指定すると、出力元Observableが終了後、終了時点から引数で指定した時間以前の出力は無視される。以下は0~9を1秒毎に出力するObservableの最後8秒間をSkipする例。

Observable.interval(1, TimeUnit.SECONDS).take(10).skipLast(8, TimeUnit.SECONDS).subscribe(i -> {
    // Observable終了後に実行される
    Log.d("AAA", "skipLast(8, TimeUnit.SECONDS) : " + i);
});
// skipLast(8, TimeUnit.SECONDS) : 0
// skipLast(8, TimeUnit.SECONDS) : 1

Take

出力元のObservableから指定した個数の値のみ出力する。

Observable.range(1, 10).take(3).subscribe(i -> {
    Log.d("AAA", "take(3) : " + i);
});
// take(3) : 1
// take(3) : 2
// take(3) : 3

Skip同様、時間を指定することもできる。時間(int)と単位(TimeUnit.SECONDS等)を指定すると、指定した時間の間に出力元Observableから出力された値のみをフィルタリングできる。

Observable.interval(1, TimeUnit.SECONDS).take(3, TimeUnit.SECONDS).subscribe(i -> {
    Log.d("AAA", "take(3, TimeUnit.SECONDS) : " + i);
});
// take(3, TimeUnit.SECONDS) : 0
// take(3, TimeUnit.SECONDS) : 1
// take(3, TimeUnit.SECONDS) : 2

TakeLast

出力元Observableから出力された最後のn個の値を出力する。

Observable.range(1, 10).takeLast(3).subscribe(i -> {
    Log.d("AAA", "takeLast(3) : " + i);
});
// takeLast(3) : 8
// takeLast(3) : 9
// takeLast(3) : 10

他のオペレータ同様、このメソッドも時間を指定することができる。時間を指定すると、出力元Observable終了後、終了直前の指定した時間分の値が出力される。

Observable.interval(1, TimeUnit.SECONDS).take(10).takeLast(3, TimeUnit.SECONDS).subscribe(i -> {
    Log.d("AAA", "takeLast(3, TimeUnit.SECONDS) : " + i);
});
// takeLast(3, TimeUnit.SECONDS) : 6
// takeLast(3, TimeUnit.SECONDS) : 7
// takeLast(3, TimeUnit.SECONDS) : 8
// takeLast(3, TimeUnit.SECONDS) : 9

なおこの場合、終了直前3秒間に出力されるのは7, 8, 9なので7, 8, 9が出力されるかと思いきや6も出力されている。これは、9が出力された瞬間を起点に3秒前までの値を出力しているから(9(0秒) -> 8(1秒後) -> 7(2秒後) -> 6(3秒後))と思われる。

TakeLastBuffer

出力元Observable終了後、指定した個数の出力をListにバッファリングして出力する。

Observable.range(1, 10).takeLastBuffer(3).subscribe(list -> {
    Log.d("AAA", "takeLastBuffer(3) : " + list);
});
// takeLastBuffer(3) : [8, 9, 10]

*1:intervalの最初の出力は0なので

*2:ReactiveX - Sample operator