Build a Resumable CSV Import with ActiveJob::Continuable

Long-running background jobs have an uncomfortable relationship with deploys. You kick off a 100,000-row CSV import, deploy new code five minutes later, and the worker restarts. The job either starts over from the beginning or silently dies. Most teams solve this with custom bookkeeping: a last_processed_id column, a Redis key, or an external gem like job-iteration.

Rails 8.1 ships a built-in solution called ActiveJob::Continuable. It lets you split a job into steps, track progress with cursors, and automatically resume from where the job left off after an interruption. No gems, no custom state machines.

How Continuations Work

Include ActiveJob::Continuable in your job and define steps with the step method. Each step can optionally track a cursor (your progress marker). When the queue adapter signals a shutdown, the job saves its progress and re-enqueues itself.

class MyJob < ApplicationJob
  include ActiveJob::Continuable

  def perform
    step :first_step do
      # runs once, skipped on resume
    end

    step :second_step do |step|
      records.find_each(start: step.cursor) do |record|
        process(record)
        step.advance! from: record.id  # saves progress + checks for interruption
      end
    end

    step :cleanup do
      # runs after second_step completes
    end
  end
end

Three key concepts:

  1. Steps run in order. Completed steps are skipped when the job resumes.
  2. Cursors track progress within a step. When resumed, the cursor is restored.
  3. Checkpoints are the safe points where interruption can happen. There is an automatic checkpoint before each step (except the first). Within a step, you create them by calling step.advance!, step.set!, or step.checkpoint!.

Building the CSV Import

Let’s build a real import pipeline. A user uploads a CSV file through Active Storage. The job validates the file, processes each row, and sends a completion email.

The Import Model

class Import < ApplicationRecord
  has_one_attached :file
  belongs_to :user

  enum :status, { pending: 0, validating: 1, processing: 2, completed: 3, failed: 4 }
end

And the migration:

class CreateImports < ActiveRecord::Migration[8.1]
  def change
    create_table :imports do |t|
      t.references :user, null: false, foreign_key: true
      t.integer :status, default: 0, null: false
      t.integer :processed_count, default: 0, null: false
      t.integer :total_rows, default: 0, null: false
      t.integer :error_count, default: 0, null: false
      t.jsonb :errors_log, default: []
      t.timestamps
    end
  end
end

The Job

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  self.max_resumptions = 50
  self.resume_options = { wait: 2.seconds }

  def perform(import_id)
    @import = Import.find(import_id)
    @rows = CSV.parse(@import.file.download, headers: true)

    step :validate do
      validate_csv
    end

    step :process_rows do |step|
      process_csv_rows(step)
    end

    step :finalize do
      finalize_import
    end
  end

  private

  def validate_csv
    @import.update!(status: :validating)

    required_headers = %w[email first_name last_name]
    missing = required_headers - @rows.headers

    if missing.any?
      @import.update!(status: :failed, errors_log: [{ row: 0, message: "Missing headers: #{missing.join(', ')}" }])
      raise "Invalid CSV: missing headers #{missing.join(', ')}"
    end

    @import.update!(total_rows: @rows.size)
  end

  def process_csv_rows(step)
    @import.update!(status: :processing)

    start_index = step.cursor || 0

    # Using set! instead of advance! because we're tracking array indices,
    # not ActiveRecord IDs where gaps between values are common.
    @rows[start_index..].each.with_index(start_index) do |row, index|
      process_single_row(row, index)
      step.set! index + 1
    end
  end

  def process_single_row(row, index)
    user = User.find_or_initialize_by(email: row["email"])
    user.assign_attributes(
      first_name: row["first_name"],
      last_name: row["last_name"]
    )

    if user.save
      @import.increment!(:processed_count)
    else
      @import.increment!(:error_count)
      log_error(index, row, user.errors.full_messages)
    end
  end

  def log_error(index, row, messages)
    @import.errors_log << { row: index + 1, email: row["email"], messages: messages }
    @import.save!
  end

  def finalize_import
    @import.update!(status: :completed)
    ImportMailer.completed(@import).deliver_later
  end
end

How This Plays Out

Suppose the CSV has 50,000 rows and the worker shuts down after processing the first 10k rows.

  1. The validate step already completed, so it’s marked as done.
  2. The process_rows step has a cursor of 10000 (the next index to process).
  3. The job re-enqueues itself with this progress saved.
  4. When the new worker picks it up, it skips validate and resumes process_rows starting from row 10,001.
  5. Once all rows are processed, finalize runs and sends the email.

The progress is serialized into the job’s data under the continuation key, so no external storage is needed.

Kicking It Off

class ImportsController < ApplicationController
  def create
    import = Current.user.imports.create!(file: params[:file])
    ProcessImportJob.perform_later(import.id)
    redirect_to import, notice: "Import started"
  end
end

Showing Progress

Since the Import model tracks processed_count, you can show progress to the user:

<%# app/views/imports/show.html.erb %>
<div id="<%= dom_id(@import) %>">
  <p>Status: <%= @import.status.humanize %></p>

  <% if @import.processing? %>
    <p>Processed <%= @import.processed_count %> rows</p>
    <p><%= number_to_percentage((@import.processed_count.to_f / [@import.total_rows, 1].max) * 100, precision: 1) %></p>
  <% end %>

  <% if @import.completed? %>
    <p>Done! <%= @import.processed_count %> imported, <%= @import.error_count %> errors.</p>
  <% end %>
</div>

Pair this with a Turbo Stream broadcast callback on your Import model for live updates:

after_update_commit -> { broadcast_replace_to user, target: dom_id(self) }

Isolated Steps

If a step is particularly long and you want to guarantee it gets its own job execution (so progress is saved before it starts), use the isolated: true option:

step :process_rows, isolated: true do |step|
  process_csv_rows(step)
end

This forces the job to interrupt and re-enqueue before starting the step, ensuring all previous progress is persisted.

Configuration

Three class-level settings control resumption behavior:

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  # Maximum number of times the job can be resumed (default: nil, unlimited)
  self.max_resumptions = 50

  # Options passed to retry_job when resuming (default: { wait: 5.seconds })
  self.resume_options = { wait: 2.seconds, queue: :imports }

  # Auto-resume if an error occurs after the cursor advanced (default: true)
  self.resume_errors_after_advancing = true
end

The last option is worth noting. If your job raises an error after it has made progress (advanced the cursor or completed a step), it will automatically resume rather than losing that progress. This is enabled by default.

Things to Watch Out For

Code outside steps re-runs on every resume. Everything outside a step block runs on every execution, including resumes. The @import = Import.find(import_id) and CSV parsing lines at the top of perform are fine as setup, but avoid putting side effects there. For very large files, the CSV re-download on each resume could be slow. Consider downloading the file to a local temp path in an earlier step and reading from disk instead.

Checkpoints within a step are not automatic. There is an automatic checkpoint between steps, but within a step, the job only checks for interruption when you call step.advance!, step.set!, or step.checkpoint!. If your step does ten minutes of work without a checkpoint, it can’t be interrupted gracefully during that time. Checkpoint more often than your worker’s shutdown timeout.

Queue adapter support. Continuations rely on queue_adapter.stopping? to detect shutdowns. Rails ships with stopping? implemented in the Sidekiq and Test adapters. Solid Queue can support it via the on_worker_stop hook, but check your adapter’s documentation. If your adapter doesn’t implement stopping?, the job will still work but won’t be interrupted gracefully.

Cursors must be serializable. The cursor is stored in the job’s serialized data, so it must be something ActiveJob can serialize: integers, strings, arrays, or any object that implements ActiveJob serialization.

Wrapping It Up

Instead of building custom state machines or pulling in external gems, you can now let Rails handle job interruption and resumption for you. Define your steps, checkpoint frequently, and deploy without worrying about killing long-running jobs.

This feature is available in Rails 8.1.0 and later. See PR #55127 for the full implementation.