MinIO 分段上传

MinIO 支持分段上传(Multipart Upload),这是一种将一个大文件分割成多个部分并并行上传的方法。分段上传可以显著提高大文件上传的效率,同时还允许在上传失败时重试上传单个部分,从而提高可靠性。

分段上传的步骤

  • 初始化分段上传:开始分段上传会返回一个上传 ID,后续的所有分段上传操作都需要用到这个上传 ID。
  • 上传各个分段:将文件分成多个部分并上传,每个部分都会得到一个 ETag(实体标签,用于标识每个部分的内容)。
  • 完成分段上传:所有部分上传完成后,通过上传 ID 和各个部分的 ETag 合并所有部分,完成上传。

下面是如何使用 MinIO 实现分段上传的详细步骤和代码示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)

func main() {
endpoint := "play.min.io"
accessKeyID := "YOUR-ACCESSKEYID"
secretAccessKey := "YOUR-SECRETACCESSKEY"
bucketName := "my-bucketname"
objectName := "my-objectname"
filePath := "path/to/largefile"

// 初始化MinIO客户端
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: true,
})
if err != nil {
fmt.Println(err)
return
}

// 执行分段上传
err = multipartUpload(minioClient, bucketName, objectName, filePath)
if err != nil {
fmt.Println(err)
}
}

func multipartUpload(minioClient *minio.Client, bucketName, objectName, filePath string) error {
ctx := context.Background()

// 初始化分段上传
uploadID, err := minioClient.NewMultipartUpload(ctx, bucketName, objectName, minio.PutObjectOptions{})
if err != nil {
return fmt.Errorf("failed to initiate multipart upload: %w", err)
}

// 打开文件
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

var parts []minio.CompletePart
partSize := int64(5 * 1024 * 1024) // 每个分段的大小 (5 MB)
buffer := make([]byte, partSize)
partNumber := 1

for {
bytesRead, err := file.Read(buffer)
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read file: %w", err)
}
if bytesRead == 0 {
break
}

// 上传当前分段
part, err := minioClient.PutObjectPart(ctx, bucketName, objectName, uploadID, partNumber, bytes.NewReader(buffer[:bytesRead]), partSize, minio.PutObjectPartOptions{})
if err != nil {
return fmt.Errorf("failed to upload part %d: %w", partNumber, err)
}
parts = append(parts, minio.CompletePart{
PartNumber: partNumber,
ETag: part.ETag,
})
partNumber++
}

// 完成分段上传
_, err = minioClient.CompleteMultipartUpload(ctx, bucketName, objectName, uploadID, parts)
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}

fmt.Println("Multipart upload completed.")
return nil
}

关键步骤详解

  • 初始化MinIO客户端:使用minio.New函数初始化MinIO客户端。
  • 初始化分段上传:使用minioClient.NewMultipartUpload函数开始一个新的分段上传,并获取上传ID。
  • 上传各个分段:读取文件的每一部分,并使用minioClient.PutObjectPart函数上传每个分段。记录每个分段的PartNumber和ETag。
  • 完成分段上传:使用minioClient.CompleteMultipartUpload函数提交所有分段信息(包括每个分段的PartNumber和ETag),从而合并所有分段,完成文件的上传。

注意事项

  • 分段大小:每个分段的大小应合理设置。MinIO和S3建议每个分段至少为5MB(除了最后一个分段可以小于5MB)。
  • 并行上传:可以通过goroutines并行上传多个分段以提高上传效率。
  • 错误处理:需要实现错误处理机制,例如重试上传失败的分段。

MinIO 分段上传
https://www.xinyublog.com/tools/minio/
作者
蚂蚁
发布于
2024年5月4日
许可协议