Construye una Importación CSV Reanudable con ActiveJob::Continuable

Los jobs de larga duración en segundo plano tienen una relación incómoda con los despliegues. Inicias una importación CSV de 100,000 filas, despliegas código nuevo cinco minutos después, y el worker se reinicia. El job o empieza desde el principio o muere silenciosamente. La mayoría de los equipos resuelven esto con contabilidad personalizada: una columna last_processed_id, una clave en Redis, o una gema externa como job-iteration.

Rails 8.1 incluye una solución integrada llamada ActiveJob::Continuable. Te permite dividir un job en pasos, rastrear el progreso con cursores, y reanudar automáticamente desde donde el job se quedó después de una interrupción. Sin gemas, sin máquinas de estado personalizadas.

Cómo Funcionan las Continuaciones

Incluye ActiveJob::Continuable en tu job y define pasos con el método step. Cada paso puede opcionalmente rastrear un cursor (tu marcador de progreso). Cuando el adaptador de cola señala un apagado, el job guarda su progreso y se vuelve a encolar.

class MyJob < ApplicationJob
  include ActiveJob::Continuable

  def perform
    step :first_step do
      # runs once, skipped on resume
    end

    step :second_step do |step|
      records.find_each(start: step.cursor) do |record|
        process(record)
        step.advance! from: record.id  # saves progress + checks for interruption
      end
    end

    step :cleanup do
      # runs after second_step completes
    end
  end
end

Tres conceptos clave:

  1. Los pasos se ejecutan en orden. Los pasos completados se omiten cuando el job se reanuda.
  2. Los cursores rastrean el progreso dentro de un paso. Cuando se reanuda, el cursor se restaura.
  3. Los checkpoints son los puntos seguros donde puede ocurrir una interrupción. Hay un checkpoint automático antes de cada paso (excepto el primero). Dentro de un paso, los creas llamando a step.advance!, step.set!, o step.checkpoint!.

Construyendo la Importación CSV

Construyamos un pipeline de importación real. Un usuario sube un archivo CSV a través de Active Storage. El job valida el archivo, procesa cada fila, y envía un correo de finalización.

El Modelo Import

class Import < ApplicationRecord
  has_one_attached :file
  belongs_to :user

  enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end

Y la migración:

class CreateImports < ActiveRecord::Migration[8.1]
  def change
    create_table :imports do |t|
      t.references :user, null: false, foreign_key: true
      t.integer :status, default: 0, null: false
      t.integer :processed_count, default: 0, null: false
      t.integer :total_rows, default: 0, null: false
      t.integer :error_count, default: 0, null: false
      t.jsonb :errors_log, default: []
      t.timestamps
    end
  end
end

El Job

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  self.max_resumptions = 50
  self.resume_options = { wait: 2.seconds }

  def perform(import_id)
    @import = Import.find(import_id)
    @rows = CSV.parse(@import.file.download, headers: true)

    step :validate do
      validate_csv
    end

    step :process_rows do |step|
      process_csv_rows(step)
    end

    step :finalize do
      finalize_import
    end
  end

  private

  def validate_csv
    @import.update!(status: :validating)

    required_headers = %w[email first_name last_name]
    missing = required_headers - @rows.headers

    if missing.any?
      @import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
      raise "Invalid CSV: missing headers #{missing.join(', ')}"
    end

    @import.update!(total_rows: @rows.size)
  end

  def process_csv_rows(step)
    @import.update!(status: :processing)

    start_index = step.cursor || 0

    # Using set! instead of advance! because we're tracking array indices,
    # not ActiveRecord IDs where gaps between values are common.
    @rows[start_index..].each.with_index(start_index) do |row, index|
      process_single_row(row, index)
      step.set! index + 1
    end
  end

  def process_single_row(row, index)
    user = User.find_or_initialize_by(email: row["email"])
    user.assign_attributes(
      first_name: row["first_name"],
      last_name: row["last_name"]
    )

    if user.save
      @import.increment!(:processed_count)
    else
      @import.increment!(:error_count)
      log_error(index, row, user.errors.full_messages)
    end
  end

  def log_error(index, row, messages)
    @import.errors_log << { row: index + 1, email: row["email"], messages: messages }
    @import.save!
  end

  def finalize_import
    @import.update!(status: :completed)
    ImportMailer.completed(@import).deliver_later
  end
end

Cómo Se Desarrolla Esto

Supongamos que el CSV tiene 50,000 filas y el worker se apaga después de procesar las primeras 10k filas.

  1. El paso validate ya se completó, así que está marcado como hecho.
  2. El paso process_rows tiene un cursor de 10000 (el siguiente índice a procesar).
  3. El job se vuelve a encolar con este progreso guardado.
  4. Cuando el nuevo worker lo toma, omite validate y reanuda process_rows comenzando desde la fila 10,001.
  5. Una vez que todas las filas están procesadas, finalize se ejecuta y envía el correo.

El progreso se serializa en los datos del job bajo la clave continuation, así que no se necesita almacenamiento externo.

Iniciando la Importación

class ImportsController < ApplicationController
  def create
    import = Current.user.imports.create!(file: params[:file])
    ProcessImportJob.perform_later(import.id)
    redirect_to import, notice: "Import started"
  end
end

Mostrando el Progreso

Como el modelo Import rastrea processed_count, puedes mostrar el progreso al usuario:

<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
  <p>Status: <%= @import.status.humanize %></p>

  <% if @import.processing? %>
    <p>Processed <%= @import.processed_count %> rows</p>
    <p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
  <% end %>

  <% if @import.completed? %>
    <p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
  <% end %>
</div>

Combina esto con un callback de broadcast de Turbo Stream en tu modelo Import para actualizaciones en vivo:

after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }

Pasos Aislados

Si un paso es particularmente largo y quieres garantizar que obtenga su propia ejecución de job (para que el progreso se guarde antes de que comience), usa la opción isolated: true:

step :process_rows, isolated: true do |step|
  process_csv_rows(step)
end

Esto fuerza al job a interrumpirse y volver a encolarse antes de comenzar el paso, asegurando que todo el progreso anterior esté persistido.

Configuración

Tres configuraciones a nivel de clase controlan el comportamiento de reanudación:

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  # Maximum number of times the job can be resumed (default: nil, unlimited)
  self.max_resumptions = 50

  # Options passed to retry_job when resuming (default: { wait: 5.seconds })
  self.resume_options = { wait: 2.seconds, queue: :imports }

  # Auto-resume if an error occurs after the cursor advanced (default: true)
  self.resume_errors_after_advancing = true
end

La última opción merece atención. Si tu job lanza un error después de haber avanzado (avanzó el cursor o completó un paso), se reanudará automáticamente en lugar de perder ese progreso. Esto está habilitado por defecto.

Cosas a Tener en Cuenta

El código fuera de los pasos se re-ejecuta en cada reanudación. Todo lo que esté fuera de un bloque step se ejecuta en cada ejecución, incluyendo las reanudaciones. Las líneas @import = Import.find(import_id) y el parseo del CSV al inicio de perform están bien como configuración, pero evita poner efectos secundarios ahí. Para archivos muy grandes, la re-descarga del CSV en cada reanudación podría ser lenta. Considera descargar el archivo a una ruta temporal local en un paso anterior y leer desde disco en su lugar.

Los checkpoints dentro de un paso no son automáticos. Hay un checkpoint automático entre pasos, pero dentro de un paso, el job solo verifica interrupciones cuando llamas a step.advance!, step.set!, o step.checkpoint!. Si tu paso hace diez minutos de trabajo sin un checkpoint, no puede ser interrumpido de forma elegante durante ese tiempo. Haz checkpoints con más frecuencia que el timeout de apagado de tu worker.

Soporte del adaptador de cola. Las continuaciones dependen de queue_adapter.stopping? para detectar apagados. Rails incluye stopping? implementado en los adaptadores de Sidekiq y Test. Solid Queue puede soportarlo a través del hook on_worker_stop, pero revisa la documentación de tu adaptador. Si tu adaptador no implementa stopping?, el job seguirá funcionando pero no será interrumpido de forma elegante.

Los cursores deben ser serializables. El cursor se almacena en los datos serializados del job, así que debe ser algo que ActiveJob pueda serializar: enteros, strings, arrays, o cualquier objeto que implemente la serialización de ActiveJob.

Conclusión

En lugar de construir máquinas de estado personalizadas o incorporar gemas externas, ahora puedes dejar que Rails maneje la interrupción y reanudación de jobs por ti. Define tus pasos, haz checkpoints frecuentemente, y despliega sin preocuparte por matar jobs de larga duración.

Esta funcionalidad está disponible en Rails 8.1.0 y versiones posteriores. Consulta el PR #55127 para la implementación completa.