おはようございます。 @shutooike です!
業務上であるジョブのキューを条件によって動的に変えたい場面に遭遇し、ActiveJobのソースを読んだのでメモを残しておきます。
前提条件
Rails version: 5.2
queue_adapter: Sidekiq
対象のジョブクラスはこんな感じ
class SampleJob < ApplicationJob queue_as 'default' rescue_from(StandardError) do retry_job(wait: 5.minutes) end def perform(args) do_something end end
方法1
ActiveJob::Core.set
か ActiveJob::Enqueuing#enqueue
に queue
を渡す
ジョブの呼び出し元で制御するという思いつく限り一番簡単な方法です。
実装
# set を使うパターン if ConditionModel.exists?(id: condition.id) SampleJob.set(queue: 'another_queue').perform_later(args) else SampleJob.perform_later(args) end # enqueue に渡すパターン if ConditionModel.exists?(id: condition.id) SampleJob.new(args).enqueue(queue: 'another_queue') else SampleJob.perform_later(args) end
内部をチラッと覗くと
module ActiveJob module Core ... module ClassMethods ... def set(options = {}) ConfiguredJob.new(self, options) end end ...
ActiveJob::Core.set
内で ActiveJob::ConfiguredJob
というインスタンスが作られます。
module ActiveJob class ConfiguredJob #:nodoc: def initialize(job_class, options = {}) @options = options @job_class = job_class end ... def perform_later(*args) @job_class.new(*args).enqueue @options end end end
ActiveJob::ConfiguredJob#perform_later
はジョブのインスタンスを作って ActiveJob::Enqueuing#enqueue
を呼ぶので、enqueue に渡すパターンと行き着く先は同じです。
#enqueue
の実装は以下のようになっており、 options['queue']
があれば ジョブインスタンスの queue_name
を上書きします。
module ActiveJob module Enqueuing ... def enqueue(options = {}) self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] self.scheduled_at = options[:wait_until].to_f if options[:wait_until] self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] self.priority = options[:priority].to_i if options[:priority] run_callbacks :enqueue do if scheduled_at self.class.queue_adapter.enqueue_at self, scheduled_at else self.class.queue_adapter.enqueue self end end self end
今回の変更対象のジョブはいろんなところで呼ばれていて、呼び出し元を変更するコストが高かったのでこの方法は 不採用
方法2
before_enqueue
で #queue_name
を動的に変更する
ActiveJob は他の Rails のライブラリと同様コールバックを持っています。
今回はエンキュー前に呼ばれる必要があるので、before_enqueue
コールバックを使います。
実装
class SampleJob < ApplicationJob queue_as 'default' rescue_from(StandardError) do retry_job(wait: 5.minutes) end + before_enqueue do |job| + if ConditionModel.exists?(id: job.arguments[0]) + job.queue_name = 'another_queue' + end + end ...
これだと変更箇所が一箇所で済むのでいい感じです。
ただ、今回の変更対象のジョブは上の実装のようにジョブの引数を使ってDBにアクセスし queue
を変更するか確認する必要がありました。
もしジョブ実行中にDBアクセスに失敗してエラーになった場合、retry_job
時にも before_enqueue
が呼ばれてDBアクセスでエラーになり、ジョブのエンキューされないまま失敗してしまう可能性があると指摘をいただきました。なので方法2も 不採用
ただ、方法1を使うのはエンジニアとして負けな気がしたので頑張って探します。
方法3
ActiveJob::Core.queue_as
にブロックを渡す
先に実装を示します。
実装
class SampleJob < ApplicationJob - queue_as 'default' + queue_as do + if ConditionModel.exists?(id: self.arguments[0]) + 'another_queue' + else + 'default' + end + end ...
方法2の before_enqueue
とほぼ同じ処理を queue_as
のブロックに渡しただけです。
これでうまくいく理由をAJのソースコードと共に順を追って説明していきます。
まず、queue_as
の定義を見ます。
module ActiveJob module QueueName extend ActiveSupport::Concern # Includes the ability to override the default queue name and prefix. module ClassMethods ... def queue_as(part_name = nil, &block) if block_given? self.queue_name = block else self.queue_name = queue_name_from_part(part_name) end end ... included do class_attribute :queue_name, instance_accessor: false, default: default_queue_name class_attribute :queue_name_delimiter, instance_accessor: false, default: "_" end ...
ブロックが渡されたらブロックをそのままクラス属性の queue_name
に入れ、
それ以外は引数を queue_name_from_part
(プレフィックスとかをつける処理)に通してクラス属性の queue_name
に入れていますね。
このクラス属性の queue_name
はどこで使われるかというと、
module ActiveJob module Core ... def initialize(*arguments) @arguments = arguments @job_id = SecureRandom.uuid @queue_name = self.class.queue_name @priority = self.class.priority @executions = 0 end ...
ジョブのコンストラクタ内で @queue_name
に代入されています。
次にジョブが Sidekiq にエンキューされるまでの流れを追います。
まず方法1でも見た ActiveJob::Enqueuing#enqueue
をもう一度確認します。
module ActiveJob module Enqueuing ... def enqueue(options = {}) self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] self.scheduled_at = options[:wait_until].to_f if options[:wait_until] self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] self.priority = options[:priority].to_i if options[:priority] run_callbacks :enqueue do if scheduled_at self.class.queue_adapter.enqueue_at self, scheduled_at else self.class.queue_adapter.enqueue self end end self end
最後に queue_adapter
の enqueue
メソッドが呼ばれています。
弊社はアダプターに Sidekiq
を使っているので SidekiqAdapter#enqueue
が呼ばれます。
module ActiveJob module QueueAdapters ... class SidekiqAdapter def enqueue(job) #:nodoc: # Sidekiq::Client does not support symbols as keys job.provider_job_id = Sidekiq::Client.push \ "class" => JobWrapper, "wrapped" => job.class.to_s, "queue" => job.queue_name, "args" => [ job.serialize ] end ...
SidekiqAdapter#enqueue
はジョブインスタンスを引数に受け、Sidekiq
側に投げる際、 "queue"
に job.queue_name
を指定していることが確認できます。
この job.queue_name
は ActiveJob::QueueName
に定義されています。
module ActiveJob module QueueName ... # Returns the name of the queue the job will be run on. def queue_name if @queue_name.is_a?(Proc) @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name)) end @queue_name end end end
#queue_name
はインスタンス変数 @queue_name
がブロックなら、instance_exec
*1 を使いジョブインスタンスのコンテキストでブロックを実行した結果を @queue_name
に代入したあと返し、ブロック以外なら、@queue_name
をそのまま返しています。
まとめると、queue_as
にブロックを渡した場合、最終的に Sidekiq に "queue"
として渡す値は、初回はブロックの実行結果を、2回目以降は初回の実行結果が返すこともわかりました。これにより retry_job
時(2回目以降)にはDBアクセスしないことがわかったので、ジョブのエンキューされないまま失敗してしまう心配もなく動的に queue
を変更できるようになりました。ということで今回は方法3を 採用 🎉
補足
なぜリトライ時(2回目以降)も初回のブロック実行結果が引き継がれるの?
ActiveJob から Sidekiq::Client に投げる際、"args"
に job.serialize
を渡しています。
module ActiveJob module QueueAdapters ... class SidekiqAdapter def enqueue(job) #:nodoc: # Sidekiq::Client does not support symbols as keys job.provider_job_id = Sidekiq::Client.push \ "class" => JobWrapper, "wrapped" => job.class.to_s, "queue" => job.queue_name, "args" => [ job.serialize ] end ...
ジョブインスタンスの #serialize
は ActiveJob::Core
に定義があって
module ActiveJob module Core ... # Returns a hash with the job data that can safely be passed to the # queueing adapter. def serialize { "job_class" => self.class.name, "job_id" => job_id, "provider_job_id" => provider_job_id, "queue_name" => queue_name, "priority" => priority, "arguments" => serialize_arguments_if_needed(arguments), "executions" => executions, "locale" => I18n.locale.to_s } end ...
上記のハッシュを返します。このハッシュに queue_name
もありますね。
serialize
があれば deserialize
もあるはずです。deserialize
だけを探してもいいですが、せっかくなので Sidekiq から ActiveJob のジョブが動く流れを追います。(Sidekiq のコードは追いません)
module ActiveJob module QueueAdapters ... class SidekiqAdapter def enqueue(job) #:nodoc: # Sidekiq::Client does not support symbols as keys job.provider_job_id = Sidekiq::Client.push \ "class" => JobWrapper, "wrapped" => job.class.to_s, "queue" => job.queue_name, "args" => [ job.serialize ] end ... class JobWrapper #:nodoc: include Sidekiq::Worker def perform(job_data) Base.execute job_data.merge("provider_job_id" => jid) end end ...
Sidekiq の worker は Redis からジョブ情報を取得し、 "class"
に指定したクラスのインスタンスの perform
を "args"
の一つ目の要素を引数に実行します。
ここで "class
" として渡されている JobWrapper
の perform
の処理は
module ActiveJob module QueueAdapters class SidekiqAdapter ... class JobWrapper #:nodoc: include Sidekiq::Worker def perform(job_data) Base.execute job_data.merge("provider_job_id" => jid) end end ...
ActiveJob::Base.execute
を 呼び出しています。
module ActiveJob module Execution ... module ClassMethods ... def execute(job_data) #:nodoc: ActiveJob::Callbacks.run_callbacks(:execute) do job = deserialize(job_data) job.perform_now end end end ...
お、deserialize
が出てきました。クラスメソッド の deserialize
を探します。
module ActiveJob module Core ... module ClassMethods # Creates a new job instance from a hash created with +serialize+ def deserialize(job_data) job = job_data["job_class"].constantize.new job.deserialize(job_data) job end
serialize
で作ったハッシュをもとに新しいジョブのインスタンス作成すると言ってますね。
job_data
というハッシュの "job_class"
からジョブインスタンスを作成して、インスタンスメソッドの deserialize
を呼び出しています。
インスタンスメソッドの deserialize
を探します。
module ActiveJob module Core ... def deserialize(job_data) self.job_id = job_data["job_id"] self.provider_job_id = job_data["provider_job_id"] self.queue_name = job_data["queue_name"] self.priority = job_data["priority"] self.serialized_arguments = job_data["arguments"] self.executions = job_data["executions"] self.locale = job_data["locale"] || I18n.locale.to_s end ...
やっと辿り着きました。ジョブインスタンスの queue_name
に job_data["queue_name"]
が代入されています。
job_data["queue_name"]
の値はブロックの実行結果が入っているのでリトライ時(2回目以降)に初回のブロック実行結果が引き継がれます。
おわりに
ActiveJob はめちゃくちゃ読みやすかったです!
【宣伝】 弊社では必要とあらばライブラリのソースコードをバリバリ読むぜ!というロックなエンジニアを募集しております!ご興味あればぜひ下記リンクをご覧ください! ingage.co.jp
ではまた!
*1:instance_exec についてはこちらの記事がわかりやすかったです。https://secret-garden.hatenablog.com/entry/2015/10/18/000000