Construye una Importación CSV Reanudable con ActiveJob::Continuable
Los jobs de larga duración en segundo plano tienen una relación incómoda con los despliegues. Inicias una importación CSV de 100,000 filas, despliegas código nuevo cinco minutos después, y el worker se reinicia. El job o empieza desde el principio o muere silenciosamente. La mayoría de los equipos resuelven esto con contabilidad personalizada: una columna last_processed_id, una clave en Redis, o una gema externa como job-iteration.
Rails 8.1 incluye una solución integrada llamada ActiveJob::Continuable. Te permite dividir un job en pasos, rastrear el progreso con cursores, y reanudar automáticamente desde donde el job se quedó después de una interrupción. Sin gemas, sin máquinas de estado personalizadas.
Cómo Funcionan las Continuaciones
Incluye ActiveJob::Continuable en tu job y define pasos con el método step. Cada paso puede opcionalmente rastrear un cursor (tu marcador de progreso). Cuando el adaptador de cola señala un apagado, el job guarda su progreso y se vuelve a encolar.
class MyJob < ApplicationJob
include ActiveJob::Continuable
def perform
step :first_step do
# runs once, skipped on resume
end
step :second_step do |step|
records.find_each(start: step.cursor) do |record|
process(record)
step.advance! from: record.id # saves progress + checks for interruption
end
end
step :cleanup do
# runs after second_step completes
end
end
end
Tres conceptos clave:
- Los pasos se ejecutan en orden. Los pasos completados se omiten cuando el job se reanuda.
- Los cursores rastrean el progreso dentro de un paso. Cuando se reanuda, el cursor se restaura.
- Los checkpoints son los puntos seguros donde puede ocurrir una interrupción. Hay un checkpoint automático antes de cada paso (excepto el primero). Dentro de un paso, los creas llamando a
step.advance!,step.set!, ostep.checkpoint!.
Construyendo la Importación CSV
Construyamos un pipeline de importación real. Un usuario sube un archivo CSV a través de Active Storage. El job valida el archivo, procesa cada fila, y envía un correo de finalización.
El Modelo Import
class Import < ApplicationRecord
has_one_attached :file
belongs_to :user
enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end
Y la migración:
class CreateImports < ActiveRecord::Migration[8.1]
def change
create_table :imports do |t|
t.references :user, null: false, foreign_key: true
t.integer :status, default: 0, null: false
t.integer :processed_count, default: 0, null: false
t.integer :total_rows, default: 0, null: false
t.integer :error_count, default: 0, null: false
t.jsonb :errors_log, default: []
t.timestamps
end
end
end
El Job
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
self.max_resumptions = 50
self.resume_options = { wait: 2.seconds }
def perform(import_id)
@import = Import.find(import_id)
@rows = CSV.parse(@import.file.download, headers: true)
step :validate do
validate_csv
end
step :process_rows do |step|
process_csv_rows(step)
end
step :finalize do
finalize_import
end
end
private
def validate_csv
@import.update!(status: :validating)
required_headers = %w[email first_name last_name]
missing = required_headers - @rows.headers
if missing.any?
@import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
raise "Invalid CSV: missing headers #{missing.join(', ')}"
end
@import.update!(total_rows: @rows.size)
end
def process_csv_rows(step)
@import.update!(status: :processing)
start_index = step.cursor || 0
# Using set! instead of advance! because we're tracking array indices,
# not ActiveRecord IDs where gaps between values are common.
@rows[start_index..].each.with_index(start_index) do |row, index|
process_single_row(row, index)
step.set! index + 1
end
end
def process_single_row(row, index)
user = User.find_or_initialize_by(email: row["email"])
user.assign_attributes(
first_name: row["first_name"],
last_name: row["last_name"]
)
if user.save
@import.increment!(:processed_count)
else
@import.increment!(:error_count)
log_error(index, row, user.errors.full_messages)
end
end
def log_error(index, row, messages)
@import.errors_log << { row: index + 1, email: row["email"], messages: messages }
@import.save!
end
def finalize_import
@import.update!(status: :completed)
ImportMailer.completed(@import).deliver_later
end
end
Cómo Se Desarrolla Esto
Supongamos que el CSV tiene 50,000 filas y el worker se apaga después de procesar las primeras 10k filas.
- El paso
validateya se completó, así que está marcado como hecho. - El paso
process_rowstiene un cursor de10000(el siguiente índice a procesar). - El job se vuelve a encolar con este progreso guardado.
- Cuando el nuevo worker lo toma, omite
validatey reanudaprocess_rowscomenzando desde la fila 10,001. - Una vez que todas las filas están procesadas,
finalizese ejecuta y envía el correo.
El progreso se serializa en los datos del job bajo la clave continuation, así que no se necesita almacenamiento externo.
Iniciando la Importación
class ImportsController < ApplicationController
def create
import = Current.user.imports.create!(file: params[:file])
ProcessImportJob.perform_later(import.id)
redirect_to import, notice: "Import started"
end
end
Mostrando el Progreso
Como el modelo Import rastrea processed_count, puedes mostrar el progreso al usuario:
<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
<p>Status: <%= @import.status.humanize %></p>
<% if @import.processing? %>
<p>Processed <%= @import.processed_count %> rows</p>
<p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
<% end %>
<% if @import.completed? %>
<p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
<% end %>
</div>
Combina esto con un callback de broadcast de Turbo Stream en tu modelo Import para actualizaciones en vivo:
after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }
Pasos Aislados
Si un paso es particularmente largo y quieres garantizar que obtenga su propia ejecución de job (para que el progreso se guarde antes de que comience), usa la opción isolated: true:
step :process_rows, isolated: true do |step|
process_csv_rows(step)
end
Esto fuerza al job a interrumpirse y volver a encolarse antes de comenzar el paso, asegurando que todo el progreso anterior esté persistido.
Configuración
Tres configuraciones a nivel de clase controlan el comportamiento de reanudación:
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
# Maximum number of times the job can be resumed (default: nil, unlimited)
self.max_resumptions = 50
# Options passed to retry_job when resuming (default: { wait: 5.seconds })
self.resume_options = { wait: 2.seconds, queue: :imports }
# Auto-resume if an error occurs after the cursor advanced (default: true)
self.resume_errors_after_advancing = true
end
La última opción merece atención. Si tu job lanza un error después de haber avanzado (avanzó el cursor o completó un paso), se reanudará automáticamente en lugar de perder ese progreso. Esto está habilitado por defecto.
Cosas a Tener en Cuenta
El código fuera de los pasos se re-ejecuta en cada reanudación. Todo lo que esté fuera de un bloque step se ejecuta en cada ejecución, incluyendo las reanudaciones. Las líneas @import = Import.find(import_id) y el parseo del CSV al inicio de perform están bien como configuración, pero evita poner efectos secundarios ahí. Para archivos muy grandes, la re-descarga del CSV en cada reanudación podría ser lenta. Considera descargar el archivo a una ruta temporal local en un paso anterior y leer desde disco en su lugar.
Los checkpoints dentro de un paso no son automáticos. Hay un checkpoint automático entre pasos, pero dentro de un paso, el job solo verifica interrupciones cuando llamas a step.advance!, step.set!, o step.checkpoint!. Si tu paso hace diez minutos de trabajo sin un checkpoint, no puede ser interrumpido de forma elegante durante ese tiempo. Haz checkpoints con más frecuencia que el timeout de apagado de tu worker.
Soporte del adaptador de cola. Las continuaciones dependen de queue_adapter.stopping? para detectar apagados. Rails incluye stopping? implementado en los adaptadores de Sidekiq y Test. Solid Queue puede soportarlo a través del hook on_worker_stop, pero revisa la documentación de tu adaptador. Si tu adaptador no implementa stopping?, el job seguirá funcionando pero no será interrumpido de forma elegante.
Los cursores deben ser serializables. El cursor se almacena en los datos serializados del job, así que debe ser algo que ActiveJob pueda serializar: enteros, strings, arrays, o cualquier objeto que implemente la serialización de ActiveJob.
Conclusión
En lugar de construir máquinas de estado personalizadas o incorporar gemas externas, ahora puedes dejar que Rails maneje la interrupción y reanudación de jobs por ti. Define tus pasos, haz checkpoints frecuentemente, y despliega sin preocuparte por matar jobs de larga duración.
Esta funcionalidad está disponible en Rails 8.1.0 y versiones posteriores. Consulta el PR #55127 para la implementación completa.