ActiveJob::Continuableで再開可能なCSVインポートを構築する

長時間実行されるバックグラウンドジョブは、デプロイと不安定な関係にあります。10万行のCSVインポートを開始し、5分後に新しいコードをデプロイすると、ワーカーが再起動します。ジョブは最初からやり直すか、静かに消えてしまいます。多くのチームはカスタムの記録管理でこれを解決しています。last_processed_idカラム、Redisキー、あるいはjob-iterationのような外部gemなどです。

Rails 8.1にはActiveJob::Continuableという組み込みソリューションが搭載されています。ジョブをステップに分割し、カーソルで進捗を追跡し、中断後にジョブが中断した箇所から自動的に再開できます。gemも不要で、カスタムのステートマシンも不要です。

継続の仕組み

ジョブにActiveJob::Continuableをincludeし、stepメソッドでステップを定義します。各ステップはオプションでカーソル(進捗マーカー)を追跡できます。キューアダプタがシャットダウンを通知すると、ジョブは進捗を保存して自身を再エンキューします。

class MyJob < ApplicationJob
  include ActiveJob::Continuable

  def perform
    step :first_step do
      # runs once, skipped on resume
    end

    step :second_step do |step|
      records.find_each(start: step.cursor) do |record|
        process(record)
        step.advance! from: record.id  # saves progress + checks for interruption
      end
    end

    step :cleanup do
      # runs after second_step completes
    end
  end
end

3つの重要な概念があります。

  1. ステップは順番に実行されます。完了したステップはジョブ再開時にスキップされます。
  2. カーソルはステップ内の進捗を追跡します。再開時にカーソルは復元されます。
  3. チェックポイントは中断が発生可能な安全なポイントです。各ステップの前に自動チェックポイントがあります(最初のステップを除く)。ステップ内では、step.advance!step.set!、またはstep.checkpoint!を呼び出すことでチェックポイントを作成します。

CSVインポートの構築

実際のインポートパイプラインを構築しましょう。ユーザーがActive Storageを通じてCSVファイルをアップロードします。ジョブはファイルを検証し、各行を処理し、完了メールを送信します。

Importモデル

class Import < ApplicationRecord
  has_one_attached :file
  belongs_to :user

  enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end

マイグレーションは以下の通りです。

class CreateImports < ActiveRecord::Migration[8.1]
  def change
    create_table :imports do |t|
      t.references :user, null: false, foreign_key: true
      t.integer :status, default: 0, null: false
      t.integer :processed_count, default: 0, null: false
      t.integer :total_rows, default: 0, null: false
      t.integer :error_count, default: 0, null: false
      t.jsonb :errors_log, default: []
      t.timestamps
    end
  end
end

ジョブ

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  self.max_resumptions = 50
  self.resume_options = { wait: 2.seconds }

  def perform(import_id)
    @import = Import.find(import_id)
    @rows = CSV.parse(@import.file.download, headers: true)

    step :validate do
      validate_csv
    end

    step :process_rows do |step|
      process_csv_rows(step)
    end

    step :finalize do
      finalize_import
    end
  end

  private

  def validate_csv
    @import.update!(status: :validating)

    required_headers = %w[email first_name last_name]
    missing = required_headers - @rows.headers

    if missing.any?
      @import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
      raise "Invalid CSV: missing headers #{missing.join(', ')}"
    end

    @import.update!(total_rows: @rows.size)
  end

  def process_csv_rows(step)
    @import.update!(status: :processing)

    start_index = step.cursor || 0

    # Using set! instead of advance! because we're tracking array indices,
    # not ActiveRecord IDs where gaps between values are common.
    @rows[start_index..].each.with_index(start_index) do |row, index|
      process_single_row(row, index)
      step.set! index + 1
    end
  end

  def process_single_row(row, index)
    user = User.find_or_initialize_by(email: row["email"])
    user.assign_attributes(
      first_name: row["first_name"],
      last_name: row["last_name"]
    )

    if user.save
      @import.increment!(:processed_count)
    else
      @import.increment!(:error_count)
      log_error(index, row, user.errors.full_messages)
    end
  end

  def log_error(index, row, messages)
    @import.errors_log << { row: index + 1, email: row["email"], messages: messages }
    @import.save!
  end

  def finalize_import
    @import.update!(status: :completed)
    ImportMailer.completed(@import).deliver_later
  end
end

動作の流れ

CSVが50,000行あり、最初の1万行を処理した後にワーカーがシャットダウンしたとします。

  1. validateステップは既に完了しているため、完了済みとしてマークされています。
  2. process_rowsステップのカーソルは10000(次に処理するインデックス)です。
  3. ジョブはこの進捗を保存して自身を再エンキューします。
  4. 新しいワーカーがジョブを拾うと、validateをスキップし、10,001行目からprocess_rowsを再開します。
  5. 全行の処理が完了すると、finalizeが実行されてメールを送信します。

進捗はジョブのデータ内のcontinuationキーにシリアライズされるため、外部ストレージは不要です。

処理の開始

class ImportsController < ApplicationController
  def create
    import = Current.user.imports.create!(file: params[:file])
    ProcessImportJob.perform_later(import.id)
    redirect_to import, notice: "Import started"
  end
end

進捗の表示

Importモデルがprocessed_countを追跡しているため、ユーザーに進捗を表示できます。

<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
  <p>Status: <%= @import.status.humanize %></p>

  <% if @import.processing? %>
    <p>Processed <%= @import.processed_count %> rows</p>
    <p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
  <% end %>

  <% if @import.completed? %>
    <p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
  <% end %>
</div>

Turbo StreamのbroadcastコールバックをあなたのImportモデルに追加すると、リアルタイム更新が実現できます。

after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }

分離されたステップ

ステップが特に長く、そのステップが独自のジョブ実行を確実に取得するようにしたい場合(開始前に進捗が保存されるようにするため)、isolated: trueオプションを使用します。

step :process_rows, isolated: true do |step|
  process_csv_rows(step)
end

これにより、ステップの開始前にジョブが強制的に中断・再エンキューされ、それまでの全ての進捗が確実に永続化されます。

設定

3つのクラスレベルの設定が再開動作を制御します。

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  # Maximum number of times the job can be resumed (default: nil, unlimited)
  self.max_resumptions = 50

  # Options passed to retry_job when resuming (default: { wait: 5.seconds })
  self.resume_options = { wait: 2.seconds, queue: :imports }

  # Auto-resume if an error occurs after the cursor advanced (default: true)
  self.resume_errors_after_advancing = true
end

最後のオプションは注目に値します。ジョブが進捗を記録した後(カーソルを進めた後やステップを完了した後)にエラーを発生させた場合、その進捗を失うのではなく、自動的に再開されます。これはデフォルトで有効です。

注意すべき点

ステップ外のコードは再開のたびに再実行されます。 stepブロックの外にある全てのコードは、再開を含む毎回の実行で実行されます。performの先頭にある@import = Import.find(import_id)やCSVパースの行はセットアップとして問題ありませんが、そこに副作用のある処理を置かないようにしましょう。非常に大きなファイルの場合、再開のたびにCSVを再ダウンロードすると遅くなる可能性があります。前のステップでファイルをローカルの一時パスにダウンロードし、ディスクから読み込むことを検討してください。

ステップ内のチェックポイントは自動ではありません。 ステップ間には自動チェックポイントがありますが、ステップ内ではstep.advance!step.set!、またはstep.checkpoint!を呼び出した時のみ中断をチェックします。ステップが10分間チェックポイントなしで処理を行う場合、その間は正常に中断できません。ワーカーのシャットダウンタイムアウトよりも頻繁にチェックポイントを設定しましょう。

キューアダプタのサポート。 継続はqueue_adapter.stopping?に依存してシャットダウンを検出します。RailsはSidekiqアダプタとTestアダプタにstopping?の実装を同梱しています。Solid Queueはon_worker_stopフックでサポートできますが、使用しているアダプタのドキュメントを確認してください。アダプタがstopping?を実装していない場合、ジョブは動作しますが、正常な中断はできません。

カーソルはシリアライズ可能でなければなりません。 カーソルはジョブのシリアライズされたデータに格納されるため、ActiveJobがシリアライズできるもの(整数、文字列、配列、またはActiveJobシリアライゼーションを実装したオブジェクト)である必要があります。

まとめ

カスタムのステートマシンを構築したり、外部gemを導入したりする代わりに、Railsにジョブの中断と再開を任せることができるようになりました。ステップを定義し、頻繁にチェックポイントを設定すれば、長時間実行されるジョブを停止してしまう心配なくデプロイできます。

この機能はRails 8.1.0以降で利用可能です。完全な実装についてはPR #55127を参照してください。