Construire un import CSV reprise apres interruption avec ActiveJob::Continuable

Les jobs d’arriere-plan de longue duree ont une relation delicate avec les deploiements. Vous lancez un import CSV de 100 000 lignes, deployez du nouveau code cinq minutes plus tard, et le worker redemarre. Le job reprend soit depuis le debut, soit meurt silencieusement. La plupart des equipes resolvent ce probleme avec du suivi manuel : une colonne last_processed_id, une cle Redis, ou un gem externe comme job-iteration.

Rails 8.1 propose une solution integree appelee ActiveJob::Continuable. Elle vous permet de decouper un job en etapes, de suivre la progression avec des curseurs, et de reprendre automatiquement la ou le job s’est arrete apres une interruption. Pas de gem, pas de machine a etats personnalisee.

Comment fonctionnent les continuations

Incluez ActiveJob::Continuable dans votre job et definissez des etapes avec la methode step. Chaque etape peut optionnellement suivre un curseur (votre marqueur de progression). Lorsque l’adaptateur de file d’attente signale un arret, le job sauvegarde sa progression et se remet en file d’attente.

class MyJob < ApplicationJob
  include ActiveJob::Continuable

  def perform
    step :first_step do
      # runs once, skipped on resume
    end

    step :second_step do |step|
      records.find_each(start: step.cursor) do |record|
        process(record)
        step.advance! from: record.id  # saves progress + checks for interruption
      end
    end

    step :cleanup do
      # runs after second_step completes
    end
  end
end

Trois concepts cles :

  1. Les etapes s’executent dans l’ordre. Les etapes terminees sont ignorees lorsque le job reprend.
  2. Les curseurs suivent la progression au sein d’une etape. Lors de la reprise, le curseur est restaure.
  3. Les points de controle sont les points surs ou l’interruption peut se produire. Il y a un point de controle automatique avant chaque etape (sauf la premiere). Au sein d’une etape, vous les creez en appelant step.advance!, step.set!, ou step.checkpoint!.

Construire l’import CSV

Construisons un vrai pipeline d’import. Un utilisateur televerse un fichier CSV via Active Storage. Le job valide le fichier, traite chaque ligne, et envoie un email de fin.

Le modele Import

class Import < ApplicationRecord
  has_one_attached :file
  belongs_to :user

  enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end

Et la migration :

class CreateImports < ActiveRecord::Migration[8.1]
  def change
    create_table :imports do |t|
      t.references :user, null: false, foreign_key: true
      t.integer :status, default: 0, null: false
      t.integer :processed_count, default: 0, null: false
      t.integer :total_rows, default: 0, null: false
      t.integer :error_count, default: 0, null: false
      t.jsonb :errors_log, default: []
      t.timestamps
    end
  end
end

Le Job

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  self.max_resumptions = 50
  self.resume_options = { wait: 2.seconds }

  def perform(import_id)
    @import = Import.find(import_id)
    @rows = CSV.parse(@import.file.download, headers: true)

    step :validate do
      validate_csv
    end

    step :process_rows do |step|
      process_csv_rows(step)
    end

    step :finalize do
      finalize_import
    end
  end

  private

  def validate_csv
    @import.update!(status: :validating)

    required_headers = %w[email first_name last_name]
    missing = required_headers - @rows.headers

    if missing.any?
      @import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
      raise "Invalid CSV: missing headers #{missing.join(', ')}"
    end

    @import.update!(total_rows: @rows.size)
  end

  def process_csv_rows(step)
    @import.update!(status: :processing)

    start_index = step.cursor || 0

    # Using set! instead of advance! because we're tracking array indices,
    # not ActiveRecord IDs where gaps between values are common.
    @rows[start_index..].each.with_index(start_index) do |row, index|
      process_single_row(row, index)
      step.set! index + 1
    end
  end

  def process_single_row(row, index)
    user = User.find_or_initialize_by(email: row["email"])
    user.assign_attributes(
      first_name: row["first_name"],
      last_name: row["last_name"]
    )

    if user.save
      @import.increment!(:processed_count)
    else
      @import.increment!(:error_count)
      log_error(index, row, user.errors.full_messages)
    end
  end

  def log_error(index, row, messages)
    @import.errors_log << { row: index + 1, email: row["email"], messages: messages }
    @import.save!
  end

  def finalize_import
    @import.update!(status: :completed)
    ImportMailer.completed(@import).deliver_later
  end
end

Comment cela se deroule

Supposons que le CSV contienne 50 000 lignes et que le worker s’arrete apres avoir traite les 10 000 premieres lignes.

  1. L’etape validate est deja terminee, elle est donc marquee comme faite.
  2. L’etape process_rows a un curseur a 10000 (le prochain index a traiter).
  3. Le job se remet en file d’attente avec cette progression sauvegardee.
  4. Lorsque le nouveau worker le reprend, il saute validate et reprend process_rows a partir de la ligne 10 001.
  5. Une fois toutes les lignes traitees, finalize s’execute et envoie l’email.

La progression est serialisee dans les donnees du job sous la cle continuation, donc aucun stockage externe n’est necessaire.

Lancer l’import

class ImportsController < ApplicationController
  def create
    import = Current.user.imports.create!(file: params[:file])
    ProcessImportJob.perform_later(import.id)
    redirect_to import, notice: "Import started"
  end
end

Afficher la progression

Puisque le modele Import suit le processed_count, vous pouvez afficher la progression a l’utilisateur :

<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
  <p>Status: <%= @import.status.humanize %></p>

  <% if @import.processing? %>
    <p>Processed <%= @import.processed_count %> rows</p>
    <p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
  <% end %>

  <% if @import.completed? %>
    <p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
  <% end %>
</div>

Combinez cela avec un callback Turbo Stream broadcast sur votre modele Import pour des mises a jour en temps reel :

after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }

Etapes isolees

Si une etape est particulierement longue et que vous voulez garantir qu’elle obtient sa propre execution de job (pour que la progression soit sauvegardee avant qu’elle ne commence), utilisez l’option isolated: true :

step :process_rows, isolated: true do |step|
  process_csv_rows(step)
end

Cela force le job a s’interrompre et a se remettre en file d’attente avant de demarrer l’etape, garantissant que toute la progression precedente est persistee.

Configuration

Trois parametres au niveau de la classe controlent le comportement de reprise :

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  # Maximum number of times the job can be resumed (default: nil, unlimited)
  self.max_resumptions = 50

  # Options passed to retry_job when resuming (default: { wait: 5.seconds })
  self.resume_options = { wait: 2.seconds, queue: :imports }

  # Auto-resume if an error occurs after the cursor advanced (default: true)
  self.resume_errors_after_advancing = true
end

La derniere option merite une attention particuliere. Si votre job leve une erreur apres avoir progresse (avance le curseur ou termine une etape), il reprendra automatiquement au lieu de perdre cette progression. Ce comportement est active par defaut.

Points d’attention

Le code en dehors des etapes se re-execute a chaque reprise. Tout ce qui se trouve en dehors d’un bloc step s’execute a chaque execution, y compris lors des reprises. Les lignes @import = Import.find(import_id) et de parsing CSV en haut de perform conviennent comme initialisation, mais evitez d’y placer des effets de bord. Pour les fichiers tres volumineux, le re-telechargement du CSV a chaque reprise peut etre lent. Envisagez de telecharger le fichier vers un chemin temporaire local dans une etape precedente et de lire depuis le disque a la place.

Les points de controle au sein d’une etape ne sont pas automatiques. Il y a un point de controle automatique entre les etapes, mais au sein d’une etape, le job ne verifie l’interruption que lorsque vous appelez step.advance!, step.set!, ou step.checkpoint!. Si votre etape effectue dix minutes de travail sans point de controle, elle ne peut pas etre interrompue proprement pendant ce temps. Placez des points de controle plus souvent que le delai d’arret de votre worker.

Support de l’adaptateur de file d’attente. Les continuations reposent sur queue_adapter.stopping? pour detecter les arrets. Rails fournit une implementation de stopping? dans les adaptateurs Sidekiq et Test. Solid Queue peut le supporter via le hook on_worker_stop, mais verifiez la documentation de votre adaptateur. Si votre adaptateur n’implemente pas stopping?, le job fonctionnera toujours mais ne sera pas interrompu proprement.

Les curseurs doivent etre serialisables. Le curseur est stocke dans les donnees serialisees du job, il doit donc etre quelque chose qu’ActiveJob peut serialiser : entiers, chaines de caracteres, tableaux, ou tout objet qui implemente la serialisation ActiveJob.

Pour conclure

Au lieu de construire des machines a etats personnalisees ou d’ajouter des gems externes, vous pouvez desormais laisser Rails gerer l’interruption et la reprise des jobs pour vous. Definissez vos etapes, placez des points de controle frequemment, et deployez sans vous soucier de tuer les jobs de longue duree.

Cette fonctionnalite est disponible dans Rails 8.1.0 et versions ulterieures. Consultez la PR #55127 pour l’implementation complete.