使用ActiveStorage构建文件去重系统

每次用户上传公司logo、个人头像或已经上传过三次的同一个PDF时,你都在为再次存储它付费。ActiveStorage默认不进行去重——即使内容相同,每次上传都会创建一个新的blob。

让我们来解决这个问题。我们将构建一个去重系统,检测相同的文件并重用现有的blob,节省存储成本并使重复文件的上传瞬间完成。我们将去重范围限定到每个用户——这样用户只能针对自己的上传进行去重,保持文件安全。

ActiveStorage校验和的工作原理

每个ActiveStorage blob都有一个checksum列,这是文件内容的MD5哈希。具有相同内容的两个文件将始终具有相同的校验和:

ActiveStorage::Blob.pluck(:checksum).tally
# => {"vckNNU4TN7zbzl+o3tjXPQ==" => 47, "x8K9f2mVhLpWQ..." => 12, ...}

如果你看到计数大于1,说明有重复。让我们消除它们。

为性能添加索引

ActiveStorage默认不会为checksum列建立索引。没有索引,去重查询将进行全表扫描——对于小型应用来说没问题,但在规模扩大时会变慢。

注意: 此迁移修改了ActiveStorage的架构。虽然添加索引是低风险的(没有结构性更改),但请注意你正在自定义一个引擎拥有的表。为你的团队记录这个决定。

class AddIndexToActiveStorageBlobsChecksum < ActiveRecord::Migration[8.0]
  disable_ddl_transaction!

  def change
    add_index :active_storage_blobs, :checksum, algorithm: :concurrently
  end
end

algorithm: :concurrently选项(PostgreSQL)在不阻塞写入的情况下构建索引,这对于具有现有数据的生产数据库至关重要。它需要disable_ddl_transaction!,因为并发索引创建不能在事务内运行。

步骤1:服务器端去重

创建blob时,检查当前用户是否已经拥有具有该校验和的blob。

创建一个控制器来处理去重的直接上传:

# app/controllers/deduplicated_uploads_controller.rb
class DeduplicatedUploadsController < ActiveStorage::DirectUploadsController
  before_action :authenticate_user!

  def create
    existing_blob = find_existing_blob_for_user(blob_params[:checksum])

    if existing_blob
      render json: existing_blob_json(existing_blob)
    else
      super
    end
  end

  private

  def find_existing_blob_for_user(checksum)
    return nil if checksum.blank?

    # 只查找当前用户之前上传的blob
    # 使用带有EXISTS子查询的单个高效查询
    ActiveStorage::Blob
      .joins(:attachments)
      .where(checksum: checksum)
      .where(user_owns_attachment_sql)
      .first
  end

  # 检查current_user是否拥有附加记录的SQL片段
  # 使用EXISTS子查询以提高效率(不将ID加载到内存中)
  def user_owns_attachment_sql
    <<~SQL.squish
      (
        (active_storage_attachments.record_type = 'Document'
         AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{current_user.id}))
        OR
        (active_storage_attachments.record_type = 'Avatar'
         AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{current_user.id}))
      )
    SQL
  end

  def existing_blob_json(blob)
    {
      id: blob.id,
      key: blob.key,
      filename: blob.filename.to_s,
      content_type: blob.content_type,
      byte_size: blob.byte_size,
      checksum: blob.checksum,
      signed_id: blob.signed_id,
      direct_upload: nil  # 跳过上传的信号
    }
  end

  def blob_params
    params.require(:blob).permit(:filename, :byte_size, :checksum, :content_type, metadata: {})
  end
end

这通过active_storage_attachments表进行查询,以查找附加到当前用户拥有的记录的blob。根据你的应用程序的所有权模型调整attachable_typesuser_record_ids

添加路由:

# config/routes.rb
Rails.application.routes.draw do
  post '/rails/active_storage/direct_uploads',
       to: 'deduplicated_uploads#create',
       as: :deduplicated_direct_uploads
end

附件的工作原理

当我们找到现有的blob时,我们返回它的signed_id。客户端将此signed_id与表单一起提交,ActiveStorage创建一个指向现有blob新附件

# app/controllers/documents_controller.rb
class DocumentsController < ApplicationController
  def create
    @document = current_user.documents.build(document_params)

    if @document.save
      redirect_to @document
    else
      render :new
    end
  end

  private

  def document_params
    params.require(:document).permit(:title, :file)
  end
end

params[:document][:file]是signed_id字符串(而不是上传的文件)时,ActiveStorage会找到blob并附加它。一个blob可以有多个附件:

blob = ActiveStorage::Blob.find_by(checksum: "vckNNU4TN7zbzl+o3tjXPQ==")
blob.attachments.count
# => 5  (五个不同的记录共享这个blob)

这是去重的关键:多个记录引用同一个存储的文件。

步骤2:客户端上传处理

客户端需要知道何时跳过上传。当响应中的direct_uploadnull时,文件已经存在:

// app/javascript/controllers/deduplicated_upload_controller.js
import { Controller } from "@hotwired/stimulus"
import { DirectUpload } from "@rails/activestorage"

export default class extends Controller {
  static targets = ["input", "progress"]
  static values = { url: String }

  upload() {
    const file = this.inputTarget.files[0]
    if (!file) return

    const upload = new DirectUpload(file, this.urlValue, this)

    upload.create((error, blob) => {
      if (error) {
        console.error(error)
      } else {
        this.handleSuccess(blob)
      }
    })
  }

  handleSuccess(blob) {
    // 创建带有signed_id的隐藏input
    const input = document.createElement("input")
    input.type = "hidden"
    input.name = this.inputTarget.name
    input.value = blob.signed_id
    this.inputTarget.form.appendChild(input)

    if (blob.direct_upload === null) {
      this.showMessage("文件已存在 - 即时上传!")
    } else {
      this.showMessage("上传完成")
    }
  }

  // DirectUpload委托方法
  directUploadWillStoreFileWithXHR(request) {
    request.upload.addEventListener("progress", (event) => {
      const progress = (event.loaded / event.total) * 100
      this.progressTarget.style.width = `${progress}%`
    })
  }

  showMessage(text) {
    // 更新UI以显示状态
    this.progressTarget.textContent = text
  }
}

在表单中使用:

<%= form_with model: @document do |f| %>
  <div data-controller="deduplicated-upload"
       data-deduplicated-upload-url-value="<%= deduplicated_direct_uploads_url %>">

    <%= f.file_field :file,
        data: {
          deduplicated_upload_target: "input",
          action: "change->deduplicated-upload#upload"
        } %>

    <div data-deduplicated-upload-target="progress"></div>
  </div>

  <%= f.submit %>
<% end %>

步骤3:完全跳过网络请求

我们可以更进一步。在客户端计算校验和,并在尝试上传之前检查blob是否存在:

将作用域逻辑提取到一个concern中,以便在控制器之间共享:

# app/controllers/concerns/blob_scoping.rb
module BlobScoping
  extend ActiveSupport::Concern

  def find_user_blob(checksum)
    return nil if checksum.blank?

    ActiveStorage::Blob
      .joins(:attachments)
      .where(checksum: checksum)
      .where(user_owns_attachment_sql)
      .first
  end

  private

  def user_owns_attachment_sql
    <<~SQL.squish
      (
        (active_storage_attachments.record_type = 'Document'
         AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{current_user.id}))
        OR
        (active_storage_attachments.record_type = 'Avatar'
         AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{current_user.id}))
      )
    SQL
  end
end

然后在查找控制器中使用它:

# app/controllers/blob_lookups_controller.rb
class BlobLookupsController < ApplicationController
  include BlobScoping
  before_action :authenticate_user!

  def show
    blob = find_user_blob(params[:checksum])

    if blob
      render json: { exists: true, signed_id: blob.signed_id }
    else
      render json: { exists: false }
    end
  end
end
# config/routes.rb
# 使用查询参数来传递校验和(base64包含+、/、=,在路径中需要编码)
get '/blobs/lookup', to: 'blob_lookups#show', as: :blob_lookup

更新控制器以从查询参数读取:

# app/controllers/blob_lookups_controller.rb
def show
  blob = find_user_blob(params[:checksum])
  # ... 其余不变
end

现在JavaScript可以先检查:

// app/javascript/controllers/smart_upload_controller.js
import { Controller } from "@hotwired/stimulus"
import { DirectUpload, FileChecksum } from "@rails/activestorage"

export default class extends Controller {
  static targets = ["input", "status"]
  static values = {
    lookupUrl: String,
    uploadUrl: String
  }

  async upload() {
    const file = this.inputTarget.files[0]
    if (!file) return

    try {
      this.statusTarget.textContent = "计算校验和..."
      const checksum = await this.computeChecksum(file)

      this.statusTarget.textContent = "检查重复..."
      const existing = await this.lookupBlob(checksum)

      if (existing.exists) {
        this.statusTarget.textContent = "文件已上传 - 即时!"
        this.attachSignedId(existing.signed_id)
        return
      }

      this.statusTarget.textContent = "上传中..."
      await this.performUpload(file)
    } catch (error) {
      this.statusTarget.textContent = `错误:${error.message}`
      console.error("上传失败:", error)
    }
  }

  computeChecksum(file) {
    return new Promise((resolve, reject) => {
      FileChecksum.create(file, (error, checksum) => {
        if (error) {
          reject(new Error(`校验和失败:${error}`))
        } else {
          resolve(checksum)
        }
      })
    })
  }

  async lookupBlob(checksum) {
    const url = new URL(this.lookupUrlValue, window.location.origin)
    url.searchParams.set("checksum", checksum)

    const response = await fetch(url, {
      headers: {
        "X-CSRF-Token": this.csrfToken,
        "Accept": "application/json"
      },
      credentials: "same-origin"
    })

    if (!response.ok) {
      throw new Error(`查找失败:${response.status}`)
    }

    return response.json()
  }

  get csrfToken() {
    const meta = document.querySelector('meta[name="csrf-token"]')
    return meta ? meta.content : ""
  }

  attachSignedId(signedId) {
    const input = document.createElement("input")
    input.type = "hidden"
    input.name = this.inputTarget.name
    input.value = signedId
    this.inputTarget.form.appendChild(input)
  }

  performUpload(file) {
    return new Promise((resolve, reject) => {
      const upload = new DirectUpload(file, this.uploadUrlValue, this)
      upload.create((error, blob) => {
        if (error) {
          reject(new Error(`上传失败:${error}`))
        } else {
          this.statusTarget.textContent = "上传完成"
          this.attachSignedId(blob.signed_id)
          resolve(blob)
        }
      })
    })
  }

  directUploadWillStoreFileWithXHR(request) {
    request.upload.addEventListener("progress", (event) => {
      const percent = Math.round((event.loaded / event.total) * 100)
      this.statusTarget.textContent = `上传中:${percent}%`
    })
  }
}

扩展范围

用户范围的去重是最安全的默认设置,但在某些情况下你可能需要更广泛的去重。以下是扩展范围的选项:

选项A:组织/租户范围

对于团队成员共享文件的SaaS应用,在组织内进行去重。这需要查询任何组织成员拥有的记录:

def find_org_blob(checksum)
  ActiveStorage::Blob
    .joins(:attachments)
    .where(checksum: checksum)
    .where(org_owns_attachment_sql)
    .first
end

def org_owns_attachment_sql
  # 查找附加到任何组织成员拥有的记录的blob
  <<~SQL.squish
    (
      (active_storage_attachments.record_type = 'Document'
       AND EXISTS (
         SELECT 1 FROM documents
         WHERE documents.id = active_storage_attachments.record_id
         AND documents.organization_id = #{current_organization.id}
       ))
      OR
      (active_storage_attachments.record_type = 'Project'
       AND EXISTS (
         SELECT 1 FROM projects
         WHERE projects.id = active_storage_attachments.record_id
         AND projects.organization_id = #{current_organization.id}
       ))
    )
  SQL
end

选项B:仅公共文件

允许对明确标记为公共的文件进行全局去重:

def find_public_blob(checksum)
  ActiveStorage::Blob
    .where(checksum: checksum)
    .where("metadata->>'public' = ?", "true")
    .first
end

def find_existing_blob(checksum)
  find_user_blob(checksum) || find_public_blob(checksum)
end

选项C:基于内容类型(仅服务器端)

对图像等”安全”内容进行全局去重,但保持文档的用户范围。这仅适用于服务器端去重(步骤1),我们可以从上传请求中获取content_type——不适用于上传前查找(步骤3):

# 在DeduplicatedUploadsController中
def find_existing_blob_for_user(checksum)
  content_type = blob_params[:content_type]

  if content_type&.start_with?("image/")
    # 图像可以全局去重(低风险)
    ActiveStorage::Blob.find_by(checksum: checksum)
  else
    # 文档保持用户范围
    find_user_blob(checksum)
  end
end

处理边缘情况

竞态条件:同一用户快速连续上传同一文件两次。两个校验和都返回”未找到”,两个都上传。你最终得到两个blob。这没问题。用户范围查询已经自然地处理了这种情况,你可以使用后台作业按用户清理重复:

# app/jobs/deduplicate_user_blobs_job.rb
class DeduplicateUserBlobsJob < ApplicationJob
  def perform(user)
    duplicates = find_duplicate_checksums_for(user)

    duplicates.each do |checksum|
      deduplicate_blobs_with_checksum(user, checksum)
    end
  end

  private

  def find_duplicate_checksums_for(user)
    ActiveStorage::Blob
      .joins(:attachments)
      .where(user_owns_attachment_sql(user))
      .group(:checksum)
      .having("COUNT(DISTINCT active_storage_blobs.id) > 1")
      .pluck(:checksum)
  end

  def deduplicate_blobs_with_checksum(user, checksum)
    ActiveStorage::Blob.transaction do
      blobs = ActiveStorage::Blob
        .joins(:attachments)
        .where(checksum: checksum)
        .where(user_owns_attachment_sql(user))
        .order(:created_at)
        .lock("FOR UPDATE")
        .distinct

      canonical = blobs.first
      return if canonical.nil?

      # 使用where.not而不是offset(offset不能与find_each一起使用)
      blobs.where.not(id: canonical.id).find_each do |duplicate|
        duplicate.attachments.update_all(blob_id: canonical.id)
        duplicate.purge
      end
    end
  end

  def user_owns_attachment_sql(user)
    <<~SQL.squish
      (
        (active_storage_attachments.record_type = 'Document'
         AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{user.id}))
        OR
        (active_storage_attachments.record_type = 'Avatar'
         AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{user.id}))
      )
    SQL
  end
end

孤立blobfind_user_blob方法已经通过attachments进行了连接,因此孤立blob(没有附件的)会自动排除。

不同的文件名:相同的内容,不同的名称。blob存储原始文件名,但附件可以覆盖它。这没问题——按内容去重,而不是元数据。

防止blob过早删除

共享blob有一个关键问题:默认情况下,当你删除记录时,ActiveStorage会清除blob。如果文档A和文档B共享一个blob,删除文档A会删除blob——破坏文档B。

通过在模型上禁用自动清除来解决这个问题:

# app/models/document.rb
class Document < ApplicationRecord
  belongs_to :user
  has_one_attached :file, dependent: false  # 不自动清除
end

# app/models/avatar.rb
class Avatar < ApplicationRecord
  belongs_to :user
  has_one_attached :image, dependent: false
end

现在即使附件被删除,blob也会保留。使用计划任务清理孤立blob(零附件的):

# app/jobs/cleanup_orphaned_blobs_job.rb
class CleanupOrphanedBlobsJob < ApplicationJob
  def perform
    # 查找没有附件且超过1天的blob(宽限期)
    ActiveStorage::Blob
      .left_joins(:attachments)
      .where(active_storage_attachments: { id: nil })
      .where(active_storage_blobs: { created_at: ...1.day.ago })
      .find_each(&:purge)
  end
end

安排它每天运行。使用Solid Queue(Rails 8默认):

# config/recurring.yml
cleanup_orphaned_blobs:
  class: CleanupOrphanedBlobsJob
  schedule: every day at 3am

或使用sidekiq-cron:

# config/initializers/sidekiq.rb
Sidekiq::Cron::Job.create(
  name: "清理孤立blob - 每日",
  cron: "0 3 * * *",
  class: "CleanupOrphanedBlobsJob"
)

1天的宽限期可以防止blob创建但尚未附加时的竞态条件。

衡量影响

跟踪你的去重率:

# 在控制器中
def create
  existing_blob = find_user_blob(blob_params[:checksum])

  if existing_blob
    Rails.logger.info "[Dedup] 为用户#{current_user.id}重用blob #{existing_blob.id}(节省#{existing_blob.byte_size}字节)"
    StatsD.increment("uploads.deduplicated")
    StatsD.count("uploads.bytes_saved", existing_blob.byte_size)
    # ...
  end
end

检查每个用户的重复潜力:

def duplication_stats_for(user)
  # 查找此用户blob的重复校验和
  duplicate_checksums = ActiveStorage::Blob
    .joins(:attachments)
    .where(user_owns_attachment_sql(user))
    .group(:checksum)
    .having("COUNT(DISTINCT active_storage_blobs.id) > 1")
    .pluck(:checksum)

  return { duplicates: 0, wasted_bytes: 0 } if duplicate_checksums.empty?

  # 计算浪费的空间
  duplicate_blobs = ActiveStorage::Blob
    .joins(:attachments)
    .where(checksum: duplicate_checksums)
    .where(user_owns_attachment_sql(user))
    .distinct

  total_bytes = duplicate_blobs.sum(:byte_size)
  unique_bytes = duplicate_blobs.select("DISTINCT ON (checksum) *").sum(&:byte_size)

  {
    duplicates: duplicate_blobs.count - duplicate_checksums.size,
    wasted_bytes: total_bytes - unique_bytes
  }
end

def user_owns_attachment_sql(user)
  <<~SQL.squish
    (
      (active_storage_attachments.record_type = 'Document'
       AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{user.id}))
      OR
      (active_storage_attachments.record_type = 'Avatar'
       AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{user.id}))
    )
  SQL
end

总结

去重可以节省存储成本,并使重复文件的上传感觉瞬间完成。关键洞察:ActiveStorage已经计算了校验和——我们只需要使用它们。

通过将范围限定到当前用户,你可以在没有安全风险的情况下获得存储节省。用户只能针对自己的上传进行去重,防止他们声称访问不应该拥有的文件。

从控制器中的服务器端去重开始。如果你想完全跳过上传,添加客户端查找。只有在有明确用例时才将范围扩展到组织或仅公共文件。

本教程中的代码使用默认的MD5校验和。如果你在FIPS环境中,Rails 8.2现在支持SHA256——去重逻辑完全相同。