マルチプロセス+マルチスレッド同居の闇を試す!

f:id:masm11:20220413205641p:plain

こんにちは、masm11 です。

スレッドは皆さんご存知のことと思います。 並列処理をしたい場合によく使っていると思います。

プロセスの方があまり知られていないのかも知れません。 スレッドより昔からあるのですが…。 一つ以上のスレッドとメモリ空間等のリソースを合わせたもの、 とでも表現しましょうか。

どちらも、増やすことで並列処理をすることができます。 それぞれ、マルチスレッドやマルチプロセスと言われます。

マルチプロセスは、メモリ空間からして独立しているので、 完全に独立した処理を並列に実行したい場合によく使われます。 それに対してマルチスレッドはスレッドどうしが影響を与え合う 場合によく使われます。

しかし、マルチプロセスとマルチスレッドの共存は、 基本的にはご法度です。 理由は簡単で、デッドロックすることがあるからです。

今回は、その場面を見てみましょう。

コード説明

ではまず、今回使用するコードをお見せします。

#!/usr/bin/env python

import sys
import os
import threading
import time

mutex = threading.Lock()

def work1():
    while True:
        time.sleep(1)
        pid = os.fork()
        mutex.acquire()
        mutex.release()
        if pid == 0:
            sys.exit(0)

def work2():
    while True:
        mutex.acquire()
        time.sleep(0.01)
        mutex.release()

thread = threading.Thread(target=work2)
thread.start()

work1()

まずこのコードの終わりの方から説明します。

thread = threading.Thread(target=work2)
thread.start()

スレッドを一つ起動し、work2 関数を実行しています。

work1()

こちらは、現在のスレッドで work1 関数を実行しています。

つまり、2つのスレッドがそれぞれ work1 と work2 を実行する、 ということですね。マルチスレッドです。

work2 を見てみます。

    while True:

無限ループです。その中で何をやっているかと言うと、

        mutex.acquire()
        time.sleep(0.01)
        mutex.release()

mutex の獲得と解放を繰り返しています。 その中で sleep しているのは、別のスレッドがそのタイミングで 処理をしやすくするためです。

一方、work1 は何をしているかと言うと、

    while True:

こちらも無限ループです。その中で何をやっているかと言うと、

        time.sleep(1)

これは処理をゆっくりにするために入れてあります。 超高速でループを繰り返すと、わけがわからなくなりますので…

        pid = os.fork()

プロセスをコピー (fork) しています。 返り値は、子プロセス (コピー側) では 0 が返り、 親プロセス (コピー元) では 0 以外が返ります。

        mutex.acquire()
        mutex.release()

そして、mutex を獲得、解放しています。

        if pid == 0:
            sys.exit(0)

子プロセスの場合はそのまま終了しています。

結局たいしたことは何もしていません。 work2 側の acquire と release の間で、work1 側で fork してみて、 その後 mutex がちゃんと acquire できるか、を確認しています。

実行例

このプログラムを test-dl.py という名前で保存し、実行します。

luna:~ % python test-dl.py

何も出力はありません。

実行したまま、別端末を起動し、様子を見ます。

luna:~ % ps auxww | grep test-dl 
masm      222267  1.1  0.0  90660  8320 pts/1    Sl+  23:01   0:00 python test-dl.py
masm      222269  0.0  0.0  90660  5572 pts/1    S+   23:01   0:00 python test-dl.py
masm      222387  0.0  0.0  90660  5572 pts/1    S+   23:02   0:00 python test-dl.py
masm      222489  0.0  0.0  90660  5572 pts/1    S+   23:02   0:00 python test-dl.py
masm      222539  0.0  0.0   9044  2052 pts/2    S+   23:02   0:00 grep test-dl
luna:~ % 

何度も ps auxww | grep test-dl を実行すると、 ゆっくり増えていることが判ります。

プログラムをもう一度よく見てみましょう。 work1 で、親プロセスは無限にループしていますが、 子プロセスは必ず終了しています。 にも関わらず、プロセスが増えているのです。

何が考えられるでしょうか?

fork したけど exit までたどり着けていない。つまり acquire で待ち続けているのです。

デッドロックですね!

何故?

Linux では fork の際に現在のスレッドしかコピーされません。 子プロセスでは、他のスレッドが存在しないのです。 マニュアルにも記載があります。

子プロセスはシングルスレッドで生成される。つまり、 fork() を呼び出したスレッドとなる。

だから、work2 の acquire と release の間のタイミングで fork してしまうと、子プロセスでは mutex を release する者がおらず、 work1 側が acquire で待ち続けてしまうのです。

どう回避する?

上のページにヒントがあります。pthread_atfork です。ただし、それは C の話です。 python の場合は、os.register_at_fork を使います。

この関数を使うと、fork の前後 (後の場合は、親プロセスと子プロセスそれぞれで) に呼んで欲しい関数を指定できます。

具体的には以下のように使います。先程のコードからの差分だけ示します。

--- test-dl.py    2022-03-03 22:29:23.807615679 +0900
+++ test-dl-2.py    2022-04-13 20:28:08.081962854 +0900
@@ -22,6 +22,17 @@
         time.sleep(0.01)
         mutex.release()
 
+def acquire():
+    mutex.acquire()
+
+def release():
+    mutex.release()
+
+os.register_at_fork(
+    before = acquire,
+    after_in_parent = release,
+    after_in_child = release)
+
 thread = threading.Thread(target=work2)
 thread.start()

こうすることで、fork 前に mutex を acquire しておき、 work2 が mutex.acquire() するのを阻止することができます。 その状態で fork すると、 もちろん親プロセス・子プロセス両方でmutex は獲得されたままですので、 両プロセスで mutex.release() で解放してやります。

こちらを実行して数分後…

luna:~ % ps auxww | grep test-dl-2 
masm      439621  1.3  0.0  90600  7424 pts/1    Sl+  20:42   0:02 python test-dl-2.py
masm      440473  0.0  0.0   9044  1776 pts/2    S+   20:45   0:00 grep test-dl-2
luna:~ % 

増えてませんね。

まとめ

テストプログラムを提示し、デッドロックを発生させ、 理由と回避方法案を提示しました。

ただ、自分が書いたコードならこういう解決策がありますが、 python のモジュールに含まれるコードが対応しているとは限りません。 /usr/lib/python3.10/ 以下を grep してみたところ、いくつかの モジュールは対応してるようですね。

インゲージではまだまだエンジニアを募集しています。 詳細は以下のページへお願いします!

https://ingage.co.jp/recruit/