使用 ActiveJob::Continuable 构建可恢复的 CSV 导入
长时间运行的后台任务与部署之间存在一种尴尬的关系。你启动了一个 100,000 行的 CSV 导入,五分钟后部署了新代码,worker 随之重启。任务要么从头开始,要么悄无声息地终止。大多数团队通过自定义簿记来解决这个问题:一个 last_processed_id 列、一个 Redis 键,或者像 job-iteration 这样的外部 gem。
Rails 8.1 提供了一个内置解决方案,叫做 ActiveJob::Continuable。它允许你将任务拆分为多个步骤,使用游标跟踪进度,并在中断后自动从上次停止的位置恢复。无需额外的 gem,无需自定义状态机。
续接的工作原理
在你的任务中引入 ActiveJob::Continuable,然后使用 step 方法定义步骤。每个步骤可以选择性地跟踪一个游标(你的进度标记)。当队列适配器发出关闭信号时,任务会保存进度并重新入队。
class MyJob < ApplicationJob
include ActiveJob::Continuable
def perform
step :first_step do
# runs once, skipped on resume
end
step :second_step do |step|
records.find_each(start: step.cursor) do |record|
process(record)
step.advance! from: record.id # saves progress + checks for interruption
end
end
step :cleanup do
# runs after second_step completes
end
end
end
三个关键概念:
- 步骤按顺序运行。已完成的步骤在任务恢复时会被跳过。
- 游标跟踪步骤内的进度。恢复时,游标会被还原。
- 检查点是可以发生中断的安全节点。每个步骤之前(第一个步骤除外)都有一个自动检查点。在步骤内部,你可以通过调用
step.advance!、step.set!或step.checkpoint!来创建检查点。
构建 CSV 导入
让我们构建一个真实的导入流水线。用户通过 Active Storage 上传 CSV 文件。任务验证文件、处理每一行,然后发送完成通知邮件。
Import 模型
class Import < ApplicationRecord
has_one_attached :file
belongs_to :user
enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end
以及迁移:
class CreateImports < ActiveRecord::Migration[8.1]
def change
create_table :imports do |t|
t.references :user, null: false, foreign_key: true
t.integer :status, default: 0, null: false
t.integer :processed_count, default: 0, null: false
t.integer :total_rows, default: 0, null: false
t.integer :error_count, default: 0, null: false
t.jsonb :errors_log, default: []
t.timestamps
end
end
end
任务
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
self.max_resumptions = 50
self.resume_options = { wait: 2.seconds }
def perform(import_id)
@import = Import.find(import_id)
@rows = CSV.parse(@import.file.download, headers: true)
step :validate do
validate_csv
end
step :process_rows do |step|
process_csv_rows(step)
end
step :finalize do
finalize_import
end
end
private
def validate_csv
@import.update!(status: :validating)
required_headers = %w[email first_name last_name]
missing = required_headers - @rows.headers
if missing.any?
@import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
raise "Invalid CSV: missing headers #{missing.join(', ')}"
end
@import.update!(total_rows: @rows.size)
end
def process_csv_rows(step)
@import.update!(status: :processing)
start_index = step.cursor || 0
# Using set! instead of advance! because we're tracking array indices,
# not ActiveRecord IDs where gaps between values are common.
@rows[start_index..].each.with_index(start_index) do |row, index|
process_single_row(row, index)
step.set! index + 1
end
end
def process_single_row(row, index)
user = User.find_or_initialize_by(email: row["email"])
user.assign_attributes(
first_name: row["first_name"],
last_name: row["last_name"]
)
if user.save
@import.increment!(:processed_count)
else
@import.increment!(:error_count)
log_error(index, row, user.errors.full_messages)
end
end
def log_error(index, row, messages)
@import.errors_log << { row: index + 1, email: row["email"], messages: messages }
@import.save!
end
def finalize_import
@import.update!(status: :completed)
ImportMailer.completed(@import).deliver_later
end
end
实际执行过程
假设 CSV 有 50,000 行,worker 在处理完前 10,000 行后关闭。
validate步骤已经完成,因此被标记为已完成。process_rows步骤的游标为10000(下一个要处理的索引)。- 任务携带已保存的进度重新入队。
- 当新的 worker 接手时,它跳过
validate,从第 10,001 行开始恢复process_rows。 - 所有行处理完毕后,
finalize运行并发送邮件。
进度被序列化存储在任务数据的 continuation 键下,因此不需要外部存储。
启动导入
class ImportsController < ApplicationController
def create
import = Current.user.imports.create!(file: params[:file])
ProcessImportJob.perform_later(import.id)
redirect_to import, notice: "Import started"
end
end
显示进度
由于 Import 模型跟踪了 processed_count,你可以向用户展示进度:
<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
<p>Status: <%= @import.status.humanize %></p>
<% if @import.processing? %>
<p>Processed <%= @import.processed_count %> rows</p>
<p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
<% end %>
<% if @import.completed? %>
<p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
<% end %>
</div>
将此与 Import 模型上的 Turbo Stream 广播回调配合使用,即可实现实时更新:
after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }
隔离步骤
如果某个步骤特别耗时,你希望确保它在自己的任务执行中运行(这样在它开始之前进度已被保存),可以使用 isolated: true 选项:
step :process_rows, isolated: true do |step|
process_csv_rows(step)
end
这会强制任务在开始该步骤之前中断并重新入队,确保之前所有的进度都已持久化。
配置
三个类级别的设置控制恢复行为:
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
# Maximum number of times the job can be resumed (default: nil, unlimited)
self.max_resumptions = 50
# Options passed to retry_job when resuming (default: { wait: 5.seconds })
self.resume_options = { wait: 2.seconds, queue: :imports }
# Auto-resume if an error occurs after the cursor advanced (default: true)
self.resume_errors_after_advancing = true
end
最后一个选项值得注意。如果你的任务在取得进展(游标前进或完成了某个步骤)之后抛出错误,它会自动恢复,而不是丢失已有的进度。此选项默认启用。
需要注意的事项
步骤外的代码在每次恢复时都会重新执行。 step 块之外的所有代码在每次执行时都会运行,包括恢复执行。perform 顶部的 @import = Import.find(import_id) 和 CSV 解析作为初始化是没问题的,但要避免在那里放置副作用。对于非常大的文件,每次恢复时重新下载 CSV 可能会很慢。可以考虑在前置步骤中将文件下载到本地临时路径,然后从磁盘读取。
步骤内的检查点不是自动的。 步骤之间有自动检查点,但在步骤内部,任务只在你调用 step.advance!、step.set! 或 step.checkpoint! 时才检查中断。如果你的步骤执行了十分钟的工作却没有检查点,那么在此期间它无法被优雅地中断。检查点的频率应高于 worker 的关闭超时时间。
队列适配器支持。 续接依赖 queue_adapter.stopping? 来检测关闭信号。Rails 内置了对 Sidekiq 和 Test 适配器的 stopping? 实现。Solid Queue 可以通过 on_worker_stop 钩子来支持它,但请查阅你所用适配器的文档。如果你的适配器没有实现 stopping?,任务仍然可以工作,但无法被优雅地中断。
游标必须是可序列化的。 游标存储在任务的序列化数据中,因此它必须是 ActiveJob 能够序列化的类型:整数、字符串、数组,或任何实现了 ActiveJob 序列化的对象。
总结
与其构建自定义状态机或引入外部 gem,现在你可以让 Rails 为你处理任务的中断和恢复。定义好步骤,频繁设置检查点,然后放心部署,不必担心终止长时间运行的任务。
此功能在 Rails 8.1.0 及更高版本中可用。完整实现请参阅 PR #55127。