mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-12 00:19:14 +08:00
214 lines
5.9 KiB
Go
214 lines
5.9 KiB
Go
|
package s3api
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"encoding/json"
|
||
|
"github.com/aws/aws-sdk-go/private/protocol/json/jsonutil"
|
||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||
|
|
||
|
//"github.com/seaweedfs/seaweedfs/weed/s3api"
|
||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||
|
"math"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
|
||
|
entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return buildBucketMetadata(entry), nil
|
||
|
}
|
||
|
|
||
|
type BucketMetaData struct {
|
||
|
_ struct{} `type:"structure"`
|
||
|
|
||
|
Name string
|
||
|
|
||
|
//By default, when another AWS account uploads an object to S3 bucket,
|
||
|
//that account (the object writer) owns the object, has access to it, and
|
||
|
//can grant other users access to it through ACLs. You can use Object Ownership
|
||
|
//to change this default behavior so that ACLs are disabled and you, as the
|
||
|
//bucket owner, automatically own every object in your bucket.
|
||
|
ObjectOwnership string
|
||
|
|
||
|
// Container for the bucket owner's display name and ID.
|
||
|
Owner *s3.Owner `type:"structure"`
|
||
|
|
||
|
// A list of grants for access controls.
|
||
|
Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"`
|
||
|
}
|
||
|
|
||
|
type BucketRegistry struct {
|
||
|
metadataCache map[string]*BucketMetaData
|
||
|
metadataCacheLock sync.RWMutex
|
||
|
|
||
|
notFound map[string]struct{}
|
||
|
notFoundLock sync.RWMutex
|
||
|
s3a *S3ApiServer
|
||
|
}
|
||
|
|
||
|
func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry {
|
||
|
br := &BucketRegistry{
|
||
|
metadataCache: make(map[string]*BucketMetaData),
|
||
|
notFound: make(map[string]struct{}),
|
||
|
s3a: s3a,
|
||
|
}
|
||
|
err := br.init()
|
||
|
if err != nil {
|
||
|
glog.Fatal("init bucket registry failed", err)
|
||
|
return nil
|
||
|
}
|
||
|
return br
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) init() error {
|
||
|
err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
|
||
|
r.LoadBucketMetadata(entry)
|
||
|
return nil
|
||
|
}, "", false, math.MaxUint32)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) {
|
||
|
bucketMetadata := buildBucketMetadata(entry)
|
||
|
r.metadataCacheLock.Lock()
|
||
|
defer r.metadataCacheLock.Unlock()
|
||
|
r.metadataCache[entry.Name] = bucketMetadata
|
||
|
}
|
||
|
|
||
|
func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData {
|
||
|
entryJson, _ := json.Marshal(entry)
|
||
|
glog.V(3).Infof("build bucket metadata,entry=%s", entryJson)
|
||
|
bucketMetadata := &BucketMetaData{
|
||
|
Name: entry.Name,
|
||
|
|
||
|
//Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled
|
||
|
ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
|
||
|
|
||
|
// Default owner: `AccountAdmin`
|
||
|
Owner: &s3.Owner{
|
||
|
ID: &AccountAdmin.Id,
|
||
|
DisplayName: &AccountAdmin.Name,
|
||
|
},
|
||
|
}
|
||
|
if entry.Extended != nil {
|
||
|
//ownership control
|
||
|
ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey]
|
||
|
if ok {
|
||
|
ownership := string(ownership)
|
||
|
valid := s3_constants.ValidateOwnership(ownership)
|
||
|
if valid {
|
||
|
bucketMetadata.ObjectOwnership = ownership
|
||
|
} else {
|
||
|
glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
//access control policy
|
||
|
acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey]
|
||
|
if ok {
|
||
|
var acp s3.AccessControlPolicy
|
||
|
err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes))
|
||
|
if err == nil {
|
||
|
//validate owner
|
||
|
if acp.Owner != nil && acp.Owner.ID != nil {
|
||
|
bucketMetadata.Owner = acp.Owner
|
||
|
} else {
|
||
|
glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name)
|
||
|
}
|
||
|
|
||
|
//acl
|
||
|
bucketMetadata.Acl = acp.Grants
|
||
|
} else {
|
||
|
glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return bucketMetadata
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) {
|
||
|
r.removeMetadataCache(entry.Name)
|
||
|
r.unMarkNotFound(entry.Name)
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
|
||
|
r.metadataCacheLock.RLock()
|
||
|
bucketMetadata, ok := r.metadataCache[bucketName]
|
||
|
r.metadataCacheLock.RUnlock()
|
||
|
if ok {
|
||
|
return bucketMetadata, s3err.ErrNone
|
||
|
}
|
||
|
|
||
|
r.notFoundLock.RLock()
|
||
|
_, ok = r.notFound[bucketName]
|
||
|
r.notFoundLock.RUnlock()
|
||
|
if ok {
|
||
|
return nil, s3err.ErrNoSuchBucket
|
||
|
}
|
||
|
|
||
|
bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName)
|
||
|
if errCode != s3err.ErrNone {
|
||
|
return nil, errCode
|
||
|
}
|
||
|
|
||
|
r.setMetadataCache(bucketMetadata)
|
||
|
r.unMarkNotFound(bucketName)
|
||
|
return bucketMetadata, s3err.ErrNone
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
|
||
|
r.notFoundLock.Lock()
|
||
|
defer r.notFoundLock.Unlock()
|
||
|
|
||
|
//check if already exists
|
||
|
r.metadataCacheLock.RLock()
|
||
|
bucketMetaData, ok := r.metadataCache[bucketName]
|
||
|
r.metadataCacheLock.RUnlock()
|
||
|
if ok {
|
||
|
return bucketMetaData, s3err.ErrNone
|
||
|
}
|
||
|
|
||
|
//if not exists, load from filer
|
||
|
bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName)
|
||
|
if err != nil {
|
||
|
if err == filer_pb.ErrNotFound {
|
||
|
// The bucket doesn't actually exist and should no longer loaded from the filer
|
||
|
r.notFound[bucketName] = struct{}{}
|
||
|
return nil, s3err.ErrNoSuchBucket
|
||
|
}
|
||
|
return nil, s3err.ErrInternalError
|
||
|
}
|
||
|
return bucketMetadata, s3err.ErrNone
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) {
|
||
|
r.metadataCacheLock.Lock()
|
||
|
defer r.metadataCacheLock.Unlock()
|
||
|
r.metadataCache[metadata.Name] = metadata
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) removeMetadataCache(bucket string) {
|
||
|
r.metadataCacheLock.Lock()
|
||
|
defer r.metadataCacheLock.Unlock()
|
||
|
delete(r.metadataCache, bucket)
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) markNotFound(bucket string) {
|
||
|
r.notFoundLock.Lock()
|
||
|
defer r.notFoundLock.Unlock()
|
||
|
r.notFound[bucket] = struct{}{}
|
||
|
}
|
||
|
|
||
|
func (r *BucketRegistry) unMarkNotFound(bucket string) {
|
||
|
r.notFoundLock.Lock()
|
||
|
defer r.notFoundLock.Unlock()
|
||
|
delete(r.notFound, bucket)
|
||
|
}
|