用 Rails 8.1 的 Rails.event 和 Continuations 实时展示后台任务进度
在后台任务运行时给用户展示一张实时进度卡片,一直都比它本该有的样子要费事得多。你需要一个存放进度的地方、任务中在每个分块后调用 broadcast_replace_to 的代码、一个渲染卡片的 partial、一个 Action Cable channel,以及为每个需要进度的任务把这四样东西接好的那份自律。大多数团队干脆跳过它。
Rails 8.1 加入了三个原语,把这件事从“为每个任务接好四样东西”变成小得多的事:一个结构化事件报告器(Rails.event)、框架为 Active Job 每个生命周期阶段发出的结构化事件,以及用于带 cursor 跟踪的多步骤任务的 ActiveJob::Continuable。把它们连起来,你就得到了面向用户的实时进度,任务体内没有 broadcast_* 调用,也没有逐任务的接线。
本文将带你走一遍这个模式,配有配套仓库里可验证的代码。
Rails 8.1 免费提供了什么
三个部分,全部是 Rails 8.1.0 中新增的:
Rails.event 是一个结构化事件报告器。你用 Rails.event.notify("user.signup", user_id: 123) 发出事件,用 Rails.event.subscribe(subscriber) 注册订阅者。每个订阅者会收到一个包含 name、payload、tags、context、timestamp 和 source_location 的 hash。Rails.event 构建在 ActiveSupport::Notifications 之上:框架的结构化事件订阅者消费旧式通知并以更丰富的 schema 重新发出,所以对于新的结构化可观测性代码,只订阅 Rails.event 就够了。已经订阅 ActiveSupport::Notifications 的旧式工具(APM、Skylight 等等)继续照常工作,不受影响。
结构化的 Active Job 事件 由 Rails 自身为每个生命周期阶段发出。对每个任务:active_job.enqueued、active_job.bulk_enqueued、active_job.started、active_job.completed、active_job.retry_scheduled、active_job.retry_stopped、active_job.discarded。对 Continuation 任务你还会得到 active_job.step_started、active_job.step、active_job.step_skipped、active_job.interrupt 和 active_job.resume。它们全都流经 Rails.event.notify,所以同一批订阅者都能看到它们。
ActiveJob::Continuable 让你可以把一个任务拆成带名字的 step 块,并可选地带 cursor。框架会持久化进度,所以任务可以跨 worker 重启、部署和重试被中断与恢复。
框架的生命周期事件和步骤事件,在任务里没有任何代码的情况下,就处理了 UI 需要的那些粗粒度状态(已入队、运行中、步骤边界、完成、失败)。对于像“已导入 200 条中的 47 条”这样的步骤内进度,你的任务在循环内部发出一个小小的自定义事件。下面的例子两者都做了。
任务
注意下面的任务体里 没有 什么:没有 broadcast_replace_to,没有对跟踪记录的 update!,没有进度百分比的计算。唯一看起来像进度埋点的是 step.advance!,而 Continuations 本来就需要它来支持可恢复性。
class ImportContactsJob < ApplicationJob
include ActiveJob::Continuable
before_perform { Rails.event.set_context(user_id: arguments.first, job_id: job_id) }
def perform(user_id, rows:)
step :prepare { Rails.event.notify("import.prepared", total: rows.size) }
step :process_records do |s|
start = s.cursor || 0
rows[start..].each_with_index do |row, i|
Contact.create!(row.merge(user_id: user_id))
Rails.event.notify("import.record_processed", index: start + i)
s.advance! from: start + i
end
end
step :finalize { Rails.event.notify("import.finalized") }
end
end
before_perform 回调是承重的那一块。它把 user_id 和 job_id 放进余下执行过程的环境上下文里,所以任务之后发出的每个事件(包括框架的生命周期事件)都带着它们。订阅者用 user_id 来挑选向哪个 stream 广播,用 job_id 来挑选更新哪个 DOM 元素。
你不需要清理上下文。Rails 把每个任务包在 app.reloader.wrap 里,executor 的 to_complete 钩子会自动调用 Rails.event.clear_context。
订阅者
一个订阅者,只注册一次。它通过同一个通道,既收到框架的生命周期事件,也收到你自定义的 import.* 事件。
# config/initializers/job_progress_subscriber.rb
Rails.event.subscribe(JobProgressBroadcaster.new) do |event|
event[:context][:user_id].present? &&
(event[:name].start_with?("active_job.") || event[:name].start_with?("import."))
end
这个过滤块让订阅者不会看到整个应用的事件流。只有处在 active_job.* 或 import.* 命名空间下、且上下文中带有 user_id 的事件,才会到达 emit。
广播器本身只是对事件名的一个 case,把每一个都变成一次 Turbo Stream 广播。它的形状:
class JobProgressBroadcaster
def emit(event)
user_id = event[:context][:user_id]
job_id = event[:payload][:job_id] || event[:context][:job_id]
update = translate(event[:name], event[:payload])
return unless update && user_id && job_id
Turbo::StreamsChannel.broadcast_replace_to(
"user_#{user_id}_jobs",
target: "job_#{job_id}",
partial: "jobs/progress_card",
locals: { update: update }
)
end
end
translate 对你关心的那些事件(active_job.started、import.prepared、import.record_processed、active_job.completed、active_job.discarded)返回一个 hash,对其他一切返回 nil。完整的 case 在配套仓库里。
视图
应当显示进度的那个页面挂载 stream,并渲染初始卡片:
<%= turbo_stream_from "user_#{current_user.id}_jobs" %>
<div id="job_<%= @job_id %>"><%= render "jobs/progress_card", update: { status: "queued" } %></div>
_progress_card.html.erb 根据 update hash 渲染你的设计所需要的任何形状。这里假设你的 ApplicationCable::Connection 是以 current_user 标识的:turbo_stream_from 用 Rails 的消息验证器对 stream 名称进行签名,使其无法被篡改,但按用户的安全模型依赖于 Action Cable 知道连接的用户是谁。
这就是整条面向用户的流水线。当 ImportContactsJob 运行时,页面会实时更新。没有轮询,没有手写的 JavaScript,没有逐任务的 Cable channel。
你免费得到了什么
三个值得验证的论断。配套仓库对每一个都有一个通过的测试。
生命周期事件。 active_job.enqueued、active_job.started 和 active_job.completed 全都流经 Rails.event,带有相互匹配的 job_id,并且 completed 上带有一个 duration 字段。你的订阅者在任务体内没有任何代码的情况下,就能看到已入队/运行中/完成的状态转换。
带 cursor 的步骤事件。 每个 step 声明都会产生一个 active_job.step_started 事件(带 cursor 和 resumed 字段)和一个 active_job.step 事件(带最终的 cursor 和 duration)。被恢复的任务,在当时进行中的那个步骤上带有 resumed: true,所以 UI 可以显示“从中断处继续”。
来自 before_perform 的上下文。 在 before_perform 回调运行之后发出的每个事件,无论是你的自定义事件还是框架事件,都带着你设置的上下文。这就是订阅者知道该向哪个用户的 stream 广播的方式。
注意事项
一些一旦你去翻源码就会清楚的事情。
Cursor 语义。 step.advance!(from: x) 会调用 x.succ,所以 cursor 反映的是“恢复时要从哪个位置开始”,而不是“最后处理的索引”。如果你在处理完索引 i 之后写 s.advance! from: i + 1,cursor 最终会停在 i + 2,而不是 i + 1。Rails 文档里的标准模式是 s.advance! from: record.id,它把 cursor 设为 record.id.succ,也就是 find_each(start: cursor) 下一个会取到的 ID。对于百分比显示,更应该在你自己的进度事件里计数(Rails.event.notify("import.record_processed", index: i)),而不是去读 cursor。
enqueued 事件在入队的那个线程上触发。 active_job.enqueued 在 perform_later 返回的那一刻触发,就在调用它的那个线程上(通常是你的 controller)。active_job.started 以及之后的一切都在 worker 线程上触发。在 :async 适配器或一个真正的队列适配器下,这正是你想要的。在 :inline 适配器下,整个任务在 perform_later 内部同步运行,所以 started 和 completed 会在 enqueued 之前 触发。如果你曾在测试或控制台里调试这个,值得知道这一点。
因为 enqueued 在 before_perform 运行之前触发,它不会带着任务最终会设置的上下文。它的 event[:context] 是入队线程当时持有的内容,通常是空的。上面的过滤器会把它排除掉。如果你想给用户来一次“已入队”的广播,就在 perform_later 返回后从 controller 里渲染已入队状态(视图示例就是这么做的),或者在入队前一刻在 controller 里设置 Rails.event.set_context(user_id: current_user.id, job_id: job.job_id)。
arguments.first 是位置参数。 before_perform 块读取 arguments.first 来取 user_id。如果你哪天改了 perform 的签名,使 user_id 不再是第一个位置参数,要同时更新上下文接线,否则订阅者会向错误的用户广播。在更大的代码库里,配合 arguments.first[:user_id] 的纯关键字 perform(user_id:, rows:) 更经得起时间考验。
与适配器无关,但时序与适配器有关。 SolidQueue、GoodJob、Sidekiq 和 async 适配器都发出相同的事件,因为事件来自 Active Job,而不是来自适配器。在一个适配器上能用的订阅者在所有适配器上都能用。但是 enqueued 和 started 之间的延迟取决于 worker 的可用性,而不是 Rails。
多租户过滤。 上下文里的 user_id 才是把广播限定到单个用户的东西。如果你的订阅者在做任何敏感的事(发送通知、向用户计费),把过滤逻辑保留在订阅者里,而不是在视图里。过滤逻辑里的 bug,在一个 initializer 里要比散布在每个页面上容易发现得多。
长时间运行的任务与在线状态。 Action Cable 不会重放用户断开期间触发的广播。如果有人在任务进行中关掉标签页,之后再打开,他们在下一个事件触发之前什么都看不到;如果任务已经完成,他们就什么都看不到了。持久化一行由同一个订阅者更新的 TrackedJob,让页面在首次加载时渲染当前状态,而 stream 只是在其上递送更新。
FAQ
What is Rails.event?
Rails 8.1 在 Rails.event 加入了一个结构化事件报告器。你调用 Rails.event.notify(name, payload) 发出一个有类型的事件,用 Rails.event.subscribe(subscriber) 注册订阅者,它们会收到一个包含 name、payload、tags、context、timestamp 和 source_location 的 hash。它构建在 ActiveSupport::Notifications 之上:框架的结构化事件订阅者消费旧式通知并以更丰富的 schema 重新发出,所以对于新的结构化可观测性代码,只订阅 Rails.event 就够了。已经订阅 ActiveSupport::Notifications 的旧式工具继续照常工作,不受影响。
Does this require Rails 8.1?
是的。Rails.event 随 Rails 8.1.0(2025-10-22 发布)一起发布。结构化的 Active Job 事件(active_job.enqueued、started、completed、step_started、step)也在 8.1 中发布。ActiveJob::Continuable 和 step DSL 同样是 Rails 8.1 的特性。
Does this work with Sidekiq, SolidQueue, GoodJob?
是的。无论由哪个适配器运行任务,事件都会流经 Rails.event。你只写一个订阅者,它在每一种适配器上都能工作。
Do users have to refresh the page to see progress?
不需要。把订阅者与 Turbo::StreamsChannel.broadcast_replace_to 搭配,通过 Action Cable 推送更新。事件到达时,用户的浏览器会就地更新进度卡片。
What happens if the worker process dies mid-job?
Continuations 会把进度作为检查点写入任务的序列化状态。当某个 worker 接手被重试的任务时,已完成的步骤会被跳过,进行中的步骤从它最后的 cursor 处恢复。被恢复步骤的 active_job.step_started 事件带有 resumed: true,所以如果你愿意,UI 可以显示“从中断处继续”。
收尾
这些部分单独来看都不是新点子。Action Cable 广播、跟踪模型上的进度列、带 cursor 的 find_each。Rails 8.1 中新的地方在于,你需要的这三个原语(Rails.event、结构化的 Active Job 事件、Continuations)全都对得上号,所以这套集成就是几个小文件,而不是散落在每个任务里的几百行广播管道。
如果你想拿这里的任何论断去对照已发布的 Rails 8.1.3 验证,rails-event-jobs-spike 仓库里有一个 bin/run 的双线程流式演示和五个通过的集成测试。Rails.event 最初的 PR 是 rails/rails#55334,发布公告在 Rails 博客上。