在 Rails 中使用基于错误的延迟构建智能重试策略

最近的一个 Rails 变更让你的任务重试逻辑可以检查实际发生的错误。这开启了以前难以实现的重试策略。

旧方式

在此变更之前,retry_on 的 wait proc 只能接收执行次数:

class ApiJob < ApplicationJob
  retry_on ApiError, wait: ->(executions) { executions ** 2 }

  def perform(endpoint)
    ExternalApi.call(endpoint)
  end
end

这对于基本的指数退避有效,但如果 API 告诉你确切的重试时间呢?有速率限制的 API 通常包含 Retry-After 头。仅有执行次数,你无法访问该信息。

新方式

PR #56601 刚刚合并到 Rails main,添加了错误作为可选的第二个参数。这将在 Rails 8.2 中可用。

class ApiJob < ApplicationJob
  retry_on ApiError, wait: ->(executions, error) { error.retry_after || executions ** 2 }

  def perform(endpoint)
    ExternalApi.call(endpoint)
  end
end

现在你可以检查错误并做出明智的决定。此变更向后兼容——参数个数为 1 的 proc 继续只接收执行次数。

模式

以下是一些实践中的使用方式。

模式 1:尊重速率限制

当 API 对你进行速率限制时,它通常会告诉你何时重试:

class RateLimitError < StandardError
  attr_reader :retry_after

  def initialize(message, retry_after: nil)
    super(message)
    @retry_after = retry_after
  end
end

class SyncToStripeJob < ApplicationJob
  retry_on RateLimitError,
    wait: ->(executions, error) {
      # 信任 API 的指导,并提供合理的后备方案
      error.retry_after || (executions * 30.seconds)
    },
    attempts: 10

  def perform(user)
    response = Stripe::Customer.update(user.stripe_id, user.stripe_attributes)
  rescue Stripe::RateLimitError => e
    raise RateLimitError.new(e.message, retry_after: e.http_headers["retry-after"]&.to_i)
  end
end

这尊重 API 的背压信号,而不是盲目地持续请求。

模式 2:从异常消息中提取重试提示

一些异常在其消息中编码了有用的信息。例如,锁超时可能会告诉你哪个资源存在争用:

class LockTimeoutError < StandardError
  attr_reader :lock_wait_time

  def initialize(message, lock_wait_time: nil)
    super(message)
    @lock_wait_time = lock_wait_time
  end
end

class ImportJob < ApplicationJob
  retry_on LockTimeoutError,
    wait: ->(executions, error) {
      # 如果我们知道等待锁的时间,在重试前至少等待那么长,
      # 再加上一些抖动
      base_delay = error.lock_wait_time || executions ** 2
      jitter = rand(0.0..1.0) * base_delay
      base_delay + jitter
    },
    attempts: 5

  def perform(batch)
    Record.transaction do
      batch.each { |row| Record.upsert(row) }
    end
  rescue ActiveRecord::LockWaitTimeout => e
    # 如果你的数据库适配器提供,提取等待时间
    raise LockTimeoutError.new(e.message, lock_wait_time: extract_wait_time(e))
  end

  private

  def extract_wait_time(error)
    # 从错误消息或元数据中解析(如果可用)
    error.message[/waited (\d+)s/, 1]&.to_i
  end
end

重试延迟现在适应实际观察到的争用情况。

模式 3:基于错误详情的上下文感知延迟

一些错误携带应该影响重试时机的上下文:

class WebhookDeliveryError < StandardError
  attr_reader :status_code, :response_body

  def initialize(message, status_code:, response_body: nil)
    super(message)
    @status_code = status_code
    @response_body = response_body
  end

  def transient?
    status_code.in?(500..599) || status_code == 429
  end

  def suggested_delay
    case status_code
    when 429 then 60.seconds  # 速率限制,显著退避
    when 503 then 30.seconds  # 服务不可用,中等退避
    when 500..502, 504..599 then 10.seconds  # 服务器错误,较短延迟
    else 5.seconds
    end
  end
end

class DeliverWebhookJob < ApplicationJob
  retry_on WebhookDeliveryError,
    wait: ->(executions, error) {
      error.suggested_delay * executions
    },
    attempts: 8

  def perform(webhook)
    response = HTTP.post(webhook.url, json: webhook.payload)

    unless response.status.success?
      raise WebhookDeliveryError.new(
        "Webhook delivery failed",
        status_code: response.status,
        response_body: response.body.to_s
      )
    end
  end
end

这对 503 和 500 的处理方式不同,两者与 429 的处理方式也不同。

模式 4:具有共享逻辑的多错误策略

对于可能以多种方式失败的任务,集中你的重试逻辑:

module RetryStrategies
  STRATEGIES = {
    rate_limit: ->(executions, error) {
      error.respond_to?(:retry_after) ? error.retry_after : 60.seconds
    },
    transient: ->(executions, error) {
      (2 ** executions) + rand(0..executions)
    },
    network: ->(executions, error) {
      [5.seconds * executions, 2.minutes].min
    }
  }

  def self.for(type)
    STRATEGIES.fetch(type)
  end
end

class ExternalSyncJob < ApplicationJob
  retry_on RateLimitError, wait: RetryStrategies.for(:rate_limit), attempts: 10
  retry_on Net::OpenTimeout, wait: RetryStrategies.for(:network), attempts: 5
  retry_on Faraday::ServerError, wait: RetryStrategies.for(:transient), attempts: 5

  def perform(record)
    ExternalService.sync(record)
  end
end

这使整个应用程序的重试策略保持一致。

携带上下文的错误类

要充分利用这一点,用有用的上下文包装外部错误:

class ExternalApiError < StandardError
  attr_reader :original_error, :retry_after, :retriable

  def initialize(message, original_error: nil, retry_after: nil, retriable: true)
    super(message)
    @original_error = original_error
    @retry_after = retry_after
    @retriable = retriable
  end

  def self.from_response(response)
    new(
      "API returned #{response.status}",
      retry_after: parse_retry_after(response),
      retriable: response.status.in?(500..599) || response.status == 429
    )
  end

  private_class_method def self.parse_retry_after(response)
    value = response.headers["Retry-After"]
    return nil unless value

    if value.match?(/^\d+$/)
      value.to_i.seconds
    else
      Time.httpdate(value) - Time.current rescue nil
    end
  end
end

然后你的任务可以根据这些详情进行分支:

class ApiSyncJob < ApplicationJob
  retry_on ExternalApiError,
    wait: ->(executions, error) {
      error.retry_after || (executions ** 2).seconds
    },
    attempts: 10

  def perform(resource)
    response = ApiClient.sync(resource)
    raise ExternalApiError.from_response(response) unless response.success?
  end
end

与 discard_on 结合

并非所有错误都应该重试。对于永远不会成功的错误,使用 discard_on

class ProcessPaymentJob < ApplicationJob
  discard_on PaymentDeclinedError  # 不要重试被拒绝的卡

  retry_on PaymentGatewayError,
    wait: ->(executions, error) {
      error.retry_after || (10.seconds * executions)
    },
    attempts: 5

  def perform(order)
    PaymentGateway.charge(order)
  end
end

不再对所有错误一视同仁,你现在可以构建适应特定故障的重试策略。你的任务可以在 API 要求时退避,为重试添加抖动以避免冲突,并在重试无济于事时快速失败。

此功能将在 Rails 8.2 中可用。