Rails 8.1のRails.eventとContinuationsでバックグラウンドジョブの進捗をライブ表示する

バックグラウンドジョブの実行中にリアルタイムの進捗カードをユーザーに見せることは、本来あるべき以上にずっと手間のかかる作業でした。進捗を保存する場所、チャンクごとに broadcast_replace_to を呼ぶジョブ内のコード、カードを描画するパーシャル、Action Cableチャンネル、そして進捗が必要なすべてのジョブでこの4つを配線し続ける規律が必要です。たいていのチームはこれを省略します。

Rails 8.1は、これを「ジョブごとに4つのものを配線する」から、もっと小さなものへと変える3つのプリミティブを追加しました。構造化イベントレポーター(Rails.event)、Active Jobのすべてのライフサイクル段階に対してフレームワークが発行する構造化イベント、そしてカーソル追跡付きの複数ステップジョブのための ActiveJob::Continuable です。これらをつなぐと、ジョブ本体に broadcast_* の呼び出しもジョブごとの配線もなしに、ユーザー向けのライブ進捗が手に入ります。

この記事ではそのパターンを、コンパニオンリポジトリの検証可能なコードとともに見ていきます。

Rails 8.1が無償で提供するもの

Rails 8.1.0 で新しく入った3つのピース:

Rails.event は構造化イベントレポーターです。Rails.event.notify("user.signup", user_id: 123) でイベントを発行し、Rails.event.subscribe(subscriber) でサブスクライバを登録します。各サブスクライバは namepayloadtagscontexttimestampsource_location を持つハッシュを受け取ります。Rails.eventActiveSupport::Notifications の上に乗っています。フレームワークの構造化イベントサブスクライバがレガシーな通知を消費し、よりリッチなスキーマで再発行するため、新しい構造化オブザーバビリティのコードでは Rails.event だけをサブスクライブすれば十分です。すでに ActiveSupport::Notifications をサブスクライブしている既存のツール(APM、Skylightなど)は変更なしでそのまま動き続けます。

構造化されたActive Jobイベント は、すべてのライフサイクル段階に対してRails自身が発行します。すべてのジョブで: active_job.enqueuedactive_job.bulk_enqueuedactive_job.startedactive_job.completedactive_job.retry_scheduledactive_job.retry_stoppedactive_job.discarded。Continuationジョブではさらに active_job.step_startedactive_job.stepactive_job.step_skippedactive_job.interruptactive_job.resume も得られます。これらはすべて Rails.event.notify を通って流れるので、同じサブスクライバがそれらを見られます。

ActiveJob::Continuable は、ジョブを名前付きの step ブロックに、オプションのカーソル付きで分割できるようにします。フレームワークが進捗を永続化するため、ワーカーの再起動、デプロイ、リトライをまたいでジョブを中断・再開できます。

フレームワークのライフサイクルイベントとステップイベントは、UIが必要とする粗い状態(キュー投入済み、実行中、ステップの境界、完了、失敗)を、ジョブ内のコードなしで処理します。「200件中47件をインポート」のようなステップ途中の進捗については、ジョブがループの内側で小さなカスタムイベントを発行します。下の例は両方を行います。

ジョブ

下の本体に ない ものに注目してください。broadcast_replace_to も、トラッキングレコードへの update! も、進捗パーセンテージの計算もありません。進捗計測らしきものは step.advance! だけですが、これはそもそも再開可能性のためにContinuationsが必要とするものです。

class ImportContactsJob < ApplicationJob
  include ActiveJob::Continuable

  before_perform { Rails.event.set_context(user_id: arguments.first, job_id: job_id) }

  def perform(user_id, rows:)
    step :prepare { Rails.event.notify("import.prepared", total: rows.size) }
    step :process_records do |s|
      start = s.cursor || 0
      rows[start..].each_with_index do |row, i|
        Contact.create!(row.merge(user_id: user_id))
        Rails.event.notify("import.record_processed", index: start + i)
        s.advance! from: start + i
      end
    end
    step :finalize { Rails.event.notify("import.finalized") }
  end
end

before_perform コールバックが要となるピースです。これは user_idjob_id を残りの実行のためのアンビエントなコンテキストに入れるので、その後ジョブが発行するすべてのイベント(フレームワークのライフサイクルイベントを含む)がそれらを運びます。サブスクライバは user_id を使ってどのストリームへブロードキャストするかを選び、job_id を使ってどのDOM要素を更新するかを選びます。

コンテキストをクリアする必要はありません。Railsは各ジョブを app.reloader.wrap で包み、エグゼキュータの to_complete フックが自動的に Rails.event.clear_context を呼びます。

サブスクライバ

サブスクライバは1つ、登録も1回だけ。フレームワークのライフサイクルイベントと、あなたのカスタムな import.* イベントの両方を、同じチャンネルを通して受け取ります。

# config/initializers/job_progress_subscriber.rb
Rails.event.subscribe(JobProgressBroadcaster.new) do |event|
  event[:context][:user_id].present? &&
    (event[:name].start_with?("active_job.") || event[:name].start_with?("import."))
end

フィルタブロックは、サブスクライバがアプリケーション全体のイベントストリームを見ないようにします。active_job.* または import.* の名前空間の下にあり、かつコンテキストに user_id を運ぶイベントだけが emit に届きます。

ブロードキャスタ自体は、イベント名に対する case で、それぞれをTurbo Streamのブロードキャストに変えるだけです。その形:

class JobProgressBroadcaster
  def emit(event)
    user_id = event[:context][:user_id]
    job_id  = event[:payload][:job_id] || event[:context][:job_id]
    update  = translate(event[:name], event[:payload])
    return unless update && user_id && job_id

    Turbo::StreamsChannel.broadcast_replace_to(
      "user_#{user_id}_jobs",
      target: "job_#{job_id}",
      partial: "jobs/progress_card",
      locals: { update: update }
    )
  end
end

translate は、関心のあるイベント(active_job.startedimport.preparedimport.record_processedactive_job.completedactive_job.discarded)についてはハッシュを返し、それ以外はすべて nil を返します。完全な caseコンパニオンリポジトリにあります。

ビュー

進捗を表示すべきページは、ストリームをマウントして初期カードを描画します:

<%= turbo_stream_from "user_#{current_user.id}_jobs" %>
<div id="job_<%= @job_id %>"><%= render "jobs/progress_card", update: { status: "queued" } %></div>

_progress_card.html.erb は、update ハッシュから、デザインが求めるどんな形でも描画します。これは ApplicationCable::Connectioncurrent_user で識別されていることを前提とします。turbo_stream_from はRailsのメッセージ検証器でストリーム名を署名するので改ざんできませんが、ユーザーごとのセキュリティモデルは、接続中のユーザーが誰かをAction Cableが知っていることに依存します。

これがユーザー向けパイプラインのすべてです。ImportContactsJob が実行されると、ページはリアルタイムで更新されます。ポーリングも、手書きのJavaScriptも、ジョブごとのCableチャンネルもありません。

無償で手に入るもの

検証する価値のある3つの主張。コンパニオンリポジトリには、それぞれに対するパスするテストがあります。

ライフサイクルイベント。 active_job.enqueuedactive_job.startedactive_job.completed はすべて、一致する job_id と、completed 上の duration フィールドを伴って Rails.event を通って流れます。サブスクライバは、ジョブ本体に一切コードを書かずに、キュー投入/実行中/完了の遷移を見られます。

カーソル付きステップイベント。step の宣言は、active_job.step_started イベント(cursorresumed フィールド付き)と active_job.step イベント(最終的な cursorduration 付き)を生成します。再開されたジョブは、進行中だったステップに resumed: true を持つので、UIは「中断したところから再開しています」と表示できます。

before_perform からのコンテキスト。 before_perform コールバックが走った後に発行されるすべてのイベントは、カスタムイベントもフレームワークイベントも、あなたが設定したコンテキストを運びます。これが、どのユーザーのストリームへブロードキャストすべきかをサブスクライバが知る仕組みです。

注意点

ソースコードを実際に見てみると明らかになる、いくつかのこと。

カーソルのセマンティクス。 step.advance!(from: x)x.succ を呼ぶので、カーソルは「最後に処理したインデックス」ではなく「再開時に開始する次の位置」を反映します。インデックス i を処理した後に s.advance! from: i + 1 と書くと、カーソルは i + 1 ではなく i + 2 になります。Railsドキュメントにある正規のパターンは s.advance! from: record.id で、これはカーソルを record.id.succ、つまり find_each(start: cursor) が次に拾うIDに設定します。パーセンテージ表示には、カーソルを読むのではなく、自前の進捗イベント(Rails.event.notify("import.record_processed", index: i))の中でカウントする方を選んでください。

enqueued イベントはエンキューしたスレッドで発火する。 active_job.enqueuedperform_later が返った瞬間に、それを呼んだスレッド(通常はあなたのコントローラ)で発火します。active_job.started とそれ以降はすべてワーカースレッドで発火します。:async アダプタや本物のキューアダプタなら、これは望ましい挙動です。:inline アダプタの場合、ジョブ全体が perform_later の中で同期的に実行されるので、startedcompletedenqueued より に発火します。テストやコンソールでこれをデバッグするとき、知っておく価値があります。

enqueuedbefore_perform が走る前に発火するので、ジョブが最終的に設定するコンテキストを運びません。その event[:context] はエンキューしたスレッドが持っていたもので、通常は空です。上のフィルタはこれを除外します。ユーザー向けに「キュー投入済み」のブロードキャストが欲しいなら、perform_later が返った後にコントローラからキュー投入済みの状態を描画する(ビューの例はこれを行っています)か、エンキュー直前にコントローラで Rails.event.set_context(user_id: current_user.id, job_id: job.job_id) を設定してください。

arguments.first は位置引数。 before_perform ブロックは arguments.first を読んで user_id を取得します。perform のシグネチャを変えて user_id が最初の位置引数でなくなった場合、同時にコンテキストの配線も更新しないと、サブスクライバが間違ったユーザー向けにブロードキャストします。arguments.first[:user_id] を伴うキーワードのみの perform(user_id:, rows:) の方が、大規模なコードベースではより耐久性があります。

アダプタに依存しないが、タイミングは依存する。 SolidQueue、GoodJob、Sidekiq、asyncアダプタはすべて同じイベントを発行します。イベントはアダプタからではなくActive Jobから来るからです。1つのアダプタで動くサブスクライバはすべてで動きます。ただし enqueuedstarted の間のレイテンシは、Railsではなくワーカーの空き状況に依存します。

マルチテナントのフィルタリング。 コンテキスト内の user_id が、ブロードキャストを単一ユーザーにスコープするものです。サブスクライバが何か機微なこと(通知の送信、ユーザーへの課金)をしているなら、フィルタリングはビューではなくサブスクライバの中に置いてください。フィルタのバグは、すべてのページにまたがるよりも、1つのイニシャライザの中にある方がずっと見つけやすいです。

長時間実行ジョブとプレゼンス。 Action Cableは、ユーザーが切断中に発火したブロードキャストを再送しません。ジョブの途中で誰かがタブを閉じて後で開き直すと、次のイベントが発火するまで何も見えず、ジョブがすでに完了していれば何も見えないままです。同じサブスクライバが更新する TrackedJob の行を永続化して、ページが初回ロードで現在の状態を描画し、ストリームはその上に更新を届けるだけにしてください。

FAQ

What is Rails.event?

Rails 8.1は Rails.event に構造化イベントレポーターを追加しました。Rails.event.notify(name, payload) を呼んで型付きイベントを発行し、Rails.event.subscribe(subscriber) でサブスクライバを登録すると、namepayloadtagscontexttimestampsource_location を持つハッシュを受け取ります。これは ActiveSupport::Notifications の上に乗っています。フレームワークの構造化イベントサブスクライバがレガシーな通知を消費し、よりリッチなスキーマで再発行するため、新しい構造化オブザーバビリティのコードでは Rails.event だけをサブスクライブすれば十分です。すでに ActiveSupport::Notifications をサブスクライブしている既存のツールは変更なしでそのまま動き続けます。

Does this require Rails 8.1?

はい。Rails.event はRails 8.1.0(2025-10-22リリース)で導入されました。構造化されたActive Jobイベント(active_job.enqueuedstartedcompletedstep_startedstep)も8.1で導入されます。ActiveJob::Continuablestep DSLもRails 8.1の機能です。

Does this work with Sidekiq, SolidQueue, GoodJob?

はい。イベントは、どのアダプタがジョブを実行するかに関係なく Rails.event を通って流れます。サブスクライバを1つ書けば、あらゆるアダプタで動作します。

Do users have to refresh the page to see progress?

いいえ。サブスクライバと Turbo::StreamsChannel.broadcast_replace_to を組み合わせて、Action Cable経由で更新をプッシュします。イベントが届くたびに、ユーザーのブラウザが進捗カードをその場で更新します。

What happens if the worker process dies mid-job?

Continuationsはジョブのシリアライズされた状態に進捗をチェックポイントとして記録します。ワーカーがリトライされたジョブを拾うと、完了済みのステップはスキップされ、進行中だったステップは最後のカーソルから再開します。再開されたステップの active_job.step_started イベントは resumed: true を持つので、必要なら「中断したところから再開しています」とUIに表示できます。

まとめ

個々のピースは新しいアイデアではありません。Action Cableのブロードキャスト、トラッキングモデル上の進捗カラム、カーソル付きの find_each。Rails 8.1で新しいのは、必要な3つのプリミティブ(Rails.event、構造化されたActive Jobイベント、Continuations)がすべて噛み合うことです。だから統合は、すべてのジョブに散らばった数百行のブロードキャスト配管ではなく、いくつかの小さなファイルで済みます。

ここでの主張をリリース済みのRails 8.1.3に対して検証したい場合、rails-event-jobs-spikeリポジトリには bin/run の2スレッドストリーミングデモと、5つのパスする統合テストがあります。Rails.event の元のPRは rails/rails#55334、アナウンスはRailsブログにあります。