こんにちは masm11 です。
以前の記事で Timeout.timeout のコードを読みました。 記事はこちらです。
新たなスレッドを起動し、時間になったら元のスレッドで 例外を発生させているらしい、ということは解りました。
しかし、それだけでは socket の read を途中でタイムアウト させることはできません。read でブロックしている間は 例外なんか無力だからです。
今回は、例外をどのように即時発生させているのか、について Ruby の実装を追いかけてみたいと思います。
今回読んだ Ruby は Ruby 3.0.2 です。
割り込み側
例外を起こす際に実行していたのは、Thread#raise
でしたね。
まずはここを起点にしてみます。
thread.c に以下の行があります。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L5469
rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
thread_raise_m が該当の関数のようです。この関数の中を見ていきます。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L2649
threadptr_check_pending_interrupt_queue(target_th);
既に割り込みがあったかどうかをチェックしているようです。 今は関係なさそうなので、次へ行きましょう。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L2650
rb_threadptr_raise(target_th, argc, argv);
これですね。該当スレッドで raise してそうです。
rb_threadptr_raise の中を見ると、例外を生成したり、スレッドの生死判定を したりしています。最後に
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L2538
rb_threadptr_interrupt(target_th);
という行があります。割り込みをかけていそうですね。
その実装が以下になります。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L487-L511
static void rb_threadptr_interrupt_common(rb_thread_t *th, int trap) { rb_native_mutex_lock(&th->interrupt_lock); if (trap) { RUBY_VM_SET_TRAP_INTERRUPT(th->ec); } else { RUBY_VM_SET_INTERRUPT(th->ec); } if (th->unblock.func != NULL) { (th->unblock.func)(th->unblock.arg); } else { /* none */ } rb_native_mutex_unlock(&th->interrupt_lock); } void rb_threadptr_interrupt(rb_thread_t *th) { rb_threadptr_interrupt_common(th, 0); }
trap が 0 なので、意味のありそうな行と言えば、
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L496
RUBY_VM_SET_INTERRUPT(th->ec);
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L498-L500
if (th->unblock.func != NULL) { (th->unblock.func)(th->unblock.arg); }
の2つですね。前者は
https://github.com/ruby/ruby/blob/v3_0_2/vm_core.h#L1876
#define RUBY_VM_SET_INTERRUPT(ec) ATOMIC_OR((ec)->interrupt_flag, PENDING_INTERRUPT_MASK)
ということで、フラグを立てているだけで、ターゲットスレッドの中断そのものでは なさそうです。
あとは後者なのですが、何をやっているのかさっぱり解りません。
割り込まれ側
仕方がないので、割り込まれる方を見ていきます。
どのメソッドでもいいのですが、IO#read
にしてみましょう。
io.c に
https://github.com/ruby/ruby/blob/v3_0_2/io.c#L13478
rb_define_method(rb_cIO, "read", io_read, -1);
とあります。io_read が該当の関数のようです。
io_read はちょっと大きめですが、
https://github.com/ruby/ruby/blob/v3_0_2/io.c#L3302
n = io_fread(str, 0, len, fptr);
ここで read してそうです。
io_fread を読んでみると、以下の行があります。
https://github.com/ruby/ruby/blob/v3_0_2/io.c#L2650
rb_str_locktmp_ensure(str, bufread_call, (VALUE)&arg);
rb_str_locktmp_ensure は、String オブジェクトに排他制御のフラグを 立てた状態で bufread_call を読んでいるようです。
bufread_call は3行しかない関数で、以下の行があります。
https://github.com/ruby/ruby/blob/v3_0_2/io.c#L2625
p->len = io_bufread(p->str_ptr, p->len, p->fptr);
io_bufread はちょっと大きめですが、見たところ、 読みすぎたデータがなければ、read して、必要に応じてリトライしてる だけのようです。rb_read_internal を呼んでいます。
https://github.com/ruby/ruby/blob/v3_0_2/io.c#L2586
c = rb_read_internal(fptr->fd, ptr+offset, n);
rb_read_internal は構造体を用意して、rb_thread_io_blocking_region を 呼んでいるだけです。
https://github.com/ruby/ruby/blob/v3_0_2/io.c#L1136
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd);
ここでスレッドの処理が出てきました。internal_read_func は read して エラーの場合にリトライしているだけですので、rb_thread_io_blocking_region に 秘密がありそうです。
ここからは thread.c を見ていきます。
rb_thread_io_blocking_region はいきなり複雑で面食らうのですが、 中程に BLOCKING_REGION というマクロを使っています。名前からして ブロックする処理をしていそうです。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L1810-L1817
EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { BLOCKING_REGION(wfd->th, { val = func(data1); saved_errno = errno; }, ubf_select, wfd->th, FALSE); } EC_POP_TAG();
このマクロの2つめの引数がコードになっていて、func を呼んでいます。 func は引数に渡されている internal_read_func のことです。
BLOCKING_REGION の定義は以下のようになっています。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L191-L199
#define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \ struct rb_blocking_region_buffer __region; \ if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ /* always return true unless fail_if_interrupted */ \ !only_if_constant(fail_if_interrupted, TRUE)) { \ exec; \ blocking_region_end(th, &__region); \ }; \ } while(0)
マクロにしてはちょっと長いですね。begin と end に囲まれて、2つめの引数 exec を実行しています。 blocking_region_begin は何をしているのでしょうか。
以下の行があります。
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L1614
if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
そして unblock_function_set はというと、
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L472-L473
th->unblock.func = func; th->unblock.arg = arg;
こんなものがありました。これは割り込み側で呼び出していました。
はて? どういうことでしょう? func って何? 怪しい…
func は unblock_function_set の2つめの引数です。 呼び出し元をたどりながら func の実体を探すと…
https://github.com/ruby/ruby/blob/v3_0_2/thread.c#L1812-L1815
BLOCKING_REGION(wfd->th, { val = func(data1); saved_errno = errno; }, ubf_select, wfd->th, FALSE);
この ubf_select でした。これは一体何でしょうか?
ubf_select は thread_pthread.c にあります。 この関数の一番最後に次の行があります。
https://github.com/ruby/ruby/blob/v3_0_2/thread_pthread.c#L1334
ubf_wakeup_thread(th);
ubf_wakeup_thread は何をしているかというと、
https://github.com/ruby/ruby/blob/v3_0_2/thread_pthread.c#L1290-L1299
/* * send a signal to intent that a target thread return from blocking syscall. * Maybe any signal is ok, but we chose SIGVTALRM. */ static void ubf_wakeup_thread(rb_thread_t *th) { thread_debug("thread_wait_queue_wakeup (%"PRI_THREAD_ID")\n", thread_id_str(th)); pthread_kill(th->thread_id, SIGVTALRM); }
pthread_kill! ありました!
まとめ
つまりどういうことでしょうか?
割り込まれる側はあらかじめ割り込み用の関数を登録しておき、 割り込み側はその関数を呼んで、結局のところ pthread_kill で 該当スレッドに非同期シグナルを送っていたわけです。
非同期シグナルで割り込まれたら、あとは例外の扱いはなんとでも なりますね。
あぁすっきりした。
インゲージでは C も読める古株エンジニアも募集しています。 詳細は以下のページへ!
では良いお年を!