bash script で並列度を制限しながら並列処理する

masm11 です。

bash script で、時間がかかる仕事を複数の CPU core を有効活用して処理したくなったので、今回はその方法を考えてみました。

要件は以下の通りです。

  • bash script で書く。
  • 時間のかかる仕事やあまりかからない仕事がたくさんある。
  • 各仕事は CPU core を1つしか使わない。
  • CPU core は複数あるので、複数の仕事を並列に処理したい。
  • 各仕事は bash script 内で関数で定義されている。

man bash してたどり着いた結論は、jobs %?ce を使う、ということです。 マニュアルには ce としか書いてありませんが、何でも構いません。 例えば jobs %?foo と実行すると、jobs の実行結果のうち foo を含むものだけを出力してくれます。 foo を含むものがあれば $? が 0 に、なければ 1 になります。

この機能を使うことにしました。 コードは以下のようになります。

#!/bin/sh

job_body () {
    id=$1
    # やりたい仕事。ここでは仮に引数に渡された時間だけ sleep することにする。
    echo "$(date +%H:%M:%S) $id enter."
    sleep $2
    echo "$(date +%H:%M:%S) $id leave."
}

process_pool () {
    JOBIDS=([0]=JOBID_a [1]=JOBID_b)
    while :; do
        for id in ${JOBIDS[*]}; do
            if jobs %?${id} > /dev/null 2>&1; then
                # このジョブはまだ実行中。
                :
            else
                if read args; then
                    # 実行が終了している。新たなジョブを投入。
                    eval "job_body $id $args &"
                else
                    # 新たなジョブもない。
                    if jobs %?JOBID_ > /dev/null 2>&1; then
                        # 何らかのジョブがまだ実行中。
                        :
                    else
                        # 新たなジョブもないし、実行中のジョブもない。終了。
                        return
                    fi
                fi
            fi
        done
        sleep 1
    done
}

# process_pool にジョブを渡す
cat <<EOT | process_pool
10
20
30
40
EOT

ジョブは2つまで並列実行可能として、JOBID_a, JOBID_b とします。 最初にその ID を配列で定義しておいて、あとは以下を繰り返しています。

  1. ジョブがまだ実行中なら何もしない。
  2. ジョブが終了しているなら、新たなジョブがあれば起動する。
  3. 新たなジョブがなければ、実行中のジョブがあるか確認して、あれば待機。
  4. それもなければ、何もないので終了。

関数名が process_pool になっていますが、process pool でもなんでもなく、 単に終了したら新たに起動しているだけです。 id が不要であれば、jobs の個数を数えるだけでも良いかもしれません。

結果は以下の通り。

% bash ./test.sh
22:53:29 JOBID_a enter.
22:53:29 JOBID_b enter.
22:53:39 JOBID_a leave.
22:53:40 JOBID_a enter.
22:53:49 JOBID_b leave.
22:53:50 JOBID_b enter.
22:54:10 JOBID_a leave.
22:54:30 JOBID_b leave.
% 

JOBID_aJOBID_b が並列に動作しているのがわかります。うまくいってますね。

上記のコードで細かいことを言えば、個人的には eval を使っているのが気に入っていません。このままでは たまたま $argsJOBID_a が含まれていた場合に誤動作してしまいます。 ここは今後の課題です。

ボツ案として、実は以下のようなものも考えました。

worker () {
    id=$1
    while read a; do
        job_body $id $a
    done
}

process_pool () {
    worker a &
    worker b &
    wait
}

しかし、read が一行だけ read() してくれる可能性が低いので、諦めました。 こちらの方が process pool っぽいのですが。