seaweedfs/weed/filesys/dirty_page.go

217 lines
5.6 KiB
Go
Raw Normal View History

package filesys
import (
"bytes"
"context"
2018-07-22 08:39:10 +08:00
"fmt"
"io"
2019-02-16 01:59:22 +08:00
"sync"
2018-07-22 08:39:10 +08:00
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
2018-07-22 08:39:10 +08:00
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2019-02-16 01:59:22 +08:00
"github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
intervals *ContinuousIntervals
f *File
lock sync.Mutex
collection string
replication string
}
2018-05-29 16:21:21 +08:00
func newDirtyPages(file *File) *ContinuousDirtyPages {
return &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
f: file,
}
2018-05-29 16:21:21 +08:00
}
2018-12-28 19:27:48 +08:00
func (pages *ContinuousDirtyPages) releaseResource() {
2018-11-15 14:48:54 +08:00
}
2019-01-01 18:14:40 +08:00
var counter = int32(0)
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock()
defer pages.lock.Unlock()
glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
2019-01-01 18:14:40 +08:00
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
2018-05-31 13:09:24 +08:00
// this is more than what buffer can hold.
return pages.flushAndSave(ctx, offset, data)
}
2020-01-27 05:01:11 +08:00
pages.intervals.AddInterval(data, offset)
var chunk *filer_pb.FileChunk
var hasSavedData bool
if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit {
chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage(ctx)
if hasSavedData {
chunks = append(chunks, chunk)
2018-09-10 17:39:41 +08:00
}
}
return
}
func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
var chunk *filer_pb.FileChunk
var newChunks []*filer_pb.FileChunk
// flush existing
if newChunks, err = pages.saveExistingPagesToStorage(ctx); err == nil {
if newChunks != nil {
chunks = append(chunks, newChunks...)
}
} else {
return
}
// flush the new page
if chunk, err = pages.saveToStorage(ctx, bytes.NewReader(data), offset, int64(len(data))); err == nil {
if chunk != nil {
2019-06-22 02:46:12 +08:00
glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
chunks = append(chunks, chunk)
}
} else {
glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
return
}
return
}
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock()
defer pages.lock.Unlock()
return pages.saveExistingPagesToStorage(ctx)
}
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (chunks []*filer_pb.FileChunk, err error) {
var hasSavedData bool
var chunk *filer_pb.FileChunk
for {
chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage(ctx)
if !hasSavedData {
return chunks, err
}
if err == nil {
chunks = append(chunks, chunk)
} else {
return
2018-05-29 05:32:16 +08:00
}
}
}
func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, hasSavedData bool, err error) {
maxList := pages.intervals.RemoveLargestIntervalLinkedList()
if maxList == nil {
return nil, false, nil
}
chunk, err = pages.saveToStorage(ctx, maxList.ToReader(), maxList.Offset(), maxList.Size())
if err == nil {
hasSavedData = true
glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
} else {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
return
}
return
}
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
var fileId, host string
2019-02-16 01:59:22 +08:00
var auth security.EncodedJwt
dir, _ := pages.f.fullpath().DirAndName()
if err := pages.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
2018-07-22 16:14:36 +08:00
Replication: pages.f.wfs.option.Replication,
Collection: pages.f.wfs.option.Collection,
TtlSec: pages.f.wfs.option.TtlSec,
DataCenter: pages.f.wfs.option.DataCenter,
ParentPath: dir,
}
resp, err := client.AssignVolume(ctx, request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
2019-02-16 01:59:22 +08:00
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
pages.collection, pages.replication = resp.Collection, resp.Replication
return nil
}); err != nil {
2018-06-06 14:37:41 +08:00
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
uploadResult, err := operation.Upload(fileUrl, pages.f.Name, reader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
return &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
Size: uint64(size),
Mtime: time.Now().UnixNano(),
2018-09-23 13:12:21 +08:00
ETag: uploadResult.ETag,
}, nil
}
2018-05-29 16:21:21 +08:00
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
func (pages *ContinuousDirtyPages) ReadDirtyData(ctx context.Context, data []byte, startOffset int64) (offset int64, size int) {
2020-01-23 07:38:25 +08:00
pages.lock.Lock()
defer pages.lock.Unlock()
return pages.intervals.ReadData(data, startOffset)
}