使用ActiveStorage构建文件去重系统
每次用户上传公司logo、个人头像或已经上传过三次的同一个PDF时,你都在为再次存储它付费。ActiveStorage默认不进行去重——即使内容相同,每次上传都会创建一个新的blob。
让我们来解决这个问题。我们将构建一个去重系统,检测相同的文件并重用现有的blob,节省存储成本并使重复文件的上传瞬间完成。我们将去重范围限定到每个用户——这样用户只能针对自己的上传进行去重,保持文件安全。
ActiveStorage校验和的工作原理
每个ActiveStorage blob都有一个checksum列,这是文件内容的MD5哈希。具有相同内容的两个文件将始终具有相同的校验和:
ActiveStorage::Blob.pluck(:checksum).tally
# => {"vckNNU4TN7zbzl+o3tjXPQ==" => 47, "x8K9f2mVhLpWQ..." => 12, ...}
如果你看到计数大于1,说明有重复。让我们消除它们。
为性能添加索引
ActiveStorage默认不会为checksum列建立索引。没有索引,去重查询将进行全表扫描——对于小型应用来说没问题,但在规模扩大时会变慢。
注意: 此迁移修改了ActiveStorage的架构。虽然添加索引是低风险的(没有结构性更改),但请注意你正在自定义一个引擎拥有的表。为你的团队记录这个决定。
class AddIndexToActiveStorageBlobsChecksum < ActiveRecord::Migration[8.0]
disable_ddl_transaction!
def change
add_index :active_storage_blobs, :checksum, algorithm: :concurrently
end
end
algorithm: :concurrently选项(PostgreSQL)在不阻塞写入的情况下构建索引,这对于具有现有数据的生产数据库至关重要。它需要disable_ddl_transaction!,因为并发索引创建不能在事务内运行。
步骤1:服务器端去重
创建blob时,检查当前用户是否已经拥有具有该校验和的blob。
创建一个控制器来处理去重的直接上传:
# app/controllers/deduplicated_uploads_controller.rb
class DeduplicatedUploadsController < ActiveStorage::DirectUploadsController
before_action :authenticate_user!
def create
existing_blob = find_existing_blob_for_user(blob_params[:checksum])
if existing_blob
render json: existing_blob_json(existing_blob)
else
super
end
end
private
def find_existing_blob_for_user(checksum)
return nil if checksum.blank?
# 只查找当前用户之前上传的blob
# 使用带有EXISTS子查询的单个高效查询
ActiveStorage::Blob
.joins(:attachments)
.where(checksum: checksum)
.where(user_owns_attachment_sql)
.first
end
# 检查current_user是否拥有附加记录的SQL片段
# 使用EXISTS子查询以提高效率(不将ID加载到内存中)
def user_owns_attachment_sql
<<~SQL.squish
(
(active_storage_attachments.record_type = 'Document'
AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{current_user.id}))
OR
(active_storage_attachments.record_type = 'Avatar'
AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{current_user.id}))
)
SQL
end
def existing_blob_json(blob)
{
id: blob.id,
key: blob.key,
filename: blob.filename.to_s,
content_type: blob.content_type,
byte_size: blob.byte_size,
checksum: blob.checksum,
signed_id: blob.signed_id,
direct_upload: nil # 跳过上传的信号
}
end
def blob_params
params.require(:blob).permit(:filename, :byte_size, :checksum, :content_type, metadata: {})
end
end
这通过active_storage_attachments表进行查询,以查找附加到当前用户拥有的记录的blob。根据你的应用程序的所有权模型调整attachable_types和user_record_ids。
添加路由:
# config/routes.rb
Rails.application.routes.draw do
post '/rails/active_storage/direct_uploads',
to: 'deduplicated_uploads#create',
as: :deduplicated_direct_uploads
end
附件的工作原理
当我们找到现有的blob时,我们返回它的signed_id。客户端将此signed_id与表单一起提交,ActiveStorage创建一个指向现有blob的新附件:
# app/controllers/documents_controller.rb
class DocumentsController < ApplicationController
def create
@document = current_user.documents.build(document_params)
if @document.save
redirect_to @document
else
render :new
end
end
private
def document_params
params.require(:document).permit(:title, :file)
end
end
当params[:document][:file]是signed_id字符串(而不是上传的文件)时,ActiveStorage会找到blob并附加它。一个blob可以有多个附件:
blob = ActiveStorage::Blob.find_by(checksum: "vckNNU4TN7zbzl+o3tjXPQ==")
blob.attachments.count
# => 5 (五个不同的记录共享这个blob)
这是去重的关键:多个记录引用同一个存储的文件。
步骤2:客户端上传处理
客户端需要知道何时跳过上传。当响应中的direct_upload为null时,文件已经存在:
// app/javascript/controllers/deduplicated_upload_controller.js
import { Controller } from "@hotwired/stimulus"
import { DirectUpload } from "@rails/activestorage"
export default class extends Controller {
static targets = ["input", "progress"]
static values = { url: String }
upload() {
const file = this.inputTarget.files[0]
if (!file) return
const upload = new DirectUpload(file, this.urlValue, this)
upload.create((error, blob) => {
if (error) {
console.error(error)
} else {
this.handleSuccess(blob)
}
})
}
handleSuccess(blob) {
// 创建带有signed_id的隐藏input
const input = document.createElement("input")
input.type = "hidden"
input.name = this.inputTarget.name
input.value = blob.signed_id
this.inputTarget.form.appendChild(input)
if (blob.direct_upload === null) {
this.showMessage("文件已存在 - 即时上传!")
} else {
this.showMessage("上传完成")
}
}
// DirectUpload委托方法
directUploadWillStoreFileWithXHR(request) {
request.upload.addEventListener("progress", (event) => {
const progress = (event.loaded / event.total) * 100
this.progressTarget.style.width = `${progress}%`
})
}
showMessage(text) {
// 更新UI以显示状态
this.progressTarget.textContent = text
}
}
在表单中使用:
<%= form_with model: @document do |f| %>
<div data-controller="deduplicated-upload"
data-deduplicated-upload-url-value="<%= deduplicated_direct_uploads_url %>">
<%= f.file_field :file,
data: {
deduplicated_upload_target: "input",
action: "change->deduplicated-upload#upload"
} %>
<div data-deduplicated-upload-target="progress"></div>
</div>
<%= f.submit %>
<% end %>
步骤3:完全跳过网络请求
我们可以更进一步。在客户端计算校验和,并在尝试上传之前检查blob是否存在:
将作用域逻辑提取到一个concern中,以便在控制器之间共享:
# app/controllers/concerns/blob_scoping.rb
module BlobScoping
extend ActiveSupport::Concern
def find_user_blob(checksum)
return nil if checksum.blank?
ActiveStorage::Blob
.joins(:attachments)
.where(checksum: checksum)
.where(user_owns_attachment_sql)
.first
end
private
def user_owns_attachment_sql
<<~SQL.squish
(
(active_storage_attachments.record_type = 'Document'
AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{current_user.id}))
OR
(active_storage_attachments.record_type = 'Avatar'
AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{current_user.id}))
)
SQL
end
end
然后在查找控制器中使用它:
# app/controllers/blob_lookups_controller.rb
class BlobLookupsController < ApplicationController
include BlobScoping
before_action :authenticate_user!
def show
blob = find_user_blob(params[:checksum])
if blob
render json: { exists: true, signed_id: blob.signed_id }
else
render json: { exists: false }
end
end
end
# config/routes.rb
# 使用查询参数来传递校验和(base64包含+、/、=,在路径中需要编码)
get '/blobs/lookup', to: 'blob_lookups#show', as: :blob_lookup
更新控制器以从查询参数读取:
# app/controllers/blob_lookups_controller.rb
def show
blob = find_user_blob(params[:checksum])
# ... 其余不变
end
现在JavaScript可以先检查:
// app/javascript/controllers/smart_upload_controller.js
import { Controller } from "@hotwired/stimulus"
import { DirectUpload, FileChecksum } from "@rails/activestorage"
export default class extends Controller {
static targets = ["input", "status"]
static values = {
lookupUrl: String,
uploadUrl: String
}
async upload() {
const file = this.inputTarget.files[0]
if (!file) return
try {
this.statusTarget.textContent = "计算校验和..."
const checksum = await this.computeChecksum(file)
this.statusTarget.textContent = "检查重复..."
const existing = await this.lookupBlob(checksum)
if (existing.exists) {
this.statusTarget.textContent = "文件已上传 - 即时!"
this.attachSignedId(existing.signed_id)
return
}
this.statusTarget.textContent = "上传中..."
await this.performUpload(file)
} catch (error) {
this.statusTarget.textContent = `错误:${error.message}`
console.error("上传失败:", error)
}
}
computeChecksum(file) {
return new Promise((resolve, reject) => {
FileChecksum.create(file, (error, checksum) => {
if (error) {
reject(new Error(`校验和失败:${error}`))
} else {
resolve(checksum)
}
})
})
}
async lookupBlob(checksum) {
const url = new URL(this.lookupUrlValue, window.location.origin)
url.searchParams.set("checksum", checksum)
const response = await fetch(url, {
headers: {
"X-CSRF-Token": this.csrfToken,
"Accept": "application/json"
},
credentials: "same-origin"
})
if (!response.ok) {
throw new Error(`查找失败:${response.status}`)
}
return response.json()
}
get csrfToken() {
const meta = document.querySelector('meta[name="csrf-token"]')
return meta ? meta.content : ""
}
attachSignedId(signedId) {
const input = document.createElement("input")
input.type = "hidden"
input.name = this.inputTarget.name
input.value = signedId
this.inputTarget.form.appendChild(input)
}
performUpload(file) {
return new Promise((resolve, reject) => {
const upload = new DirectUpload(file, this.uploadUrlValue, this)
upload.create((error, blob) => {
if (error) {
reject(new Error(`上传失败:${error}`))
} else {
this.statusTarget.textContent = "上传完成"
this.attachSignedId(blob.signed_id)
resolve(blob)
}
})
})
}
directUploadWillStoreFileWithXHR(request) {
request.upload.addEventListener("progress", (event) => {
const percent = Math.round((event.loaded / event.total) * 100)
this.statusTarget.textContent = `上传中:${percent}%`
})
}
}
扩展范围
用户范围的去重是最安全的默认设置,但在某些情况下你可能需要更广泛的去重。以下是扩展范围的选项:
选项A:组织/租户范围
对于团队成员共享文件的SaaS应用,在组织内进行去重。这需要查询任何组织成员拥有的记录:
def find_org_blob(checksum)
ActiveStorage::Blob
.joins(:attachments)
.where(checksum: checksum)
.where(org_owns_attachment_sql)
.first
end
def org_owns_attachment_sql
# 查找附加到任何组织成员拥有的记录的blob
<<~SQL.squish
(
(active_storage_attachments.record_type = 'Document'
AND EXISTS (
SELECT 1 FROM documents
WHERE documents.id = active_storage_attachments.record_id
AND documents.organization_id = #{current_organization.id}
))
OR
(active_storage_attachments.record_type = 'Project'
AND EXISTS (
SELECT 1 FROM projects
WHERE projects.id = active_storage_attachments.record_id
AND projects.organization_id = #{current_organization.id}
))
)
SQL
end
选项B:仅公共文件
允许对明确标记为公共的文件进行全局去重:
def find_public_blob(checksum)
ActiveStorage::Blob
.where(checksum: checksum)
.where("metadata->>'public' = ?", "true")
.first
end
def find_existing_blob(checksum)
find_user_blob(checksum) || find_public_blob(checksum)
end
选项C:基于内容类型(仅服务器端)
对图像等”安全”内容进行全局去重,但保持文档的用户范围。这仅适用于服务器端去重(步骤1),我们可以从上传请求中获取content_type——不适用于上传前查找(步骤3):
# 在DeduplicatedUploadsController中
def find_existing_blob_for_user(checksum)
content_type = blob_params[:content_type]
if content_type&.start_with?("image/")
# 图像可以全局去重(低风险)
ActiveStorage::Blob.find_by(checksum: checksum)
else
# 文档保持用户范围
find_user_blob(checksum)
end
end
处理边缘情况
竞态条件:同一用户快速连续上传同一文件两次。两个校验和都返回”未找到”,两个都上传。你最终得到两个blob。这没问题。用户范围查询已经自然地处理了这种情况,你可以使用后台作业按用户清理重复:
# app/jobs/deduplicate_user_blobs_job.rb
class DeduplicateUserBlobsJob < ApplicationJob
def perform(user)
duplicates = find_duplicate_checksums_for(user)
duplicates.each do |checksum|
deduplicate_blobs_with_checksum(user, checksum)
end
end
private
def find_duplicate_checksums_for(user)
ActiveStorage::Blob
.joins(:attachments)
.where(user_owns_attachment_sql(user))
.group(:checksum)
.having("COUNT(DISTINCT active_storage_blobs.id) > 1")
.pluck(:checksum)
end
def deduplicate_blobs_with_checksum(user, checksum)
ActiveStorage::Blob.transaction do
blobs = ActiveStorage::Blob
.joins(:attachments)
.where(checksum: checksum)
.where(user_owns_attachment_sql(user))
.order(:created_at)
.lock("FOR UPDATE")
.distinct
canonical = blobs.first
return if canonical.nil?
# 使用where.not而不是offset(offset不能与find_each一起使用)
blobs.where.not(id: canonical.id).find_each do |duplicate|
duplicate.attachments.update_all(blob_id: canonical.id)
duplicate.purge
end
end
end
def user_owns_attachment_sql(user)
<<~SQL.squish
(
(active_storage_attachments.record_type = 'Document'
AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{user.id}))
OR
(active_storage_attachments.record_type = 'Avatar'
AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{user.id}))
)
SQL
end
end
孤立blob:find_user_blob方法已经通过attachments进行了连接,因此孤立blob(没有附件的)会自动排除。
不同的文件名:相同的内容,不同的名称。blob存储原始文件名,但附件可以覆盖它。这没问题——按内容去重,而不是元数据。
防止blob过早删除
共享blob有一个关键问题:默认情况下,当你删除记录时,ActiveStorage会清除blob。如果文档A和文档B共享一个blob,删除文档A会删除blob——破坏文档B。
通过在模型上禁用自动清除来解决这个问题:
# app/models/document.rb
class Document < ApplicationRecord
belongs_to :user
has_one_attached :file, dependent: false # 不自动清除
end
# app/models/avatar.rb
class Avatar < ApplicationRecord
belongs_to :user
has_one_attached :image, dependent: false
end
现在即使附件被删除,blob也会保留。使用计划任务清理孤立blob(零附件的):
# app/jobs/cleanup_orphaned_blobs_job.rb
class CleanupOrphanedBlobsJob < ApplicationJob
def perform
# 查找没有附件且超过1天的blob(宽限期)
ActiveStorage::Blob
.left_joins(:attachments)
.where(active_storage_attachments: { id: nil })
.where(active_storage_blobs: { created_at: ...1.day.ago })
.find_each(&:purge)
end
end
安排它每天运行。使用Solid Queue(Rails 8默认):
# config/recurring.yml
cleanup_orphaned_blobs:
class: CleanupOrphanedBlobsJob
schedule: every day at 3am
或使用sidekiq-cron:
# config/initializers/sidekiq.rb
Sidekiq::Cron::Job.create(
name: "清理孤立blob - 每日",
cron: "0 3 * * *",
class: "CleanupOrphanedBlobsJob"
)
1天的宽限期可以防止blob创建但尚未附加时的竞态条件。
衡量影响
跟踪你的去重率:
# 在控制器中
def create
existing_blob = find_user_blob(blob_params[:checksum])
if existing_blob
Rails.logger.info "[Dedup] 为用户#{current_user.id}重用blob #{existing_blob.id}(节省#{existing_blob.byte_size}字节)"
StatsD.increment("uploads.deduplicated")
StatsD.count("uploads.bytes_saved", existing_blob.byte_size)
# ...
end
end
检查每个用户的重复潜力:
def duplication_stats_for(user)
# 查找此用户blob的重复校验和
duplicate_checksums = ActiveStorage::Blob
.joins(:attachments)
.where(user_owns_attachment_sql(user))
.group(:checksum)
.having("COUNT(DISTINCT active_storage_blobs.id) > 1")
.pluck(:checksum)
return { duplicates: 0, wasted_bytes: 0 } if duplicate_checksums.empty?
# 计算浪费的空间
duplicate_blobs = ActiveStorage::Blob
.joins(:attachments)
.where(checksum: duplicate_checksums)
.where(user_owns_attachment_sql(user))
.distinct
total_bytes = duplicate_blobs.sum(:byte_size)
unique_bytes = duplicate_blobs.select("DISTINCT ON (checksum) *").sum(&:byte_size)
{
duplicates: duplicate_blobs.count - duplicate_checksums.size,
wasted_bytes: total_bytes - unique_bytes
}
end
def user_owns_attachment_sql(user)
<<~SQL.squish
(
(active_storage_attachments.record_type = 'Document'
AND EXISTS (SELECT 1 FROM documents WHERE documents.id = active_storage_attachments.record_id AND documents.user_id = #{user.id}))
OR
(active_storage_attachments.record_type = 'Avatar'
AND EXISTS (SELECT 1 FROM avatars WHERE avatars.id = active_storage_attachments.record_id AND avatars.user_id = #{user.id}))
)
SQL
end
总结
去重可以节省存储成本,并使重复文件的上传感觉瞬间完成。关键洞察:ActiveStorage已经计算了校验和——我们只需要使用它们。
通过将范围限定到当前用户,你可以在没有安全风险的情况下获得存储节省。用户只能针对自己的上传进行去重,防止他们声称访问不应该拥有的文件。
从控制器中的服务器端去重开始。如果你想完全跳过上传,添加客户端查找。只有在有明确用例时才将范围扩展到组织或仅公共文件。
本教程中的代码使用默认的MD5校验和。如果你在FIPS环境中,Rails 8.2现在支持SHA256——去重逻辑完全相同。