Progression en direct des jobs en arrière-plan dans Rails 8.1 avec Rails.event et les Continuations
Afficher aux utilisateurs une carte de progression en temps réel pendant qu’un job en arrière-plan tourne a toujours demandé plus de travail que ça ne le devrait. Il vous faut un endroit pour stocker la progression, du code dans le job qui appelle broadcast_replace_to après chaque morceau, un partial qui rend la carte, un channel Action Cable, et la discipline de câbler ces quatre éléments ensemble pour chaque job qui a besoin de progression. La plupart des équipes s’en passent.
Rails 8.1 a ajouté les trois primitives qui font passer ça de « câbler quatre choses ensemble pour chaque job » à quelque chose de bien plus réduit : un reporter d’événements structuré (Rails.event), des événements structurés émis par le framework pour chaque étape du cycle de vie d’un Active Job, et ActiveJob::Continuable pour les jobs multi-étapes suivis par cursor. Reliez-les et vous obtenez une progression en direct côté utilisateur, sans appel broadcast_* dans le corps du job et sans câblage par job.
Cet article parcourt le pattern, avec du code vérifiable dans un dépôt compagnon.
Ce que Rails 8.1 livre gratuitement
Trois pièces, toutes nouvelles dans Rails 8.1.0 :
Rails.event est un reporter d’événements structuré. Vous émettez des événements avec Rails.event.notify("user.signup", user_id: 123) et vous enregistrez des subscribers avec Rails.event.subscribe(subscriber). Chaque subscriber reçoit un hash avec name, payload, tags, context, timestamp et source_location. Rails.event repose sur ActiveSupport::Notifications : les subscribers d’événements structurés du framework consomment les notifications historiques et les ré-émettent avec le schéma plus riche, donc pour du nouveau code d’observabilité structurée, s’abonner uniquement à Rails.event suffit. Les outils historiques qui s’abonnent déjà à ActiveSupport::Notifications (APM, Skylight, etc.) continuent de fonctionner sans changement.
Les événements Active Job structurés sont émis par Rails lui-même pour chaque étape du cycle de vie. Pour chaque job : active_job.enqueued, active_job.bulk_enqueued, active_job.started, active_job.completed, active_job.retry_scheduled, active_job.retry_stopped, active_job.discarded. Pour les jobs Continuation, vous obtenez aussi active_job.step_started, active_job.step, active_job.step_skipped, active_job.interrupt et active_job.resume. Ils passent tous par Rails.event.notify, donc les mêmes subscribers les voient.
ActiveJob::Continuable vous laisse découper un job en blocs step nommés, avec des cursors optionnels. Le framework persiste la progression pour que le job puisse être interrompu et repris à travers les redémarrages de worker, les déploiements et les retries.
Les événements de cycle de vie et d’étape du framework gèrent les états grossiers dont une UI a besoin (en file, en cours, frontières d’étapes, terminé, échoué) sans aucun code dans le job. Pour la progression en milieu d’étape comme « 47 sur 200 importés », votre job émet un petit événement personnalisé à l’intérieur de la boucle. L’exemple ci-dessous fait les deux.
Le job
Remarquez ce qui n’est pas dans le corps ci-dessous : pas de broadcast_replace_to, pas d’update! sur un enregistrement de suivi, pas de calcul de pourcentage de progression. La seule chose qui ressemble à de l’instrumentation de progression est step.advance!, dont les Continuations ont de toute façon besoin pour la reprise.
class ImportContactsJob < ApplicationJob
include ActiveJob::Continuable
before_perform { Rails.event.set_context(user_id: arguments.first, job_id: job_id) }
def perform(user_id, rows:)
step :prepare { Rails.event.notify("import.prepared", total: rows.size) }
step :process_records do |s|
start = s.cursor || 0
rows[start..].each_with_index do |row, i|
Contact.create!(row.merge(user_id: user_id))
Rails.event.notify("import.record_processed", index: start + i)
s.advance! from: start + i
end
end
step :finalize { Rails.event.notify("import.finalized") }
end
end
Le callback before_perform est la pièce porteuse. Il met user_id et job_id dans le contexte ambiant pour le reste de l’exécution, si bien que chaque événement que le job émet ensuite (y compris les événements de cycle de vie du framework) les transporte. Le subscriber utilise user_id pour choisir vers quel stream diffuser et job_id pour choisir quel élément du DOM mettre à jour.
Vous n’avez pas besoin de vider le contexte. Rails enveloppe chaque job dans app.reloader.wrap, et le hook to_complete de l’executor appelle automatiquement Rails.event.clear_context.
Le subscriber
Un subscriber, enregistré une seule fois. Il reçoit à la fois les événements de cycle de vie du framework et vos événements import.* personnalisés via le même canal.
# config/initializers/job_progress_subscriber.rb
Rails.event.subscribe(JobProgressBroadcaster.new) do |event|
event[:context][:user_id].present? &&
(event[:name].start_with?("active_job.") || event[:name].start_with?("import."))
end
Le bloc de filtre empêche le subscriber de voir le flux d’événements de toute l’application. Seuls les événements sous les namespaces active_job.* ou import.* qui portent aussi un user_id dans le contexte atteignent emit.
Le broadcaster lui-même n’est qu’un case sur les noms d’événements qui transforme chacun en une diffusion Turbo Stream. La forme :
class JobProgressBroadcaster
def emit(event)
user_id = event[:context][:user_id]
job_id = event[:payload][:job_id] || event[:context][:job_id]
update = translate(event[:name], event[:payload])
return unless update && user_id && job_id
Turbo::StreamsChannel.broadcast_replace_to(
"user_#{user_id}_jobs",
target: "job_#{job_id}",
partial: "jobs/progress_card",
locals: { update: update }
)
end
end
translate renvoie un hash pour les événements qui vous intéressent (active_job.started, import.prepared, import.record_processed, active_job.completed, active_job.discarded) et nil pour tout le reste. Le case complet est dans le dépôt compagnon.
La vue
La page qui doit afficher la progression monte le stream et rend la carte initiale :
<%= turbo_stream_from "user_#{current_user.id}_jobs" %>
<div id="job_<%= @job_id %>"><%= render "jobs/progress_card", update: { status: "queued" } %></div>
_progress_card.html.erb rend la forme que votre design demande à partir du hash update. Ceci suppose que votre ApplicationCable::Connection est identifié par current_user : turbo_stream_from signe le nom du stream avec le vérificateur de messages de Rails pour qu’il ne puisse pas être falsifié, mais le modèle de sécurité par utilisateur repose sur le fait qu’Action Cable sache qui est l’utilisateur connecté.
C’est là toute la pipeline côté utilisateur. Quand ImportContactsJob s’exécute, la page se met à jour en temps réel. Pas de polling, pas de JavaScript écrit à la main, pas de channel Cable par job.
Ce que vous obtenez gratuitement
Trois affirmations qui valent la peine d’être vérifiées. Le dépôt compagnon a un test qui passe pour chacune.
Événements de cycle de vie. active_job.enqueued, active_job.started et active_job.completed passent tous par Rails.event avec un job_id correspondant et un champ duration sur completed. Votre subscriber voit les transitions en file / en cours / terminé sans aucun code dans le corps du job.
Événements d’étape avec cursor. Chaque déclaration step produit un événement active_job.step_started (avec les champs cursor et resumed) et un événement active_job.step (avec le cursor final et duration). Les jobs repris ont resumed: true sur l’étape qui était en cours, donc l’UI peut afficher « On reprend là où on s’était arrêté ».
Contexte depuis before_perform. Chaque événement émis après l’exécution du callback before_perform, vos événements personnalisés comme les événements du framework, transporte le contexte que vous avez défini. C’est ainsi que le subscriber sait vers le stream de quel utilisateur diffuser.
Points de vigilance
Quelques éléments que le code source rend clairs une fois qu’on va y regarder.
Sémantique du cursor. step.advance!(from: x) appelle x.succ, donc le cursor reflète « la prochaine position d’où démarrer à la reprise », pas « le dernier index traité ». Si vous écrivez s.advance! from: i + 1 après avoir traité l’index i, le cursor finit à i + 2, pas à i + 1. Le pattern canonique de la doc Rails est s.advance! from: record.id, qui fixe le cursor à record.id.succ, le prochain ID que find_each(start: cursor) reprendra. Pour l’affichage d’un pourcentage, préférez compter dans votre propre événement de progression (Rails.event.notify("import.record_processed", index: i)) plutôt que de lire le cursor.
L’événement enqueued se déclenche sur le thread d’enqueue. active_job.enqueued se déclenche au moment où perform_later retourne, sur le thread qui l’a appelé (votre controller, en général). active_job.started et tout ce qui suit se déclenchent sur le thread du worker. Avec l’adaptateur :async ou un vrai adaptateur de queue, c’est ce que vous voulez. Avec l’adaptateur :inline, tout le job s’exécute de façon synchrone à l’intérieur de perform_later, donc started et completed se déclenchent avant enqueued. Bon à savoir si vous déboguez un jour ça en test ou en console.
Comme enqueued se déclenche avant que before_perform ne s’exécute, il ne transporte pas le contexte que le job finira par définir. Son event[:context] est ce que le thread d’enqueue avait, ce qui est typiquement vide. Le filtre ci-dessus l’exclura. Si vous voulez une diffusion « en file » pour l’utilisateur, rendez l’état en file depuis le controller après le retour de perform_later (l’exemple de vue fait ça), ou définissez Rails.event.set_context(user_id: current_user.id, job_id: job.job_id) dans le controller juste avant l’enqueue.
arguments.first est positionnel. Le bloc before_perform lit arguments.first pour récupérer user_id. Si vous changez un jour la signature de perform pour que user_id ne soit plus le premier argument positionnel, mettez à jour le câblage du contexte en même temps, sinon le subscriber diffusera pour le mauvais utilisateur. Un perform(user_id:, rows:) en mots-clés uniquement avec arguments.first[:user_id] est plus durable dans de grandes bases de code.
Indépendant de l’adaptateur, mais pas le timing. SolidQueue, GoodJob, Sidekiq et l’adaptateur async émettent tous les mêmes événements parce que les événements viennent d’Active Job, pas de l’adaptateur. Un subscriber qui fonctionne sur l’un fonctionne sur tous. La latence entre enqueued et started, en revanche, dépend de la disponibilité des workers, pas de Rails.
Filtrage multi-tenant. Le user_id dans le contexte est ce qui restreint les diffusions à un seul utilisateur. Si votre subscriber fait quoi que ce soit de sensible (envoi de notifications, facturation de l’utilisateur), gardez votre filtrage à l’intérieur du subscriber et non dans la vue. Un bug dans le filtre est bien plus facile à repérer dans un seul initializer qu’à travers chaque page.
Jobs de longue durée et présence. Action Cable ne rejoue pas les diffusions qui se sont déclenchées pendant que l’utilisateur était déconnecté. Si quelqu’un ferme l’onglet en plein job et le rouvre plus tard, il ne verra rien jusqu’au prochain événement, et si le job est déjà terminé il ne verra rien du tout. Persistez une ligne TrackedJob mise à jour par le même subscriber pour que la page rende l’état courant au premier chargement et que le stream ne fasse que livrer les mises à jour par-dessus.
FAQ
What is Rails.event?
Rails 8.1 a ajouté un reporter d’événements structuré dans Rails.event. Vous appelez Rails.event.notify(name, payload) pour émettre un événement typé, vous enregistrez des subscribers avec Rails.event.subscribe(subscriber), et ils reçoivent un hash avec name, payload, tags, context, timestamp et source_location. Il repose sur ActiveSupport::Notifications : les subscribers d’événements structurés du framework consomment les notifications historiques et les ré-émettent avec le schéma plus riche, donc pour du nouveau code d’observabilité structurée, s’abonner uniquement à Rails.event suffit. Les outils historiques qui s’abonnent déjà à ActiveSupport::Notifications continuent de fonctionner sans changement.
Does this require Rails 8.1?
Oui. Rails.event est livré dans Rails 8.1.0 (sorti le 2025-10-22). Les événements Active Job structurés (active_job.enqueued, started, completed, step_started, step) sont aussi livrés dans 8.1. ActiveJob::Continuable et le DSL step sont également des fonctionnalités de Rails 8.1.
Does this work with Sidekiq, SolidQueue, GoodJob?
Oui. Les événements passent par Rails.event quel que soit l’adaptateur qui exécute le job. Vous écrivez un seul subscriber et il fonctionne sur tous les adaptateurs.
Do users have to refresh the page to see progress?
Non. Associez le subscriber à Turbo::StreamsChannel.broadcast_replace_to pour pousser les mises à jour via Action Cable. Le navigateur de l’utilisateur met à jour la carte de progression sur place à mesure que les événements arrivent.
What happens if the worker process dies mid-job?
Les Continuations enregistrent la progression sous forme de points de contrôle dans l’état sérialisé du job. Quand un worker reprend le job retenté, les étapes terminées sont sautées et l’étape en cours reprend depuis son dernier cursor. L’événement active_job.step_started de l’étape reprise porte resumed: true, donc l’UI peut afficher « On reprend là où on s’était arrêté » si vous le souhaitez.
Pour conclure
Les pièces prises individuellement ne sont pas des idées neuves. Diffusions Action Cable, colonnes de progression sur un modèle de suivi, find_each avec cursors. Ce qui est nouveau dans Rails 8.1, c’est que les trois primitives dont vous avez besoin (Rails.event, événements Active Job structurés, Continuations) s’alignent toutes, si bien que l’intégration tient en quelques petits fichiers au lieu de quelques centaines de lignes de plomberie de broadcast dispersées dans chaque job.
Si vous voulez vérifier l’une des affirmations ici contre Rails 8.1.3 tel que livré, le dépôt rails-event-jobs-spike a une démo de streaming à deux threads bin/run et cinq tests d’intégration qui passent. La PR d’origine pour Rails.event est rails/rails#55334, et l’annonce est sur le blog Rails.