ActiveJob で queue を動的に指定する3つの方法

おはようございます。 @shutooike です!

業務上であるジョブのキューを条件によって動的に変えたい場面に遭遇し、ActiveJobのソースを読んだのでメモを残しておきます。

前提条件

Rails version: 5.2

queue_adapter: Sidekiq

対象のジョブクラスはこんな感じ

class SampleJob < ApplicationJob
  queue_as 'default'
  
  rescue_from(StandardError) do
    retry_job(wait: 5.minutes)
  end
  
  def perform(args)
    do_something
  end
end

方法1

ActiveJob::Core.setActiveJob::Enqueuing#enqueuequeue を渡す

ジョブの呼び出し元で制御するという思いつく限り一番簡単な方法です。

実装

# set を使うパターン
if ConditionModel.exists?(id: condition.id)
  SampleJob.set(queue: 'another_queue').perform_later(args)
else
  SampleJob.perform_later(args)
end

# enqueue に渡すパターン
if ConditionModel.exists?(id: condition.id)
  SampleJob.new(args).enqueue(queue: 'another_queue')
else
  SampleJob.perform_later(args)
end

内部をチラッと覗くと

module ActiveJob
  module Core
...
    module ClassMethods
...
      def set(options = {})
        ConfiguredJob.new(self, options)
      end
    end
...

ActiveJob::Core.set 内で ActiveJob::ConfiguredJob というインスタンスが作られます。

module ActiveJob
  class ConfiguredJob #:nodoc:
    def initialize(job_class, options = {})
      @options = options
      @job_class = job_class
    end
...
    def perform_later(*args)
      @job_class.new(*args).enqueue @options
    end
  end
end

ActiveJob::ConfiguredJob#perform_later はジョブのインスタンスを作って ActiveJob::Enqueuing#enqueue を呼ぶので、enqueue に渡すパターンと行き着く先は同じです。

#enqueue の実装は以下のようになっており、 options['queue'] があれば ジョブインスタンスの queue_name を上書きします。

module ActiveJob
  module Enqueuing
...    
    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end
      end
      self
    end

今回の変更対象のジョブはいろんなところで呼ばれていて、呼び出し元を変更するコストが高かったのでこの方法は 不採用

方法2

before_enqueue#queue_name を動的に変更する

ActiveJob は他の Rails のライブラリと同様コールバックを持っています。

今回はエンキュー前に呼ばれる必要があるので、before_enqueue コールバックを使います。

実装

class SampleJob < ApplicationJob
  queue_as 'default'
  
  rescue_from(StandardError) do
    retry_job(wait: 5.minutes)
  end
  
+  before_enqueue do |job|
+    if ConditionModel.exists?(id: job.arguments[0]) 
+      job.queue_name = 'another_queue'
+    end
+  end

...

これだと変更箇所が一箇所で済むのでいい感じです。

ただ、今回の変更対象のジョブは上の実装のようにジョブの引数を使ってDBにアクセスし queue を変更するか確認する必要がありました。

もしジョブ実行中にDBアクセスに失敗してエラーになった場合、retry_job 時にも before_enqueue が呼ばれてDBアクセスでエラーになり、ジョブのエンキューされないまま失敗してしまう可能性があると指摘をいただきました。なので方法2も 不採用

ただ、方法1を使うのはエンジニアとして負けな気がしたので頑張って探します。

方法3

ActiveJob::Core.queue_as にブロックを渡す

先に実装を示します。

実装

class SampleJob < ApplicationJob
-  queue_as 'default'

+  queue_as do
+    if ConditionModel.exists?(id: self.arguments[0])
+      'another_queue'
+    else
+      'default'
+    end
+  end

...

方法2の before_enqueue とほぼ同じ処理を queue_as のブロックに渡しただけです。

これでうまくいく理由をAJのソースコードと共に順を追って説明していきます。

まず、queue_as の定義を見ます。

module ActiveJob
  module QueueName
    extend ActiveSupport::Concern

    # Includes the ability to override the default queue name and prefix.
    module ClassMethods
...

      def queue_as(part_name = nil, &block)
        if block_given?
          self.queue_name = block
        else
          self.queue_name = queue_name_from_part(part_name)
        end
      end

...

    included do
      class_attribute :queue_name, instance_accessor: false, default: default_queue_name
      class_attribute :queue_name_delimiter, instance_accessor: false, default: "_"
    end

...

ブロックが渡されたらブロックをそのままクラス属性の queue_name に入れ、 それ以外は引数を queue_name_from_part(プレフィックスとかをつける処理)に通してクラス属性の queue_name に入れていますね。

このクラス属性の queue_name はどこで使われるかというと、

module ActiveJob
  module Core
...

    def initialize(*arguments)
      @arguments  = arguments
      @job_id     = SecureRandom.uuid
      @queue_name = self.class.queue_name
      @priority   = self.class.priority
      @executions = 0
    end
...

ジョブのコンストラクタ内で @queue_name に代入されています。

次にジョブが Sidekiq にエンキューされるまでの流れを追います。 まず方法1でも見た ActiveJob::Enqueuing#enqueue をもう一度確認します。

module ActiveJob
  module Enqueuing
...    
    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end
      end
      self
    end

最後に queue_adapterenqueue メソッドが呼ばれています。

弊社はアダプターに Sidekiq を使っているので SidekiqAdapter#enqueue が呼ばれます。

module ActiveJob
  module QueueAdapters
...
    class SidekiqAdapter
      def enqueue(job) #:nodoc:
        # Sidekiq::Client does not support symbols as keys
        job.provider_job_id = Sidekiq::Client.push \
          "class"   => JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end
...

SidekiqAdapter#enqueue はジョブインスタンスを引数に受け、Sidekiq 側に投げる際、 "queue"job.queue_name を指定していることが確認できます。

この job.queue_nameActiveJob::QueueName に定義されています。

module ActiveJob
  module QueueName
...

    # Returns the name of the queue the job will be run on.
    def queue_name
      if @queue_name.is_a?(Proc)
        @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name))
      end
      @queue_name
    end
  end
end

#queue_name はインスタンス変数 @queue_name がブロックなら、instance_exec*1 を使いジョブインスタンスのコンテキストでブロックを実行した結果を @queue_name に代入したあと返し、ブロック以外なら、@queue_name をそのまま返しています。

まとめると、queue_as にブロックを渡した場合、最終的に Sidekiq に "queue" として渡す値は、初回はブロックの実行結果を、2回目以降は初回の実行結果が返すこともわかりました。これにより retry_job 時(2回目以降)にはDBアクセスしないことがわかったので、ジョブのエンキューされないまま失敗してしまう心配もなく動的に queue を変更できるようになりました。ということで今回は方法3を 採用 🎉

補足

なぜリトライ時(2回目以降)も初回のブロック実行結果が引き継がれるの?

ActiveJob から Sidekiq::Client に投げる際、"args"job.serialize を渡しています。

module ActiveJob
  module QueueAdapters
...
    class SidekiqAdapter
      def enqueue(job) #:nodoc:
        # Sidekiq::Client does not support symbols as keys
        job.provider_job_id = Sidekiq::Client.push \
          "class"   => JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end
...

ジョブインスタンスの #serializeActiveJob::Core に定義があって

module ActiveJob
  module Core
...

    # Returns a hash with the job data that can safely be passed to the
    # queueing adapter.
    def serialize
      {
        "job_class"  => self.class.name,
        "job_id"     => job_id,
        "provider_job_id" => provider_job_id,
        "queue_name" => queue_name,
        "priority"   => priority,
        "arguments"  => serialize_arguments_if_needed(arguments),
        "executions" => executions,
        "locale"     => I18n.locale.to_s
      }
    end
...

上記のハッシュを返します。このハッシュに queue_name もありますね。

serialize があれば deserialize もあるはずです。deserialize だけを探してもいいですが、せっかくなので Sidekiq から ActiveJob のジョブが動く流れを追います。(Sidekiq のコードは追いません)

module ActiveJob
  module QueueAdapters
...
    class SidekiqAdapter
      def enqueue(job) #:nodoc:
        # Sidekiq::Client does not support symbols as keys
        job.provider_job_id = Sidekiq::Client.push \
          "class"   => JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end   
...
      class JobWrapper #:nodoc:
        include Sidekiq::Worker

        def perform(job_data)
          Base.execute job_data.merge("provider_job_id" => jid)
        end
      end
...

Sidekiq の worker は Redis からジョブ情報を取得し、 "class"に指定したクラスのインスタンスの perform"args" の一つ目の要素を引数に実行します。

ここで "class" として渡されている JobWrapperperform の処理は

module ActiveJob
  module QueueAdapters
    class SidekiqAdapter
...
      class JobWrapper #:nodoc:
        include Sidekiq::Worker

        def perform(job_data)
          Base.execute job_data.merge("provider_job_id" => jid)
        end
      end
...

ActiveJob::Base.execute を 呼び出しています。

module ActiveJob
  module Execution
...
    module ClassMethods
...
      def execute(job_data) #:nodoc:
        ActiveJob::Callbacks.run_callbacks(:execute) do
          job = deserialize(job_data)
          job.perform_now
        end
      end
    end
...

お、deserialize が出てきました。クラスメソッド の deserialize を探します。

module ActiveJob
  module Core
...
    module ClassMethods
      # Creates a new job instance from a hash created with +serialize+
      def deserialize(job_data)
        job = job_data["job_class"].constantize.new
        job.deserialize(job_data)
        job
      end

serialize で作ったハッシュをもとに新しいジョブのインスタンス作成すると言ってますね。 job_data というハッシュの "job_class" からジョブインスタンスを作成して、インスタンスメソッドの deserialize を呼び出しています。 インスタンスメソッドの deserialize を探します。

module ActiveJob
  module Core
...

    def deserialize(job_data)
      self.job_id               = job_data["job_id"]
      self.provider_job_id      = job_data["provider_job_id"]
      self.queue_name           = job_data["queue_name"]
      self.priority             = job_data["priority"]
      self.serialized_arguments = job_data["arguments"]
      self.executions           = job_data["executions"]
      self.locale               = job_data["locale"] || I18n.locale.to_s
    end
...

やっと辿り着きました。ジョブインスタンスの queue_namejob_data["queue_name"] が代入されています。 job_data["queue_name"] の値はブロックの実行結果が入っているのでリトライ時(2回目以降)に初回のブロック実行結果が引き継がれます。

おわりに

ActiveJob はめちゃくちゃ読みやすかったです!

【宣伝】 弊社では必要とあらばライブラリのソースコードをバリバリ読むぜ!というロックなエンジニアを募集しております!ご興味あればぜひ下記リンクをご覧ください! ingage.co.jp

ではまた!

*1:instance_exec についてはこちらの記事がわかりやすかったです。https://secret-garden.hatenablog.com/entry/2015/10/18/000000