Progreso en vivo de jobs en segundo plano en Rails 8.1 con Rails.event y Continuations

Mostrar a los usuarios una tarjeta de progreso en tiempo real mientras un job en segundo plano corre siempre ha dado más trabajo del que debería. Necesitas un sitio donde almacenar el progreso, código en el job que llame a broadcast_replace_to después de cada fragmento, un partial que renderice la tarjeta, un channel de Action Cable, y la disciplina de cablear esas cuatro cosas juntas para cada job que necesite progreso. La mayoría de los equipos se lo saltan.

Rails 8.1 añadió las tres primitivas que convierten esto de «cablear cuatro cosas juntas para cada job» en algo mucho más pequeño: un reporter de eventos estructurado (Rails.event), eventos estructurados emitidos por el framework para cada etapa del ciclo de vida de un Active Job, y ActiveJob::Continuable para jobs multipaso con seguimiento por cursor. Conéctalos y obtienes progreso en vivo de cara al usuario, sin llamadas broadcast_* en el cuerpo del job y sin cableado por job.

Este artículo recorre el patrón, con código verificable en un repositorio compañero.

Lo que Rails 8.1 trae gratis

Tres piezas, todas nuevas en Rails 8.1.0:

Rails.event es un reporter de eventos estructurado. Emites eventos con Rails.event.notify("user.signup", user_id: 123) y registras subscribers con Rails.event.subscribe(subscriber). Cada subscriber recibe un hash con name, payload, tags, context, timestamp y source_location. Rails.event se apoya sobre ActiveSupport::Notifications: los subscribers de eventos estructurados del framework consumen las notificaciones heredadas y las vuelven a emitir con el esquema más rico, así que para código nuevo de observabilidad estructurada, suscribirse solo a Rails.event es suficiente. Las herramientas heredadas que ya se suscriben a ActiveSupport::Notifications (APM, Skylight, etc.) siguen funcionando sin cambios.

Los eventos Active Job estructurados son emitidos por el propio Rails para cada etapa del ciclo de vida. Para cada job: active_job.enqueued, active_job.bulk_enqueued, active_job.started, active_job.completed, active_job.retry_scheduled, active_job.retry_stopped, active_job.discarded. Para los jobs Continuation también obtienes active_job.step_started, active_job.step, active_job.step_skipped, active_job.interrupt y active_job.resume. Todos fluyen a través de Rails.event.notify, así que los mismos subscribers los ven.

ActiveJob::Continuable te permite dividir un job en bloques step con nombre y cursors opcionales. El framework persiste el progreso para que el job pueda interrumpirse y reanudarse a través de reinicios de worker, despliegues y reintentos.

Los eventos de ciclo de vida y de paso del framework gestionan los estados gruesos que una UI necesita (en cola, en ejecución, fronteras de paso, hecho, fallido) sin ningún código en el job. Para el progreso a mitad de paso como «importados 47 de 200», tu job emite un pequeño evento personalizado dentro del bucle. El ejemplo de abajo hace ambas cosas.

El job

Fíjate en lo que no está en el cuerpo de abajo: nada de broadcast_replace_to, nada de update! sobre un registro de seguimiento, nada de cálculos de porcentaje de progreso. Lo único que se parece a instrumentación de progreso es step.advance!, que las Continuations necesitan de todos modos para la reanudabilidad.

class ImportContactsJob < ApplicationJob
  include ActiveJob::Continuable

  before_perform { Rails.event.set_context(user_id: arguments.first, job_id: job_id) }

  def perform(user_id, rows:)
    step :prepare { Rails.event.notify("import.prepared", total: rows.size) }
    step :process_records do |s|
      start = s.cursor || 0
      rows[start..].each_with_index do |row, i|
        Contact.create!(row.merge(user_id: user_id))
        Rails.event.notify("import.record_processed", index: start + i)
        s.advance! from: start + i
      end
    end
    step :finalize { Rails.event.notify("import.finalized") }
  end
end

El callback before_perform es la pieza portante. Mete user_id y job_id en el contexto ambiental para el resto de la ejecución, de modo que cada evento que el job emita después (incluidos los eventos de ciclo de vida del framework) los lleva consigo. El subscriber usa user_id para elegir a qué stream difundir y job_id para elegir qué elemento del DOM actualizar.

No necesitas limpiar el contexto. Rails envuelve cada job en app.reloader.wrap, y el hook to_complete del executor llama a Rails.event.clear_context automáticamente.

El subscriber

Un subscriber, registrado una sola vez. Recibe tanto los eventos de ciclo de vida del framework como tus eventos import.* personalizados a través del mismo canal.

# config/initializers/job_progress_subscriber.rb
Rails.event.subscribe(JobProgressBroadcaster.new) do |event|
  event[:context][:user_id].present? &&
    (event[:name].start_with?("active_job.") || event[:name].start_with?("import."))
end

El bloque de filtro evita que el subscriber vea el flujo de eventos de toda la aplicación. Solo los eventos bajo los namespaces active_job.* o import.* que además llevan un user_id en el contexto llegan a emit.

El broadcaster en sí es solo un case sobre los nombres de evento que convierte cada uno en una difusión Turbo Stream. La forma:

class JobProgressBroadcaster
  def emit(event)
    user_id = event[:context][:user_id]
    job_id  = event[:payload][:job_id] || event[:context][:job_id]
    update  = translate(event[:name], event[:payload])
    return unless update && user_id && job_id

    Turbo::StreamsChannel.broadcast_replace_to(
      "user_#{user_id}_jobs",
      target: "job_#{job_id}",
      partial: "jobs/progress_card",
      locals: { update: update }
    )
  end
end

translate devuelve un hash para los eventos que te importan (active_job.started, import.prepared, import.record_processed, active_job.completed, active_job.discarded) y nil para todo lo demás. El case completo está en el repositorio compañero.

La vista

La página que debe mostrar el progreso monta el stream y renderiza la tarjeta inicial:

<%= turbo_stream_from "user_#{current_user.id}_jobs" %>
<div id="job_<%= @job_id %>"><%= render "jobs/progress_card", update: { status: "queued" } %></div>

_progress_card.html.erb renderiza la forma que pida tu diseño a partir del hash update. Esto asume que tu ApplicationCable::Connection se identifica por current_user: turbo_stream_from firma el nombre del stream con el verificador de mensajes de Rails para que no pueda manipularse, pero el modelo de seguridad por usuario depende de que Action Cable sepa quién es el usuario que se conecta.

Esa es toda la pipeline de cara al usuario. Cuando ImportContactsJob corre, la página se actualiza en tiempo real. Sin polling, sin JavaScript escrito a mano, sin un channel de Cable por job.

Lo que obtienes gratis

Tres afirmaciones que vale la pena verificar. El repositorio compañero tiene un test que pasa para cada una.

Eventos de ciclo de vida. active_job.enqueued, active_job.started y active_job.completed fluyen todos a través de Rails.event con un job_id coincidente y un campo duration en completed. Tu subscriber ve las transiciones en cola / en ejecución / hecho sin ningún código en el cuerpo del job.

Eventos de paso con cursor. Cada declaración step produce un evento active_job.step_started (con los campos cursor y resumed) y un evento active_job.step (con el cursor final y duration). Los jobs reanudados llevan resumed: true en el paso que estaba en curso, así que la UI puede mostrar «Reanudando donde lo dejamos».

Contexto desde before_perform. Cada evento emitido después de que se ejecute el callback before_perform, tanto tus eventos personalizados como los eventos del framework, lleva el contexto que estableciste. Así es como el subscriber sabe al stream de qué usuario difundir.

Advertencias

Algunas cosas que el código fuente deja claras una vez que vas a mirar.

Semántica del cursor. step.advance!(from: x) llama a x.succ, así que el cursor refleja «la siguiente posición desde la que empezar al reanudar», no «el último índice procesado». Si escribes s.advance! from: i + 1 después de procesar el índice i, el cursor acaba en i + 2, no en i + 1. El patrón canónico de la documentación de Rails es s.advance! from: record.id, que fija el cursor a record.id.succ, el siguiente ID que find_each(start: cursor) recogerá. Para mostrar un porcentaje, prefiere contar en tu propio evento de progreso (Rails.event.notify("import.record_processed", index: i)) en lugar de leer el cursor.

El evento enqueued se dispara en el hilo que encola. active_job.enqueued se dispara en el momento en que perform_later retorna, en el hilo que lo llamó (tu controller, normalmente). active_job.started y todo lo que viene después se dispara en el hilo del worker. Con el adaptador :async o un adaptador de cola real, esto es lo que quieres. Con el adaptador :inline, todo el job corre de forma síncrona dentro de perform_later, así que started y completed se disparan antes que enqueued. Vale la pena saberlo si alguna vez depuras esto en test o en una consola.

Como enqueued se dispara antes de que before_perform se ejecute, no lleva el contexto que el job acabará estableciendo. Su event[:context] es lo que tuviera el hilo que encola, que típicamente está vacío. El filtro de arriba lo excluirá. Si quieres una difusión «en cola» para el usuario, renderiza el estado en cola desde el controller después de que perform_later retorne (el ejemplo de la vista hace esto), o establece Rails.event.set_context(user_id: current_user.id, job_id: job.job_id) en el controller justo antes de encolar.

arguments.first es posicional. El bloque before_perform lee arguments.first para coger user_id. Si alguna vez cambias la firma de perform de modo que user_id no sea el primer argumento posicional, actualiza el cableado del contexto al mismo tiempo o el subscriber difundirá para el usuario equivocado. Un perform(user_id:, rows:) solo de palabras clave con arguments.first[:user_id] aguanta mejor en bases de código grandes.

Independiente del adaptador, pero el timing no. SolidQueue, GoodJob, Sidekiq y el adaptador async emiten todos los mismos eventos porque los eventos vienen de Active Job, no del adaptador. Un subscriber que funciona en uno funciona en todos. La latencia entre enqueued y started, sin embargo, depende de la disponibilidad de los workers, no de Rails.

Filtrado multi-tenant. El user_id en el contexto es lo que acota las difusiones a un solo usuario. Si tu subscriber hace algo sensible (enviar notificaciones, facturar al usuario), mantén tu filtrado dentro del subscriber y no en la vista. Un bug en el filtro es mucho más fácil de detectar en un único initializer que a lo largo de todas las páginas.

Jobs de larga duración y presencia. Action Cable no reproduce las difusiones que se dispararon mientras el usuario estaba desconectado. Si alguien cierra la pestaña a mitad del job y la vuelve a abrir más tarde, no verá nada hasta que se dispare el siguiente evento, y si el job ya se ha completado no verá nada en absoluto. Persiste una fila TrackedJob actualizada por el mismo subscriber para que la página renderice el estado actual en la primera carga y el stream solo entregue las actualizaciones encima.

FAQ

What is Rails.event?

Rails 8.1 añadió un reporter de eventos estructurado en Rails.event. Llamas a Rails.event.notify(name, payload) para emitir un evento tipado, registras subscribers con Rails.event.subscribe(subscriber), y reciben un hash con name, payload, tags, context, timestamp y source_location. Se apoya sobre ActiveSupport::Notifications: los subscribers de eventos estructurados del framework consumen las notificaciones heredadas y las vuelven a emitir con el esquema más rico, así que para código nuevo de observabilidad estructurada, suscribirse solo a Rails.event es suficiente. Las herramientas heredadas que ya se suscriben a ActiveSupport::Notifications siguen funcionando sin cambios.

Does this require Rails 8.1?

Sí. Rails.event se incluye en Rails 8.1.0 (publicado el 2025-10-22). Los eventos Active Job estructurados (active_job.enqueued, started, completed, step_started, step) también se incluyen en 8.1. ActiveJob::Continuable y el DSL step también son funcionalidades de Rails 8.1.

Does this work with Sidekiq, SolidQueue, GoodJob?

Sí. Los eventos fluyen a través de Rails.event sin importar qué adaptador ejecute el job. Escribes un único subscriber y funciona en todos los adaptadores.

Do users have to refresh the page to see progress?

No. Combina el subscriber con Turbo::StreamsChannel.broadcast_replace_to para empujar las actualizaciones por Action Cable. El navegador del usuario actualiza la tarjeta de progreso en el sitio a medida que llegan los eventos.

What happens if the worker process dies mid-job?

Las Continuations guardan el progreso como puntos de control en el estado serializado del job. Cuando un worker retoma el job reintentado, los pasos completados se saltan y el paso en curso se reanuda desde su último cursor. El evento active_job.step_started del paso reanudado lleva resumed: true, así que la UI puede mostrar «Reanudando donde lo dejamos» si lo deseas.

Para terminar

Las piezas por separado no son ideas nuevas. Difusiones de Action Cable, columnas de progreso en un modelo de seguimiento, find_each con cursors. Lo nuevo en Rails 8.1 es que las tres primitivas que necesitas (Rails.event, eventos Active Job estructurados, Continuations) encajan todas, de modo que la integración son unos pocos archivos pequeños en lugar de unos cientos de líneas de fontanería de broadcast desperdigadas por cada job.

Si quieres verificar cualquiera de las afirmaciones de aquí contra Rails 8.1.3 tal como se publicó, el repositorio rails-event-jobs-spike tiene una demo de streaming a dos hilos bin/run y cinco tests de integración que pasan. La PR original de Rails.event es rails/rails#55334, y el anuncio está en el blog de Rails.