使用 ActiveJob::Continuable 构建可恢复的 CSV 导入

长时间运行的后台任务与部署之间存在一种尴尬的关系。你启动了一个 100,000 行的 CSV 导入,五分钟后部署了新代码,worker 随之重启。任务要么从头开始,要么悄无声息地终止。大多数团队通过自定义簿记来解决这个问题:一个 last_processed_id 列、一个 Redis 键,或者像 job-iteration 这样的外部 gem。

Rails 8.1 提供了一个内置解决方案,叫做 ActiveJob::Continuable。它允许你将任务拆分为多个步骤,使用游标跟踪进度,并在中断后自动从上次停止的位置恢复。无需额外的 gem,无需自定义状态机。

续接的工作原理

在你的任务中引入 ActiveJob::Continuable,然后使用 step 方法定义步骤。每个步骤可以选择性地跟踪一个游标(你的进度标记)。当队列适配器发出关闭信号时,任务会保存进度并重新入队。

class MyJob < ApplicationJob
  include ActiveJob::Continuable

  def perform
    step :first_step do
      # runs once, skipped on resume
    end

    step :second_step do |step|
      records.find_each(start: step.cursor) do |record|
        process(record)
        step.advance! from: record.id  # saves progress + checks for interruption
      end
    end

    step :cleanup do
      # runs after second_step completes
    end
  end
end

三个关键概念:

  1. 步骤按顺序运行。已完成的步骤在任务恢复时会被跳过。
  2. 游标跟踪步骤内的进度。恢复时,游标会被还原。
  3. 检查点是可以发生中断的安全节点。每个步骤之前(第一个步骤除外)都有一个自动检查点。在步骤内部,你可以通过调用 step.advance!step.set!step.checkpoint! 来创建检查点。

构建 CSV 导入

让我们构建一个真实的导入流水线。用户通过 Active Storage 上传 CSV 文件。任务验证文件、处理每一行,然后发送完成通知邮件。

Import 模型

class Import < ApplicationRecord
  has_one_attached :file
  belongs_to :user

  enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end

以及迁移:

class CreateImports < ActiveRecord::Migration[8.1]
  def change
    create_table :imports do |t|
      t.references :user, null: false, foreign_key: true
      t.integer :status, default: 0, null: false
      t.integer :processed_count, default: 0, null: false
      t.integer :total_rows, default: 0, null: false
      t.integer :error_count, default: 0, null: false
      t.jsonb :errors_log, default: []
      t.timestamps
    end
  end
end

任务

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  self.max_resumptions = 50
  self.resume_options = { wait: 2.seconds }

  def perform(import_id)
    @import = Import.find(import_id)
    @rows = CSV.parse(@import.file.download, headers: true)

    step :validate do
      validate_csv
    end

    step :process_rows do |step|
      process_csv_rows(step)
    end

    step :finalize do
      finalize_import
    end
  end

  private

  def validate_csv
    @import.update!(status: :validating)

    required_headers = %w[email first_name last_name]
    missing = required_headers - @rows.headers

    if missing.any?
      @import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
      raise "Invalid CSV: missing headers #{missing.join(', ')}"
    end

    @import.update!(total_rows: @rows.size)
  end

  def process_csv_rows(step)
    @import.update!(status: :processing)

    start_index = step.cursor || 0

    # Using set! instead of advance! because we're tracking array indices,
    # not ActiveRecord IDs where gaps between values are common.
    @rows[start_index..].each.with_index(start_index) do |row, index|
      process_single_row(row, index)
      step.set! index + 1
    end
  end

  def process_single_row(row, index)
    user = User.find_or_initialize_by(email: row["email"])
    user.assign_attributes(
      first_name: row["first_name"],
      last_name: row["last_name"]
    )

    if user.save
      @import.increment!(:processed_count)
    else
      @import.increment!(:error_count)
      log_error(index, row, user.errors.full_messages)
    end
  end

  def log_error(index, row, messages)
    @import.errors_log << { row: index + 1, email: row["email"], messages: messages }
    @import.save!
  end

  def finalize_import
    @import.update!(status: :completed)
    ImportMailer.completed(@import).deliver_later
  end
end

实际执行过程

假设 CSV 有 50,000 行,worker 在处理完前 10,000 行后关闭。

  1. validate 步骤已经完成,因此被标记为已完成。
  2. process_rows 步骤的游标为 10000(下一个要处理的索引)。
  3. 任务携带已保存的进度重新入队。
  4. 当新的 worker 接手时,它跳过 validate,从第 10,001 行开始恢复 process_rows
  5. 所有行处理完毕后,finalize 运行并发送邮件。

进度被序列化存储在任务数据的 continuation 键下,因此不需要外部存储。

启动导入

class ImportsController < ApplicationController
  def create
    import = Current.user.imports.create!(file: params[:file])
    ProcessImportJob.perform_later(import.id)
    redirect_to import, notice: "Import started"
  end
end

显示进度

由于 Import 模型跟踪了 processed_count,你可以向用户展示进度:

<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
  <p>Status: <%= @import.status.humanize %></p>

  <% if @import.processing? %>
    <p>Processed <%= @import.processed_count %> rows</p>
    <p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
  <% end %>

  <% if @import.completed? %>
    <p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
  <% end %>
</div>

将此与 Import 模型上的 Turbo Stream 广播回调配合使用,即可实现实时更新:

after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }

隔离步骤

如果某个步骤特别耗时,你希望确保它在自己的任务执行中运行(这样在它开始之前进度已被保存),可以使用 isolated: true 选项:

step :process_rows, isolated: true do |step|
  process_csv_rows(step)
end

这会强制任务在开始该步骤之前中断并重新入队,确保之前所有的进度都已持久化。

配置

三个类级别的设置控制恢复行为:

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  # Maximum number of times the job can be resumed (default: nil, unlimited)
  self.max_resumptions = 50

  # Options passed to retry_job when resuming (default: { wait: 5.seconds })
  self.resume_options = { wait: 2.seconds, queue: :imports }

  # Auto-resume if an error occurs after the cursor advanced (default: true)
  self.resume_errors_after_advancing = true
end

最后一个选项值得注意。如果你的任务在取得进展(游标前进或完成了某个步骤)之后抛出错误,它会自动恢复,而不是丢失已有的进度。此选项默认启用。

需要注意的事项

步骤外的代码在每次恢复时都会重新执行。 step 块之外的所有代码在每次执行时都会运行,包括恢复执行。perform 顶部的 @import = Import.find(import_id) 和 CSV 解析作为初始化是没问题的,但要避免在那里放置副作用。对于非常大的文件,每次恢复时重新下载 CSV 可能会很慢。可以考虑在前置步骤中将文件下载到本地临时路径,然后从磁盘读取。

步骤内的检查点不是自动的。 步骤之间有自动检查点,但在步骤内部,任务只在你调用 step.advance!step.set!step.checkpoint! 时才检查中断。如果你的步骤执行了十分钟的工作却没有检查点,那么在此期间它无法被优雅地中断。检查点的频率应高于 worker 的关闭超时时间。

队列适配器支持。 续接依赖 queue_adapter.stopping? 来检测关闭信号。Rails 内置了对 Sidekiq 和 Test 适配器的 stopping? 实现。Solid Queue 可以通过 on_worker_stop 钩子来支持它,但请查阅你所用适配器的文档。如果你的适配器没有实现 stopping?,任务仍然可以工作,但无法被优雅地中断。

游标必须是可序列化的。 游标存储在任务的序列化数据中,因此它必须是 ActiveJob 能够序列化的类型:整数、字符串、数组,或任何实现了 ActiveJob 序列化的对象。

总结

与其构建自定义状态机或引入外部 gem,现在你可以让 Rails 为你处理任务的中断和恢复。定义好步骤,频繁设置检查点,然后放心部署,不必担心终止长时间运行的任务。

此功能在 Rails 8.1.0 及更高版本中可用。完整实现请参阅 PR #55127