Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend storagedriver to add bulkdelete #52

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions registry/storage/driver/inmemory/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
Expand Down Expand Up @@ -236,6 +238,24 @@ func (d *driver) Delete(ctx context.Context, path string) error {
}
}

// BulkDelete deletes all objects passed in paths.
func (d *driver) BulkDelete(ctx context.Context, paths []string) (*s3.DeleteObjectsOutput, error) {
var response s3.DeleteObjectsOutput
for _, path := range paths {
err := d.Delete(ctx, path)
if err != nil {
response.Errors = append(response.Errors, &s3.Error{
Key: aws.String(path),
})
}
response.Deleted = append(response.Deleted, &s3.DeletedObject{
Key: aws.String(path),
})
}

return &response, nil
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
Expand Down Expand Up @@ -342,3 +362,24 @@ func (w *writer) flush() {
w.buffer = []byte{}
w.buffSize = 0
}

// DriverV2 embeds driver
type DriverV2 struct {
storagedriver.StorageDriverV2
}

// NewV2 constructs a new DriverV2.
func NewV2() *DriverV2 {
d := &driver{
root: &dir{
common: common{
p: "/",
mod: time.Now(),
},
},
}

return &DriverV2{
d,
}
}
108 changes: 108 additions & 0 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,30 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
return nil
}

// BulkDelete objects provided in the paths
func (d *driver) BulkDelete(ctx context.Context, path []string) (*s3.DeleteObjectsOutput, error) {
s := d.s3Client(ctx)
s3Objects := make([]*s3.ObjectIdentifier, 0, len(path))

for i := 0; i < len(path); i++ {
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: aws.String(d.s3Path(path[i])),
})
}

resp, err := s.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,
Quiet: aws.Bool(false),
},
})
if err != nil {
return nil, err
}
return resp, nil
}

// directoryDiff finds all directories that are not in common between
// the previous and current paths in sorted order.
//
Expand Down Expand Up @@ -1705,3 +1729,87 @@ func (w *writer) flushPart() error {
w.pendingPart = nil
return nil
}

type DriverV2 struct {
storagedriver.StorageDriverV2
}

func NewDriverV2(params DriverParameters) (*DriverV2, error) {

s3obj := params.S3
if s3obj == nil {
if !params.V4Auth &&
(params.RegionEndpoint == "" ||
strings.Contains(params.RegionEndpoint, "s3.amazonaws.com")) {
return nil, fmt.Errorf("on Amazon S3 this storage driver can only be used with v4 authentication")
}

awsConfig := aws.NewConfig()

if params.AccessKey != "" && params.SecretKey != "" {
creds := credentials.NewStaticCredentials(
params.AccessKey,
params.SecretKey,
params.SessionToken,
)
awsConfig.WithCredentials(creds)
}

if params.RegionEndpoint != "" {
awsConfig.WithEndpoint(params.RegionEndpoint)
awsConfig.WithS3ForcePathStyle(params.ForcePathStyle)
}

awsConfig.WithS3UseAccelerate(params.Accelerate)
awsConfig.WithRegion(params.Region)
awsConfig.WithDisableSSL(!params.Secure)
if params.UseDualStack {
awsConfig.UseDualStackEndpoint = endpoints.DualStackEndpointStateEnabled
}

if params.SkipVerify {
httpTransport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
awsConfig.WithHTTPClient(&http.Client{
Transport: httpTransport,
})
}

sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, fmt.Errorf("failed to create new session with aws config: %v", err)
}

if params.UserAgent != "" {
sess.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler(params.UserAgent))
}

s3obj := s3.New(sess)

// enable S3 compatible signature v2 signing instead
if !params.V4Auth {
setv2Handlers(s3obj)
}
}

originalDriver := &driver{
S3: s3obj,
Bucket: params.Bucket,
ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt,
KeyID: params.KeyID,
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
MultipartCombineSmallPart: params.MultipartCombineSmallPart,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
LogS3APIRequests: params.LogS3APIRequests,
LogS3APIResponseHeaders: params.LogS3APIResponseHeaders,
}

// Return a new instance of DriverV2 that embeds the original driver
return &DriverV2{originalDriver}, nil
}
157 changes: 154 additions & 3 deletions registry/storage/driver/s3-aws/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
func Test(t *testing.T) { check.TestingT(t) }

var (
s3DriverConstructor func(rootDirectory, storageClass string) (*Driver, error)
skipS3 func() string
s3DriverConstructor func(rootDirectory, storageClass string) (*Driver, error)
skipS3 func() string
s3DriverV2Constructor func(rootDirectory, storageClass string) (*DriverV2, error)
)

func init() {
Expand Down Expand Up @@ -150,6 +151,99 @@ func init() {
return New(parameters)
}

s3DriverV2Constructor = func(rootDirectory, storageClass string) (*DriverV2, error) {
encryptBool := false
if encrypt != "" {
encryptBool, err = strconv.ParseBool(encrypt)
if err != nil {
return nil, err
}
}

secureBool := true
if secure != "" {
secureBool, err = strconv.ParseBool(secure)
if err != nil {
return nil, err
}
}

skipVerifyBool := false
if skipVerify != "" {
skipVerifyBool, err = strconv.ParseBool(skipVerify)
if err != nil {
return nil, err
}
}

v4Bool := true
if v4Auth != "" {
v4Bool, err = strconv.ParseBool(v4Auth)
if err != nil {
return nil, err
}
}
forcePathStyleBool := true
if forcePathStyle != "" {
forcePathStyleBool, err = strconv.ParseBool(forcePathStyle)
if err != nil {
return nil, err
}
}

useDualStackBool := false
if useDualStack != "" {
useDualStackBool, err = strconv.ParseBool(useDualStack)
}

multipartCombineSmallPart := true
if combineSmallPart != "" {
multipartCombineSmallPart, err = strconv.ParseBool(combineSmallPart)
if err != nil {
return nil, err
}
}

accelerateBool := true
if accelerate != "" {
accelerateBool, err = strconv.ParseBool(accelerate)
if err != nil {
return nil, err
}
}

parameters := DriverParameters{
nil,
accessKey,
secretKey,
bucket,
region,
regionEndpoint,
forcePathStyleBool,
encryptBool,
keyID,
secureBool,
skipVerifyBool,
v4Bool,
minChunkSize,
defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize,
multipartCombineSmallPart,
rootDirectory,
storageClass,
driverName + "-test",
objectACL,
sessionToken,
useDualStackBool,
accelerateBool,
false,
map[string]string{},
}

return NewDriverV2(parameters)
}

// Skip S3 storage driver tests if environment variable parameters are not provided
skipS3 = func() string {
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
Expand All @@ -159,7 +253,7 @@ func init() {
}

testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return s3DriverConstructor(root, s3.StorageClassStandard)
return s3DriverV2Constructor(root, s3.StorageClassStandard)
}, skipS3)
}

Expand Down Expand Up @@ -825,6 +919,63 @@ func TestListObjectsV2(t *testing.T) {
}
}

func TestBulkDelete(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}

rootDir := t.TempDir()

drvr, err := s3DriverV2Constructor(rootDir, s3.StorageClassStandard)
if err != nil {
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
}

files := []string{
"file1",
"file2",
"file3",
"file4",
}

for _, file := range files {
err := drvr.PutContent(context.Background(), file, []byte("content "+file))
if err != nil {
fmt.Printf("unable to init file %s: %s\n", file, err)
continue
}
}

testCases := []struct {
desc string
deleteFiles []string
expectedDeletedFiles []string
expectedDeleteErrorFiles []string
}{
{
desc: "test-case-1",
deleteFiles: []string{"file1", "file2"},
expectedDeletedFiles: []string{"file1", "file2"},
expectedDeleteErrorFiles: nil,
},
}

for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {

res, err := drvr.BulkDelete(context.Background(), testCase.deleteFiles)
if err != nil {
t.Fatalf("unexpected error deleting: %v", err)
}

if !reflect.DeepEqual(res.Deleted, testCase.expectedDeletedFiles) {
t.Fatalf("unexpected bulk delete result")
}
})
}

}

func compareWalked(t *testing.T, expected, walked []string) {
if len(walked) != len(expected) {
t.Fatalf("Mismatch number of fileInfo walked %d expected %d; walked %s; expected %s;", len(walked), len(expected), walked, expected)
Expand Down
8 changes: 8 additions & 0 deletions registry/storage/driver/storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/service/s3"
)

// Version is a string representing the storage driver version, of the form
Expand Down Expand Up @@ -212,3 +214,9 @@ func (e Errors) Error() string {
return msg
}
}

// StorageDriverV2 extends StorageDriver
type StorageDriverV2 interface {
StorageDriver
BulkDelete(ctx context.Context, path []string) (*s3.DeleteObjectsOutput, error)
}