diff --git a/go.mod b/go.mod index d25bf41e4..79896c7d3 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/chrislusf/seaweedfs - go 1.12 + require ( cloud.google.com/go v0.44.3 contrib.go.opencensus.io/exporter/aws v0.0.0-20190807220307-c50fb1bd7f21 // indirect @@ -119,4 +119,4 @@ require ( pack.ag/amqp v0.12.1 // indirect ) -replace github.com/satori/go.uuid v1.2.0 => github.com/satori/go.uuid v0.0.0-20181028125025-b2ce2384e17b +replace github.com/satori/go.uuid v1.2.0 => github.com/satori/go.uuid v0.0.0-20181028125025-b2ce2384e17b \ No newline at end of file diff --git a/go.sum b/go.sum index 3cf8c49ca..d6ee0dc70 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92 h1:lM9SFsh0EPXkyJyrTJqLZPAIJBtNFP6LNkYXu2MnSZI= github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92/go.mod h1:4jyiUCD5y548+yKW+oiHtccBiMaLCCbFBpK2t7X4eUo= +github.com/chrislusf/seaweedfs v0.0.0-20190912032620-ae53f636804e h1:PmqW1XGq0V6KnwOFa3hOSqsqa/bH66zxWzCVMOo5Yi4= +github.com/chrislusf/seaweedfs v0.0.0-20190912032620-ae53f636804e/go.mod h1:e5Pz27e2DxLCFt6GbCBP5/qJygD4TkOL5xqSFYFq+2U= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= @@ -262,6 +264,7 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/joeslay/seaweedfs v0.0.0-20190912104409-d8c34b032fb6/go.mod h1:ljVry+CyFSNBLlKiell2UlxOKCvXXHjyBhiGDzXa+0c= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= diff --git a/weed/command/backup.go b/weed/command/backup.go index cd5843dd3..505de4ae6 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true @@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index 79d50c095..4a54f5670 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -38,7 +38,7 @@ func runCompact(cmd *Command, args []string) bool { vid := needle.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil, preallocate) + storage.NeedleMapInMemory, nil, nil, preallocate, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/weed/os_overloads/file_windows.go b/weed/os_overloads/file_windows.go new file mode 100644 index 000000000..05aa384e2 --- /dev/null +++ b/weed/os_overloads/file_windows.go @@ -0,0 +1,168 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package os_overloads + +import ( + "os" + "syscall" + + "golang.org/x/sys/windows" +) + +func isAbs(path string) (b bool) { + v := volumeName(path) + if v == "" { + return false + } + path = path[len(v):] + if path == "" { + return false + } + return os.IsPathSeparator(path[0]) +} + +func volumeName(path string) (v string) { + if len(path) < 2 { + return "" + } + // with drive letter + c := path[0] + if path[1] == ':' && + ('0' <= c && c <= '9' || 'a' <= c && c <= 'z' || + 'A' <= c && c <= 'Z') { + return path[:2] + } + // is it UNC + if l := len(path); l >= 5 && os.IsPathSeparator(path[0]) && os.IsPathSeparator(path[1]) && + !os.IsPathSeparator(path[2]) && path[2] != '.' { + // first, leading `\\` and next shouldn't be `\`. its server name. + for n := 3; n < l-1; n++ { + // second, next '\' shouldn't be repeated. + if os.IsPathSeparator(path[n]) { + n++ + // third, following something characters. its share name. + if !os.IsPathSeparator(path[n]) { + if path[n] == '.' { + break + } + for ; n < l; n++ { + if os.IsPathSeparator(path[n]) { + break + } + } + return path[:n] + } + break + } + } + } + return "" +} + +// fixLongPath returns the extended-length (\\?\-prefixed) form of +// path when needed, in order to avoid the default 260 character file +// path limit imposed by Windows. If path is not easily converted to +// the extended-length form (for example, if path is a relative path +// or contains .. elements), or is short enough, fixLongPath returns +// path unmodified. +// +// See https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath +func fixLongPath(path string) string { + // Do nothing (and don't allocate) if the path is "short". + // Empirically (at least on the Windows Server 2013 builder), + // the kernel is arbitrarily okay with < 248 bytes. That + // matches what the docs above say: + // "When using an API to create a directory, the specified + // path cannot be so long that you cannot append an 8.3 file + // name (that is, the directory name cannot exceed MAX_PATH + // minus 12)." Since MAX_PATH is 260, 260 - 12 = 248. + // + // The MSDN docs appear to say that a normal path that is 248 bytes long + // will work; empirically the path must be less then 248 bytes long. + if len(path) < 248 { + // Don't fix. (This is how Go 1.7 and earlier worked, + // not automatically generating the \\?\ form) + return path + } + + // The extended form begins with \\?\, as in + // \\?\c:\windows\foo.txt or \\?\UNC\server\share\foo.txt. + // The extended form disables evaluation of . and .. path + // elements and disables the interpretation of / as equivalent + // to \. The conversion here rewrites / to \ and elides + // . elements as well as trailing or duplicate separators. For + // simplicity it avoids the conversion entirely for relative + // paths or paths containing .. elements. For now, + // \\server\share paths are not converted to + // \\?\UNC\server\share paths because the rules for doing so + // are less well-specified. + if len(path) >= 2 && path[:2] == `\\` { + // Don't canonicalize UNC paths. + return path + } + if !isAbs(path) { + // Relative path + return path + } + + const prefix = `\\?` + + pathbuf := make([]byte, len(prefix)+len(path)+len(`\`)) + copy(pathbuf, prefix) + n := len(path) + r, w := 0, len(prefix) + for r < n { + switch { + case os.IsPathSeparator(path[r]): + // empty block + r++ + case path[r] == '.' && (r+1 == n || os.IsPathSeparator(path[r+1])): + // /./ + r++ + case r+1 < n && path[r] == '.' && path[r+1] == '.' && (r+2 == n || os.IsPathSeparator(path[r+2])): + // /../ is currently unhandled + return path + default: + pathbuf[w] = '\\' + w++ + for ; r < n && !os.IsPathSeparator(path[r]); r++ { + pathbuf[w] = path[r] + w++ + } + } + } + // A drive's root directory needs a trailing \ + if w == len(`\\?\c:`) { + pathbuf[w] = '\\' + w++ + } + return string(pathbuf[:w]) +} + +// syscallMode returns the syscall-specific mode bits from Go's portable mode bits. +func syscallMode(i os.FileMode) (o uint32) { + o |= uint32(i.Perm()) + if i&os.ModeSetuid != 0 { + o |= syscall.S_ISUID + } + if i&os.ModeSetgid != 0 { + o |= syscall.S_ISGID + } + if i&os.ModeSticky != 0 { + o |= syscall.S_ISVTX + } + // No mapping for Go's ModeTemporary (plan9 only). + return +} + +//If the bool is set to true then the file is opened with the parameters FILE_ATTRIBUTE_TEMPORARY and +// FILE_FLAG_DELETE_ON_CLOSE +func OpenFile(name string, flag int, perm os.FileMode, setToTempAndDelete bool) (file *os.File, err error) { + r, e := Open(fixLongPath(name), flag|windows.O_CLOEXEC, syscallMode(perm), setToTempAndDelete) + if e != nil { + return nil, e + } + return os.NewFile(uintptr(r), name), nil +} diff --git a/weed/os_overloads/syscall_windows.go b/weed/os_overloads/syscall_windows.go new file mode 100644 index 000000000..081cba431 --- /dev/null +++ b/weed/os_overloads/syscall_windows.go @@ -0,0 +1,80 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Windows system calls. + +package os_overloads + +import ( + "syscall" + "unsafe" + + "golang.org/x/sys/windows" +) + +// windows api calls + +//sys CreateFile(name *uint16, access uint32, mode uint32, sa *SecurityAttributes, createmode uint32, attrs uint32, templatefile int32) (handle Handle, err error) [failretval==InvalidHandle] = CreateFileW + +func makeInheritSa() *syscall.SecurityAttributes { + var sa syscall.SecurityAttributes + sa.Length = uint32(unsafe.Sizeof(sa)) + sa.InheritHandle = 1 + return &sa +} + +// opens the +func Open(path string, mode int, perm uint32, setToTempAndDelete bool) (fd syscall.Handle, err error) { + if len(path) == 0 { + return syscall.InvalidHandle, windows.ERROR_FILE_NOT_FOUND + } + pathp, err := syscall.UTF16PtrFromString(path) + if err != nil { + return syscall.InvalidHandle, err + } + var access uint32 + switch mode & (windows.O_RDONLY | windows.O_WRONLY | windows.O_RDWR) { + case windows.O_RDONLY: + access = windows.GENERIC_READ + case windows.O_WRONLY: + access = windows.GENERIC_WRITE + case windows.O_RDWR: + access = windows.GENERIC_READ | windows.GENERIC_WRITE + } + if mode&windows.O_CREAT != 0 { + access |= windows.GENERIC_WRITE + } + if mode&windows.O_APPEND != 0 { + access &^= windows.GENERIC_WRITE + access |= windows.FILE_APPEND_DATA + } + sharemode := uint32(windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE) + var sa *syscall.SecurityAttributes + if mode&windows.O_CLOEXEC == 0 { + sa = makeInheritSa() + } + var createmode uint32 + switch { + case mode&(windows.O_CREAT|windows.O_EXCL) == (windows.O_CREAT | windows.O_EXCL): + createmode = windows.CREATE_NEW + case mode&(windows.O_CREAT|windows.O_TRUNC) == (windows.O_CREAT | windows.O_TRUNC): + createmode = windows.CREATE_ALWAYS + case mode&windows.O_CREAT == windows.O_CREAT: + createmode = windows.OPEN_ALWAYS + case mode&windows.O_TRUNC == windows.O_TRUNC: + createmode = windows.TRUNCATE_EXISTING + default: + createmode = windows.OPEN_EXISTING + } + + var h syscall.Handle + var e error + + if setToTempAndDelete { + h, e = syscall.CreateFile(pathp, access, sharemode, sa, createmode, (windows.FILE_ATTRIBUTE_TEMPORARY | FILE_FLAG_DELETE_ON_CLOSE), 0) + } else { + h, e = syscall.CreateFile(pathp, access, sharemode, sa, createmode, windows.FILE_ATTRIBUTE_NORMAL, 0) + } + return h, e +} diff --git a/weed/os_overloads/types_windows.go b/weed/os_overloads/types_windows.go new file mode 100644 index 000000000..254ba3002 --- /dev/null +++ b/weed/os_overloads/types_windows.go @@ -0,0 +1,9 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package os_overloads + +const ( + FILE_FLAG_DELETE_ON_CLOSE = 0x04000000 +) diff --git a/weed/pb/master.proto b/weed/pb/master.proto index 9cf46ab92..f4230d029 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -139,6 +139,7 @@ message AssignRequest { string data_center = 5; string rack = 6; string data_node = 7; + uint32 MemoryMapMaxSizeMB = 8; } message AssignResponse { string fid = 1; diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index f2d9420f0..a5d665289 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -44,12 +44,15 @@ It has these top-level messages: */ package master_pb -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - import ( + fmt "fmt" + + proto "github.com/golang/protobuf/proto" + + math "math" + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" ) @@ -648,13 +651,14 @@ func (m *Location) GetPublicUrl() string { } type AssignRequest struct { - Count uint64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"` - Replication string `protobuf:"bytes,2,opt,name=replication" json:"replication,omitempty"` - Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` - Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"` - DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` - Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"` - DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"` + Count uint64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"` + Replication string `protobuf:"bytes,2,opt,name=replication" json:"replication,omitempty"` + Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` + Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"` + DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` + Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"` + DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"` + MemoryMapMaxSizeMB uint32 `protobuf:"varint,8,opt,name=memorymapmaxsizemb" json:"memorymapmaxsizemb,omitempty"` } func (m *AssignRequest) Reset() { *m = AssignRequest{} } @@ -711,6 +715,13 @@ func (m *AssignRequest) GetDataNode() string { return "" } +func (m *AssignRequest) GetMemoryMapMaxSizeMB() uint32 { + if m != nil { + return m.MemoryMapMaxSizeMB + } + return 0 +} + type AssignResponse struct { Fid string `protobuf:"bytes,1,opt,name=fid" json:"fid,omitempty"` Url string `protobuf:"bytes,2,opt,name=url" json:"url,omitempty"` diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index c330f7fb2..b3d49aaed 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -131,6 +131,7 @@ message AllocateVolumeRequest { int64 preallocate = 3; string replication = 4; string ttl = 5; + int32 memorymapmaxsizemb = 6; } message AllocateVolumeResponse { } diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 8b84f1cfd..7060c602f 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -70,12 +70,15 @@ It has these top-level messages: */ package volume_server_pb -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - import ( + fmt "fmt" + + proto "github.com/golang/protobuf/proto" + + math "math" + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" ) @@ -315,11 +318,12 @@ func (*DeleteCollectionResponse) ProtoMessage() {} func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } type AllocateVolumeRequest struct { - VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` - Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` - Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"` - Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` - Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` + Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"` + Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` + Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"` + MemoryMapMaxSizeMB uint32 `protobuf:"varint,6,opt,name=memorymapmaxsizemb" json:"memorymapmaxsizemb,omitempty"` } func (m *AllocateVolumeRequest) Reset() { *m = AllocateVolumeRequest{} } @@ -362,6 +366,13 @@ func (m *AllocateVolumeRequest) GetTtl() string { return "" } +func (m *AllocateVolumeRequest) GetMemoryMapMaxSizeMB() uint32 { + if m != nil { + return m.MemoryMapMaxSizeMB + } + return 0 +} + type AllocateVolumeResponse struct { } @@ -2359,6 +2370,7 @@ func _VolumeServer_DeleteCollection_Handler(srv interface{}, ctx context.Context func _VolumeServer_AllocateVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(AllocateVolumeRequest) + if err := dec(in); err != nil { return nil, err } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 19064bcde..16ce5356a 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -62,13 +62,14 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } option := &topology.VolumeGrowOption{ - Collection: req.Collection, - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - Prealloacte: ms.preallocateSize, - DataCenter: req.DataCenter, - Rack: req.Rack, - DataNode: req.DataNode, + Collection: req.Collection, + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + Prealloacte: ms.preallocateSize, + DataCenter: req.DataCenter, + Rack: req.Rack, + DataNode: req.DataNode, + MemoryMapMaxSizeMB: req.MemoryMapMaxSizeMB, } if !ms.Topo.HasWritableVolume(option) { diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 6b5da1132..2acde740e 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -148,6 +148,11 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + memoryMapMaxSizeMB, err := needle.ReadMemoryMapMaxSizeMB(r.FormValue("memorymapmaxsizemb")) + if err != nil { + return nil, err + } + preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) @@ -156,13 +161,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr } } volumeGrowOption := &topology.VolumeGrowOption{ - Collection: r.FormValue("collection"), - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - Prealloacte: preallocate, - DataCenter: r.FormValue("dataCenter"), - Rack: r.FormValue("rack"), - DataNode: r.FormValue("dataNode"), + Collection: r.FormValue("collection"), + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + Prealloacte: preallocate, + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), + DataNode: r.FormValue("dataNode"), + MemoryMapMaxSizeMB: memoryMapMaxSizeMB, } return volumeGrowOption, nil } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 35c2508a6..ad7920161 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -35,6 +35,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Replication, req.Ttl, req.Preallocate, + req.MemoryMapMaxSizeMB, ) if err != nil { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index e61623fc7..c7faa57a6 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -60,7 +60,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne _, found := l.volumes[vid] l.RUnlock() if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { l.Lock() l.volumes[vid] = v l.Unlock() diff --git a/weed/storage/memory_map/memory_map.go b/weed/storage/memory_map/memory_map.go new file mode 100644 index 000000000..5f0327ea7 --- /dev/null +++ b/weed/storage/memory_map/memory_map.go @@ -0,0 +1,42 @@ +// +build !windows + +package memory_map + +import ( + "fmt" + "os" +) + +type MemoryBuffer struct { + aligned_length uint64 + length uint64 + aligned_ptr uintptr + ptr uintptr + Buffer []byte +} + +type MemoryMap struct { + File *os.File + file_memory_map_handle uintptr + write_map_views []MemoryBuffer + max_length uint64 + End_of_file int64 +} + +var FileMemoryMap = make(map[string]*MemoryMap) + +func (mMap *MemoryMap) CreateMemoryMap(file *os.File, maxLength uint64) { +} + +func (mMap *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte) { + +} + +func (mMap *MemoryMap) ReadMemory(offset uint64, length uint64) ([]byte, error) { + dataSlice := []byte{} + return dataSlice, fmt.Errorf("Memory Map not implemented for this platform") +} + +func (mBuffer *MemoryMap) DeleteFileAndMemoryMap() { + +} diff --git a/weed/storage/memory_map/memory_map_windows.go b/weed/storage/memory_map/memory_map_windows.go new file mode 100644 index 000000000..ac4493188 --- /dev/null +++ b/weed/storage/memory_map/memory_map_windows.go @@ -0,0 +1,343 @@ +// +build windows + +package memory_map + +import ( + "os" + "reflect" + "syscall" + "unsafe" + + "golang.org/x/sys/windows" +) + +type MemoryBuffer struct { + aligned_length uint64 + length uint64 + aligned_ptr uintptr + ptr uintptr + Buffer []byte +} + +type MemoryMap struct { + File *os.File + file_memory_map_handle uintptr + write_map_views []MemoryBuffer + max_length uint64 + End_of_file int64 +} + +var FileMemoryMap = make(map[string]*MemoryMap) + +type DWORDLONG = uint64 +type DWORD = uint32 +type WORD = uint16 + +var ( + modkernel32 = syscall.NewLazyDLL("kernel32.dll") + + procGetSystemInfo = modkernel32.NewProc("GetSystemInfo") + procGlobalMemoryStatusEx = modkernel32.NewProc("GlobalMemoryStatusEx") + procGetProcessWorkingSetSize = modkernel32.NewProc("GetProcessWorkingSetSize") + procSetProcessWorkingSetSize = modkernel32.NewProc("SetProcessWorkingSetSize") +) + +var currentProcess, _ = windows.GetCurrentProcess() +var currentMinWorkingSet uint64 = 0 +var currentMaxWorkingSet uint64 = 0 +var _ = getProcessWorkingSetSize(uintptr(currentProcess), ¤tMinWorkingSet, ¤tMaxWorkingSet) + +var systemInfo, _ = getSystemInfo() +var chunkSize = uint64(systemInfo.dwAllocationGranularity) * 128 + +var memoryStatusEx, _ = globalMemoryStatusEx() +var maxMemoryLimitBytes = uint64(float64(memoryStatusEx.ullTotalPhys) * 0.8) + +func (mMap *MemoryMap) CreateMemoryMap(file *os.File, maxLength uint64) { + + chunks := (maxLength / chunkSize) + if chunks*chunkSize < maxLength { + chunks = chunks + 1 + } + + alignedMaxLength := chunks * chunkSize + + maxLength_high := uint32(alignedMaxLength >> 32) + maxLength_low := uint32(alignedMaxLength & 0xFFFFFFFF) + file_memory_map_handle, err := windows.CreateFileMapping(windows.Handle(file.Fd()), nil, windows.PAGE_READWRITE, maxLength_high, maxLength_low, nil) + + if err == nil { + mMap.File = file + mMap.file_memory_map_handle = uintptr(file_memory_map_handle) + mMap.write_map_views = make([]MemoryBuffer, 0, alignedMaxLength/chunkSize) + mMap.max_length = alignedMaxLength + mMap.End_of_file = -1 + } +} + +func (mMap *MemoryMap) DeleteFileAndMemoryMap() { + //First we close the file handles first to delete the file, + //Then we unmap the memory to ensure the unmapping process doesn't write the data to disk + windows.CloseHandle(windows.Handle(mMap.file_memory_map_handle)) + windows.CloseHandle(windows.Handle(mMap.File.Fd())) + + for _, view := range mMap.write_map_views { + view.releaseMemory() + } + + mMap.write_map_views = nil + mMap.max_length = 0 +} + +func min(x, y uint64) uint64 { + if x < y { + return x + } + return y +} + +func (mMap *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte) { + + for { + if ((offset+length)/chunkSize)+1 > uint64(len(mMap.write_map_views)) { + allocateChunk(mMap) + } else { + break + } + } + + remaining_length := length + sliceIndex := offset / chunkSize + sliceOffset := offset - (sliceIndex * chunkSize) + dataOffset := uint64(0) + + for { + writeEnd := min((remaining_length + sliceOffset), chunkSize) + copy(mMap.write_map_views[sliceIndex].Buffer[sliceOffset:writeEnd], data[dataOffset:]) + remaining_length -= (writeEnd - sliceOffset) + dataOffset += (writeEnd - sliceOffset) + + if remaining_length > 0 { + sliceIndex += 1 + sliceOffset = 0 + } else { + break + } + } + + if mMap.End_of_file < int64(offset+length-1) { + mMap.End_of_file = int64(offset + length - 1) + } +} + +func (mMap *MemoryMap) ReadMemory(offset uint64, length uint64) (dataSlice []byte, err error) { + dataSlice = make([]byte, length) + mBuffer, err := allocate(windows.Handle(mMap.file_memory_map_handle), offset, length, false) + copy(dataSlice, mBuffer.Buffer) + mBuffer.releaseMemory() + return dataSlice, err +} + +func (mBuffer *MemoryBuffer) releaseMemory() { + + windows.VirtualUnlock(mBuffer.aligned_ptr, uintptr(mBuffer.aligned_length)) + windows.UnmapViewOfFile(mBuffer.aligned_ptr) + + currentMinWorkingSet -= mBuffer.aligned_length + currentMaxWorkingSet -= mBuffer.aligned_length + + if currentMinWorkingSet < maxMemoryLimitBytes { + var _ = setProcessWorkingSetSize(uintptr(currentProcess), currentMinWorkingSet, currentMaxWorkingSet) + } + + mBuffer.ptr = 0 + mBuffer.aligned_ptr = 0 + mBuffer.length = 0 + mBuffer.aligned_length = 0 + mBuffer.Buffer = nil +} + +func allocateChunk(mMap *MemoryMap) { + start := uint64(len(mMap.write_map_views)) * chunkSize + mBuffer, err := allocate(windows.Handle(mMap.file_memory_map_handle), start, chunkSize, true) + + if err == nil { + mMap.write_map_views = append(mMap.write_map_views, mBuffer) + } +} + +func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) (MemoryBuffer, error) { + + mBuffer := MemoryBuffer{} + + //align memory allocations to the minium virtal memory allocation size + dwSysGran := systemInfo.dwAllocationGranularity + + start := (offset / uint64(dwSysGran)) * uint64(dwSysGran) + diff := offset - start + aligned_length := diff + length + + offset_high := uint32(start >> 32) + offset_low := uint32(start & 0xFFFFFFFF) + + access := windows.FILE_MAP_READ + + if write { + access = windows.FILE_MAP_WRITE + } + + currentMinWorkingSet += aligned_length + currentMaxWorkingSet += aligned_length + + if currentMinWorkingSet < maxMemoryLimitBytes { + // increase the process working set size to hint to windows memory manager to + // prioritise keeping this memory mapped in physical memory over other standby memory + var _ = setProcessWorkingSetSize(uintptr(currentProcess), currentMinWorkingSet, currentMaxWorkingSet) + } + + addr_ptr, errno := windows.MapViewOfFile(hMapFile, + uint32(access), // read/write permission + offset_high, + offset_low, + uintptr(aligned_length)) + + if addr_ptr == 0 { + return mBuffer, errno + } + + if currentMinWorkingSet < maxMemoryLimitBytes { + windows.VirtualLock(mBuffer.aligned_ptr, uintptr(mBuffer.aligned_length)) + } + + mBuffer.aligned_ptr = addr_ptr + mBuffer.aligned_length = aligned_length + mBuffer.ptr = addr_ptr + uintptr(diff) + mBuffer.length = length + + slice_header := (*reflect.SliceHeader)(unsafe.Pointer(&mBuffer.Buffer)) + slice_header.Data = addr_ptr + uintptr(diff) + slice_header.Len = int(length) + slice_header.Cap = int(length) + + return mBuffer, nil +} + +//typedef struct _MEMORYSTATUSEX { +// DWORD dwLength; +// DWORD dwMemoryLoad; +// DWORDLONG ullTotalPhys; +// DWORDLONG ullAvailPhys; +// DWORDLONG ullTotalPageFile; +// DWORDLONG ullAvailPageFile; +// DWORDLONG ullTotalVirtual; +// DWORDLONG ullAvailVirtual; +// DWORDLONG ullAvailExtendedVirtual; +// } MEMORYSTATUSEX, *LPMEMORYSTATUSEX; +//https://docs.microsoft.com/en-gb/windows/win32/api/sysinfoapi/ns-sysinfoapi-memorystatusex + +type _MEMORYSTATUSEX struct { + dwLength DWORD + dwMemoryLoad DWORD + ullTotalPhys DWORDLONG + ullAvailPhys DWORDLONG + ullTotalPageFile DWORDLONG + ullAvailPageFile DWORDLONG + ullTotalVirtual DWORDLONG + ullAvailVirtual DWORDLONG + ullAvailExtendedVirtual DWORDLONG +} + +// BOOL GlobalMemoryStatusEx( +// LPMEMORYSTATUSEX lpBuffer +// ); +// https://docs.microsoft.com/en-gb/windows/win32/api/sysinfoapi/nf-sysinfoapi-globalmemorystatusex +func globalMemoryStatusEx() (_MEMORYSTATUSEX, error) { + var mem_status _MEMORYSTATUSEX + + mem_status.dwLength = uint32(unsafe.Sizeof(mem_status)) + _, _, err := procGlobalMemoryStatusEx.Call(uintptr(unsafe.Pointer(&mem_status))) + + if err != syscall.Errno(0) { + return mem_status, err + } + return mem_status, nil +} + +// typedef struct _SYSTEM_INFO { +// union { +// DWORD dwOemId; +// struct { +// WORD wProcessorArchitecture; +// WORD wReserved; +// }; +// }; +// DWORD dwPageSize; +// LPVOID lpMinimumApplicationAddress; +// LPVOID lpMaximumApplicationAddress; +// DWORD_PTR dwActiveProcessorMask; +// DWORD dwNumberOfProcessors; +// DWORD dwProcessorType; +// DWORD dwAllocationGranularity; +// WORD wProcessorLevel; +// WORD wProcessorRevision; +// } SYSTEM_INFO; +// https://docs.microsoft.com/en-gb/windows/win32/api/sysinfoapi/ns-sysinfoapi-system_info +type _SYSTEM_INFO struct { + dwOemId DWORD + dwPageSize DWORD + lpMinimumApplicationAddress uintptr + lpMaximumApplicationAddress uintptr + dwActiveProcessorMask uintptr + dwNumberOfProcessors DWORD + dwProcessorType DWORD + dwAllocationGranularity DWORD + wProcessorLevel WORD + wProcessorRevision WORD +} + +// void WINAPI GetSystemInfo( +// _Out_ LPSYSTEM_INFO lpSystemInfo +// ); +// https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/nf-sysinfoapi-getsysteminfo +func getSystemInfo() (_SYSTEM_INFO, error) { + var si _SYSTEM_INFO + _, _, err := procGetSystemInfo.Call(uintptr(unsafe.Pointer(&si))) + if err != syscall.Errno(0) { + return si, err + } + return si, nil +} + +// BOOL GetProcessWorkingSetSize( +// HANDLE hProcess, +// PSIZE_T lpMinimumWorkingSetSize, +// PSIZE_T lpMaximumWorkingSetSize +// ); +// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getprocessworkingsetsize + +func getProcessWorkingSetSize(process uintptr, dwMinWorkingSet *uint64, dwMaxWorkingSet *uint64) error { + r1, _, err := syscall.Syscall(procGetProcessWorkingSetSize.Addr(), 3, process, uintptr(unsafe.Pointer(dwMinWorkingSet)), uintptr(unsafe.Pointer(dwMaxWorkingSet))) + if r1 == 0 { + if err != syscall.Errno(0) { + return err + } + } + return nil +} + +// BOOL SetProcessWorkingSetSize( +// HANDLE hProcess, +// SIZE_T dwMinimumWorkingSetSize, +// SIZE_T dwMaximumWorkingSetSize +// ); +// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setprocessworkingsetsize + +func setProcessWorkingSetSize(process uintptr, dwMinWorkingSet uint64, dwMaxWorkingSet uint64) error { + r1, _, err := syscall.Syscall(procSetProcessWorkingSetSize.Addr(), 3, process, uintptr(dwMinWorkingSet), uintptr(dwMaxWorkingSet)) + if r1 == 0 { + if err != syscall.Errno(0) { + return err + } + } + return nil +} diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 75aefdf16..5f8bfee8b 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -9,6 +9,7 @@ import ( "math" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/memory_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -29,39 +30,25 @@ func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } -func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) { - if end, e := w.Seek(0, io.SeekEnd); e == nil { - defer func(w *os.File, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) - } - } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) - return - } +func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) { + + writeBytes := make([]byte, 0) + switch version { case Version1: header := make([]byte, NeedleHeaderSize) CookieToBytes(header[0:CookieSize], n.Cookie) NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) n.Size = uint32(len(n.Data)) - size = n.Size util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - if _, err = w.Write(header); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } - actualSize = NeedleHeaderSize + int64(n.Size) + size := n.Size + actualSize := NeedleHeaderSize + int64(n.Size) + writeBytes = append(writeBytes, header...) + writeBytes = append(writeBytes, n.Data...) padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return + writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...) + return writeBytes, size, actualSize, nil case Version2, Version3: header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation CookieToBytes(header[0:CookieSize], n.Cookie) @@ -92,82 +79,101 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32 } else { n.Size = 0 } - size = n.DataSize util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - if _, err = w.Write(header[0:NeedleHeaderSize]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...) if n.DataSize > 0 { util.Uint32toBytes(header[0:4], n.DataSize) - if _, err = w.Write(header[0:4]); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } + writeBytes = append(writeBytes, header[0:4]...) + writeBytes = append(writeBytes, n.Data...) util.Uint8toBytes(header[0:1], n.Flags) - if _, err = w.Write(header[0:1]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:1]...) if n.HasName() { util.Uint8toBytes(header[0:1], n.NameSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Name[:n.NameSize]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:1]...) + writeBytes = append(writeBytes, n.Name[:n.NameSize]...) } if n.HasMime() { util.Uint8toBytes(header[0:1], n.MimeSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Mime); err != nil { - return - } + writeBytes = append(writeBytes, header[0:1]...) + writeBytes = append(writeBytes, n.Mime...) } if n.HasLastModifiedDate() { util.Uint64toBytes(header[0:8], n.LastModified) - if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil { - return - } + writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...) } if n.HasTtl() && n.Ttl != nil { n.Ttl.ToBytes(header[0:TtlBytesLength]) - if _, err = w.Write(header[0:TtlBytesLength]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:TtlBytesLength]...) } if n.HasPairs() { util.Uint16toBytes(header[0:2], n.PairsSize) - if _, err = w.Write(header[0:2]); err != nil { - return - } - if _, err = w.Write(n.Pairs); err != nil { - return - } + writeBytes = append(writeBytes, header[0:2]...) + writeBytes = append(writeBytes, n.Pairs...) } } padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) if version == Version2 { - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...) } else { // version3 util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs) - _, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding]) + writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...) } - return offset, n.DataSize, GetActualSize(n.Size, version), err + return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil } - return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) + + return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) +} + +func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) { + + mMap, exists := memory_map.FileMemoryMap[w.Name()] + if !exists { + if end, e := w.Seek(0, io.SeekEnd); e == nil { + defer func(w *os.File, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) + } + } + }(w, end) + offset = uint64(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + } else { + offset = uint64(mMap.End_of_file + 1) + } + + bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version) + + if err == nil { + if exists { + mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite) + } else { + _, err = w.Write(bytesToWrite) + } + } + + return offset, size, actualSize, err } func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) { - dataSlice = make([]byte, int(GetActualSize(size, version))) - _, err = r.ReadAt(dataSlice, offset) - return dataSlice, err + + dataSize := GetActualSize(size, version) + dataSlice = make([]byte, int(dataSize)) + + mMap, exists := memory_map.FileMemoryMap[r.Name()] + if exists { + dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize)) + return dataSlice, err + } else { + _, err = r.ReadAt(dataSlice, offset) + return dataSlice, err + } } // ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set. @@ -280,14 +286,24 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt n = new(Needle) if version == Version1 || version == Version2 || version == Version3 { bytes = make([]byte, NeedleHeaderSize) - var count int - count, err = r.ReadAt(bytes, offset) - if count <= 0 || err != nil { - return nil, bytes, 0, err + + mMap, exists := memory_map.FileMemoryMap[r.Name()] + if exists { + bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize) + if err != nil { + return nil, bytes, 0, err + } + } else { + var count int + count, err = r.ReadAt(bytes, offset) + if count <= 0 || err != nil { + return nil, bytes, 0, err + } } n.ParseNeedleHeader(bytes) bodyLength = NeedleBodyLength(n.Size, version) } + return } diff --git a/weed/storage/needle/volume_memory_map_max_size.go b/weed/storage/needle/volume_memory_map_max_size.go new file mode 100644 index 000000000..ee6c6ede6 --- /dev/null +++ b/weed/storage/needle/volume_memory_map_max_size.go @@ -0,0 +1,14 @@ +package needle + +import "strconv" + +func ReadMemoryMapMaxSizeMB(MemoryMapMaxSizeMBString string) (uint32, error) { + if MemoryMapMaxSizeMBString == "" { + return 0, nil + } + var MemoryMapMaxSize64 uint64 + var err error + MemoryMapMaxSize64, err = strconv.ParseUint(MemoryMapMaxSizeMBString, 10, 32) + MemoryMapMaxSize := uint32(MemoryMapMaxSize64) + return MemoryMapMaxSize, err +} diff --git a/weed/storage/needle/volume_memory_map_max_size_test.go b/weed/storage/needle/volume_memory_map_max_size_test.go new file mode 100644 index 000000000..ee9923f2b --- /dev/null +++ b/weed/storage/needle/volume_memory_map_max_size_test.go @@ -0,0 +1,10 @@ +package needle + +import "testing" + +func TestMemoryMapMaxSizeReadWrite(t *testing.T) { + memoryMapSize, _ := ReadMemoryMapMaxSizeMB("5000") + if memoryMapSize != 5000 { + t.Errorf("empty memoryMapSize:%v", memoryMapSize) + } +} diff --git a/weed/storage/store.go b/weed/storage/store.go index f0dc90790..948dd1f0a 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -59,7 +59,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, memoryMapMaxSizeMB uint32) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -68,7 +68,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, memoryMapMaxSizeMB) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -101,14 +101,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMB uint32) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.FindFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMB); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 301610ab6..75f889689 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -18,13 +18,14 @@ import ( ) type Volume struct { - Id needle.VolumeId - dir string - Collection string - dataFile *os.File - nm NeedleMapper - needleMapKind NeedleMapType - readOnly bool + Id needle.VolumeId + dir string + Collection string + dataFile *os.File + nm NeedleMapper + needleMapKind NeedleMapType + readOnly bool + MemoryMapMaxSizeMB uint32 SuperBlock @@ -38,9 +39,9 @@ type Volume struct { isCompacting bool } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMB uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id} + v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMB: memoryMapMaxSizeMB} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go index 76231c533..ef58e5871 100644 --- a/weed/storage/volume_create.go +++ b/weed/storage/volume_create.go @@ -1,4 +1,4 @@ -// +build !linux +// +build !linux,!windows package storage @@ -8,8 +8,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) -func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) { - file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { + file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go index 355392c88..d9dfc3862 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/volume_create_linux.go @@ -9,8 +9,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) -func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) { - file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { + file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go new file mode 100644 index 000000000..4c9a146f5 --- /dev/null +++ b/weed/storage/volume_create_windows.go @@ -0,0 +1,38 @@ +// +build windows + +package storage + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/storage/memory_map" + "golang.org/x/sys/windows" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/os_overloads" +) + +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { + + mMap, exists := memory_map.FileMemoryMap[fileName] + if !exists { + + if preallocate > 0 { + glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) + } + + if memoryMapSizeMB > 0 { + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) + memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap) + + new_mMap := memory_map.FileMemoryMap[fileName] + new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB)) + return file, e + } else { + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) + return file, e + } + } else { + return mMap.File, nil + } +} diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 19a45c842..0f2a19bd1 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -42,7 +42,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } else { if createDatIfMissing { - v.dataFile, e = createVolumeFile(fileName+".dat", preallocate) + v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMB) } else { return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index c0b1ca933..767a318c8 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -9,6 +9,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/memory_map" "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -44,6 +45,12 @@ func (v *Volume) Destroy() (err error) { err = fmt.Errorf("volume %d is compacting", v.Id) return } + mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()] + if exists { + mMap.DeleteFileAndMemoryMap() + delete(memory_map.FileMemoryMap, v.dataFile.Name()) + } + v.Close() os.Remove(v.FileName() + ".dat") os.Remove(v.FileName() + ".idx") diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 47cddaba4..e4dc985b0 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -4,11 +4,13 @@ import ( "fmt" "os" + "github.com/chrislusf/seaweedfs/weed/storage/memory_map" + + "github.com/golang/protobuf/proto" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) const ( @@ -74,24 +76,34 @@ func (s *SuperBlock) Initialized() bool { } func (v *Volume) maybeWriteSuperBlock() error { - stat, e := v.dataFile.Stat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e) - return e - } - if stat.Size() == 0 { - v.SuperBlock.version = needle.CurrentVersion - _, e = v.dataFile.Write(v.SuperBlock.Bytes()) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { - if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { - v.readOnly = false + + mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()] + if exists { + if mMap.End_of_file == -1 { + v.SuperBlock.version = needle.CurrentVersion + mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes()) + } + return nil + } else { + stat, e := v.dataFile.Stat() + if e != nil { + glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e) + return e + } + if stat.Size() == 0 { + v.SuperBlock.version = needle.CurrentVersion + _, e = v.dataFile.Write(v.SuperBlock.Bytes()) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { + if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { + v.readOnly = false + } } } } + return e } - return e } func (v *Volume) readSuperBlock() (err error) { @@ -101,15 +113,26 @@ func (v *Volume) readSuperBlock() (err error) { // ReadSuperBlock reads from data file and load it into volume's super block func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) { - if _, err = dataFile.Seek(0, 0); err != nil { - err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err) - return - } + header := make([]byte, _SuperBlockSize) - if _, e := dataFile.Read(header); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) - return + mMap, exists := memory_map.FileMemoryMap[dataFile.Name()] + if exists { + header, err = mMap.ReadMemory(0, _SuperBlockSize) + if err != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), err) + return + } + } else { + if _, err = dataFile.Seek(0, 0); err != nil { + err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err) + return + } + if _, e := dataFile.Read(header); e != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) + return + } } + superBlock.version = needle.Version(header[0]) if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index c021c4c18..5c10eb20f 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -22,86 +22,98 @@ func (v *Volume) garbageLevel() float64 { } func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { - glog.V(3).Infof("Compacting volume %d ...", v.Id) - //no need to lock for copy on write - //v.accessLock.Lock() - //defer v.accessLock.Unlock() - //glog.V(3).Infof("Got Compaction lock...") - v.isCompacting = true - defer func() { - v.isCompacting = false - }() - filePath := v.FileName() - v.lastCompactIndexOffset = v.IndexFileSize() - v.lastCompactRevision = v.SuperBlock.CompactionRevision - glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) + if v.MemoryMapMaxSizeMB > 0 { //it makes no sense to compact in memory + glog.V(3).Infof("Compacting volume %d ...", v.Id) + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") + v.isCompacting = true + defer func() { + v.isCompacting = false + }() + + filePath := v.FileName() + v.lastCompactIndexOffset = v.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactionRevision + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) + } else { + return nil + } } func (v *Volume) Compact2() error { - glog.V(3).Infof("Compact2 volume %d ...", v.Id) - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + if v.MemoryMapMaxSizeMB > 0 { //it makes no sense to compact in memory + glog.V(3).Infof("Compact2 volume %d ...", v.Id) - filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") + v.isCompacting = true + defer func() { + v.isCompacting = false + }() + + filePath := v.FileName() + glog.V(3).Infof("creating copies for volume %d ...", v.Id) + return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") + } else { + return nil + } } func (v *Volume) CommitCompact() error { - glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) + if v.MemoryMapMaxSizeMB > 0 { //it makes no sense to compact in memory + glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + v.isCompacting = true + defer func() { + v.isCompacting = false + }() - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() - glog.V(3).Infof("Got volume %d committing lock...", v.Id) - v.nm.Close() - if err := v.dataFile.Close(); err != nil { - glog.V(0).Infof("fail to close volume %d", v.Id) - } - v.dataFile = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() - - var e error - if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { - glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) - e = os.Remove(v.FileName() + ".cpd") - if e != nil { - return e + glog.V(3).Infof("Got volume %d committing lock...", v.Id) + v.nm.Close() + if err := v.dataFile.Close(); err != nil { + glog.V(0).Infof("fail to close volume %d", v.Id) } - e = os.Remove(v.FileName() + ".cpx") - if e != nil { - return e - } - } else { + v.dataFile = nil + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() + var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) + if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) + e = os.Remove(v.FileName() + ".cpd") + if e != nil { + return e + } + e = os.Remove(v.FileName() + ".cpx") + if e != nil { + return e + } + } else { + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) + } } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) + + //glog.V(3).Infof("Pretending to be vacuuming...") + //time.Sleep(20 * time.Second) + + os.RemoveAll(v.FileName() + ".ldb") + os.RemoveAll(v.FileName() + ".bdb") + + glog.V(3).Infof("Loading volume %d commit file...", v.Id) + if e = v.load(true, false, v.needleMapKind, 0); e != nil { + return e } } - - //glog.V(3).Infof("Pretending to be vacuuming...") - //time.Sleep(20 * time.Second) - - os.RemoveAll(v.FileName() + ".ldb") - os.RemoveAll(v.FileName() + ".bdb") - - glog.V(3).Infof("Loading volume %d commit file...", v.Id) - if e = v.load(true, false, v.needleMapKind, 0); e != nil { - return e - } return nil } @@ -299,7 +311,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca var ( dst, idx *os.File ) - if dst, err = createVolumeFile(dstName, preallocate); err != nil { + if dst, err = createVolumeFile(dstName, preallocate, v.MemoryMapMaxSizeMB); err != nil { return } defer dst.Close() diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 54899c788..ba1e59f2c 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -68,7 +68,7 @@ func TestCompaction(t *testing.T) { } defer os.RemoveAll(dir) // clean up - v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0) + v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -95,7 +95,7 @@ func TestCompaction(t *testing.T) { v.Close() - v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0) + v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0) if err != nil { t.Fatalf("volume reloading: %v", err) } diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 48336092f..342ca6579 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -18,11 +18,12 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ - VolumeId: uint32(vid), - Collection: option.Collection, - Replication: option.ReplicaPlacement.String(), - Ttl: option.Ttl.String(), - Preallocate: option.Prealloacte, + VolumeId: uint32(vid), + Collection: option.Collection, + Replication: option.ReplicaPlacement.String(), + Ttl: option.Ttl.String(), + Preallocate: option.Prealloacte, + MemoryMapMaxSizeMB: option.MemoryMapMaxSizeMB, }) return deleteErr }) diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index ff02044a1..14f94fbd2 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -21,13 +21,14 @@ This package is created to resolve these replica placement issues: */ type VolumeGrowOption struct { - Collection string - ReplicaPlacement *storage.ReplicaPlacement - Ttl *needle.TTL - Prealloacte int64 - DataCenter string - Rack string - DataNode string + Collection string + ReplicaPlacement *storage.ReplicaPlacement + Ttl *needle.TTL + Prealloacte int64 + DataCenter string + Rack string + DataNode string + MemoryMapMaxSizeMB uint32 } type VolumeGrowth struct {