C#でReactive Extensions(Rx)その1:まずはIObserver, IObservableを理解する

今回より、C#でRxの使い方を確認していきます。

Rxはデザインパターンの1つであるのObserverパターンを元にしています。
なのでRxを学ぶ前に、まずはC#でObserverパターンを表現するためのIObserver, IObservableクラスの使い方を確認します。

今回作るのは以下のようなコードです。
ここに出てくる、SimpleEventSource/PrintEventListenerクラスを作るのが今回のゴールです。

public void Sample01() {
    // イベントソースを作る
    SimpleEventSource source = new SimpleEventSource();
 
    // リスナを登録する
    source.Subscribe(new PrintEventListener("A"));
 
  // 処理をexecuteする。(この時、PrintEventListenerへイベントが通知される)
    source.Execute(1);
    source.Execute(2);
    source.Execute(3);
}


まず、今回出てくる2つのクラスの概要を説明します。
SimpleObserverは、Execute()メソッドでなんらかの処理を行うクラスです。
このクラスは処理を実行するだけでなく、Execute()が実行時に登録されているイベントリスナに対して、Execute実行がされたことをイベントとして通知します。

イベントリスナになっているのが、PrintEventListenerクラスです。
イベントリスナはSimpleEventSource::Subscribe()を使って登録します。

ここで、イベントソースであるSimpleEventSourceがIObservableを実装し、PrintEventListenerがIObserverを実装します。

class PrintEventListener : IObserver<int> {
    ...
}
 
class SimpleEventSource : IObservable<int> {
    ...
}


IObserverとIObservableの2つは、.Net Frameworkが用意しているインターフェースです。


IObserverの実装(イベントリスナ)


まずは、処理内容が分かりやすいイベントリスナ側を実装します。
ちなみに、イベントリスナは、Rx上ではサブスクライバ(購読者)と呼ばれています。

// サブスクライバの実装(いわゆるイベントリスナ)
class PrintEventListener : IObserver<int> {
    private string name;
    public PrintEventListener ( string name ) {
        // 区別するための名前を記録(IObserverの実装とは関係のないDebug用)
        this.name = name;
    }
 
    // イベント受信時のハンドラ
    public void OnNext ( int value ) {
        Console.WriteLine( name + ":イベントを受信 val="+value );
    }
 
    // イベント通知終了時のハンドラ(イベントソースから切られた)
    public void OnCompleted () {
        Console.WriteLine( name + ":イベントが終了しました" );
    }
 
    // エラー時のハンドラ
    public void OnError ( Exception error ) {
        Console.WriteLine( name + ":イベント受信中にエラー発生 Msg=" + error.Message );
    }
}


この中で最も大事なのは、OnNext()です。
OnNext()は、イベントが発火したとき、コールされるコールバック(イベントハンドラ)です。
IObserverインターフェースでは、このほかにイベントが最後になった事とエラーになった事を伝えるために、OnCompleted()・OnError()の実装を要求されています。

イベントが最後というのは、たとえば通信プログラムを作っている際、通信が切断されるときなど、イベントソース側からこれ以上イベントが来なくなる事を通知するハンドラです。

エラーになった事というのは文字通り、想定外のエラーが発生しイベントの実行が行えなくなった事で、このときにOnError()をコールされることでエラー通知されます。


IObservableの実装(イベントソース)

次に、イベントを発火させる側の処理です。
イベントソースは、Rxではパブリッシャ(出版者)と呼ばれます。

イベントソース側の処理は長いので、ちょっとづつ見ていきます。
まずはリスナの登録処理である、Subscribe()です。

// オブザーバの実装(いわゆるイベントソース)
class SimpleEventSource : IObservable<int> {
    // イベントリスナの一覧
    private List<IObserver<int>> observerList = new List<IObserver<int>>();
 
    // イベントリスナの登録
    public IDisposable Subscribe( IObserver<int> observer ) {
        observerList.Add(observer);
        return new NotifyDispose(observerList, observer);
    }


Subscribeでは、IObserverのオブジェクトを受け取ることでリスナを登録します。
複数のリスナを登録可能にするため、今回はList<>でリスなの一覧を登録質得ます。

また、リスナの削除を行うためのDisposerオブジェクトを渡しています(詳細は後述)。


次に、イベントの実行処理です。
このメソッド名は何でも良いですが、今回はExecute()にしました。

    // 処理の実行(実行されたことのOnExecute通知を行う)
    public void Execute ( int value ) {
        // do something...
    
        if( value < 0 ) {
            foreach( var ob in observerList ) {
                ob.OnError(new Exception("invalid value:" + value));
            }
            observerList.Clear();
            return;
        }
 
        if( value > 10 ) {
            foreach( var ob in observerList ) {
                ob.OnCompleted();
            }
            observerList.Clear();
            return;
        }
 
        // イベントを通知する
        foreach( var ob in observerList ) {
            ob.OnNext(value);
        }
    }


今回のサンプルでは、負の数が入力されたらエラー、10以上の数値が入力されたら終了。ということにしてみました(この処理が何であるかは、パブリッシャの実装に依存します)。

IObservableインターフェースを実装するものは、エラーが起きた時はOnError()、イベントの終了はOnCompleted()を呼ぶのが決まりとなっています。このためExecute()ではこのルールどおりそれぞれのメソッドを実行しています。
また、IObserverではOnError()およびOnCompleted()を読んだ後は、OnNext()をコールしてはいけないというルールもある為、observerList.Clear();でリスナを削除しています。


最後に、前述のSubscribe()で戻り値としてリターンしていたDisposeオブジェクトです。
今回は、SimpleEventSourceのインナークラスとして実装しています。

    // Listenerの削除管理クラス
    private class NotifyDispose : IDisposable {
        private List<IObserver<int>> observerListRef = null; // SimpleObserverが持つリスナ一覧への参照
        private IObserver<int>       targetObserver  = null;
 
        // コンストラクタ
        public NotifyDispose ( List<IObserver<int>> observerListRef, IObserver<int> targetObserver ) {
            this.observerListRef = observerListRef;
            this.targetObserver  = targetObserver;
        }
 
        // 削除処理
        public void Dispose () {
            if( this.observerListRef == null ) {
                // 既に削除が終わっていたら何もしない
                return;
            }
 
            if( observerListRef.IndexOf(targetObserver) != -1 ) {
                // 監視中だったら、監視対象から外す
                observerListRef.Remove(targetObserver);
            }
 
            // 削除完了をマーキングする
            observerListRef = null;
            targetObserver  = null;
        }
    }
}


ちょっと長いですが、行っていることはシンプルです。
コンストラクタでSimpleEventSourceが持っているリスナ一覧への参照を覚えておき、Dispose()がコールされたらリスナ一覧から登録したリスナを削除しているだけです。


作ったIObserver/IObservableを連携させてみる。

最後に、作ったPrintEventListenerとSimpleEventSourceを連携させて、呼び出しテストを行います。

今回は下記のコードを書いてみました。

public void ObserverableTest () {
    // イベントソースを作る
    SimpleEventSource source = new SimpleEventSource();
 
    // リスナ登録し処理をexecuteする。(この時、OnExecuteのイベントが通知される)
    source.Subscribe(new PrintEventListener("A"));
    source.Execute(1);
    source.Execute(2);
    source.Execute(3);
    source.Execute(100);
    source.Execute(4);  // このイベントは通知されない
 
    // 2つのリスナへ通知を送る. 途中でエラーにさせる
    source.Subscribe(new PrintEventListener("B1"));
    source.Subscribe(new PrintEventListener("B2"));
    source.Execute(1);
    source.Execute(2);
    source.Execute(-10);
    source.Execute(3);  // このイベントは通知されない
 
 
    // 呼び元主導で、Listenを中断してもらう。
    IDisposable disposerC1 = source.Subscribe(new PrintEventListener("C1"));
    IDisposable disposerC2 = source.Subscribe(new PrintEventListener("C2"));
    source.Execute(1);
    source.Execute(2);
 
    disposerC1.Dispose();
    source.Execute(3);   // このイベントはC1には通知されない
 
    disposerC2.Dispose();
    source.Execute(4);   // このイベントは通知されない
}


いろいろなリスナ登録・削除の処理を行っています。


この処理の実行結果は以下のとおりです。

A:イベントを受信 val=1
A:イベントを受信 val=2
A:イベントを受信 val=3
A:イベントが終了しました
 
B1:イベントを受信 val=1
B2:イベントを受信 val=1
B1:イベントを受信 val=2
B2:イベントを受信 val=2
B1:イベント受信中にエラー発生 Msg=invalid value:-10
B2:イベント受信中にエラー発生 Msg=invalid value:-10
 
C1:イベントを受信 val=1
C2:イベントを受信 val=1
C1:イベントを受信 val=2
C2:イベントを受信 val=2
C2:イベントを受信 val=3


思ったとおりの出力になっているか、確認してみてください。


今回は、以上で終了です。
.NetFrameworkが用意しているIObserverおよび、IObservableクラスを利用し、Observerパターンの実装を行いました。


関連記事

コメントを残す

メールアドレスが公開されることはありません。