2018/02/19

AppDomain境界を跨ぐデータ授受を行う一手法

こんなことをやりたい。



ネットワークから大きなデータを受信して、複数のアプリケーションドメインを経由して、最終的にはどこかのアプリケーションドメインで消費される。あるいはその逆の過程を辿ってネットワークにデータを送信する。

そこで気になるのはアプリケーションドメインを跨いでデータを受け渡す方法だ。

標準的な方法では、受信側でMarshalByRefObjectを継承したクラスを作り、送信側でそのクラスのメソッドを呼び出すということになる。

標準的手法を用いると以下のようなコードになる。というか、いつもいつも似たようなコードで申し訳ない。

open System
open System.Reflection

// 受信側になるアプリケーションドメイン
type Receiver() =
    inherit MarshalByRefObject()

    // データを受信する
    member __.receive ( v : byte[] ) =
        //printf "length = %d\n" ( v.Length )
        ()

// 送信側のアプリケーションドメイン
[<EntryPoint>]
let main argv = 
    // アプリケーションドメインを構築する
    let ad = AppDomain.CreateDomain( "dom1" );

    // dom1のアプリケーションドメインで、
    // Receiverクラスのインスタンスを生成する
    let receiver =
        ad.CreateInstanceAndUnwrap(
            Assembly.GetEntryAssembly().FullName,
            typeof< Receiver >.FullName
        ) :?> Receiver

    // 送信すべきデータを受信する
    let send_data : byte[]  = Array.zeroCreate( 128 * 1024 );

    // 開始時刻を取得する
    let startTime = DateTime.Now

    // 一定時間繰り返す
    let rec loop cnt =
        // データを渡す
        receiver.receive send_data
        // 時々経過時間を確認する
        if ( DateTime.Now - startTime ).TotalSeconds < 10.0 then
            loop ( cnt + 1 )
        else
            cnt
    let count = loop 0

    // 秒間の繰り返し回数を算出する
    let timeSpan = DateTime.Now - startTime
    printf "count = %f / s\n" <| ( float count ) * 1000.0 / timeSpan.TotalMilliseconds
    0

だがこの方法には問題がある。

データが受け渡される都度(上記ではreceiveメソッドが呼ばれる都度)、送信側アプリケーションドメインにあるデータが、受信側アプリケーションドメインのヒープに複製されるのである。ということは、データが受け渡されるアプリケーションドメインの個数分だけ複製が作られてメモリを無駄遣いするし、その分GCの負荷も大きくなるということを意味している。

しかも、やってみればわかるが、このアプリケーションドメインを跨ぐデータの複製は、単なるメモリコピーと比較しても時間がかかる。しかも、データ量に比例して時間がかかる。

だから、性能向上を図ろうと思うのなら、アプリケーションドメイン境界を跨ぐデータ授受について、頻度もしくはデータ量を削減しなければならない。

前に、アプリケーションドメイン境界を跨いで共有メモリにアクセスする手法について記載したが、それをここで使う。

やることは単純である。送信元で共有メモリ空間(あるいはどこかの領域)にデータを書き込んで、その後書き込んだ事実を送信先に通知、送信先で共有メモリの内容を読み込めばよい。そうすれば、少なくともマネージドな空間の間でやり取りされる情報はデータ送受信の通知に関する分だけとなり、データ本体は繰り返しコピーする必要がなくなる。

※余談だが、所詮プロセス内でデータのコピーを行っているだけなのだから、「本物の」共有メモリを使用する必然性はどこにもない。GC対象外の領域であればそれが何であっても実のところ問題ないのだが、ここでは単に便利だからという理由だけで、MemoryMappedFileクラスを用いている。

open System
open System.Reflection
open System.IO.MemoryMappedFiles
open Microsoft.FSharp.NativeInterop

let DataLength = 128 * 1024

// 受信側になるアプリケーションドメイン
type Receiver( argAdr : nativeint ) =
    inherit MarshalByRefObject()

    // 受信用のバッファを用意する
    let receive_data : byte[] = Array.zeroCreate( DataLength );

    // 共有メモリの開始アドレス
    let adr = argAdr

    // データを受信する
    member __.receive () =
        // (3)データをバッファにコピーする
        Buffer.MemoryCopy(
            adr.ToPointer(),
            ( NativePtr.toNativeInt( &&receive_data.[0] ) ).ToPointer(),
            int64 DataLength,
            int64 DataLength
        )

// 送信側のアプリケーションドメイン
[<EntryPoint>]
let main argv = 
    // 共有メモリ
    let mmapFile : MemoryMappedFile =
        MemoryMappedFile.CreateOrOpen( "abc", int64 DataLength );
    let handle =
        mmapFile.CreateViewAccessor().SafeMemoryMappedViewHandle
    let mutable p : nativeptr<byte> =
        NativePtr.ofNativeInt( nativeint 0 )
    handle.AcquirePointer( &p )

    // アプリケーションドメインを構築する
    let ad = AppDomain.CreateDomain( "dom1" );

    // dom1のアプリケーションドメインで、
    // Receiverクラスのインスタンスを生成する
    let receiver =
        ad.CreateInstanceAndUnwrap(
            Assembly.GetEntryAssembly().FullName,
            typeof< Receiver >.FullName,
            false,
            BindingFlags.Default,
            null,
            [| NativePtr.toNativeInt p |],
            null,
            null
        ) :?> Receiver

    // 送信すべきデータを用意する
    let send_data : byte[] = Array.zeroCreate( DataLength );

    // 開始時刻を取得する
    let startTime = DateTime.Now

    // 一定時間繰り返す
    let rec loop cnt =
        // (1)データを書き込む
        Buffer.MemoryCopy(
            ( NativePtr.toNativeInt( &&send_data.[0] ) ).ToPointer(),
            ( NativePtr.toNativeInt p ).ToPointer(),
            int64 DataLength,
            int64 DataLength
        )
        // (2)書き込んだことを通知する
        receiver.receive ()
        // 時々経過時間を確認する
        if cnt % 100 <> 0 || ( DateTime.Now - startTime ).TotalSeconds < 10.0 then
            loop ( cnt + 1 )
        else
            cnt
    let count = loop 0

    handle.ReleasePointer()

    // 秒間の繰り返し回数を算出する
    let timeSpan = DateTime.Now - startTime
    printf "count = %f / s\n" <| ( float count ) * 1000.0 / timeSpan.TotalMilliseconds
    0

ここまでに示したプログラムを単純に実行すると、5倍弱程度の性能差があることが確認できる。

ところで、先のプログラムでは送信側も受信側も共有メモリ領域を完全に占有することを前提としていたことに注意してほしい。しかし、それでは俺のやりたいことは実現できない。ソケットからデータは連続して受信し続けたいし、各アプリケーションドメイン内の処理は非同期的に動作できるようにしなければならない。

なので、共有メモリ空間を単一のデータで占有できないので、メモリ空間を切り出して個別に割り当て・解放を管理しなければならない。

ということで、まずは、共有メモリ領域を固定長のページ単位に区切る。その上で、ページごとに使用中か否かの管理を行うものとする。

共有メモリに書き込む前に領域の割り当てを行い、割り当てられたページ番号を取得する。データの受信側、すなわち共有メモリから読み込む処理に対しては、割り当てられたページ番号を通知し、そのページ番号を元にデータの参照を行う。



さらに、非同期的に動作するということは、メモリ領域(もしくはページ)を確保した処理のすぐ後に解放する処理を入れるようなコーディングスタイルは取れない。

C言語的に書くならこうだ。

void Procedure( …… )
{
    // メモリ領域を確保する
    unsigned char *p = (unsigned char*)malloc( いっぱい );

    // データを設定する
    p[ ??? ] = ……;

    // 重要な処理を行う
    jyuuyou_na_syori( p );// ←これが非同期的に実行され、即座に制御を返すとしたらどうなる?

    // 解放する
    free( p );
}

だから、ページ単位に参照カウンタを設けて、参照する処理がなくなったところでページの開放が行われるような方法を考えてみる。



ページ割り当てを行ったときに参照カウンタを1に設定し、その後、アプリケーションドメイン境界を跨いでページ番号が通知されたら参照カウンタを加算する。データを使い終わるか、他の処理に丸投げるかして参照を外す場合には参照カウンタを減算する。やっていることは純朴な参照カウンタによるガベージコレクタの実装そのものだ。

後は、加減算さえ間違えなければよいのだが、それをどう実現するか。

無論1つの方法として、呼び出された箇所や参照が増えた所に加算処理を記述し、参照が不要になる箇所に減算処理をその都度記述するという方法もある。だが、それを間違いなく行うのは至難の業だ。しかも、例外が発生したり、アプリケーションドメインがアンロードされたような場合にも、確実に参照カウンタが減算されるようにしたい。そうでないと、解放されないページが残存して、いつかはメモリ領域が枯渇してしまうことになる。

なので、ページ番号を保持するクラスを作り、そのクラスのコンストラクタで参照カウンタの加算を行い、ファイナライザで減算させるようにする。そうすれば管理が簡単になって、間違いも減るだろう。(なお、アプリケーションドメインがアンロードされる時にも、ファイナライザは実行される)

だがここで疑問になるのが、アプリケーションドメイン境界を跨いでページ番号を渡す時のことだ。シリアライズ可能なクラスを作って、引数に指定してメソッドを呼び出したとして、その時コンストラクタは実行されるのか否か。実行されるとしたら、それはどれなのか?

open System
open System.Reflection

// ページ番号を保持するクラス
[<Serializable>]
type Descriptor( argPages : int[] ) =
    let m_Pages = argPages
    do
        printf "コンストラクタが動いたぜ!\n"

    override __.Finalize() =
        printf "ファイナライザが動いたぜ!\n"

// 受信側になるアプリケーションドメイン
type Receiver() =
    inherit MarshalByRefObject()

    // データを受信する
    member __.receive ( pages : Descriptor ) =
        ()

// 送信側のアプリケーションドメイン
[<EntryPoint>]
let main argv = 
    // アプリケーションドメインを構築する
    let ad = AppDomain.CreateDomain( "dom1" );

    // dom1のアプリケーションドメインで、
    // Receiverクラスのインスタンスを生成する
    let receiver =
        ad.CreateInstanceAndUnwrap(
            Assembly.GetEntryAssembly().FullName,
            typeof< Receiver >.FullName
        ) :?> Receiver

    let pages = new Descriptor( [| 0; 1; 2; |] )
    receiver.receive pages

    0

結論からすれば、この場合、コンストラクタは実行されない。



ファイナライザは想定通り2回動作しているが、コンストラクタは最初の1回しか実行されていない。

どうするのか。

この場合、以下にあるカスタムシリアル化という処理が役に立つ。

https://msdn.microsoft.com/ja-jp/library/ms973893.aspx#objserializ_topic6

結局、シリアル化を解除されるタイミングで参照カウンタの加算を行いたいのだから、そのタイミングを捕まえてやればいいのである。

open System
open System.Reflection
open System.Runtime.Serialization

// ページ番号を保持するクラス
[<Serializable>]
type Descriptor( argPages : int[] ) =
    let m_Pages = argPages
    do
        printf "オブジェクトが作られたぜ!\n"

    override __.Finalize() =
        printf "ファイナライザが動いたぜ!\n"

    interface ISerializable with
        // ここでシリアル化する
        member __.GetObjectData( info : SerializationInfo, context : StreamingContext ) =
            info.AddValue( "a", m_Pages, typeof< int[] > )
        
    // ここでシリアル化を解除する
    new( info : SerializationInfo, context : StreamingContext ) =
        let wv = info.GetValue( "a", typeof< int[] > ) :?> int[]
        // デフォルトのコンストラクタを呼ぶ
        new Descriptor( wv )

// 受信側になるアプリケーションドメイン
type Receiver() =
    inherit MarshalByRefObject()

    // データを受信する
    member __.receive ( pages : Descriptor ) =
        ()

// 送信側のアプリケーションドメイン
[<EntryPoint>]
let main argv = 
    // アプリケーションドメインを構築する
    let ad = AppDomain.CreateDomain( "dom1" );

    // dom1のアプリケーションドメインで、
    // Receiverクラスのインスタンスを生成する
    let receiver =
        ad.CreateInstanceAndUnwrap(
            Assembly.GetEntryAssembly().FullName,
            typeof< Receiver >.FullName
        ) :?> Receiver

    let pages = new Descriptor( [| 0; 1; 2; |] )
    receiver.receive pages

    0

実行するとこうなる。



無事に、コンストラクタが2回と、ファイナライザが2回呼ばれていることが確認できる。

これで後は、割り当てられたページ番号として常にDescriptorクラスを使うように気を付けてあげれば、加減算で間違いはないはずである。

ここで、ページの割り当て・解放を管理するクラスの実装を示しておく。

open System
open System.Reflection
open System.Runtime.Serialization
open System.Threading

// ページ番号を保持するクラス
// pos : 割り当てられたページのページ番号(連続するページの開始ページ番号)
// len : 割り当てられたページのページ数(何ページ分連続しているか)
// argTable : ページ管理用クラスの参照
// alloced : 新規に割り当てられたのか否か
[<Serializable>]
type Descriptor( pos : uint16[], len : byte[], argTable : MMapTable, alloced : bool ) =

    let m_PageNums = pos    // 割り当てられたページ番号
    let m_PageCount = len   // 割り当てられたページ数
    let m_Table = argTable  // メモリ管理用のクラス
    let mutable m_Released : bool = false   // 解放済みか否か

    do
        if not alloced then
            // 新規に割り当てられたのではない場合は、参照カウンタを加算する
            // ※新規に割り当てられた場合は、その時点で参照カウンタが1に設定されるため、
            //   ここで再度参照カウンタを加算してはならない。
            m_Table.AddRef m_PageNums m_PageCount

    interface ISerializable with
        // ここでシリアル化する
        member __.GetObjectData( info : SerializationInfo, context : StreamingContext ) =
            // 意外なことに、AddValueを3回呼ぶよりもレコード型にして
            // 1回で登録してしまった方が早い。
            info.AddValue( "a", ( m_PageNums, m_PageCount, m_Table ), typeof< ( uint16[] * byte[] * MMapTable ) > )
        
    // ここでシリアル化を解除する
    new( info : SerializationInfo, context : StreamingContext ) =
        let wpos, wlen, wtbl =
            info.GetValue( "a", typeof< ( uint16[] * byte[] * MMapTable ) > ) :?> ( uint16[] * byte[] * MMapTable )
        // デフォルトのコンストラクタを呼ぶ
        new Descriptor( wpos, wlen, wtbl, false )

    // ファイナライザ
    override __.Finalize() =
        // (Disposeで)参照カウンタの減算が既に行われているのなら、
        // ここでは何も行わない。
        if not m_Released then
            m_Table.Release m_PageNums m_PageCount
            m_Released <- true

    // 明示的なオブジェクトの開放
    interface IDisposable with
        member __.Dispose () =
            // 明らかに不要とわかるのなら、GCの実行を待たずにページを解放する
            m_Table.Release m_PageNums m_PageCount
            // 実は、ここのタイミングで例外が発生すると、参照カウンタを減算しすぎることになる。
            m_Released <- true
            // 逆に、Releaseメソッドの実行前にm_Released <- trueを行うと、
            // その間で例外が発生した場合に、参照カウンタの減算が不足することになる。
            // しかし、可能性として著しく低いものと判断し、単純に無視する。
            // できるのであれば、誰か対処するべし。

// ページの割り当て状況を管理するクラス
and MMapTable( argPageSize : uint32, argPageCount : uint16 ) =
    inherit MarshalByRefObject()

    let m_PageSize = argPageSize    // 1ページのバイト長
    let m_PageCount = argPageCount  // ページ数
    let m_PageStatus : int[] =      // ページごとの参照カウンタの配列
        Array.zeroCreate( int( m_PageCount ) )

    // メモリ領域の割り当てを行う
    // size : 要求するデータ量(バイト数)
    // 戻り値:割り当てられたページのページ番号(連続するページの開始ページ番号)と
    //         割り当てられたページのページ数(何ページ分連続しているか)のレコード。
    //         割り当てに失敗した場合はNoneを返す。
    member this.Allocate ( size : uint32 ) : ( uint16[] * byte[] ) option =
        // 必要になるページ数を求める
        let needPageCount32 =
            if size % m_PageSize > 0u then
                ( size / m_PageSize ) + 1u
            else
                size / m_PageSize

        // 要求ページ数が総ページ数を超える場合や、0ページの場合は何もしない
        if needPageCount32 > uint32 m_PageCount || needPageCount32 = 0u then
            None
        else
            let needPageCount = int( needPageCount32 )
            // メモリ領域の割り当てを試みる
            let rec loop
                ( idx : int )       // 次に確認すべきページ番号
                ( cnt : int )       // 今までに割り当てたページ数
                ( hold : bool )     // curIdx,curCntに有効な値を保持しているか否か
                ( curIdx : int )    // 連続して確保したページの開始ページ番号
                ( curCnt : int )    // 連続して確保したページのページ数
                ( cont : ( ( int * int ) list * bool ) -> ( ( int * int ) list * bool ) ) = // 継続
                
                if cnt >= needPageCount then
                    // 必要なページ数分割り当てた場合
                    if hold then
                        // 最後に割り当てた連続領域も含めて、戻り値のリストを構築する
                        cont ( [ ( curIdx, curCnt ) ], true )
                    else
                        // ここに来ることは無いはずだが
                        cont ( [], true )
                    
                elif idx >= int ( m_PageCount ) then
                    // 要求された分を割り当てられなかった場合
                    // 今まで割り当ててしまった分のリストを構築して返す
                    cont ( [], false )
                
                elif Interlocked.CompareExchange( &m_PageStatus.[idx], 1, 0 ) = 0 then
                    // ページの割り当てに成功した場合

                    if not hold then
                        // 新規に連続領域が開始する場合
                        loop ( idx + 1 ) ( cnt + 1 ) true idx 1 cont
                    elif curCnt >= 255 then
                        // 連続領域のページ数が既に255を超える場合は、別の連続領域とする
                        loop ( idx + 1 ) ( cnt + 1 ) true idx 1 ( fun ( arg, flg ) -> cont ( ( ( curIdx, curCnt ) :: arg ), flg ) )
                    else
                        // 連続領域のページ数のみを加算する
                        loop ( idx + 1 ) ( cnt + 1 ) true curIdx ( curCnt + 1 ) cont
                else
                    // ページの割り当てができなかった場合

                    if not hold then
                        // 割り当てられない領域が継続している場合
                        loop ( idx + 1 ) cnt false 0 0 cont
                    else
                        // 直前までページが割り当てられていた場合
                        loop ( idx + 1 ) cnt false 0 0 ( fun ( arg, flg ) -> cont ( ( ( curIdx, curCnt ) :: arg ), flg ) )

            // 割り当てる
            let rList, rFlg = loop 0 0 false 0 0 ( fun arg -> arg )
            
            if not rFlg then
                // 必要なページ数分割り当てることができなかった場合
                List.iter
                    ( fun ( pos, cnt ) ->
                        // 割り当ててしまったページを解放する
                        for i = 0 to cnt - 1 do
                            this.ReleasePos <| uint16( pos + i )
                    ) rList
                // 失敗したのでNoneを返す
                None
            else
                // 必要なページの割り当てに成功した場合

                let vPos : uint16[] = Array.zeroCreate( rList.Length )
                let vCnt : byte[] = Array.zeroCreate( rList.Length )

                let rec loop ( idx : int ) list =
                    match list with
                    | ( pos, cnt ) :: tail ->
                        vPos.[idx] <- uint16 pos
                        vCnt.[idx] <- byte cnt
                        loop ( idx + 1 ) tail
                    | [] -> ()
                loop 0 rList

                Some( vPos, vCnt )
         
        // 参照カウンタを加算する
        member __.AddRef ( pos : uint16[] ) ( cnt : byte[] ) =
            for i = 0 to pos.Length - 1 do
                for j = 0 to int( cnt.[i] ) - 1 do
                    let idx = int( pos.[i] ) + j
                    Interlocked.Increment ( &( m_PageStatus.[idx] ) ) |> ignore
        
        // 参照カウンタを減算する
        member __.Release( pos : uint16[] ) ( cnt : byte[] ) =
            for i = 0 to pos.Length - 1 do
                for j = 0 to int( cnt.[i] ) - 1 do
                    let idx = int( pos.[i] ) + j
                    if Interlocked.Decrement( &( m_PageStatus.[idx] ) ) < 0 then
                        // 可能性として、減算しすぎる場合も起こりうる。
                        // 無駄な努力だが、一応加算する処理だけ記述しておく
                        Interlocked.Increment ( &( m_PageStatus.[idx] ) ) |> ignore
                        // あるいはこの場合にはプロセスを落としてしまった方がいいかもしれない

        // 参照カウンタを減算する
        member __.ReleasePos( pos : uint16 ) =
            let idx = int pos
            if Interlocked.Decrement( &( m_PageStatus.[idx] ) ) < 0 then
                Interlocked.Increment ( &( m_PageStatus.[idx] ) ) |> ignore

著しく分かりにくい処理だが、まぁがんばれ。


Descriptorクラスは、Disposeメソッドを追加しているだけで、さっきと変わっていない。参照カウンタの加減算も、さして難しいことは行っていない。

分かりにくいのは、MMapTableテーブルクラスのAllocateメソッドである。



ここでは、参照カウンタの配列を先頭から虱潰しに検索して、0の箇所が存在したらInterlocked.CompareExchangeメソッドでアトミックに1を設定している。それにより確保できたページについて、連続して確保したページの開始ページ番号と、連続して確保したページ数を配列に格納して処理結果として返している。

安直に考えるのなら、確保したページのページ番号だけを配列に入れて返せばいい気もするが、ページ番号の配列はアプリケーションドメイン境界を越えて送受信されることになるため、少しでもデータ量を削減するためにこのような面倒なことを行っている。

最も、1ページごと細切れに確保してしまうような最悪の事態が発生した場合には、最大で1.5倍(データ型がuint16とbyteであるため)になってしまうが、そのようなことは多分発生しない。なぜならば、毎回配列を先頭から検索しているためである。

ここも試案のしどころで、ランダムな位置から開始するとか、前回検索を終了した位置から継続するとか、いろいろ試したが、結局これが一番最速であるとの結論に達した。(特に問題になるのが、ランダムな要素が入ると、空き領域があるはずなのに確保できないという問題が発生する可能性があり、その可能性を排除できなかったからだ)

後は、上記プログラムのAddRefとReleaseのメソッドを呼び出す都度、アプリケーションドメイン境界を越えた呼び出しになる問題だけだ。これも、あと少し工夫すれば排除できる目途があるから、気が向いたらまたここに書き散らす。

0 件のコメント: