2018-05-29 03:30:17 +08:00
|
|
|
package filesys
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
2018-07-22 08:39:10 +08:00
|
|
|
"fmt"
|
2019-02-16 01:59:22 +08:00
|
|
|
"sync"
|
2019-01-01 18:14:40 +08:00
|
|
|
"sync/atomic"
|
2018-07-22 08:39:10 +08:00
|
|
|
"time"
|
2018-05-29 03:30:17 +08:00
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
2018-07-22 08:39:10 +08:00
|
|
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
2019-02-16 01:59:22 +08:00
|
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
2018-05-29 03:30:17 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
type ContinuousDirtyPages struct {
|
2018-05-29 16:21:21 +08:00
|
|
|
hasData bool
|
|
|
|
Offset int64
|
|
|
|
Size int64
|
|
|
|
Data []byte
|
|
|
|
f *File
|
2018-09-08 04:11:43 +08:00
|
|
|
lock sync.Mutex
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
|
|
|
|
2018-05-29 16:21:21 +08:00
|
|
|
func newDirtyPages(file *File) *ContinuousDirtyPages {
|
|
|
|
return &ContinuousDirtyPages{
|
2018-12-28 19:27:48 +08:00
|
|
|
Data: nil,
|
2018-05-29 16:21:21 +08:00
|
|
|
f: file,
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
2018-05-29 16:21:21 +08:00
|
|
|
}
|
2018-05-29 03:30:17 +08:00
|
|
|
|
2018-12-28 19:27:48 +08:00
|
|
|
func (pages *ContinuousDirtyPages) releaseResource() {
|
|
|
|
if pages.Data != nil {
|
|
|
|
pages.f.wfs.bufPool.Put(pages.Data)
|
2019-01-01 18:14:40 +08:00
|
|
|
pages.Data = nil
|
|
|
|
atomic.AddInt32(&counter, -1)
|
2019-01-01 19:04:44 +08:00
|
|
|
glog.V(3).Infof("%s/%s releasing resource %d", pages.f.dir.Path, pages.f.Name, counter)
|
2018-11-15 14:48:54 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-01-01 18:14:40 +08:00
|
|
|
var counter = int32(0)
|
|
|
|
|
2018-05-31 13:02:21 +08:00
|
|
|
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
|
|
|
|
2018-09-08 04:11:43 +08:00
|
|
|
pages.lock.Lock()
|
|
|
|
defer pages.lock.Unlock()
|
|
|
|
|
2018-05-31 13:02:21 +08:00
|
|
|
var chunk *filer_pb.FileChunk
|
2018-05-29 03:30:17 +08:00
|
|
|
|
2019-01-01 18:14:40 +08:00
|
|
|
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
|
2018-05-31 13:09:24 +08:00
|
|
|
// this is more than what buffer can hold.
|
2018-09-10 17:21:57 +08:00
|
|
|
return pages.flushAndSave(ctx, offset, data)
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
|
|
|
|
2019-01-01 18:14:40 +08:00
|
|
|
if pages.Data == nil {
|
|
|
|
pages.Data = pages.f.wfs.bufPool.Get().([]byte)
|
|
|
|
atomic.AddInt32(&counter, 1)
|
2019-01-01 19:04:44 +08:00
|
|
|
glog.V(3).Infof("%s/%s acquire resource %d", pages.f.dir.Path, pages.f.Name, counter)
|
2019-01-01 18:14:40 +08:00
|
|
|
}
|
|
|
|
|
2018-05-29 16:21:21 +08:00
|
|
|
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
|
|
|
|
pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) {
|
|
|
|
// if the data is out of range,
|
|
|
|
// or buffer is full if adding new data,
|
|
|
|
// flush current buffer and add new data
|
|
|
|
|
2020-01-21 12:21:01 +08:00
|
|
|
glog.V(4).Infof("offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
2018-05-29 16:21:21 +08:00
|
|
|
|
2018-05-31 13:02:21 +08:00
|
|
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
2018-05-29 16:21:21 +08:00
|
|
|
if chunk != nil {
|
|
|
|
glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
2018-05-31 13:02:21 +08:00
|
|
|
chunks = append(chunks, chunk)
|
2018-05-29 16:21:21 +08:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
|
|
|
return
|
2018-05-29 04:42:25 +08:00
|
|
|
}
|
2018-05-29 16:21:21 +08:00
|
|
|
pages.Offset = offset
|
2020-01-21 12:21:01 +08:00
|
|
|
glog.V(4).Infof("copy data0: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
2018-05-29 16:21:21 +08:00
|
|
|
copy(pages.Data, data)
|
2018-09-08 04:11:43 +08:00
|
|
|
pages.Size = int64(len(data))
|
2018-05-29 04:42:25 +08:00
|
|
|
return
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
|
|
|
|
2018-09-08 04:11:43 +08:00
|
|
|
if offset != pages.Offset+pages.Size {
|
|
|
|
// when this happens, debug shows the data overlapping with existing data is empty
|
|
|
|
// the data is not just append
|
2018-09-22 15:11:46 +08:00
|
|
|
if offset == pages.Offset && int(pages.Size) < len(data) {
|
2020-01-21 12:21:01 +08:00
|
|
|
glog.V(4).Infof("copy data1: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
2018-09-10 17:39:41 +08:00
|
|
|
copy(pages.Data[pages.Size:], data[pages.Size:])
|
|
|
|
} else {
|
|
|
|
if pages.Size != 0 {
|
2018-11-11 16:07:46 +08:00
|
|
|
glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data)))
|
2018-09-10 17:39:41 +08:00
|
|
|
}
|
|
|
|
return pages.flushAndSave(ctx, offset, data)
|
|
|
|
}
|
|
|
|
} else {
|
2020-01-21 12:21:01 +08:00
|
|
|
glog.V(4).Infof("copy data2: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
2018-09-10 17:39:41 +08:00
|
|
|
copy(pages.Data[offset-pages.Offset:], data)
|
2018-09-08 04:11:43 +08:00
|
|
|
}
|
2018-09-10 17:21:57 +08:00
|
|
|
|
2018-05-29 16:21:21 +08:00
|
|
|
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
|
2018-05-29 03:30:17 +08:00
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-09-10 17:21:57 +08:00
|
|
|
func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
|
|
|
|
|
|
|
var chunk *filer_pb.FileChunk
|
|
|
|
|
|
|
|
// flush existing
|
|
|
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
|
|
|
if chunk != nil {
|
2019-06-22 02:46:12 +08:00
|
|
|
glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
|
2018-09-10 17:21:57 +08:00
|
|
|
chunks = append(chunks, chunk)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
pages.Size = 0
|
|
|
|
pages.Offset = 0
|
|
|
|
|
|
|
|
// flush the new page
|
|
|
|
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
|
|
|
|
if chunk != nil {
|
2019-06-22 02:46:12 +08:00
|
|
|
glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
|
2018-09-10 17:21:57 +08:00
|
|
|
chunks = append(chunks, chunk)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-05-29 03:30:17 +08:00
|
|
|
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
|
|
|
|
|
2018-09-08 04:11:43 +08:00
|
|
|
pages.lock.Lock()
|
|
|
|
defer pages.lock.Unlock()
|
|
|
|
|
2018-05-29 16:21:21 +08:00
|
|
|
if pages.Size == 0 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
2018-05-29 03:30:17 +08:00
|
|
|
|
2018-05-31 13:02:21 +08:00
|
|
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
2018-05-29 16:21:21 +08:00
|
|
|
pages.Size = 0
|
2018-09-08 04:11:43 +08:00
|
|
|
pages.Offset = 0
|
2018-05-29 05:32:16 +08:00
|
|
|
if chunk != nil {
|
2018-05-29 13:45:52 +08:00
|
|
|
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
2018-05-29 05:32:16 +08:00
|
|
|
}
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-05-31 13:02:21 +08:00
|
|
|
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
|
2018-09-08 04:11:43 +08:00
|
|
|
|
|
|
|
if pages.Size == 0 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2020-01-21 12:21:01 +08:00
|
|
|
glog.V(0).Infof("%s/%s saveExistingPagesToStorage [%d,%d): Data len=%d", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Size, len(pages.Data))
|
|
|
|
|
2018-05-31 13:02:21 +08:00
|
|
|
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
|
2018-05-29 03:30:17 +08:00
|
|
|
|
|
|
|
var fileId, host string
|
2019-02-16 01:59:22 +08:00
|
|
|
var auth security.EncodedJwt
|
2018-05-29 03:30:17 +08:00
|
|
|
|
2019-05-03 15:24:35 +08:00
|
|
|
if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
2018-05-29 03:30:17 +08:00
|
|
|
|
|
|
|
request := &filer_pb.AssignVolumeRequest{
|
|
|
|
Count: 1,
|
2018-07-22 16:14:36 +08:00
|
|
|
Replication: pages.f.wfs.option.Replication,
|
|
|
|
Collection: pages.f.wfs.option.Collection,
|
|
|
|
TtlSec: pages.f.wfs.option.TtlSec,
|
|
|
|
DataCenter: pages.f.wfs.option.DataCenter,
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
resp, err := client.AssignVolume(ctx, request)
|
|
|
|
if err != nil {
|
|
|
|
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-02-16 01:59:22 +08:00
|
|
|
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
|
2018-05-29 03:30:17 +08:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
2018-06-06 14:37:41 +08:00
|
|
|
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
2018-05-29 03:30:17 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
avoid slice out of bounds
avoid this problem
2018/09/04 16:27:14 fuse: panic in handler for Write [ID=0x27c0d Node=0x2 Uid=0 Gid=0 Pid=0] 0x1 131072 @10607788032 fl=WriteCache lock=0 ffl=OpenReadOnly: runtime error: slice bounds out of range
goroutine 211141 [running]:
bazil.org/fuse/fs.(*Server).serve.func2(0x10d3e60, 0xc00014be30, 0xc00052fef8, 0xc00052fe77)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:857 +0x1ac
panic(0xe2d080, 0x17f62b0)
/home/travis/.gimme/versions/go/src/runtime/panic.go:513 +0x1b9
github.com/chrislusf/seaweedfs/weed/filesys.(*ContinuousDirtyPages).saveToStorage(0xc0000aca80, 0x10d7ba0, 0xc0003fcc00, 0xc0005dc000, 0x20000, 0x1000000, 0x276720000, 0xc0003feaa0, 0x0, 0x0)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/dirty_page.go:142 +0x8ec
github.com/chrislusf/seaweedfs/weed/filesys.(*ContinuousDirtyPages).saveExistingPagesToStorage(0xc0000aca80, 0x10d7ba0, 0xc0003fcc00, 0x0, 0x0, 0x0)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/dirty_page.go:107 +0x6c
github.com/chrislusf/seaweedfs/weed/filesys.(*ContinuousDirtyPages).AddPage(0xc0000aca80, 0x10d7ba0, 0xc0003fcc00, 0x278460000, 0xc011966050, 0x20000, 0x20fb0, 0x6fc23ac00, 0x4a817c800, 0x0, ...)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/dirty_page.go:70 +0x8f
github.com/chrislusf/seaweedfs/weed/filesys.(*FileHandle).Write(0xc000548410, 0x10d7ba0, 0xc0003fcc00, 0xc00014be30, 0xc011946af8, 0x47fa01, 0x0)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/filehandle.go:141 +0x245
bazil.org/fuse/fs.(*Server).handleRequest(0xc0002cc0c0, 0x10d7ba0, 0xc0003fcc00, 0x10cb020, 0xc000394140, 0xc0000acac0, 0x10d3e60, 0xc00014be30, 0xc00052fef8, 0x10ca6a0, ...)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:1265 +0x1599
bazil.org/fuse/fs.(*Server).serve(0xc0002cc0c0, 0x10d3e60, 0xc00014be30)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:878 +0x410
bazil.org/fuse/fs.(*Server).Serve.func1(0xc0002cc0c0, 0x10d3e60, 0xc00014be30)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:425 +0x6e
created by bazil.org/fuse/fs.(*Server).Serve
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:423 +0x321
2018-09-05 17:17:04 +08:00
|
|
|
bufReader := bytes.NewReader(buf)
|
2019-12-16 12:57:08 +08:00
|
|
|
uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "", nil, auth)
|
2018-05-29 03:30:17 +08:00
|
|
|
if err != nil {
|
|
|
|
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
|
|
|
|
return nil, fmt.Errorf("upload data: %v", err)
|
|
|
|
}
|
|
|
|
if uploadResult.Error != "" {
|
|
|
|
glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
|
|
|
|
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &filer_pb.FileChunk{
|
|
|
|
FileId: fileId,
|
2018-05-31 13:02:21 +08:00
|
|
|
Offset: offset,
|
|
|
|
Size: uint64(len(buf)),
|
2018-05-29 03:30:17 +08:00
|
|
|
Mtime: time.Now().UnixNano(),
|
2018-09-23 13:12:21 +08:00
|
|
|
ETag: uploadResult.ETag,
|
2018-05-29 03:30:17 +08:00
|
|
|
}, nil
|
|
|
|
|
|
|
|
}
|
2018-05-29 16:21:21 +08:00
|
|
|
|
|
|
|
func max(x, y int64) int64 {
|
|
|
|
if x > y {
|
|
|
|
return x
|
|
|
|
}
|
|
|
|
return y
|
|
|
|
}
|
2020-01-23 05:42:03 +08:00
|
|
|
func min(x, y int64) int64 {
|
|
|
|
if x < y {
|
|
|
|
return x
|
|
|
|
}
|
|
|
|
return y
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pages *ContinuousDirtyPages) ReadDirtyData(ctx context.Context, data []byte, startOffset int64) (offset int64, size int, err error) {
|
|
|
|
bufSize := int64(len(data))
|
2020-01-23 07:38:25 +08:00
|
|
|
|
|
|
|
pages.lock.Lock()
|
|
|
|
defer pages.lock.Unlock()
|
|
|
|
|
2020-01-23 05:42:03 +08:00
|
|
|
if startOffset+bufSize < pages.Offset {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if startOffset >= pages.Offset+pages.Size {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
offset = max(pages.Offset, startOffset)
|
|
|
|
stopOffset := min(pages.Offset+pages.Size, startOffset+bufSize)
|
|
|
|
size = int(stopOffset - offset)
|
|
|
|
copy(data[offset-startOffset:], pages.Data[offset-pages.Offset:stopOffset-pages.Offset])
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|