Ruby のコードを読む! 別スレッドで例外を起こす

f:id:masm11:20211222213724p:plain

こんにちは masm11 です。

以前の記事で Timeout.timeout のコードを読みました。 記事はこちらです。

blog.ingage.jp

新たなスレッドを起動し、時間になったら元のスレッドで 例外を発生させているらしい、ということは解りました。

しかし、それだけでは socket の read を途中でタイムアウト させることはできません。read でブロックしている間は 例外なんか無力だからです。

今回は、例外をどのように即時発生させているのか、について Ruby の実装を追いかけてみたいと思います。

今回読んだ Ruby は Ruby 3.0.2 です。

割り込み側

例外を起こす際に実行していたのは、Thread#raise でしたね。 まずはここを起点にしてみます。 thread.c に以下の行があります。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L5469

    rb_define_method(rb_cThread, "raise", thread_raise_m, -1);

thread_raise_m が該当の関数のようです。この関数の中を見ていきます。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L2649

    threadptr_check_pending_interrupt_queue(target_th);

既に割り込みがあったかどうかをチェックしているようです。 今は関係なさそうなので、次へ行きましょう。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L2650

    rb_threadptr_raise(target_th, argc, argv);

これですね。該当スレッドで raise してそうです。

rb_threadptr_raise の中を見ると、例外を生成したり、スレッドの生死判定を したりしています。最後に

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L2538

    rb_threadptr_interrupt(target_th);

という行があります。割り込みをかけていそうですね。

その実装が以下になります。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L487-L511

static void
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
{
    rb_native_mutex_lock(&th->interrupt_lock);

    if (trap) {
        RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
    }
    else {
        RUBY_VM_SET_INTERRUPT(th->ec);
    }
    if (th->unblock.func != NULL) {
        (th->unblock.func)(th->unblock.arg);
    }
    else {
        /* none */
    }
    rb_native_mutex_unlock(&th->interrupt_lock);
}

void
rb_threadptr_interrupt(rb_thread_t *th)
{
    rb_threadptr_interrupt_common(th, 0);
}

trap が 0 なので、意味のありそうな行と言えば、

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L496

        RUBY_VM_SET_INTERRUPT(th->ec);

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L498-L500

    if (th->unblock.func != NULL) {
        (th->unblock.func)(th->unblock.arg);
    }

の2つですね。前者は

https://github.com/ruby/ruby/blob/v3_0_2/vm_core.h#L1876

#define RUBY_VM_SET_INTERRUPT(ec)               ATOMIC_OR((ec)->interrupt_flag, PENDING_INTERRUPT_MASK)

ということで、フラグを立てているだけで、ターゲットスレッドの中断そのものでは なさそうです。

あとは後者なのですが、何をやっているのかさっぱり解りません。

割り込まれ側

仕方がないので、割り込まれる方を見ていきます。

どのメソッドでもいいのですが、IO#read にしてみましょう。

io.c に

https://github.com/ruby/ruby/blob/v3_0_2/io.c#L13478

    rb_define_method(rb_cIO, "read",  io_read, -1);

とあります。io_read が該当の関数のようです。

io_read はちょっと大きめですが、

https://github.com/ruby/ruby/blob/v3_0_2/io.c#L3302

    n = io_fread(str, 0, len, fptr);

ここで read してそうです。

io_fread を読んでみると、以下の行があります。

https://github.com/ruby/ruby/blob/v3_0_2/io.c#L2650

    rb_str_locktmp_ensure(str, bufread_call, (VALUE)&arg);

rb_str_locktmp_ensure は、String オブジェクトに排他制御のフラグを 立てた状態で bufread_call を読んでいるようです。

bufread_call は3行しかない関数で、以下の行があります。

https://github.com/ruby/ruby/blob/v3_0_2/io.c#L2625

    p->len = io_bufread(p->str_ptr, p->len, p->fptr);

io_bufread はちょっと大きめですが、見たところ、 読みすぎたデータがなければ、read して、必要に応じてリトライしてる だけのようです。rb_read_internal を呼んでいます。

https://github.com/ruby/ruby/blob/v3_0_2/io.c#L2586

            c = rb_read_internal(fptr->fd, ptr+offset, n);

rb_read_internal は構造体を用意して、rb_thread_io_blocking_region を 呼んでいるだけです。

https://github.com/ruby/ruby/blob/v3_0_2/io.c#L1136

    return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd);

ここでスレッドの処理が出てきました。internal_read_func は read して エラーの場合にリトライしているだけですので、rb_thread_io_blocking_region に 秘密がありそうです。

ここからは thread.c を見ていきます。

rb_thread_io_blocking_region はいきなり複雑で面食らうのですが、 中程に BLOCKING_REGION というマクロを使っています。名前からして ブロックする処理をしていそうです。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L1810-L1817

    EC_PUSH_TAG(ec);
    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
        BLOCKING_REGION(wfd->th, {
            val = func(data1);
            saved_errno = errno;
        }, ubf_select, wfd->th, FALSE);
    }
    EC_POP_TAG();

このマクロの2つめの引数がコードになっていて、func を呼んでいます。 func は引数に渡されている internal_read_func のことです。

BLOCKING_REGION の定義は以下のようになっています。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L191-L199

#define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \
    struct rb_blocking_region_buffer __region; \
    if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
        /* always return true unless fail_if_interrupted */ \
        !only_if_constant(fail_if_interrupted, TRUE)) { \
        exec; \
        blocking_region_end(th, &__region); \
    }; \
} while(0)

マクロにしてはちょっと長いですね。begin と end に囲まれて、2つめの引数 exec を実行しています。 blocking_region_begin は何をしているのでしょうか。

以下の行があります。

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L1614

    if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {

そして unblock_function_set はというと、

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L472-L473

    th->unblock.func = func;
    th->unblock.arg = arg;

こんなものがありました。これは割り込み側で呼び出していました。

はて? どういうことでしょう? func って何? 怪しい…

func は unblock_function_set の2つめの引数です。 呼び出し元をたどりながら func の実体を探すと…

https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L1812-L1815

        BLOCKING_REGION(wfd->th, {
            val = func(data1);
            saved_errno = errno;
        }, ubf_select, wfd->th, FALSE);

この ubf_select でした。これは一体何でしょうか?

ubf_select は thread_pthread.c にあります。 この関数の一番最後に次の行があります。

https://github.com/ruby/ruby/blob/v3_0_2/thread_pthread.c#L1334

    ubf_wakeup_thread(th);

ubf_wakeup_thread は何をしているかというと、

https://github.com/ruby/ruby/blob/v3_0_2/thread_pthread.c#L1290-L1299

/*
 * send a signal to intent that a target thread return from blocking syscall.
 * Maybe any signal is ok, but we chose SIGVTALRM.
 */
static void
ubf_wakeup_thread(rb_thread_t *th)
{
    thread_debug("thread_wait_queue_wakeup (%"PRI_THREAD_ID")\n", thread_id_str(th));
    pthread_kill(th->thread_id, SIGVTALRM);
}

pthread_kill! ありました!

まとめ

つまりどういうことでしょうか?

割り込まれる側はあらかじめ割り込み用の関数を登録しておき、 割り込み側はその関数を呼んで、結局のところ pthread_kill で 該当スレッドに非同期シグナルを送っていたわけです。

非同期シグナルで割り込まれたら、あとは例外の扱いはなんとでも なりますね。

あぁすっきりした。

インゲージでは C も読める古株エンジニアも募集しています。 詳細は以下のページへ!

ingage.co.jp

では良いお年を!