Construire un import CSV reprise apres interruption avec ActiveJob::Continuable
Les jobs d’arriere-plan de longue duree ont une relation delicate avec les deploiements. Vous lancez un import CSV de 100 000 lignes, deployez du nouveau code cinq minutes plus tard, et le worker redemarre. Le job reprend soit depuis le debut, soit meurt silencieusement. La plupart des equipes resolvent ce probleme avec du suivi manuel : une colonne last_processed_id, une cle Redis, ou un gem externe comme job-iteration.
Rails 8.1 propose une solution integree appelee ActiveJob::Continuable. Elle vous permet de decouper un job en etapes, de suivre la progression avec des curseurs, et de reprendre automatiquement la ou le job s’est arrete apres une interruption. Pas de gem, pas de machine a etats personnalisee.
Comment fonctionnent les continuations
Incluez ActiveJob::Continuable dans votre job et definissez des etapes avec la methode step. Chaque etape peut optionnellement suivre un curseur (votre marqueur de progression). Lorsque l’adaptateur de file d’attente signale un arret, le job sauvegarde sa progression et se remet en file d’attente.
class MyJob < ApplicationJob
include ActiveJob::Continuable
def perform
step :first_step do
# runs once, skipped on resume
end
step :second_step do |step|
records.find_each(start: step.cursor) do |record|
process(record)
step.advance! from: record.id # saves progress + checks for interruption
end
end
step :cleanup do
# runs after second_step completes
end
end
end
Trois concepts cles :
- Les etapes s’executent dans l’ordre. Les etapes terminees sont ignorees lorsque le job reprend.
- Les curseurs suivent la progression au sein d’une etape. Lors de la reprise, le curseur est restaure.
- Les points de controle sont les points surs ou l’interruption peut se produire. Il y a un point de controle automatique avant chaque etape (sauf la premiere). Au sein d’une etape, vous les creez en appelant
step.advance!,step.set!, oustep.checkpoint!.
Construire l’import CSV
Construisons un vrai pipeline d’import. Un utilisateur televerse un fichier CSV via Active Storage. Le job valide le fichier, traite chaque ligne, et envoie un email de fin.
Le modele Import
class Import < ApplicationRecord
has_one_attached :file
belongs_to :user
enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end
Et la migration :
class CreateImports < ActiveRecord::Migration[8.1]
def change
create_table :imports do |t|
t.references :user, null: false, foreign_key: true
t.integer :status, default: 0, null: false
t.integer :processed_count, default: 0, null: false
t.integer :total_rows, default: 0, null: false
t.integer :error_count, default: 0, null: false
t.jsonb :errors_log, default: []
t.timestamps
end
end
end
Le Job
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
self.max_resumptions = 50
self.resume_options = { wait: 2.seconds }
def perform(import_id)
@import = Import.find(import_id)
@rows = CSV.parse(@import.file.download, headers: true)
step :validate do
validate_csv
end
step :process_rows do |step|
process_csv_rows(step)
end
step :finalize do
finalize_import
end
end
private
def validate_csv
@import.update!(status: :validating)
required_headers = %w[email first_name last_name]
missing = required_headers - @rows.headers
if missing.any?
@import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
raise "Invalid CSV: missing headers #{missing.join(', ')}"
end
@import.update!(total_rows: @rows.size)
end
def process_csv_rows(step)
@import.update!(status: :processing)
start_index = step.cursor || 0
# Using set! instead of advance! because we're tracking array indices,
# not ActiveRecord IDs where gaps between values are common.
@rows[start_index..].each.with_index(start_index) do |row, index|
process_single_row(row, index)
step.set! index + 1
end
end
def process_single_row(row, index)
user = User.find_or_initialize_by(email: row["email"])
user.assign_attributes(
first_name: row["first_name"],
last_name: row["last_name"]
)
if user.save
@import.increment!(:processed_count)
else
@import.increment!(:error_count)
log_error(index, row, user.errors.full_messages)
end
end
def log_error(index, row, messages)
@import.errors_log << { row: index + 1, email: row["email"], messages: messages }
@import.save!
end
def finalize_import
@import.update!(status: :completed)
ImportMailer.completed(@import).deliver_later
end
end
Comment cela se deroule
Supposons que le CSV contienne 50 000 lignes et que le worker s’arrete apres avoir traite les 10 000 premieres lignes.
- L’etape
validateest deja terminee, elle est donc marquee comme faite. - L’etape
process_rowsa un curseur a10000(le prochain index a traiter). - Le job se remet en file d’attente avec cette progression sauvegardee.
- Lorsque le nouveau worker le reprend, il saute
validateet reprendprocess_rowsa partir de la ligne 10 001. - Une fois toutes les lignes traitees,
finalizes’execute et envoie l’email.
La progression est serialisee dans les donnees du job sous la cle continuation, donc aucun stockage externe n’est necessaire.
Lancer l’import
class ImportsController < ApplicationController
def create
import = Current.user.imports.create!(file: params[:file])
ProcessImportJob.perform_later(import.id)
redirect_to import, notice: "Import started"
end
end
Afficher la progression
Puisque le modele Import suit le processed_count, vous pouvez afficher la progression a l’utilisateur :
<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
<p>Status: <%= @import.status.humanize %></p>
<% if @import.processing? %>
<p>Processed <%= @import.processed_count %> rows</p>
<p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
<% end %>
<% if @import.completed? %>
<p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
<% end %>
</div>
Combinez cela avec un callback Turbo Stream broadcast sur votre modele Import pour des mises a jour en temps reel :
after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }
Etapes isolees
Si une etape est particulierement longue et que vous voulez garantir qu’elle obtient sa propre execution de job (pour que la progression soit sauvegardee avant qu’elle ne commence), utilisez l’option isolated: true :
step :process_rows, isolated: true do |step|
process_csv_rows(step)
end
Cela force le job a s’interrompre et a se remettre en file d’attente avant de demarrer l’etape, garantissant que toute la progression precedente est persistee.
Configuration
Trois parametres au niveau de la classe controlent le comportement de reprise :
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
# Maximum number of times the job can be resumed (default: nil, unlimited)
self.max_resumptions = 50
# Options passed to retry_job when resuming (default: { wait: 5.seconds })
self.resume_options = { wait: 2.seconds, queue: :imports }
# Auto-resume if an error occurs after the cursor advanced (default: true)
self.resume_errors_after_advancing = true
end
La derniere option merite une attention particuliere. Si votre job leve une erreur apres avoir progresse (avance le curseur ou termine une etape), il reprendra automatiquement au lieu de perdre cette progression. Ce comportement est active par defaut.
Points d’attention
Le code en dehors des etapes se re-execute a chaque reprise. Tout ce qui se trouve en dehors d’un bloc step s’execute a chaque execution, y compris lors des reprises. Les lignes @import = Import.find(import_id) et de parsing CSV en haut de perform conviennent comme initialisation, mais evitez d’y placer des effets de bord. Pour les fichiers tres volumineux, le re-telechargement du CSV a chaque reprise peut etre lent. Envisagez de telecharger le fichier vers un chemin temporaire local dans une etape precedente et de lire depuis le disque a la place.
Les points de controle au sein d’une etape ne sont pas automatiques. Il y a un point de controle automatique entre les etapes, mais au sein d’une etape, le job ne verifie l’interruption que lorsque vous appelez step.advance!, step.set!, ou step.checkpoint!. Si votre etape effectue dix minutes de travail sans point de controle, elle ne peut pas etre interrompue proprement pendant ce temps. Placez des points de controle plus souvent que le delai d’arret de votre worker.
Support de l’adaptateur de file d’attente. Les continuations reposent sur queue_adapter.stopping? pour detecter les arrets. Rails fournit une implementation de stopping? dans les adaptateurs Sidekiq et Test. Solid Queue peut le supporter via le hook on_worker_stop, mais verifiez la documentation de votre adaptateur. Si votre adaptateur n’implemente pas stopping?, le job fonctionnera toujours mais ne sera pas interrompu proprement.
Les curseurs doivent etre serialisables. Le curseur est stocke dans les donnees serialisees du job, il doit donc etre quelque chose qu’ActiveJob peut serialiser : entiers, chaines de caracteres, tableaux, ou tout objet qui implemente la serialisation ActiveJob.
Pour conclure
Au lieu de construire des machines a etats personnalisees ou d’ajouter des gems externes, vous pouvez desormais laisser Rails gerer l’interruption et la reprise des jobs pour vous. Definissez vos etapes, placez des points de controle frequemment, et deployez sans vous soucier de tuer les jobs de longue duree.
Cette fonctionnalite est disponible dans Rails 8.1.0 et versions ulterieures. Consultez la PR #55127 pour l’implementation complete.