ActiveJob::Continuableで再開可能なCSVインポートを構築する
長時間実行されるバックグラウンドジョブは、デプロイと不安定な関係にあります。10万行のCSVインポートを開始し、5分後に新しいコードをデプロイすると、ワーカーが再起動します。ジョブは最初からやり直すか、静かに消えてしまいます。多くのチームはカスタムの記録管理でこれを解決しています。last_processed_idカラム、Redisキー、あるいはjob-iterationのような外部gemなどです。
Rails 8.1にはActiveJob::Continuableという組み込みソリューションが搭載されています。ジョブをステップに分割し、カーソルで進捗を追跡し、中断後にジョブが中断した箇所から自動的に再開できます。gemも不要で、カスタムのステートマシンも不要です。
継続の仕組み
ジョブにActiveJob::Continuableをincludeし、stepメソッドでステップを定義します。各ステップはオプションでカーソル(進捗マーカー)を追跡できます。キューアダプタがシャットダウンを通知すると、ジョブは進捗を保存して自身を再エンキューします。
class MyJob < ApplicationJob
include ActiveJob::Continuable
def perform
step :first_step do
# runs once, skipped on resume
end
step :second_step do |step|
records.find_each(start: step.cursor) do |record|
process(record)
step.advance! from: record.id # saves progress + checks for interruption
end
end
step :cleanup do
# runs after second_step completes
end
end
end
3つの重要な概念があります。
- ステップは順番に実行されます。完了したステップはジョブ再開時にスキップされます。
- カーソルはステップ内の進捗を追跡します。再開時にカーソルは復元されます。
- チェックポイントは中断が発生可能な安全なポイントです。各ステップの前に自動チェックポイントがあります(最初のステップを除く)。ステップ内では、
step.advance!、step.set!、またはstep.checkpoint!を呼び出すことでチェックポイントを作成します。
CSVインポートの構築
実際のインポートパイプラインを構築しましょう。ユーザーがActive Storageを通じてCSVファイルをアップロードします。ジョブはファイルを検証し、各行を処理し、完了メールを送信します。
Importモデル
class Import < ApplicationRecord
has_one_attached :file
belongs_to :user
enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end
マイグレーションは以下の通りです。
class CreateImports < ActiveRecord::Migration[8.1]
def change
create_table :imports do |t|
t.references :user, null: false, foreign_key: true
t.integer :status, default: 0, null: false
t.integer :processed_count, default: 0, null: false
t.integer :total_rows, default: 0, null: false
t.integer :error_count, default: 0, null: false
t.jsonb :errors_log, default: []
t.timestamps
end
end
end
ジョブ
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
self.max_resumptions = 50
self.resume_options = { wait: 2.seconds }
def perform(import_id)
@import = Import.find(import_id)
@rows = CSV.parse(@import.file.download, headers: true)
step :validate do
validate_csv
end
step :process_rows do |step|
process_csv_rows(step)
end
step :finalize do
finalize_import
end
end
private
def validate_csv
@import.update!(status: :validating)
required_headers = %w[email first_name last_name]
missing = required_headers - @rows.headers
if missing.any?
@import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
raise "Invalid CSV: missing headers #{missing.join(', ')}"
end
@import.update!(total_rows: @rows.size)
end
def process_csv_rows(step)
@import.update!(status: :processing)
start_index = step.cursor || 0
# Using set! instead of advance! because we're tracking array indices,
# not ActiveRecord IDs where gaps between values are common.
@rows[start_index..].each.with_index(start_index) do |row, index|
process_single_row(row, index)
step.set! index + 1
end
end
def process_single_row(row, index)
user = User.find_or_initialize_by(email: row["email"])
user.assign_attributes(
first_name: row["first_name"],
last_name: row["last_name"]
)
if user.save
@import.increment!(:processed_count)
else
@import.increment!(:error_count)
log_error(index, row, user.errors.full_messages)
end
end
def log_error(index, row, messages)
@import.errors_log << { row: index + 1, email: row["email"], messages: messages }
@import.save!
end
def finalize_import
@import.update!(status: :completed)
ImportMailer.completed(@import).deliver_later
end
end
動作の流れ
CSVが50,000行あり、最初の1万行を処理した後にワーカーがシャットダウンしたとします。
validateステップは既に完了しているため、完了済みとしてマークされています。process_rowsステップのカーソルは10000(次に処理するインデックス)です。- ジョブはこの進捗を保存して自身を再エンキューします。
- 新しいワーカーがジョブを拾うと、
validateをスキップし、10,001行目からprocess_rowsを再開します。 - 全行の処理が完了すると、
finalizeが実行されてメールを送信します。
進捗はジョブのデータ内のcontinuationキーにシリアライズされるため、外部ストレージは不要です。
処理の開始
class ImportsController < ApplicationController
def create
import = Current.user.imports.create!(file: params[:file])
ProcessImportJob.perform_later(import.id)
redirect_to import, notice: "Import started"
end
end
進捗の表示
Importモデルがprocessed_countを追跡しているため、ユーザーに進捗を表示できます。
<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
<p>Status: <%= @import.status.humanize %></p>
<% if @import.processing? %>
<p>Processed <%= @import.processed_count %> rows</p>
<p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
<% end %>
<% if @import.completed? %>
<p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
<% end %>
</div>
Turbo StreamのbroadcastコールバックをあなたのImportモデルに追加すると、リアルタイム更新が実現できます。
after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }
分離されたステップ
ステップが特に長く、そのステップが独自のジョブ実行を確実に取得するようにしたい場合(開始前に進捗が保存されるようにするため)、isolated: trueオプションを使用します。
step :process_rows, isolated: true do |step|
process_csv_rows(step)
end
これにより、ステップの開始前にジョブが強制的に中断・再エンキューされ、それまでの全ての進捗が確実に永続化されます。
設定
3つのクラスレベルの設定が再開動作を制御します。
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
# Maximum number of times the job can be resumed (default: nil, unlimited)
self.max_resumptions = 50
# Options passed to retry_job when resuming (default: { wait: 5.seconds })
self.resume_options = { wait: 2.seconds, queue: :imports }
# Auto-resume if an error occurs after the cursor advanced (default: true)
self.resume_errors_after_advancing = true
end
最後のオプションは注目に値します。ジョブが進捗を記録した後(カーソルを進めた後やステップを完了した後)にエラーを発生させた場合、その進捗を失うのではなく、自動的に再開されます。これはデフォルトで有効です。
注意すべき点
ステップ外のコードは再開のたびに再実行されます。 stepブロックの外にある全てのコードは、再開を含む毎回の実行で実行されます。performの先頭にある@import = Import.find(import_id)やCSVパースの行はセットアップとして問題ありませんが、そこに副作用のある処理を置かないようにしましょう。非常に大きなファイルの場合、再開のたびにCSVを再ダウンロードすると遅くなる可能性があります。前のステップでファイルをローカルの一時パスにダウンロードし、ディスクから読み込むことを検討してください。
ステップ内のチェックポイントは自動ではありません。 ステップ間には自動チェックポイントがありますが、ステップ内ではstep.advance!、step.set!、またはstep.checkpoint!を呼び出した時のみ中断をチェックします。ステップが10分間チェックポイントなしで処理を行う場合、その間は正常に中断できません。ワーカーのシャットダウンタイムアウトよりも頻繁にチェックポイントを設定しましょう。
キューアダプタのサポート。 継続はqueue_adapter.stopping?に依存してシャットダウンを検出します。RailsはSidekiqアダプタとTestアダプタにstopping?の実装を同梱しています。Solid Queueはon_worker_stopフックでサポートできますが、使用しているアダプタのドキュメントを確認してください。アダプタがstopping?を実装していない場合、ジョブは動作しますが、正常な中断はできません。
カーソルはシリアライズ可能でなければなりません。 カーソルはジョブのシリアライズされたデータに格納されるため、ActiveJobがシリアライズできるもの(整数、文字列、配列、またはActiveJobシリアライゼーションを実装したオブジェクト)である必要があります。
まとめ
カスタムのステートマシンを構築したり、外部gemを導入したりする代わりに、Railsにジョブの中断と再開を任せることができるようになりました。ステップを定義し、頻繁にチェックポイントを設定すれば、長時間実行されるジョブを停止してしまう心配なくデプロイできます。
この機能はRails 8.1.0以降で利用可能です。完全な実装についてはPR #55127を参照してください。