Build a Resumable CSV Import with ActiveJob::Continuable
Long-running background jobs have an uncomfortable relationship with deploys. You kick off a 100,000-row CSV import, deploy new code five minutes later, and the worker restarts. The job either starts over from the beginning or silently dies. Most teams solve this with custom bookkeeping: a last_processed_id column, a Redis key, or an external gem like job-iteration.
Rails 8.1 ships a built-in solution called ActiveJob::Continuable. It lets you split a job into steps, track progress with cursors, and automatically resume from where the job left off after an interruption. No gems, no custom state machines.
How Continuations Work
Include ActiveJob::Continuable in your job and define steps with the step method. Each step can optionally track a cursor (your progress marker). When the queue adapter signals a shutdown, the job saves its progress and re-enqueues itself.
class MyJob < ApplicationJob
include ActiveJob::Continuable
def perform
step :first_step do
# runs once, skipped on resume
end
step :second_step do |step|
records.find_each(start: step.cursor) do |record|
process(record)
step.advance! from: record.id # saves progress + checks for interruption
end
end
step :cleanup do
# runs after second_step completes
end
end
end
Three key concepts:
- Steps run in order. Completed steps are skipped when the job resumes.
- Cursors track progress within a step. When resumed, the cursor is restored.
- Checkpoints are the safe points where interruption can happen. There is an automatic checkpoint before each step (except the first). Within a step, you create them by calling
step.advance!,step.set!, orstep.checkpoint!.
Building the CSV Import
Let’s build a real import pipeline. A user uploads a CSV file through Active Storage. The job validates the file, processes each row, and sends a completion email.
The Import Model
class Import < ApplicationRecord
has_one_attached :file
belongs_to :user
enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end
And the migration:
class CreateImports < ActiveRecord::Migration[8.1]
def change
create_table :imports do |t|
t.references :user, null: false, foreign_key: true
t.integer :status, default: 0, null: false
t.integer :processed_count, default: 0, null: false
t.integer :total_rows, default: 0, null: false
t.integer :error_count, default: 0, null: false
t.jsonb :errors_log, default: []
t.timestamps
end
end
end
The Job
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
self.max_resumptions = 50
self.resume_options = { wait: 2.seconds }
def perform(import_id)
@import = Import.find(import_id)
@rows = CSV.parse(@import.file.download, headers: true)
step :validate do
validate_csv
end
step :process_rows do |step|
process_csv_rows(step)
end
step :finalize do
finalize_import
end
end
private
def validate_csv
@import.update!(status: :validating)
required_headers = %w[email first_name last_name]
missing = required_headers - @rows.headers
if missing.any?
@import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
raise "Invalid CSV: missing headers #{missing.join(', ')}"
end
@import.update!(total_rows: @rows.size)
end
def process_csv_rows(step)
@import.update!(status: :processing)
start_index = step.cursor || 0
# Using set! instead of advance! because we're tracking array indices,
# not ActiveRecord IDs where gaps between values are common.
@rows[start_index..].each.with_index(start_index) do |row, index|
process_single_row(row, index)
step.set! index + 1
end
end
def process_single_row(row, index)
user = User.find_or_initialize_by(email: row["email"])
user.assign_attributes(
first_name: row["first_name"],
last_name: row["last_name"]
)
if user.save
@import.increment!(:processed_count)
else
@import.increment!(:error_count)
log_error(index, row, user.errors.full_messages)
end
end
def log_error(index, row, messages)
@import.errors_log << { row: index + 1, email: row["email"], messages: messages }
@import.save!
end
def finalize_import
@import.update!(status: :completed)
ImportMailer.completed(@import).deliver_later
end
end
How This Plays Out
Suppose the CSV has 50,000 rows and the worker shuts down after processing the first 10k rows.
- The
validatestep already completed, so it’s marked as done. - The
process_rowsstep has a cursor of10000(the next index to process). - The job re-enqueues itself with this progress saved.
- When the new worker picks it up, it skips
validateand resumesprocess_rowsstarting from row 10,001. - Once all rows are processed,
finalizeruns and sends the email.
The progress is serialized into the job’s data under the continuation key, so no external storage is needed.
Kicking It Off
class ImportsController < ApplicationController
def create
import = Current.user.imports.create!(file: params[:file])
ProcessImportJob.perform_later(import.id)
redirect_to import, notice: "Import started"
end
end
Showing Progress
Since the Import model tracks processed_count, you can show progress to the user:
<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
<p>Status: <%= @import.status.humanize %></p>
<% if @import.processing? %>
<p>Processed <%= @import.processed_count %> rows</p>
<p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
<% end %>
<% if @import.completed? %>
<p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
<% end %>
</div>
Pair this with a Turbo Stream broadcast callback on your Import model for live updates:
after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }
Isolated Steps
If a step is particularly long and you want to guarantee it gets its own job execution (so progress is saved before it starts), use the isolated: true option:
step :process_rows, isolated: true do |step|
process_csv_rows(step)
end
This forces the job to interrupt and re-enqueue before starting the step, ensuring all previous progress is persisted.
Configuration
Three class-level settings control resumption behavior:
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
# Maximum number of times the job can be resumed (default: nil, unlimited)
self.max_resumptions = 50
# Options passed to retry_job when resuming (default: { wait: 5.seconds })
self.resume_options = { wait: 2.seconds, queue: :imports }
# Auto-resume if an error occurs after the cursor advanced (default: true)
self.resume_errors_after_advancing = true
end
The last option is worth noting. If your job raises an error after it has made progress (advanced the cursor or completed a step), it will automatically resume rather than losing that progress. This is enabled by default.
Things to Watch Out For
Code outside steps re-runs on every resume. Everything outside a step block runs on every execution, including resumes. The @import = Import.find(import_id) and CSV parsing lines at the top of perform are fine as setup, but avoid putting side effects there. For very large files, the CSV re-download on each resume could be slow. Consider downloading the file to a local temp path in an earlier step and reading from disk instead.
Checkpoints within a step are not automatic. There is an automatic checkpoint between steps, but within a step, the job only checks for interruption when you call step.advance!, step.set!, or step.checkpoint!. If your step does ten minutes of work without a checkpoint, it can’t be interrupted gracefully during that time. Checkpoint more often than your worker’s shutdown timeout.
Queue adapter support. Continuations rely on queue_adapter.stopping? to detect shutdowns. Rails ships with stopping? implemented in the Sidekiq and Test adapters. Solid Queue can support it via the on_worker_stop hook, but check your adapter’s documentation. If your adapter doesn’t implement stopping?, the job will still work but won’t be interrupted gracefully.
Cursors must be serializable. The cursor is stored in the job’s serialized data, so it must be something ActiveJob can serialize: integers, strings, arrays, or any object that implements ActiveJob serialization.
Wrapping It Up
Instead of building custom state machines or pulling in external gems, you can now let Rails handle job interruption and resumption for you. Define your steps, checkpoint frequently, and deploy without worrying about killing long-running jobs.
This feature is available in Rails 8.1.0 and later. See PR #55127 for the full implementation.