在 Rails 中使用基于错误的延迟构建智能重试策略
最近的一个 Rails 变更让你的任务重试逻辑可以检查实际发生的错误。这开启了以前难以实现的重试策略。
旧方式
在此变更之前,retry_on 的 wait proc 只能接收执行次数:
class ApiJob < ApplicationJob
retry_on ApiError, wait: ->(executions) { executions ** 2 }
def perform(endpoint)
ExternalApi.call(endpoint)
end
end
这对于基本的指数退避有效,但如果 API 告诉你确切的重试时间呢?有速率限制的 API 通常包含 Retry-After 头。仅有执行次数,你无法访问该信息。
新方式
PR #56601 刚刚合并到 Rails main,添加了错误作为可选的第二个参数。这将在 Rails 8.2 中可用。
class ApiJob < ApplicationJob
retry_on ApiError, wait: ->(executions, error) { error.retry_after || executions ** 2 }
def perform(endpoint)
ExternalApi.call(endpoint)
end
end
现在你可以检查错误并做出明智的决定。此变更向后兼容——参数个数为 1 的 proc 继续只接收执行次数。
模式
以下是一些实践中的使用方式。
模式 1:尊重速率限制
当 API 对你进行速率限制时,它通常会告诉你何时重试:
class RateLimitError < StandardError
attr_reader :retry_after
def initialize(message, retry_after: nil)
super(message)
@retry_after = retry_after
end
end
class SyncToStripeJob < ApplicationJob
retry_on RateLimitError,
wait: ->(executions, error) {
# 信任 API 的指导,并提供合理的后备方案
error.retry_after || (executions * 30.seconds)
},
attempts: 10
def perform(user)
response = Stripe::Customer.update(user.stripe_id, user.stripe_attributes)
rescue Stripe::RateLimitError => e
raise RateLimitError.new(e.message, retry_after: e.http_headers["retry-after"]&.to_i)
end
end
这尊重 API 的背压信号,而不是盲目地持续请求。
模式 2:从异常消息中提取重试提示
一些异常在其消息中编码了有用的信息。例如,锁超时可能会告诉你哪个资源存在争用:
class LockTimeoutError < StandardError
attr_reader :lock_wait_time
def initialize(message, lock_wait_time: nil)
super(message)
@lock_wait_time = lock_wait_time
end
end
class ImportJob < ApplicationJob
retry_on LockTimeoutError,
wait: ->(executions, error) {
# 如果我们知道等待锁的时间,在重试前至少等待那么长,
# 再加上一些抖动
base_delay = error.lock_wait_time || executions ** 2
jitter = rand(0.0..1.0) * base_delay
base_delay + jitter
},
attempts: 5
def perform(batch)
Record.transaction do
batch.each { |row| Record.upsert(row) }
end
rescue ActiveRecord::LockWaitTimeout => e
# 如果你的数据库适配器提供,提取等待时间
raise LockTimeoutError.new(e.message, lock_wait_time: extract_wait_time(e))
end
private
def extract_wait_time(error)
# 从错误消息或元数据中解析(如果可用)
error.message[/waited (\d+)s/, 1]&.to_i
end
end
重试延迟现在适应实际观察到的争用情况。
模式 3:基于错误详情的上下文感知延迟
一些错误携带应该影响重试时机的上下文:
class WebhookDeliveryError < StandardError
attr_reader :status_code, :response_body
def initialize(message, status_code:, response_body: nil)
super(message)
@status_code = status_code
@response_body = response_body
end
def transient?
status_code.in?(500..599) || status_code == 429
end
def suggested_delay
case status_code
when 429 then 60.seconds # 速率限制,显著退避
when 503 then 30.seconds # 服务不可用,中等退避
when 500..502, 504..599 then 10.seconds # 服务器错误,较短延迟
else 5.seconds
end
end
end
class DeliverWebhookJob < ApplicationJob
retry_on WebhookDeliveryError,
wait: ->(executions, error) {
error.suggested_delay * executions
},
attempts: 8
def perform(webhook)
response = HTTP.post(webhook.url, json: webhook.payload)
unless response.status.success?
raise WebhookDeliveryError.new(
"Webhook delivery failed",
status_code: response.status,
response_body: response.body.to_s
)
end
end
end
这对 503 和 500 的处理方式不同,两者与 429 的处理方式也不同。
模式 4:具有共享逻辑的多错误策略
对于可能以多种方式失败的任务,集中你的重试逻辑:
module RetryStrategies
STRATEGIES = {
rate_limit: ->(executions, error) {
error.respond_to?(:retry_after) ? error.retry_after : 60.seconds
},
transient: ->(executions, error) {
(2 ** executions) + rand(0..executions)
},
network: ->(executions, error) {
[5.seconds * executions, 2.minutes].min
}
}
def self.for(type)
STRATEGIES.fetch(type)
end
end
class ExternalSyncJob < ApplicationJob
retry_on RateLimitError, wait: RetryStrategies.for(:rate_limit), attempts: 10
retry_on Net::OpenTimeout, wait: RetryStrategies.for(:network), attempts: 5
retry_on Faraday::ServerError, wait: RetryStrategies.for(:transient), attempts: 5
def perform(record)
ExternalService.sync(record)
end
end
这使整个应用程序的重试策略保持一致。
携带上下文的错误类
要充分利用这一点,用有用的上下文包装外部错误:
class ExternalApiError < StandardError
attr_reader :original_error, :retry_after, :retriable
def initialize(message, original_error: nil, retry_after: nil, retriable: true)
super(message)
@original_error = original_error
@retry_after = retry_after
@retriable = retriable
end
def self.from_response(response)
new(
"API returned #{response.status}",
retry_after: parse_retry_after(response),
retriable: response.status.in?(500..599) || response.status == 429
)
end
private_class_method def self.parse_retry_after(response)
value = response.headers["Retry-After"]
return nil unless value
if value.match?(/^\d+$/)
value.to_i.seconds
else
Time.httpdate(value) - Time.current rescue nil
end
end
end
然后你的任务可以根据这些详情进行分支:
class ApiSyncJob < ApplicationJob
retry_on ExternalApiError,
wait: ->(executions, error) {
error.retry_after || (executions ** 2).seconds
},
attempts: 10
def perform(resource)
response = ApiClient.sync(resource)
raise ExternalApiError.from_response(response) unless response.success?
end
end
与 discard_on 结合
并非所有错误都应该重试。对于永远不会成功的错误,使用 discard_on:
class ProcessPaymentJob < ApplicationJob
discard_on PaymentDeclinedError # 不要重试被拒绝的卡
retry_on PaymentGatewayError,
wait: ->(executions, error) {
error.retry_after || (10.seconds * executions)
},
attempts: 5
def perform(order)
PaymentGateway.charge(order)
end
end
不再对所有错误一视同仁,你现在可以构建适应特定故障的重试策略。你的任务可以在 API 要求时退避,为重试添加抖动以避免冲突,并在重试无济于事时快速失败。
此功能将在 Rails 8.2 中可用。