Rails のコネクションプールから接続を取り出す処理を追う

こんにちは、masm11 です。

弊社では PostgreSQL のデータベースを Amazon Aurora に移そうとしていますが、 フェイルオーバー時の処理が気になっています。 ググってみたところ、MySQL の情報はたくさん出てくるのですが、PostgreSQL の情報は 少なく、欲しい情報が出てきません。

今回は、弊社で使用している Rails 5.1 でフェイルオーバー時の動作をコードレベルで追いかけてみます。

unicorn は都度接続なのか?

その前に、確認しておきたいことが一つありました。 unicorn が HTTP リクエストごとに DB に接続/切断しているのかどうかです。

config/database.yml には、以下のように設定してあります。

staging:
  adapter:   postgresql
  database:  ...
  username:  ...
  password:  ...
  host:      ...
  statement_limit:      ...

伏せ字ばかりですが、pool: の設定はしていません。

activerecord-5.1.7/lib/active_record/connection_adapters/abstract/connection_pool.rb に以下のコードがあります。

        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5

省略した場合、デフォルトは 5 ですね。プールしていそうです。

他の確認もしてみました。

$ netstat -anp | grep :5432
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp        0      0 172.31.30.216:24023     172.31.26.254:5432      ESTABLISHED 13853/unicorn worke 
(以下略)

少し時間が経った後、もう一度確認したところ、:24023 の部分が変わっていませんでした。 都度接続しているなら、ここが変わっていくはずです。

やはりコネクションをプールしているようです。 unicorn はシングルスレッドで、コネクションプールから接続を取得して返却し取得して返却し… を 繰り返しているので、デフォルトの 5 でもプロセスごとに1本しか使っていないのでしょう。

都度接続なら、そのたびに Aurora の cluster endpoint を DNS で引けば常に master を向いており、 フェイルオーバー時も新しい master を向いているので、そのまま何もしなくて良いはずです。 しかし、今コネクションプールを使っているアプリケーションを(なんとかして)都度接続に変更すると、 接続のコストが高すぎるように思います。

やはり、フェイルオーバーで接続が切れた場合にどうなるのか、処理を追う必要がありそうです。

Rails のコネクションプールの処理を追う

接続を取得する際によく使うメソッドは ActiveRecord::Base.connection ですね。ここをスタート地点にします。

activerecord-5.1.7/lib/active_record/base.rb では以下のようになっています。なお、途中のコメントやコードは省略して、必要な部分のみ引用しています。

module ActiveRecord #:nodoc:
  class Base
    extend ConnectionHandling

そして activerecord-5.1.7/lib/active_record/connection_handling.rb

module ActiveRecord
  module ConnectionHandling
    # Returns the connection currently associated with the class. This can
    # also be used to "borrow" the connection to do database work unrelated
    # to any of the specific Active Records.
    def connection
      retrieve_connection
    end

とあります。スタート地点はここのようです。

同じ module の中に以下がありました。

    def retrieve_connection
      connection_handler.retrieve_connection(connection_specification_name)
    end

retrieve_connection メソッドを呼び出しています。同名ですが別のメソッドのようです。 探してみたところ、activerecord-5.1.7/lib/active_record/connection_adapters/abstract/connection_pool.rb に以下のメソッドがありました。

module ActiveRecord
  module ConnectionAdapters
    class ConnectionHandler
      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(spec_name) #:nodoc:
        pool = retrieve_connection_pool(spec_name)
        raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found." unless pool
        conn = pool.connection
        raise ConnectionNotEstablished, "No connection for '#{spec_name}' in connection pool" unless conn
        conn
      end

クラス名から考えて、おそらくこれでしょう。 該当するコネクションプールを取得して、pool.connection で接続を取得しているようです。 同じファイルの中に以下のメソッドがあります。

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPool
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a cache keyed by a thread.
      def connection
        @thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout
      end

ここでは、スレッドごとに接続をキャッシュしているようです。キャッシュになければ checkout するのですね。 checkout を見ていきましょう。

      def checkout(checkout_timeout = @checkout_timeout)
        checkout_and_verify(acquire_connection(checkout_timeout))
      end

checkout_and_verify。それっぽい名前ですね。接続が生きているか確認してそうです。このメソッドを追います。

        def checkout_and_verify(c)
          c._run_checkout_callbacks do
            c.verify!
          end
          c
        rescue
          remove c
          c.disconnect!
          raise
        end

なんだか複雑です。引数 c には acquire_connection の返り値が渡るので、接続ですね。 c.verify! でチェックしてそうです。

verify!activerecord-5.1.7/lib/active_record/connection_adapters/abstract_adapter.rb にあります。

      def verify!(*ignored)
        if ignored.size > 0
          ActiveSupport::Deprecation.warn("Passing arguments to #verify method of the connection has no effect and has been deprecated. Please remove all arguments from the #verify method call.")
        end
        reconnect! unless active?
      end

active?reconnect! が出てきました。一つずつ見ていきます。

active? は同じファイルの中にもあるのですが、

      def active?
      end

と空っぽです。おそらく他の場所でクラスを継承してオーバーライドしているのでしょう。

activerecord-5.1.7/lib/active_record/connection_adapters/postgresql_adapter.rb にありました。

      def active?
        @lock.synchronize do
          @connection.query "SELECT 1"
        end
        true
      rescue PG::Error
        false
      end

SELECT 1 を実行してみて、問題なければ true、失敗なら false を返しているだけですね。 接続が生きているかの確認はここにありました。

active? は理解したので、引き続き reconnect! を見てみます。

activerecord-5.1.7/lib/active_record/connection_adapters/abstract_adapter.rb

      # Disconnects from the database if already connected, and establishes a
      # new connection with the database. Implementors should call super if they
      # override the default implementation.
      def reconnect!
        clear_cache!
        reset_transaction
      end

とあって、それだけ?? と思いましたが、activerecord-5.1.7/lib/active_record/connection_adapters/postgresql_adapter.rb にもありました。

      # Close then reopen the connection.
      def reconnect!
        @lock.synchronize do
          super
          @connection.reset
          configure_connection
        end
      end

super で先に上げた方の reconnect! を呼び出しているんですね。 そして @connection.reset です。

@connection は同じファイルで以下のように作っています。

        def connect
          @connection = PG.connect(@connection_parameters)
          configure_connection
        rescue ::PG::Error => error
          if error.message.include?("does not exist")
            raise ActiveRecord::NoDatabaseError
          else
            raise
          end
        end

PG.connect で作っています。PG は pg という gem で定義されています。 つまり @connection.resetreset メソッドは pg gem の方にあります。

せっかくここまで来たので、ついでに見ていきます。ここからは C 言語になります。

pg-0.21.0/ext/pg_connection.c で以下のようになっています。

 rb_define_method(rb_cPGconn, "reset", pgconn_reset, 0);

ruby でいう reset というメソッドは、C では pgconn_reset() 関数なのですね。 この関数は以下のようになっています。

/*
 * call-seq:
 *    conn.reset()
 *
 * Resets the backend connection. This method closes the
 * backend connection and tries to re-connect.
 */
static VALUE
pgconn_reset( VALUE self )
{
        pgconn_close_socket_io( self );
        gvl_PQreset( pg_get_pgconn(self) );
        return self;
}

いろいろやってますね。一つずつ見ていきます。

pgconn_close_socket_io() は以下の通りです。

static void
pgconn_close_socket_io( VALUE self )
{
        t_pg_connection *this = pg_get_connection( self );
        VALUE socket_io = this->socket_io;

        if ( RTEST(socket_io) ) {
#if defined(_WIN32) && defined(HAVE_RB_W32_WRAP_IO_HANDLE)
                int ruby_sd = NUM2INT(rb_funcall( socket_io, rb_intern("fileno"), 0 ));
                if( rb_w32_unwrap_io_handle(ruby_sd) ){
                        rb_raise(rb_eConnectionBad, "Could not unwrap win32 socket handle");
                }
#endif
                rb_funcall( socket_io, rb_intern("close"), 0 );
        }

        this->socket_io = Qnil;
}

見づらいので、windows 専用コードを削除してみると、以下のようになります。

static void
pgconn_close_socket_io( VALUE self )
{
        t_pg_connection *this = pg_get_connection( self );
        VALUE socket_io = this->socket_io;

        if ( RTEST(socket_io) ) {
                rb_funcall( socket_io, rb_intern("close"), 0 );
        }

        this->socket_io = Qnil;
}

ruby レベルで socket を close しているようです。

次に、pg_get_pgconn() は以下のようになっています。

PGconn *
pg_get_pgconn( VALUE self )
{
        t_pg_connection *this;
        Data_Get_Struct( self, t_pg_connection, this);

        if ( !this->pgconn )
                rb_raise( rb_eConnectionBad, "connection is closed" );

        return this->pgconn;
}

接続に対応する構造体のポインタを取得しているだけのようです。

最後に gvl_PQreset() です。パッと見、見つかりませんでしたが、gvl_wrappers.c にありました。

FOR_EACH_BLOCKING_FUNCTION( DEFINE_GVL_STUB );

FOR_EACH_BLOCKING_FUNCTION はマクロです。gvl_wrappers.h で以下のようになっています。

#define FOR_EACH_BLOCKING_FUNCTION(function) \
  (略)
        function(PQreset, GVL_TYPE_VOID, void, PGconn *, conn) \
  (略)

DEFINE_GVL_STUB が引数 function に渡されるので、

        DEFINE_GVL_STUB(PQreset, GVL_TYPE_VOID, void, PGconn *, conn)

となります。そして DEFINE_GVL_STUB がまたマクロで、以下のようになっています。

        #define DEFINE_GVL_STUB(name, when_non_void, rettype, lastparamtype, lastparamname) \
                rettype gvl_##name(FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST3) lastparamtype lastparamname){ \
                        return name( FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST1) lastparamname ); \
                }

渡された引数を使って展開すると、以下のようになります。

                void gvl_PQreset(FOR_EACH_PARAM_OF_PQreset(DEFINE_PARAM_LIST3) PGConn *conn){
                        return PQreset( FOR_EACH_PARAM_OF_PQreset(DEFINE_PARAM_LIST1) conn );
                }

FOR_EACH_PARAM_OF_PQreset もマクロで、以下のように定義されています。

#define FOR_EACH_PARAM_OF_PQreset(param)

空です。これも展開すると、以下のようになります。

                void gvl_PQreset(PGConn *conn){
                        return PQreset(conn);
                }

たくさんのコードを列挙しなくて済むようマクロが使われていたので、解読に少し手間取りましたが、 結局 gvl_PQreset()PQreset() を呼んでいるだけですね。

PQreset() のドキュメントはこちらにあります。 同じパラメータを使って同じサーバへ接続するそうです。

まとめ

ActiveRecord::Base.connection は、コネクションプールから接続を取得して返します。 コネクションプールは配列から接続を取り出して、生きていればそれを返し、死んでいたら PQreset() で 復活させてからそれを返すようです。

PQreset() のドキュメントを読むと、同じパラメータを使って同じサーバへ接続するそうでが、 接続時のパラメータとして指定するのはホスト名であって IP アドレスは指定しません。 接続しなおす時に DNS を引き直してくれるのかどうかは判りませんでした。 DNS を引き直さない場合、本当に「同じサーバ」(同じインスタンス) になってしまい、 そこはおそらく read-only です。

ドキュメントの説明が若干曖昧なので、あとは実際にテストしてみるしかないかな、と思いました。