mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-24 19:19:11 +08:00
filer: support TTL for all filer stores
This commit is contained in:
parent
8a899992f2
commit
89eb05b50f
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
)
|
)
|
||||||
@ -41,6 +42,7 @@ type CopyOptions struct {
|
|||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
masters []string
|
masters []string
|
||||||
cipher bool
|
cipher bool
|
||||||
|
ttlSec int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -124,6 +126,13 @@ func runCopy(cmd *Command, args []string) bool {
|
|||||||
copy.masters = masters
|
copy.masters = masters
|
||||||
copy.cipher = cipher
|
copy.cipher = cipher
|
||||||
|
|
||||||
|
ttl, err := needle.ReadTTL(*copy.ttl)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
copy.ttlSec = int32(ttl.Minutes()) * 60
|
||||||
|
|
||||||
if *cmdCopy.IsDebug {
|
if *cmdCopy.IsDebug {
|
||||||
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
|
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
|
||||||
}
|
}
|
||||||
@ -286,7 +295,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
|
|||||||
Count: 1,
|
Count: 1,
|
||||||
Replication: *worker.options.replication,
|
Replication: *worker.options.replication,
|
||||||
Collection: *worker.options.collection,
|
Collection: *worker.options.collection,
|
||||||
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
|
TtlSec: worker.options.ttlSec,
|
||||||
ParentPath: task.destinationUrlPath,
|
ParentPath: task.destinationUrlPath,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +351,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
|
|||||||
Mime: mimeType,
|
Mime: mimeType,
|
||||||
Replication: *worker.options.replication,
|
Replication: *worker.options.replication,
|
||||||
Collection: *worker.options.collection,
|
Collection: *worker.options.collection,
|
||||||
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
|
TtlSec: worker.options.ttlSec,
|
||||||
},
|
},
|
||||||
Chunks: chunks,
|
Chunks: chunks,
|
||||||
},
|
},
|
||||||
@ -388,7 +397,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
|
|||||||
Count: 1,
|
Count: 1,
|
||||||
Replication: *worker.options.replication,
|
Replication: *worker.options.replication,
|
||||||
Collection: *worker.options.collection,
|
Collection: *worker.options.collection,
|
||||||
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
|
TtlSec: worker.options.ttlSec,
|
||||||
ParentPath: task.destinationUrlPath,
|
ParentPath: task.destinationUrlPath,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -469,7 +478,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
|
|||||||
Mime: mimeType,
|
Mime: mimeType,
|
||||||
Replication: replication,
|
Replication: replication,
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
|
TtlSec: worker.options.ttlSec,
|
||||||
},
|
},
|
||||||
Chunks: chunks,
|
Chunks: chunks,
|
||||||
},
|
},
|
||||||
|
@ -223,14 +223,36 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
|
|||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
return f.store.FindEntry(ctx, p)
|
entry, err = f.store.FindEntry(ctx, p)
|
||||||
|
if entry != nil && entry.TtlSec > 0 {
|
||||||
|
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
|
||||||
|
f.store.DeleteEntry(ctx, p.Child(entry.Name()))
|
||||||
|
return nil, filer_pb.ErrNotFound
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
|
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, err error) {
|
||||||
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
|
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
|
||||||
p = p[0 : len(p)-1]
|
p = p[0 : len(p)-1]
|
||||||
}
|
}
|
||||||
return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
|
listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
|
||||||
|
if listErr != nil {
|
||||||
|
return listedEntries, expiredCount, err
|
||||||
|
}
|
||||||
|
for _, entry := range listedEntries {
|
||||||
|
if entry.TtlSec > 0 {
|
||||||
|
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
|
||||||
|
f.store.DeleteEntry(ctx, p.Child(entry.Name()))
|
||||||
|
expiredCount++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
entries = append(entries, entry)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filer) cacheDelDirectory(dirpath string) {
|
func (f *Filer) cacheDelDirectory(dirpath string) {
|
||||||
|
@ -28,7 +28,7 @@ func (f *Filer) LoadBuckets(dirBucketsPath string) {
|
|||||||
|
|
||||||
limit := math.MaxInt32
|
limit := math.MaxInt32
|
||||||
|
|
||||||
entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
|
entries, _, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("no buckets found: %v", err)
|
glog.V(1).Infof("no buckets found: %v", err)
|
||||||
|
@ -57,7 +57,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
|||||||
lastFileName := ""
|
lastFileName := ""
|
||||||
includeLastFile := false
|
includeLastFile := false
|
||||||
for {
|
for {
|
||||||
entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
|
entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("list folder %s: %v", entry.FullPath, err)
|
glog.Errorf("list folder %s: %v", entry.FullPath, err)
|
||||||
return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
|
return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
|
||||||
|
@ -48,14 +48,14 @@ func TestCreateAndFind(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// checking one upper directory
|
// checking one upper directory
|
||||||
entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
|
entries, _, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
|
||||||
if len(entries) != 1 {
|
if len(entries) != 1 {
|
||||||
t.Errorf("list entries count: %v", len(entries))
|
t.Errorf("list entries count: %v", len(entries))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// checking one upper directory
|
// checking one upper directory
|
||||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
entries, _, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||||
if len(entries) != 1 {
|
if len(entries) != 1 {
|
||||||
t.Errorf("list entries count: %v", len(entries))
|
t.Errorf("list entries count: %v", len(entries))
|
||||||
return
|
return
|
||||||
@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// checking one upper directory
|
// checking one upper directory
|
||||||
entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
entries, _, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("list entries: %v", err)
|
t.Errorf("list entries: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -48,14 +48,14 @@ func TestCreateAndFind(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// checking one upper directory
|
// checking one upper directory
|
||||||
entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
|
entries, _, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
|
||||||
if len(entries) != 1 {
|
if len(entries) != 1 {
|
||||||
t.Errorf("list entries count: %v", len(entries))
|
t.Errorf("list entries count: %v", len(entries))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// checking one upper directory
|
// checking one upper directory
|
||||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
entries, _, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||||
if len(entries) != 1 {
|
if len(entries) != 1 {
|
||||||
t.Errorf("list entries count: %v", len(entries))
|
t.Errorf("list entries count: %v", len(entries))
|
||||||
return
|
return
|
||||||
@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// checking one upper directory
|
// checking one upper directory
|
||||||
entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
entries, _, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("list entries: %v", err)
|
t.Errorf("list entries: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -53,7 +53,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
|
|||||||
lastFileName := req.StartFromFileName
|
lastFileName := req.StartFromFileName
|
||||||
includeLastFile := req.InclusiveStartFrom
|
includeLastFile := req.InclusiveStartFrom
|
||||||
for limit > 0 {
|
for limit > 0 {
|
||||||
entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
|
entries, expiredCount, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -92,7 +92,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(entries) < paginationLimit {
|
if len(entries)+expiredCount < paginationLimit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
|
|||||||
includeLastFile := false
|
includeLastFile := false
|
||||||
for {
|
for {
|
||||||
|
|
||||||
entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
|
entries, _, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
|
|
||||||
lastFileName := r.FormValue("lastFileName")
|
lastFileName := r.FormValue("lastFileName")
|
||||||
|
|
||||||
entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
|
entries, expiredCount, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
|
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
|
||||||
@ -40,7 +40,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
shouldDisplayLoadMore := len(entries) == limit
|
shouldDisplayLoadMore := len(entries)+expiredCount == limit
|
||||||
if path == "/" {
|
if path == "/" {
|
||||||
path = ""
|
path = ""
|
||||||
}
|
}
|
||||||
|
@ -180,9 +180,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
|
|||||||
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
|
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
|
glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
|
||||||
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
|
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
|
||||||
glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
|
glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
|
||||||
if int64(v.Ttl.Minutes()) < livedMinutes {
|
if int64(v.Ttl.Minutes()) < livedMinutes {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user